CDC with Kafka Connect
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Change Data Capture (CDC) captures row-level changes from databases and streams them to Kafka. Debezium is the most popular CDC connector for Kafka Connect.
CDC Architecture
Architecture Diagram
Database β Debezium Connector β Kafka β Consumers
β
βββ Transaction Log (PostgreSQL WAL, MySQL Binlog)
βββ Debezium Captures: INSERT, UPDATE, DELETE
βββ Schema Registry Stores: Before/After States
Debezium Flow
Architecture Diagram
1. Debezium connects to database
2. Reads transaction log (WAL/binlog)
3. Captures row changes with before/after images
4. Converts to Kafka records with:
- Key: Primary key(s)
- Value: Before image (optional), After image
- Headers: Operation type (c, u, d)
5. Publishes to Kafka topic
PostgreSQL CDC Setup
Debezium Connector Configuration
{
"name": "postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.example.com",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${secrets:postgres-password}",
"database.dbname": "orders_db",
"topic.prefix": "cdc",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers,public.products",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": true,
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"heartbeat.interval.ms": "10000",
"tombstones.on.delete": true
}
}
PostgreSQL Setup
-- Create Debezium user
CREATE USER debezium WITH REPLICATION PASSWORD 'password';
-- Grant privileges
GRANT CONNECT ON DATABASE orders_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
-- Create replication slot
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
-- Create publication
CREATE PUBLICATION debezium_publication FOR TABLE orders, customers, products;
MySQL CDC Setup
{
"name": "mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql.example.com",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${secrets:mysql-password}",
"database.server.id": "184054",
"database.server.name": "mysql",
"database.include.list": "orders_db",
"table.include.list": "orders_db.orders,orders_db.customers",
"database.ssl.mode": "required",
"database.ssl.truststore": "/etc/kafka/secrets/truststore.jks",
"database.ssl.truststore.password": "${secrets:truststore-password}",
"topic.prefix": "cdc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,ts_ms",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"binlogε
ηε.name": "mysql-bin",
"gtid.source.includes": ".*"
}
}
Java Consumer for CDC Events
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
public class CDCConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "cdc-processor");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("specific.avro.reader", true);
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("cdc.orders"));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord value = record.value();
// Get operation type from headers
String operation = getHeader(record, "op");
switch (operation) {
case "c": // Create
handleInsert(value);
break;
case "u": // Update
handleUpdate(value);
break;
case "d": // Delete
handleDelete(value);
break;
}
}
consumer.commitSync();
}
}
private static void handleInsert(GenericRecord record) {
System.out.println("INSERT: " + record.get("order_id"));
// Process new record
}
private static void handleUpdate(GenericRecord record) {
System.out.println("UPDATE: " + record.get("order_id"));
// Process updated record
}
private static void handleDelete(GenericRecord record) {
System.out.println("DELETE: " + record.get("order_id"));
// Process deletion
}
}
Python CDC Consumer
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.avro import AvroConsumer
import json
class CDCProcessor:
def __init__(self, bootstrap_servers, schema_registry_url):
self.consumer = AvroConsumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'cdc-processor',
'enable.auto.commit': False,
'schema.registry.url': schema_registry_url
})
self.consumer.subscribe(['cdc.orders'])
def process(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Error: {msg.error()}")
break
# Get operation from headers
operation = self.get_header(msg, 'op')
if operation == 'c':
self.handle_insert(msg.value())
elif operation == 'u':
self.handle_update(msg.value())
elif operation == 'd':
self.handle_delete(msg.value())
self.consumer.commit()
def handle_insert(self, record):
print(f"INSERT: {record['order_id']}")
# Sync to Elasticsearch, cache, etc.
def handle_update(self, record):
print(f"UPDATE: {record['order_id']}")
# Update downstream systems
def handle_delete(self, record):
print(f"DELETE: {record['order_id']}")
# Soft delete or remove from downstream
def get_header(self, msg, header_name):
if msg.headers():
for header in msg.headers():
if header[0] == header_name:
return header[1].decode('utf-8')
return None
# Run processor
processor = CDCProcessor('kafka1:9092', 'http://schema-registry:8081')
processor.process()
Schema Evolution with CDC
// Original schema (v1)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
// Evolved schema (v2) - adds field
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
βΉοΈ
Key Insight: Debezium automatically handles schema evolution. New fields are added with defaults, and consumers using old schemas can still read the data.
Monitoring CDC
from prometheus_client import Gauge, Counter, start_http_server
cdc_lag = Gauge(
'kafka_connect_cdc_lag',
'CDC replication lag in milliseconds',
['connector', 'database', 'table']
)
cdc_operations = Counter(
'kafka_connect_cdc_operations_total',
'Total CDC operations',
['connector', 'operation', 'table']
)
cdc_errors = Counter(
'kafka_connect_cdc_errors_total',
'Total CDC errors',
['connector', 'error_type']
)
def monitor_cdc_connector(connector_name):
"""Monitor CDC connector metrics"""
import requests
while True:
response = requests.get(
f'http://connect:8083/connectors/{connector_name}/status'
)
status = response.json()
# Extract metrics from connector status
for task in status.get('tasks', []):
if task['state'] == 'RUNNING':
# Parse connector-specific metrics
pass
time.sleep(10)
Follow-Up Questions
- What is the difference between logical replication and CDC?
- How does Debezium handle schema evolution in databases?
- Explain the purpose of the
ExtractNewRecordStateSMT. - How would you implement exactly-once CDC from PostgreSQL to Kafka?
- What are the performance implications of CDC on the source database?