πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Schema Registry & Evolution

Apache KafkaSchema Registry⭐ Premium

Advertisement

Schema Registry & Evolution

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Schema Registry provides a centralized repository for managing schemas and enabling data compatibility across producers and consumers.

Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Schema Registry Cluster                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚ Master      β”‚  β”‚ Slave       β”‚              β”‚
β”‚  β”‚ (Leader)    β”‚  β”‚ (Follower)  β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka Cluster                                  β”‚
β”‚  __schemas topic (internal)                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Flow:
1. Producer registers schema with Schema Registry
2. Schema Registry assigns version ID
3. Producer sends data with schema ID (5 bytes)
4. Consumer fetches schema from Registry using ID
5. Consumer deserializes data

Wire Format

Architecture Diagram
Avro Wire Format:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Magic Byte (1 byte) β”‚ Schema ID (4 bytes) β”‚ Avro Data β”‚
β”‚      0x0            β”‚    0x00000001       β”‚  ...      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Total overhead: 5 bytes per message

Avro Schema Definition

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.events",
  "fields": [
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "user_id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED"]
      }
    },
    {
      "name": "metadata",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

Java Producer with Schema Registry

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Create record using schema
Schema schema = new Schema.Parser().parse(
    "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
);

GenericRecord order = new GenericData.Record(schema);
order.put("order_id", "12345");
order.put("amount", 99.99);

ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(
    "orders",
    "order-12345",
    order
);

producer.send(record);

Python Producer with Schema Registry

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# Define schema
order_schema = avro.loads('''
{
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "timestamp", "type": "long"}
    ]
}
''')

producer = AvroProducer({
    'bootstrap.servers': 'kafka1:9092',
    'schema.registry.url': 'http://schema-registry:8081'
})

# Create record
order = {
    'order_id': '12345',
    'user_id': 'user-42',
    'amount': 99.99,
    'timestamp': 1625097600000
}

producer.produce(
    topic='orders',
    key='order-12345',
    value=order,
    value_schema=order_schema
)

producer.flush()

Schema Evolution Strategies

Backward Compatible (Default)

// Original schema (v1)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

// New schema (v2) - adds optional field
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"}
  ]
}

Forward Compatible

// Original schema (v1)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

// New schema (v2) - adds field with default
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string", "default": "UNKNOWN"}
  ]
}

Full Compatible

// Combines backward and forward compatibility
// Both readers and writers can use old or new schema
// Requires: all new fields must have defaults
//           no fields removed (only optional)

ℹ️

Key Insight: Backward compatibility ensures consumers with old schema can read data written with new schema. Forward compatibility ensures producers with new schema can write data readable by old consumers.

Compatibility Configuration

# Set compatibility level for subject
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

# Compatibility levels:
# NONE - No compatibility checking
# BACKWARD - New schema can read old data
# FORWARD - Old schema can read new data
# FULL - Both backward and forward compatible
# BACKWARD_TRANSITIVE - Backward compatible with all versions
# FORWARD_TRANSITIVE - Forward compatible with all versions
# FULL_TRANSITIVE - Full compatible with all versions

Protobuf Schema Example

syntax = "proto3";

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  int64 timestamp = 4;
  OrderStatus status = 5;
  optional string currency = 6;  // New field, optional
}

enum OrderStatus {
  UNKNOWN = 0;
  PENDING = 1;
  CONFIRMED = 2;
  SHIPPED = 3;
  DELIVERED = 4;
}

JSON Schema Example

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Order",
  "type": "object",
  "properties": {
    "order_id": {
      "type": "string"
    },
    "amount": {
      "type": "number"
    },
    "currency": {
      "type": "string",
      "default": "USD"
    }
  },
  "required": ["order_id", "amount"]
}

Schema Registry Administration

# List subjects
curl http://schema-registry:8081/subjects

# Get schema versions
curl http://schema-registry:8081/subjects/orders-value/versions

# Get specific version
curl http://schema-registry:8081/subjects/orders-value/versions/1

# Delete subject
curl -X DELETE http://schema-registry:8081/subjects/orders-value

# Check compatibility
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\",\"default\":\"USD\"}]}"}'

Follow-Up Questions

  1. What is the difference between backward and forward compatibility?
  2. How does Schema Registry handle schema evolution for Avro vs Protobuf?
  3. Explain the wire format for messages using Schema Registry.
  4. What happens when a producer uses a schema that is not compatible?
  5. How would you migrate from one schema version to another in production?

Advertisement