Apache Kafka Schema Registry: Avro Schemas, Compatibility, and Evolution
Architecture Diagram: Schema Registry Components
Architecture Diagram: Schema Evolution Compatibility Matrix
Architecture Diagram: Schema Serialization Flow
Architecture Diagram: Schema Registry Client Cache
Formal Definitions
DfSchema Compatibility
Schema compatibility is a set of rules that govern how schemas can evolve over time without breaking existing producers or consumers. The Schema Registry enforces compatibility checks before registering a new schema version. Compatibility modes include BACKWARD (new schema readable by old consumers), FORWARD (new consumer reads old data), FULL (both), and NONE (no checks).
DfWire Format
The wire format is the binary encoding of a message in Kafka that includes a 5-byte header: 1 magic byte (0x0) + 4-byte schema ID (big-endian integer) + Avro/JSON/Protobuf binary payload. This format enables consumers to look up the schema from the registry using the embedded ID for deserialization, decoupling producers and consumers through schema evolution.
DfSubject Name Strategy
A subject name strategy determines how schemas are organized and versioned in the registry. TopicNameStrategy (default) uses the topic name as the subject, meaning all records in a topic share one schema lineage. TopicRecordNameStrategy combines topic + record name, allowing multiple record types per topic. RecordNameStrategy uses only the record name, enabling schema sharing across topics.
DfSchema Reference
A schema reference is a dependency between schemas, where one schema references another by subject name and version. References enable schema composition (e.g., an Order schema referencing a User schema). The Schema Registry resolves references recursively and includes all dependent schemas in the wire format when needed.
DfSchema Linking
Schema linking is the process of synchronizing schemas across Schema Registry clusters, enabling multi-cluster deployments, migrations, and disaster recovery. Linked schemas maintain compatibility across clusters and can be used for data replication between environments (dev, staging, production).
Key Formulas
Here,
- =Magic byte (1 byte) β identifies Schema Registry encoding
- =4-byte big-endian integer β unique schema identifier from registry
- =Binary-encoded record payload using the identified schema
Schema Storage Cost
Here,
- =Number of registered subjects
- =Average schema versions per subject
- =Average schema size in bytes
Cache Hit Rate
Here,
- =Number of schema lookups served from cache
- =Number of schema lookups requiring HTTP request
Serialization Overhead
Here,
- =Total wire format size (bytes)
- =Raw payload size (bytes)
- =Overhead from magic byte + schema ID (typically 5 bytes)
Schema Registry stores all schemas in the _schemas topic (replicated for HA). Only the leader handles writes (registration); all nodes serve reads (retrieval). The client caches schemas locally (ID->schema and schema->ID mappings) to minimize round trips. Cache hit rate is typically >99.9% in steady state.
For production: always use auto.register.schemas=false to prevent accidental schema registration. Use use.latest.version=true for automatic schema evolution. Test compatibility with the registry's compatibility endpoint before deploying schema changes.
Avro encodes data as binary without field names or separators, making it the most compact serialization format. JSON Schema is human-readable but larger. Protobuf uses varint encoding for integers, providing a balance between size and readability.
ThSchema Evolution Invariants
The following invariants must hold for safe schema evolution:
- Additive fields (new fields with defaults) are backward compatible β old consumers ignore unknown fields.
- Removing fields requires the field to have a default value β otherwise old consumers will fail when reading new data (forward incompatibility).
- Type changes are only compatible if the new type is wire-compatible (e.g., int->long is safe; string->int is not).
- BACKWARD compatibility requires: for every field in the new schema that did not exist in the old schema, the new field must have a default value.
- FORWARD compatibility requires: for every field in the old schema that is removed in the new schema, the old field must have had a default value.
ThWire Format Efficiency
The wire format overhead is constant at 5 bytes (1 magic byte + 4-byte schema ID) regardless of schema complexity. For a typical Avro payload of 100 bytes, the overhead is 5%. For payloads >1KB, the overhead is <0.5%. The schema ID enables O(1) schema lookup via client cache, making the overhead negligible for most workloads.
Detailed Explanation
Schema Registration and Storage
Confluent Schema Registry is a centralized schema management service for Kafka that enables schema evolution while maintaining data quality and compatibility. When a producer registers a schema, the registry assigns it a unique integer ID and stores the schema definition (Avro, JSON Schema, or Protobuf) associated with the subject name. The subject name can follow different strategies: TopicNameStrategy (default), TopicRecordNameStrategy, or RecordNameStrategy.
The registry checks compatibility before registration, ensuring that the new schema is compatible with the existing schema version according to the configured compatibility mode. The schema is then stored in the _schemas topic and cached in the registry's local cache.
Compatibility Modes
The registry supports several compatibility modes to control how schemas can evolve:
- BACKWARD compatibility ensures that new schemas can be read by old consumers (new schema is a superset of old).
- FORWARD compatibility ensures that new consumers can read old data (old schema is a subset of new).
- FULL compatibility ensures both backward and forward compatibility.
- NONE disables compatibility checking (not recommended for production).
- BACKWARD_TRANSITIVE ensures backward compatibility across all historical versions.
- FORWARD_TRANSITIVE ensures forward compatibility across all versions.
Wire Format and Schema IDs
Messages in Kafka include a 5-byte header: the first byte is a magic byte (0x0), followed by a 4-byte schema ID (big-endian integer). This design allows consumers to deserialize messages by looking up the schema from the registry using the embedded ID. The registry client caches schemas locally, reducing latency and network overhead.
Schema Evolution Strategies
There are several patterns for evolving schemas in production:
- Additive Evolution adds new fields with default values (backward compatible).
- Field Deprecation marks fields as deprecated without removing them (forward compatible).
- Type Evolution changes field types (requires careful planning).
- Schema Branching uses
TopicRecordNameStrategyto allow multiple schemas per topic.
Subject Name Strategies
The subject name determines how schemas are organized in the registry:
TopicNameStrategyuses the topic name as the subject (default), meaning all records in a topic share a schema.TopicRecordNameStrategycombines topic name with record name, allowing multiple record types per topic.RecordNameStrategyuses only the record name, enabling schema sharing across topics.
Schema Registry High Availability
The registry runs as a cluster with leader election (using ZooKeeper or KRaft). Only the leader handles writes (schema registration), while all nodes can serve reads (schema retrieval). Follower nodes replicate the _schemas topic to stay synchronized. If the leader fails, a new leader is elected automatically. The registry supports horizontal scaling for read-heavy workloads, but write scalability is limited by the single-leader design.
Serialization Format Comparison
| Format | Binary Size | Schema Evolution | Human Readable | Performance |
|---|---|---|---|---|
| Avro | Smallest | Excellent (schema resolution) | No | Fast encode/decode |
| JSON Schema | Largest | Good (default values) | Yes | Moderate |
| Protobuf | Medium | Good (field numbers) | Partial | Fast encode/decode |
Key Concepts Table
| Component | Description | Key Configurations | Use Cases |
|---|---|---|---|
| Subject | Schema namespace (topic or record name) | subject.name.strategy | Schema organization |
| Schema ID | Unique integer identifier for schema | Auto-assigned | Wire format, caching |
| Compatibility Mode | Rules for schema evolution | compatibility.level | Data quality assurance |
| Wire Format | 5-byte header (magic + schema ID) | Automatic | Message serialization |
| Schema Cache | Local cache of schemas | schema.registry.cache.config | Performance optimization |
| Subject Name Strategy | How subjects are named | topic.name.strategy | Schema organization |
| Compatibility Checker | Validates schema evolution | compatibility.level | Prevent breaking changes |
| Schema Reference | Dependencies between schemas | references | Complex schema hierarchies |
| Mode | PERMISSIVE or READONLY | mode | Schema management control |
| Schema Linking | Sync schemas across clusters | schema.links | Multi-cluster, migration |
Code Examples
Schema Registration with Avro
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import java.util.Properties;
public class SchemaRegistrationExample {
public static void main(String[] args) throws Exception {
// Define Avro schema using builder API
Schema schema = SchemaBuilder.builder("com.example.orders")
.record("Order")
.namespace("com.example.orders")
.fields()
.name("orderId")
.type()
.stringType()
.noDefault()
// orderId: required string, no default
.name("userId")
.type()
.stringType()
.noDefault()
// userId: required string, no default
.name("amount")
.type()
.doubleType()
.noDefault()
// amount: required double, no default
.name("currency")
.type()
.stringType()
.stringDefault("USD")
// currency: optional with default "USD"
.name("status")
.type()
.stringType()
.stringDefault("PENDING")
// status: optional with default "PENDING"
.name("createdAt")
.type()
.longType()
.noDefault()
// createdAt: required long (epoch millis)
.name("metadata")
.type()
.nullable()
.map()
.values()
.stringType()
.nullDefault()
// metadata: nullable map, default null
.endRecord();
// Create Schema Registry client
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
// Comma-separated for HA: "http://sr-1:8081,http://sr-2:8081"
props.put("basic.auth.credentials.source", "USER_INFO");
// Authentication source: USER_INFO, SASL_INHERIT, URL
props.put("basic.auth.user.info", "admin:secret");
// Username:password for basic auth
SchemaRegistryClient client = new RestService(props);
// Register schema
String subject = "order-events-value";
// Subject naming: <topic>-key or <topic>-value (TopicNameStrategy)
AvroSchema avroSchema = new AvroSchema(schema);
int schemaId = client.register(subject, avroSchema);
// Returns unique integer ID (used in wire format)
System.out.println("Registered schema with ID: " + schemaId);
// Get schema metadata
SchemaMetadata metadata = client.getSchemaMetadata(subject, 1);
// version=1 returns first version
System.out.println("Schema version: " + metadata.getVersion());
System.out.println("Schema ID: " + metadata.getSchemaId());
// Check compatibility before registering
boolean isCompatible = client.testCompatibility(subject, avroSchema);
System.out.println("Schema is compatible: " + isCompatible);
// Always test compatibility before deploying schema changes
}
}
Avro Producer with Schema Registry
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class AvroProducerExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Key serializer: String (not Avro)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Value serializer: KafkaAvroSerializer
// Automatically registers schema and embeds schema ID in wire format
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Avro-specific configurations
props.put("avro.use.logical.type.converters", true);
// Convert Avro logical types (date, timestamp) to Java types
props.put("specific.avro.reader", false);
// false = use GenericRecord (flexible)
// true = use SpecificRecord (type-safe, requires code generation)
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Define schema inline
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"Order\"," +
"\"namespace\":\"com.example\"," +
"\"fields\":[" +
"{\"name\":\"orderId\",\"type\":\"string\"}," +
"{\"name\":\"userId\",\"type\":\"string\"}," +
"{\"name\":\"amount\",\"type\":\"double\"}," +
"{\"name\":\"status\",\"type\":\"string\",\"default\":\"PENDING\"}" +
"]}");
for (int i = 0; i < 100; i++) {
// Create Avro record
GenericRecord order = new GenericData.Record(schema);
order.put("orderId", "order-" + i);
order.put("userId", "user-" + (i % 10));
order.put("amount", Math.random() * 1000);
order.put("status", "PENDING");
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("order-events", "order-" + i, order);
// Add headers (independent of Avro schema)
record.headers().add("schema.version", "1".getBytes());
record.headers().add("content-type", "avro".getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.flush();
producer.close();
}
}
Avro Consumer with Schema Evolution
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class AvroConsumerExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Avro deserializer configuration
props.put("specific.avro.reader", false);
// Use GenericRecord for flexibility across schema versions
props.put("avro.use.logical.type.converters", true);
// Convert Avro logical types to Java types
props.put("auto.register.schemas", false);
// CRITICAL: Never auto-register from consumer side
// Prevents accidental schema registration
props.put("use.latest.version", true);
// Use latest compatible schema for deserialization
// Enables automatic schema evolution
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value();
// Access fields safely (handle schema evolution)
String orderId = order.get("orderId").toString();
String userId = order.get("userId").toString();
double amount = (double) order.get("amount");
// Handle new fields with defaults (backward compatible)
String status = "PENDING";
if (order.get("status") != null) {
status = order.get("status").toString();
}
// Handle optional fields
String metadata = "";
if (order.get("metadata") != null) {
@SuppressWarnings("unchecked")
Map<String, String> metaMap =
(Map<String, String>) order.get("metadata");
metadata = metaMap.toString();
}
// Handle new field added in v2 (backward compatible)
String priority = "NORMAL";
try {
if (order.get("priority") != null) {
priority = order.get("priority").toString();
}
} catch (Exception e) {
// Field doesn't exist in older schema version
// Use default value
}
System.out.printf("Order: %s, User: %s, Amount: %.2f, " +
"Status: %s, Priority: %s%n",
orderId, userId, amount, status, priority);
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
Schema Evolution Script
#!/bin/bash
# Schema Registry management script
SCHEMA_REGISTRY_URL="http://localhost:8081"
AUTH="admin:secret"
echo "=== SCHEMA REGISTRY MANAGEMENT ==="
echo "Timestamp: $(date)"
echo ""
# List all subjects
echo "--- All Subjects ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects" | jq .
echo ""
echo "--- Get Subject Schema ---"
SUBJECT="order-events-value"
SCHEMA=$(curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/latest")
echo "Subject: $SUBJECT"
echo "Schema: $SCHEMA" | jq .
echo ""
echo "--- Compatibility Check ---"
NEW_SCHEMA='{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string", "default": "PENDING"},
{"name": "priority", "type": "string", "default": "NORMAL"}
]
}'
# Adding "priority" field with default = backward compatible
COMPATIBILITY=$(curl -s -u $AUTH \
-X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
"$SCHEMA_REGISTRY_URL/compatibility/subjects/$SUBJECT/versions/latest")
echo "Compatibility result: $COMPATIBILITY" | jq .
echo ""
echo "--- Register New Schema ---"
if [ $(echo $COMPATIBILITY | jq -r '.is_compatible') == "true" ]; then
REGISTRATION=$(curl -s -u $AUTH \
-X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
"$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions")
echo "Registration result: $REGISTRATION" | jq .
else
echo "Schema is NOT compatible - cannot register"
fi
echo ""
echo "--- Schema Compatibility Modes ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/config" | jq .
echo ""
echo "--- Set Compatibility Mode ---"
curl -u $AUTH \
-X PUT \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}' \
"$SCHEMA_REGISTRY_URL/config/$SUBJECT"
echo ""
echo "--- List Schema Versions ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions" | jq .
echo ""
echo "--- Get Specific Version ---"
VERSION=1
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/$VERSION" | jq .
echo ""
echo "--- Delete Schema Version (soft delete) ---"
curl -s -u $AUTH \
-X DELETE \
"$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/1"
echo ""
echo "--- Get Schema by ID ---"
SCHEMA_ID=1
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID" | jq .
echo ""
echo "--- Get Schema Subjects by ID ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID/subjects" | jq .
Kafka Streams with Schema Registry
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
public class StreamsSchemaRegistryExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "schema-registry-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Use GenericAvroSerde for Avro values
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
// Configure Avro serde
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", "http://localhost:8081");
serdeConfig.put("basic.auth.credentials.source", "USER_INFO");
serdeConfig.put("basic.auth.user.info", "admin:secret");
GenericAvroSerde avroSerde = new GenericAvroSerde();
avroSerde.configure(serdeConfig, false);
// false = value serde (not key serde)
// Source stream with Avro deserialization
KStream<String, GenericRecord> orders = builder.stream("order-events",
Consumed.with(Serdes.String(), avroSerde));
// Transform and filter
KStream<String, GenericRecord> highValueOrders = orders
.filter((key, order) -> {
double amount = (double) order.get("amount");
return amount > 1000;
})
.mapValues(order -> {
// Create new Avro record with evolved schema
Schema newSchema = SchemaBuilder.builder("com.example")
.record("HighValueOrder")
.fields()
.name("orderId")
.type(order.getSchema().getField("orderId").schema())
.noDefault()
.name("userId")
.type(order.getSchema().getField("userId").schema())
.noDefault()
.name("amount")
.type(order.getSchema().getField("amount").schema())
.noDefault()
.name("verified")
.type()
.booleanType()
.booleanDefault(false)
.endRecord();
GenericRecord highValueOrder = new GenericData.Record(newSchema);
highValueOrder.put("orderId", order.get("orderId"));
highValueOrder.put("userId", order.get("userId"));
highValueOrder.put("amount", order.get("amount"));
highValueOrder.put("verified", false);
return highValueOrder;
});
// Sink with Avro serialization
highValueOrders.to("high-value-orders",
Produced.with(Serdes.String(), avroSerde));
// Aggregation with windowing
KTable<Windowed<String>, Long> orderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withValueSerde(Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Python: Avro Producer with Schema Registry
from confluent_kafka import avro, avro64
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
# Schema Registry client
schema_registry = CachedSchemaRegistryClient({
'url': 'http://localhost:8081',
'basic.auth.credentials.source': 'USER_INFO',
'basic.auth.user.info': 'admin:secret'
})
# Define Avro schema
order_schema = avro.load('order.avsc')
# Or define inline:
order_schema = avro.loads('''
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string", "default": "PENDING"}
]
}
''')
# Avro Producer
avro Producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
'basic.auth.credentials.source': 'USER_INFO',
'basic.auth.user.info': 'admin:secret',
'auto.register.schemas': False,
'use.latest.version': True,
})
# Send Avro records
for i in range(100):
order = {
'orderId': f'order-{i}',
'userId': f'user-{i % 10}',
'amount': 1000.0,
'status': 'PENDING'
}
avro_producer.produce(
topic='order-events',
key=f'order-{i}',
value=order,
value_schema=order_schema
)
avro_producer.flush()
# Avro Consumer
avro_consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'avro-consumer-group',
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
avro_consumer.subscribe(['order-events'])
try:
while True:
msg = avro_consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
# Avro record automatically deserialized
order = msg.value()
print(f"Order: {order['orderId']}, Amount: {order['amount']}")
avro_consumer.commit()
finally:
avro_consumer.close()
Performance Metrics
| Operation | Latency (p99) | Throughput | Cache Hit Rate | Memory Usage |
|---|---|---|---|---|
| Schema Registration | 100ms | 100/sec | N/A | 100MB |
| Schema Retrieval (cache hit) | 1ms | 1M/sec | 99.9% | 100MB |
| Schema Retrieval (cache miss) | 50ms | 10K/sec | 0.1% | 100MB |
| Compatibility Check | 10ms | 10K/sec | N/A | 100MB |
| Avro Serialization | 0.1ms | 10M/sec | N/A | 64MB |
| Avro Deserialization | 0.1ms | 10M/sec | N/A | 64MB |
| Registry Cluster (3 nodes) | 5ms | 300K/sec | 99.9% | 300MB |
| Registry Cluster (5 nodes) | 5ms | 500K/sec | 99.9% | 500MB |
| Serialization Format | Payload Size (100 fields) | Encode Time | Decode Time | Schema Evolution |
|---|---|---|---|---|
| Avro | 50 bytes | 0.05ms | 0.05ms | Excellent |
| JSON Schema | 500 bytes | 0.5ms | 0.5ms | Good |
| Protobuf | 80 bytes | 0.08ms | 0.08ms | Good |
Best Practices
-
Compatibility Mode Selection: Use
BACKWARDcompatibility for most use cases (allows old consumers to read new data). UseFULLcompatibility when you need maximum flexibility. AvoidNONEin production. -
Subject Name Strategy: Use
TopicRecordNameStrategyfor topics with multiple record types. UseTopicNameStrategyfor simple topics. UseRecordNameStrategyfor schema sharing across topics. -
Schema Evolution Pattern: Always add new fields with default values (backward compatible). Never remove fields (breaks compatibility). Never change field types (breaks compatibility). Deprecate fields by adding new ones with different names.
-
Wire Format Efficiency: Use Avro for compact binary encoding (smallest payload). Use JSON Schema for human-readable debugging. Use Protobuf for cross-language compatibility. Consider schema size impact on network bandwidth.
-
Registry High Availability: Run Schema Registry in a cluster (3+ nodes) for production. Use a load balancer in front of the cluster. Monitor leader election and failover. Test failover scenarios regularly.
-
Client Configuration: Enable schema caching (
schema.registry.cache.config). Useauto.register.schemas=falsein production to prevent accidental schema registration. Useuse.latest.version=truefor automatic schema evolution. -
Monitoring: Track
schema-registry-request-rate,schema-registry-error-rate,schema-registry-cache-hit-ratio. Monitor schema compatibility failures. Alert on schema registration errors. -
Testing: Test schema evolution with realistic data. Verify backward and forward compatibility. Use schema registry test utilities. Test with multiple consumer versions running simultaneously.
-
Security: Enable authentication for schema registry access. Use SSL for encrypted communication. Implement ACLs for schema modification. Audit schema registration and compatibility checks.
-
Migration: Use schema linking for multi-cluster migrations. Plan schema evolution in advance. Document schema changes. Coordinate with all consumer teams before schema changes. Use compatibility mode
BACKWARD_TRANSITIVEfor safe rollout.
Key Takeaways:
- Wire format = magic byte (0x0) + 4-byte schema ID + binary payload; enables schema lookup by ID
- BACKWARD compatibility: new schema readable by old consumers; FORWARD: new consumer reads old data; FULL: both
- Additive evolution (new fields with defaults) is always safe; removing fields requires defaults; type changes need wire compatibility
- Client-side caching achieves >99.9% cache hit rate; schema retrieval latency is ~1ms with cache, ~50ms without
- auto.register.schemas=false prevents accidental registration in production
- Schema Registry cluster (3+ nodes) provides HA; only leader handles writes, all nodes serve reads
See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Kafka Streams & Connect (kafka/03) | Exactly-Once Semantics (kafka/04)
See Also
- Kafka Architecture β Schema in Kafka architecture
- Producer Consumer Patterns β Schema-aware producers/consumers
- Kafka Streams and Connect β Schema in stream processing
- Data Formats β Data serialization formats