πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Kafka Schema Registry: Avro Schemas, Compatibility, and Evolution

🟒 Free Lesson

Advertisement

Apache Kafka Schema Registry: Avro Schemas, Compatibility, and Evolution

Schema Registry ComponentsProducerAvro/JSONConsumerAvro/JSONSR Node 1Leader :8081SR Node 2Follower :8081_schemasKafka TopicZK/KRaftMetadataWire FormatMagic Byte + Schema ID + PayloadCompatibilityBACKWARD, FORWARD, FULLSchema EvolutionVersioning & migrationSchema Registry: Central schema management with compatibility enforcement
Wire Format: Magic Byte + Schema ID + Payload0x0Schema IDAvro PayloadJSON PayloadProtobufWire format: 0x0 (1 byte) + 4-byte Schema ID + serialized payloadConsumer fetches schema by ID from registry for deserialization

Architecture Diagram: Schema Registry Components

Architecture Diagram: Schema Evolution Compatibility Matrix

Architecture Diagram: Schema Serialization Flow

Architecture Diagram: Schema Registry Client Cache

Formal Definitions

DfSchema Compatibility

Schema compatibility is a set of rules that govern how schemas can evolve over time without breaking existing producers or consumers. The Schema Registry enforces compatibility checks before registering a new schema version. Compatibility modes include BACKWARD (new schema readable by old consumers), FORWARD (new consumer reads old data), FULL (both), and NONE (no checks).

DfWire Format

The wire format is the binary encoding of a message in Kafka that includes a 5-byte header: 1 magic byte (0x0) + 4-byte schema ID (big-endian integer) + Avro/JSON/Protobuf binary payload. This format enables consumers to look up the schema from the registry using the embedded ID for deserialization, decoupling producers and consumers through schema evolution.

DfSubject Name Strategy

A subject name strategy determines how schemas are organized and versioned in the registry. TopicNameStrategy (default) uses the topic name as the subject, meaning all records in a topic share one schema lineage. TopicRecordNameStrategy combines topic + record name, allowing multiple record types per topic. RecordNameStrategy uses only the record name, enabling schema sharing across topics.

DfSchema Reference

A schema reference is a dependency between schemas, where one schema references another by subject name and version. References enable schema composition (e.g., an Order schema referencing a User schema). The Schema Registry resolves references recursively and includes all dependent schemas in the wire format when needed.

DfSchema Linking

Schema linking is the process of synchronizing schemas across Schema Registry clusters, enabling multi-cluster deployments, migrations, and disaster recovery. Linked schemas maintain compatibility across clusters and can be used for data replication between environments (dev, staging, production).

Key Formulas

Wire Format Encoding
Wire=[0x0]β€…β€Šβˆ₯β€…β€Š[SchemaID]4Bβ€…β€Šβˆ₯β€…β€Š[AvroPayload]Nbytes\text{Wire} = [\texttt{0x0}] \;\|\; [\text{SchemaID}]_{4B} \;\|\; [\text{AvroPayload}]_{N bytes}

Here,

  • 0x0\texttt{0x0}=Magic byte (1 byte) β€” identifies Schema Registry encoding
  • SchemaIDSchemaID=4-byte big-endian integer β€” unique schema identifier from registry
  • AvroPayloadAvroPayload=Binary-encoded record payload using the identified schema

Schema Storage Cost

Stotal=NsubjectsΓ—Vavg_versionsΓ—Savg_schemaS_{\text{total}} = N_{\text{subjects}} \times V_{\text{avg\_versions}} \times S_{\text{avg\_schema}}

Here,

  • NsubjectsN_{\text{subjects}}=Number of registered subjects
  • Vavg_versionsV_{\text{avg\_versions}}=Average schema versions per subject
  • Savg_schemaS_{\text{avg\_schema}}=Average schema size in bytes

Cache Hit Rate

Hit_Rate=Ncache_hitsNcache_hits+Ncache_missesΓ—100%\text{Hit\_Rate} = \frac{N_{\text{cache\_hits}}}{N_{\text{cache\_hits}} + N_{\text{cache\_misses}}} \times 100\%

Here,

  • Ncache_hitsN_{\text{cache\_hits}}=Number of schema lookups served from cache
  • Ncache_missesN_{\text{cache\_misses}}=Number of schema lookups requiring HTTP request

Serialization Overhead

Oser=Swireβˆ’SpayloadO_{\text{ser}} = S_{\text{wire}} - S_{\text{payload}}

Here,

  • SwireS_{\text{wire}}=Total wire format size (bytes)
  • SpayloadS_{\text{payload}}=Raw payload size (bytes)
  • OserO_{\text{ser}}=Overhead from magic byte + schema ID (typically 5 bytes)

Schema Registry stores all schemas in the _schemas topic (replicated for HA). Only the leader handles writes (registration); all nodes serve reads (retrieval). The client caches schemas locally (ID->schema and schema->ID mappings) to minimize round trips. Cache hit rate is typically >99.9% in steady state.

For production: always use auto.register.schemas=false to prevent accidental schema registration. Use use.latest.version=true for automatic schema evolution. Test compatibility with the registry's compatibility endpoint before deploying schema changes.

Avro encodes data as binary without field names or separators, making it the most compact serialization format. JSON Schema is human-readable but larger. Protobuf uses varint encoding for integers, providing a balance between size and readability.

ThSchema Evolution Invariants

The following invariants must hold for safe schema evolution:

  1. Additive fields (new fields with defaults) are backward compatible β€” old consumers ignore unknown fields.
  2. Removing fields requires the field to have a default value β€” otherwise old consumers will fail when reading new data (forward incompatibility).
  3. Type changes are only compatible if the new type is wire-compatible (e.g., int->long is safe; string->int is not).
  4. BACKWARD compatibility requires: for every field in the new schema that did not exist in the old schema, the new field must have a default value.
  5. FORWARD compatibility requires: for every field in the old schema that is removed in the new schema, the old field must have had a default value.

ThWire Format Efficiency

The wire format overhead is constant at 5 bytes (1 magic byte + 4-byte schema ID) regardless of schema complexity. For a typical Avro payload of 100 bytes, the overhead is 5%. For payloads >1KB, the overhead is <0.5%. The schema ID enables O(1) schema lookup via client cache, making the overhead negligible for most workloads.

Detailed Explanation

Schema Registration and Storage

Confluent Schema Registry is a centralized schema management service for Kafka that enables schema evolution while maintaining data quality and compatibility. When a producer registers a schema, the registry assigns it a unique integer ID and stores the schema definition (Avro, JSON Schema, or Protobuf) associated with the subject name. The subject name can follow different strategies: TopicNameStrategy (default), TopicRecordNameStrategy, or RecordNameStrategy.

The registry checks compatibility before registration, ensuring that the new schema is compatible with the existing schema version according to the configured compatibility mode. The schema is then stored in the _schemas topic and cached in the registry's local cache.

Compatibility Modes

The registry supports several compatibility modes to control how schemas can evolve:

  • BACKWARD compatibility ensures that new schemas can be read by old consumers (new schema is a superset of old).
  • FORWARD compatibility ensures that new consumers can read old data (old schema is a subset of new).
  • FULL compatibility ensures both backward and forward compatibility.
  • NONE disables compatibility checking (not recommended for production).
  • BACKWARD_TRANSITIVE ensures backward compatibility across all historical versions.
  • FORWARD_TRANSITIVE ensures forward compatibility across all versions.

Wire Format and Schema IDs

Messages in Kafka include a 5-byte header: the first byte is a magic byte (0x0), followed by a 4-byte schema ID (big-endian integer). This design allows consumers to deserialize messages by looking up the schema from the registry using the embedded ID. The registry client caches schemas locally, reducing latency and network overhead.

Schema Evolution Strategies

There are several patterns for evolving schemas in production:

  • Additive Evolution adds new fields with default values (backward compatible).
  • Field Deprecation marks fields as deprecated without removing them (forward compatible).
  • Type Evolution changes field types (requires careful planning).
  • Schema Branching uses TopicRecordNameStrategy to allow multiple schemas per topic.

Subject Name Strategies

The subject name determines how schemas are organized in the registry:

  • TopicNameStrategy uses the topic name as the subject (default), meaning all records in a topic share a schema.
  • TopicRecordNameStrategy combines topic name with record name, allowing multiple record types per topic.
  • RecordNameStrategy uses only the record name, enabling schema sharing across topics.

Schema Registry High Availability

The registry runs as a cluster with leader election (using ZooKeeper or KRaft). Only the leader handles writes (schema registration), while all nodes can serve reads (schema retrieval). Follower nodes replicate the _schemas topic to stay synchronized. If the leader fails, a new leader is elected automatically. The registry supports horizontal scaling for read-heavy workloads, but write scalability is limited by the single-leader design.

Serialization Format Comparison

FormatBinary SizeSchema EvolutionHuman ReadablePerformance
AvroSmallestExcellent (schema resolution)NoFast encode/decode
JSON SchemaLargestGood (default values)YesModerate
ProtobufMediumGood (field numbers)PartialFast encode/decode

Key Concepts Table

ComponentDescriptionKey ConfigurationsUse Cases
SubjectSchema namespace (topic or record name)subject.name.strategySchema organization
Schema IDUnique integer identifier for schemaAuto-assignedWire format, caching
Compatibility ModeRules for schema evolutioncompatibility.levelData quality assurance
Wire Format5-byte header (magic + schema ID)AutomaticMessage serialization
Schema CacheLocal cache of schemasschema.registry.cache.configPerformance optimization
Subject Name StrategyHow subjects are namedtopic.name.strategySchema organization
Compatibility CheckerValidates schema evolutioncompatibility.levelPrevent breaking changes
Schema ReferenceDependencies between schemasreferencesComplex schema hierarchies
ModePERMISSIVE or READONLYmodeSchema management control
Schema LinkingSync schemas across clustersschema.linksMulti-cluster, migration

Code Examples

Schema Registration with Avro

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import java.util.Properties;

public class SchemaRegistrationExample {
    
    public static void main(String[] args) throws Exception {
        // Define Avro schema using builder API
        Schema schema = SchemaBuilder.builder("com.example.orders")
            .record("Order")
            .namespace("com.example.orders")
            .fields()
                .name("orderId")
                .type()
                .stringType()
                .noDefault()
                // orderId: required string, no default
                .name("userId")
                .type()
                .stringType()
                .noDefault()
                // userId: required string, no default
                .name("amount")
                .type()
                .doubleType()
                .noDefault()
                // amount: required double, no default
                .name("currency")
                .type()
                .stringType()
                .stringDefault("USD")
                // currency: optional with default "USD"
                .name("status")
                .type()
                .stringType()
                .stringDefault("PENDING")
                // status: optional with default "PENDING"
                .name("createdAt")
                .type()
                .longType()
                .noDefault()
                // createdAt: required long (epoch millis)
                .name("metadata")
                .type()
                .nullable()
                .map()
                .values()
                .stringType()
                .nullDefault()
                // metadata: nullable map, default null
            .endRecord();
        
        // Create Schema Registry client
        Properties props = new Properties();
        props.put("schema.registry.url", "http://localhost:8081");
        // Comma-separated for HA: "http://sr-1:8081,http://sr-2:8081"
        
        props.put("basic.auth.credentials.source", "USER_INFO");
        // Authentication source: USER_INFO, SASL_INHERIT, URL
        
        props.put("basic.auth.user.info", "admin:secret");
        // Username:password for basic auth
        
        SchemaRegistryClient client = new RestService(props);
        
        // Register schema
        String subject = "order-events-value";
        // Subject naming: <topic>-key or <topic>-value (TopicNameStrategy)
        
        AvroSchema avroSchema = new AvroSchema(schema);
        
        int schemaId = client.register(subject, avroSchema);
        // Returns unique integer ID (used in wire format)
        System.out.println("Registered schema with ID: " + schemaId);
        
        // Get schema metadata
        SchemaMetadata metadata = client.getSchemaMetadata(subject, 1);
        // version=1 returns first version
        System.out.println("Schema version: " + metadata.getVersion());
        System.out.println("Schema ID: " + metadata.getSchemaId());
        
        // Check compatibility before registering
        boolean isCompatible = client.testCompatibility(subject, avroSchema);
        System.out.println("Schema is compatible: " + isCompatible);
        // Always test compatibility before deploying schema changes
    }
}

Avro Producer with Schema Registry

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class AvroProducerExample {
    
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // Key serializer: String (not Avro)
        
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        // Value serializer: KafkaAvroSerializer
        // Automatically registers schema and embeds schema ID in wire format
        
        // Schema Registry configuration
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "admin:secret");
        
        // Avro-specific configurations
        props.put("avro.use.logical.type.converters", true);
        // Convert Avro logical types (date, timestamp) to Java types
        
        props.put("specific.avro.reader", false);
        // false = use GenericRecord (flexible)
        // true = use SpecificRecord (type-safe, requires code generation)
        
        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
        
        // Define schema inline
        Schema schema = new Schema.Parser().parse(
            "{\"type\":\"record\",\"name\":\"Order\"," +
            "\"namespace\":\"com.example\"," +
            "\"fields\":[" +
            "{\"name\":\"orderId\",\"type\":\"string\"}," +
            "{\"name\":\"userId\",\"type\":\"string\"}," +
            "{\"name\":\"amount\",\"type\":\"double\"}," +
            "{\"name\":\"status\",\"type\":\"string\",\"default\":\"PENDING\"}" +
            "]}");
        
        for (int i = 0; i < 100; i++) {
            // Create Avro record
            GenericRecord order = new GenericData.Record(schema);
            order.put("orderId", "order-" + i);
            order.put("userId", "user-" + (i % 10));
            order.put("amount", Math.random() * 1000);
            order.put("status", "PENDING");
            
            ProducerRecord<String, GenericRecord> record = 
                new ProducerRecord<>("order-events", "order-" + i, order);
            
            // Add headers (independent of Avro schema)
            record.headers().add("schema.version", "1".getBytes());
            record.headers().add("content-type", "avro".getBytes());
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error: " + exception.getMessage());
                } else {
                    System.out.printf("Sent to partition %d, offset %d%n",
                        metadata.partition(), metadata.offset());
                }
            });
        }
        
        producer.flush();
        producer.close();
    }
}

Avro Consumer with Schema Evolution

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class AvroConsumerExample {
    
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        
        // Schema Registry configuration
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "admin:secret");
        
        // Avro deserializer configuration
        props.put("specific.avro.reader", false);
        // Use GenericRecord for flexibility across schema versions
        
        props.put("avro.use.logical.type.converters", true);
        // Convert Avro logical types to Java types
        
        props.put("auto.register.schemas", false);
        // CRITICAL: Never auto-register from consumer side
        // Prevents accidental schema registration
        
        props.put("use.latest.version", true);
        // Use latest compatible schema for deserialization
        // Enables automatic schema evolution
        
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("order-events"));
        
        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord order = record.value();
                    
                    // Access fields safely (handle schema evolution)
                    String orderId = order.get("orderId").toString();
                    String userId = order.get("userId").toString();
                    double amount = (double) order.get("amount");
                    
                    // Handle new fields with defaults (backward compatible)
                    String status = "PENDING";
                    if (order.get("status") != null) {
                        status = order.get("status").toString();
                    }
                    
                    // Handle optional fields
                    String metadata = "";
                    if (order.get("metadata") != null) {
                        @SuppressWarnings("unchecked")
                        Map<String, String> metaMap = 
                            (Map<String, String>) order.get("metadata");
                        metadata = metaMap.toString();
                    }
                    
                    // Handle new field added in v2 (backward compatible)
                    String priority = "NORMAL";
                    try {
                        if (order.get("priority") != null) {
                            priority = order.get("priority").toString();
                        }
                    } catch (Exception e) {
                        // Field doesn't exist in older schema version
                        // Use default value
                    }
                    
                    System.out.printf("Order: %s, User: %s, Amount: %.2f, " +
                        "Status: %s, Priority: %s%n",
                        orderId, userId, amount, status, priority);
                }
                
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}

Schema Evolution Script

#!/bin/bash
# Schema Registry management script

SCHEMA_REGISTRY_URL="http://localhost:8081"
AUTH="admin:secret"

echo "=== SCHEMA REGISTRY MANAGEMENT ==="
echo "Timestamp: $(date)"
echo ""

# List all subjects
echo "--- All Subjects ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects" | jq .

echo ""
echo "--- Get Subject Schema ---"
SUBJECT="order-events-value"
SCHEMA=$(curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/latest")
echo "Subject: $SUBJECT"
echo "Schema: $SCHEMA" | jq .

echo ""
echo "--- Compatibility Check ---"
NEW_SCHEMA='{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "userId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string", "default": "PENDING"},
    {"name": "priority", "type": "string", "default": "NORMAL"}
  ]
}'
# Adding "priority" field with default = backward compatible

COMPATIBILITY=$(curl -s -u $AUTH \
  -X POST \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
  "$SCHEMA_REGISTRY_URL/compatibility/subjects/$SUBJECT/versions/latest")

echo "Compatibility result: $COMPATIBILITY" | jq .

echo ""
echo "--- Register New Schema ---"
if [ $(echo $COMPATIBILITY | jq -r '.is_compatible') == "true" ]; then
  REGISTRATION=$(curl -s -u $AUTH \
    -X POST \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
    "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions")
  echo "Registration result: $REGISTRATION" | jq .
else
  echo "Schema is NOT compatible - cannot register"
fi

echo ""
echo "--- Schema Compatibility Modes ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/config" | jq .

echo ""
echo "--- Set Compatibility Mode ---"
curl -u $AUTH \
  -X PUT \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}' \
  "$SCHEMA_REGISTRY_URL/config/$SUBJECT"

echo ""
echo "--- List Schema Versions ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions" | jq .

echo ""
echo "--- Get Specific Version ---"
VERSION=1
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/$VERSION" | jq .

echo ""
echo "--- Delete Schema Version (soft delete) ---"
curl -s -u $AUTH \
  -X DELETE \
  "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/1"

echo ""
echo "--- Get Schema by ID ---"
SCHEMA_ID=1
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID" | jq .

echo ""
echo "--- Get Schema Subjects by ID ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID/subjects" | jq .

Kafka Streams with Schema Registry

import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;

public class StreamsSchemaRegistryExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "schema-registry-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // Schema Registry configuration
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "admin:secret");
        
        // Use GenericAvroSerde for Avro values
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Configure Avro serde
        Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put("schema.registry.url", "http://localhost:8081");
        serdeConfig.put("basic.auth.credentials.source", "USER_INFO");
        serdeConfig.put("basic.auth.user.info", "admin:secret");
        
        GenericAvroSerde avroSerde = new GenericAvroSerde();
        avroSerde.configure(serdeConfig, false);
        // false = value serde (not key serde)
        
        // Source stream with Avro deserialization
        KStream<String, GenericRecord> orders = builder.stream("order-events",
            Consumed.with(Serdes.String(), avroSerde));
        
        // Transform and filter
        KStream<String, GenericRecord> highValueOrders = orders
            .filter((key, order) -> {
                double amount = (double) order.get("amount");
                return amount > 1000;
            })
            .mapValues(order -> {
                // Create new Avro record with evolved schema
                Schema newSchema = SchemaBuilder.builder("com.example")
                    .record("HighValueOrder")
                    .fields()
                        .name("orderId")
                        .type(order.getSchema().getField("orderId").schema())
                        .noDefault()
                        .name("userId")
                        .type(order.getSchema().getField("userId").schema())
                        .noDefault()
                        .name("amount")
                        .type(order.getSchema().getField("amount").schema())
                        .noDefault()
                        .name("verified")
                        .type()
                        .booleanType()
                        .booleanDefault(false)
                    .endRecord();
                
                GenericRecord highValueOrder = new GenericData.Record(newSchema);
                highValueOrder.put("orderId", order.get("orderId"));
                highValueOrder.put("userId", order.get("userId"));
                highValueOrder.put("amount", order.get("amount"));
                highValueOrder.put("verified", false);
                
                return highValueOrder;
            });
        
        // Sink with Avro serialization
        highValueOrders.to("high-value-orders", 
            Produced.with(Serdes.String(), avroSerde));
        
        // Aggregation with windowing
        KTable<Windowed<String>, Long> orderCounts = orders
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "order-count-store")
                .withValueSerde(Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Python: Avro Producer with Schema Registry

from confluent_kafka import avro, avro64
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

# Schema Registry client
schema_registry = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'basic.auth.credentials.source': 'USER_INFO',
    'basic.auth.user.info': 'admin:secret'
})

# Define Avro schema
order_schema = avro.load('order.avsc')
# Or define inline:
order_schema = avro.loads('''
{
    "type": "record",
    "name": "Order",
    "namespace": "com.example",
    "fields": [
        {"name": "orderId", "type": "string"},
        {"name": "userId", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "status", "type": "string", "default": "PENDING"}
    ]
}
''')

# Avro Producer
avro Producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081',
    'basic.auth.credentials.source': 'USER_INFO',
    'basic.auth.user.info': 'admin:secret',
    'auto.register.schemas': False,
    'use.latest.version': True,
})

# Send Avro records
for i in range(100):
    order = {
        'orderId': f'order-{i}',
        'userId': f'user-{i % 10}',
        'amount': 1000.0,
        'status': 'PENDING'
    }
    
    avro_producer.produce(
        topic='order-events',
        key=f'order-{i}',
        value=order,
        value_schema=order_schema
    )
    
avro_producer.flush()

# Avro Consumer
avro_consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'avro-consumer-group',
    'schema.registry.url': 'http://localhost:8081',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
})

avro_consumer.subscribe(['order-events'])

try:
    while True:
        msg = avro_consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        
        # Avro record automatically deserialized
        order = msg.value()
        print(f"Order: {order['orderId']}, Amount: {order['amount']}")
        
        avro_consumer.commit()
finally:
    avro_consumer.close()

Performance Metrics

OperationLatency (p99)ThroughputCache Hit RateMemory Usage
Schema Registration100ms100/secN/A100MB
Schema Retrieval (cache hit)1ms1M/sec99.9%100MB
Schema Retrieval (cache miss)50ms10K/sec0.1%100MB
Compatibility Check10ms10K/secN/A100MB
Avro Serialization0.1ms10M/secN/A64MB
Avro Deserialization0.1ms10M/secN/A64MB
Registry Cluster (3 nodes)5ms300K/sec99.9%300MB
Registry Cluster (5 nodes)5ms500K/sec99.9%500MB
Serialization FormatPayload Size (100 fields)Encode TimeDecode TimeSchema Evolution
Avro50 bytes0.05ms0.05msExcellent
JSON Schema500 bytes0.5ms0.5msGood
Protobuf80 bytes0.08ms0.08msGood

Best Practices

  1. Compatibility Mode Selection: Use BACKWARD compatibility for most use cases (allows old consumers to read new data). Use FULL compatibility when you need maximum flexibility. Avoid NONE in production.

  2. Subject Name Strategy: Use TopicRecordNameStrategy for topics with multiple record types. Use TopicNameStrategy for simple topics. Use RecordNameStrategy for schema sharing across topics.

  3. Schema Evolution Pattern: Always add new fields with default values (backward compatible). Never remove fields (breaks compatibility). Never change field types (breaks compatibility). Deprecate fields by adding new ones with different names.

  4. Wire Format Efficiency: Use Avro for compact binary encoding (smallest payload). Use JSON Schema for human-readable debugging. Use Protobuf for cross-language compatibility. Consider schema size impact on network bandwidth.

  5. Registry High Availability: Run Schema Registry in a cluster (3+ nodes) for production. Use a load balancer in front of the cluster. Monitor leader election and failover. Test failover scenarios regularly.

  6. Client Configuration: Enable schema caching (schema.registry.cache.config). Use auto.register.schemas=false in production to prevent accidental schema registration. Use use.latest.version=true for automatic schema evolution.

  7. Monitoring: Track schema-registry-request-rate, schema-registry-error-rate, schema-registry-cache-hit-ratio. Monitor schema compatibility failures. Alert on schema registration errors.

  8. Testing: Test schema evolution with realistic data. Verify backward and forward compatibility. Use schema registry test utilities. Test with multiple consumer versions running simultaneously.

  9. Security: Enable authentication for schema registry access. Use SSL for encrypted communication. Implement ACLs for schema modification. Audit schema registration and compatibility checks.

  10. Migration: Use schema linking for multi-cluster migrations. Plan schema evolution in advance. Document schema changes. Coordinate with all consumer teams before schema changes. Use compatibility mode BACKWARD_TRANSITIVE for safe rollout.

Key Takeaways:

  • Wire format = magic byte (0x0) + 4-byte schema ID + binary payload; enables schema lookup by ID
  • BACKWARD compatibility: new schema readable by old consumers; FORWARD: new consumer reads old data; FULL: both
  • Additive evolution (new fields with defaults) is always safe; removing fields requires defaults; type changes need wire compatibility
  • Client-side caching achieves >99.9% cache hit rate; schema retrieval latency is ~1ms with cache, ~50ms without
  • auto.register.schemas=false prevents accidental registration in production
  • Schema Registry cluster (3+ nodes) provides HA; only leader handles writes, all nodes serve reads

See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Kafka Streams & Connect (kafka/03) | Exactly-Once Semantics (kafka/04)

See Also

⭐

Premium Content

Apache Kafka Schema Registry: Avro Schemas, Compatibility, and Evolution

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement