πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

CDC with Kafka Connect

Apache KafkaCDC⭐ Premium

Advertisement

CDC with Kafka Connect

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Change Data Capture (CDC) captures row-level changes from databases and streams them to Kafka. Debezium is the most popular CDC connector for Kafka Connect.

CDC Architecture

Architecture Diagram
Database β†’ Debezium Connector β†’ Kafka β†’ Consumers
    β”‚
    β”œβ”€β”€ Transaction Log (PostgreSQL WAL, MySQL Binlog)
    β”œβ”€β”€ Debezium Captures: INSERT, UPDATE, DELETE
    └── Schema Registry Stores: Before/After States

Debezium Flow

Architecture Diagram
1. Debezium connects to database
2. Reads transaction log (WAL/binlog)
3. Captures row changes with before/after images
4. Converts to Kafka records with:
   - Key: Primary key(s)
   - Value: Before image (optional), After image
   - Headers: Operation type (c, u, d)
5. Publishes to Kafka topic

PostgreSQL CDC Setup

Debezium Connector Configuration

{
  "name": "postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.example.com",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${secrets:postgres-password}",
    "database.dbname": "orders_db",
    
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_publication",
    
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers,public.products",
    
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": true,
    
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    
    "heartbeat.interval.ms": "10000",
    "tombstones.on.delete": true
  }
}

PostgreSQL Setup

-- Create Debezium user
CREATE USER debezium WITH REPLICATION PASSWORD 'password';

-- Grant privileges
GRANT CONNECT ON DATABASE orders_db TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

-- Create replication slot
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');

-- Create publication
CREATE PUBLICATION debezium_publication FOR TABLE orders, customers, products;

MySQL CDC Setup

{
  "name": "mysql-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${secrets:mysql-password}",
    "database.server.id": "184054",
    "database.server.name": "mysql",
    "database.include.list": "orders_db",
    "table.include.list": "orders_db.orders,orders_db.customers",
    
    "database.ssl.mode": "required",
    "database.ssl.truststore": "/etc/kafka/secrets/truststore.jks",
    "database.ssl.truststore.password": "${secrets:truststore-password}",
    
    "topic.prefix": "cdc",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,ts_ms",
    
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    "binlogε…šηš„ε.name": "mysql-bin",
    "gtid.source.includes": ".*"
  }
}

Java Consumer for CDC Events

import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;

public class CDCConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092");
        props.put("group.id", "cdc-processor");
        props.put("enable.auto.commit", false);
        props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://schema-registry:8081");
        props.put("specific.avro.reader", true);
        
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("cdc.orders"));
        
        while (true) {
            ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, GenericRecord> record : records) {
                GenericRecord value = record.value();
                
                // Get operation type from headers
                String operation = getHeader(record, "op");
                
                switch (operation) {
                    case "c":  // Create
                        handleInsert(value);
                        break;
                    case "u":  // Update
                        handleUpdate(value);
                        break;
                    case "d":  // Delete
                        handleDelete(value);
                        break;
                }
            }
            
            consumer.commitSync();
        }
    }
    
    private static void handleInsert(GenericRecord record) {
        System.out.println("INSERT: " + record.get("order_id"));
        // Process new record
    }
    
    private static void handleUpdate(GenericRecord record) {
        System.out.println("UPDATE: " + record.get("order_id"));
        // Process updated record
    }
    
    private static void handleDelete(GenericRecord record) {
        System.out.println("DELETE: " + record.get("order_id"));
        // Process deletion
    }
}

Python CDC Consumer

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.avro import AvroConsumer
import json

class CDCProcessor:
    def __init__(self, bootstrap_servers, schema_registry_url):
        self.consumer = AvroConsumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'cdc-processor',
            'enable.auto.commit': False,
            'schema.registry.url': schema_registry_url
        })
        
        self.consumer.subscribe(['cdc.orders'])
    
    def process(self):
        while True:
            msg = self.consumer.poll(1.0)
            
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f"Error: {msg.error()}")
                    break
            
            # Get operation from headers
            operation = self.get_header(msg, 'op')
            
            if operation == 'c':
                self.handle_insert(msg.value())
            elif operation == 'u':
                self.handle_update(msg.value())
            elif operation == 'd':
                self.handle_delete(msg.value())
            
            self.consumer.commit()
    
    def handle_insert(self, record):
        print(f"INSERT: {record['order_id']}")
        # Sync to Elasticsearch, cache, etc.
    
    def handle_update(self, record):
        print(f"UPDATE: {record['order_id']}")
        # Update downstream systems
    
    def handle_delete(self, record):
        print(f"DELETE: {record['order_id']}")
        # Soft delete or remove from downstream
    
    def get_header(self, msg, header_name):
        if msg.headers():
            for header in msg.headers():
                if header[0] == header_name:
                    return header[1].decode('utf-8')
        return None

# Run processor
processor = CDCProcessor('kafka1:9092', 'http://schema-registry:8081')
processor.process()

Schema Evolution with CDC

// Original schema (v1)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

// Evolved schema (v2) - adds field
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"}
  ]
}

ℹ️

Key Insight: Debezium automatically handles schema evolution. New fields are added with defaults, and consumers using old schemas can still read the data.

Monitoring CDC

from prometheus_client import Gauge, Counter, start_http_server

cdc_lag = Gauge(
    'kafka_connect_cdc_lag',
    'CDC replication lag in milliseconds',
    ['connector', 'database', 'table']
)

cdc_operations = Counter(
    'kafka_connect_cdc_operations_total',
    'Total CDC operations',
    ['connector', 'operation', 'table']
)

cdc_errors = Counter(
    'kafka_connect_cdc_errors_total',
    'Total CDC errors',
    ['connector', 'error_type']
)

def monitor_cdc_connector(connector_name):
    """Monitor CDC connector metrics"""
    import requests
    
    while True:
        response = requests.get(
            f'http://connect:8083/connectors/{connector_name}/status'
        )
        
        status = response.json()
        
        # Extract metrics from connector status
        for task in status.get('tasks', []):
            if task['state'] == 'RUNNING':
                # Parse connector-specific metrics
                pass
        
        time.sleep(10)

Follow-Up Questions

  1. What is the difference between logical replication and CDC?
  2. How does Debezium handle schema evolution in databases?
  3. Explain the purpose of the ExtractNewRecordState SMT.
  4. How would you implement exactly-once CDC from PostgreSQL to Kafka?
  5. What are the performance implications of CDC on the source database?

Advertisement