Change Data Capture: Debezium, Log-based, Trigger-based
Capturing and streaming database changes in real-time
Interview Question
"Design a CDC pipeline that: (1) captures changes from PostgreSQL, (2) handles schema evolution, (3) provides exactly-once delivery, (4) supports 100K changes/second, (5) allows point-in-time recovery. Compare log-based vs trigger-based approaches and justify your choice."
Difficulty: Hard | Frequently asked at LinkedIn, Uber, Netflix, Stripe
Theoretical Foundation
What is Change Data Capture (CDC)?
CDC captures changes (INSERT, UPDATE, DELETE) made to a database and delivers them to downstream systems.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CDC Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Source Database βββΆ CDC Connector βββΆ Message Queue βββΆ Sinkβ
β β
β PostgreSQL Debezium Kafka S3/ β
β MySQL Maxwell Kinesis Delta β
β SQL Server GoldenGate EventBridge Lake β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
CDC Approaches
1. Log-based CDC (Debezium)
Reads database transaction log to capture changes.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Log-based CDC (Debezium) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β PostgreSQL WAL βββΆ Debezium Connector βββΆ Kafka β
β β
β WAL (Write-Ahead Log): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LSN 100: INSERT INTO orders (id, amount) VALUES (1, 100)β
β β LSN 101: UPDATE orders SET amount = 150 WHERE id = 1 β
β β LSN 102: DELETE FROM orders WHERE id = 2 β
β β LSN 103: COMMIT β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Debezium reads WAL and converts to Kafka messages: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β {"op": "c", "before": null, "after": {id: 1, amount: 100}}β
β β {"op": "u", "before": {amount: 100}, "after": {amount: 150}}β
β β {"op": "d", "before": {id: 2}, "after": null} β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pros:
- No impact on source database
- Captures all changes (including deletes)
- Low latency (milliseconds)
- Guarantees order within transaction
Cons:
- Requires database-specific connector
- May expose sensitive data in logs
- Log retention affects recovery window
2. Trigger-based CDC
Database triggers fire on changes and write to a change table.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Trigger-based CDC β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Source Table: orders β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CREATE TRIGGER orders_cdc_trigger β β
β β AFTER INSERT OR UPDATE OR DELETE ON orders β β
β β FOR EACH ROW β β
β β EXECUTE FUNCTION log_change(); β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Change Table: orders_changelog β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β id β operation β old_data β new_data β changed_at β β
β β 1 β INSERT β NULL β {...} β 2024-01-15 β β
β β 1 β UPDATE β {...} β {...} β 2024-01-15 β β
β β 2 β DELETE β {...} β NULL β 2024-01-15 β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Connector reads change table and sends to Kafka: β
β Polling query: SELECT * FROM orders_changelog β
β WHERE id > last_processed_id β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pros:
- Database-agnostic
- No special permissions needed
- Simple to implement
Cons:
- Performance impact on source (triggers)
- Only captures committed changes
- May miss changes if trigger fails
- Requires polling
3. Timestamp-based CDC
Uses a last_updated timestamp to find changes.
-- Query for changes since last run
SELECT * FROM orders
WHERE updated_at > :last_run_timestamp
ORDER BY updated_at;
Pros:
- Simple to implement
- No database modifications needed
Cons:
- Misses deletions
- May miss concurrent updates
- Requires indexed timestamp column
4. Version-based CDC
Adds a version number that increments on each change.
-- Query for changes since last version
SELECT * FROM orders
WHERE version > :last_version
ORDER BY version;
Pros:
- Captures all changes
- Simple to query
- No timestamp issues
Cons:
- Requires schema change
- Version column must be maintained
Debezium Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Debezium Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β PostgreSQL βββΆ Debezium Connector βββΆ Kafka Connect βββΆ Kafkaβ
β β
β Debezium Components: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Snapshotter: Initial data snapshot β β
β β 2. WAL Reader: Reads transaction log β β
β β 3. Schema History: Tracks schema changes β β
β β 4. Offset Storage: Tracks reading position β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Kafka Topics: β
β - {server}.{schema}.{table}: Change events β
β - {server}.schema.history: Schema change events β
β - {server}.offsets: Debezium offsets β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Exactly-Once CDC
Code Implementation
Debezium PostgreSQL Connector
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${secrets:postgres-password}",
"database.dbname": "orders_db",
"database.server.name": "orders_server",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"tombstones.on.delete": true,
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"producer.override.max.request.size": "10485760",
"producer.override.batch.size": "65536",
"producer.override.linger.ms": "10"
}
}
Debezium Kafka Connect Setup
import requests
import json
# ============================================================
# KAFKA CONNECT SETUP
# ============================================================
KAFKA_CONNECT_URL = "http://kafka-connect:8083"
# Create connector
def create_debezium_connector():
"""Create Debezium PostgreSQL connector"""
config = {
"name": "postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "orders_db",
"database.server.name": "orders_server",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"tombstones.on.delete": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
}
}
response = requests.post(
f"{KAFKA_CONNECT_URL}/connectors",
json=config,
headers={"Content-Type": "application/json"}
)
if response.status_code == 201:
print("Connector created successfully")
else:
print(f"Error: {response.text}")
# Check connector status
def get_connector_status(connector_name):
"""Get connector status"""
response = requests.get(
f"{KAFKA_CONNECT_URL}/connectors/{connector_name}/status"
)
return response.json()
# Restart connector
def restart_connector(connector_name):
"""Restart connector"""
response = requests.post(
f"{KAFKA_CONNECT_URL}/connectors/{connector_name}/restart"
)
return response.status_code == 204
Trigger-based CDC Implementation
-- ============================================================
-- TRIGGER-BASED CDC IN POSTGRESQL
-- ============================================================
-- Create change log table
CREATE TABLE orders_changelog (
changelog_id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(100),
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_data JSONB,
new_data JSONB,
changed_by VARCHAR(100),
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create CDC function
CREATE OR REPLACE FUNCTION log_orders_change()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO orders_changelog (table_name, operation, new_data, changed_by)
VALUES ('orders', 'INSERT', to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO orders_changelog (table_name, operation, old_data, new_data, changed_by)
VALUES ('orders', 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO orders_changelog (table_name, operation, old_data, changed_by)
VALUES ('orders', 'DELETE', to_jsonb(OLD), current_user);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create trigger
CREATE TRIGGER orders_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW
EXECUTE FUNCTION log_orders_change();
-- Create index for efficient polling
CREATE INDEX idx_orders_changelog_id ON orders_changelog(changelog_id);
CREATE INDEX idx_orders_changelog_timestamp ON orders_changelog(changed_at);
CDC Consumer in Python
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
# ============================================================
# CDC CONSUMER
# ============================================================
spark = SparkSession.builder \
.appName("CDCConsumer") \
.getOrCreate()
# Read CDC events from Kafka
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "earliest") \
.load()
# Parse CDC events
schema = "op STRING, before STRUCT<id BIGINT, amount DOUBLE>, after STRUCT<id BIGINT, amount DOUBLE>, source STRUCT<ts_ms BIGINT>"
parsed_stream = cdc_stream \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Process INSERT operations
inserts = parsed_stream.filter(F.col("op") == "c") \
.select("after.*")
# Process UPDATE operations
updates = parsed_stream.filter(F.col("op") == "u") \
.select("after.*")
# Process DELETE operations
deletes = parsed_stream.filter(F.col("op") == "d") \
.select("before.*")
# Write to Delta Lake
def process_cdc_batch(batch_df, batch_id):
"""Process a batch of CDC events"""
# Separate by operation type
inserts = batch_df.filter(F.col("op") == "c")
updates = batch_df.filter(F.col("op") == "u")
deletes = batch_df.filter(F.col("op") == "d")
# Merge into target table
from delta.tables import DeltaTable
target_table = DeltaTable.forPath(spark, "/delta-lake/orders/")
# Process inserts and updates
changes = inserts.unionByName(updates).select("after.*")
if changes.count() > 0:
target_table.alias("target").merge(
changes.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Process deletes
if deletes.count() > 0:
delete_ids = deletes.select("before.id")
target_table.alias("target").merge(
delete_ids.alias("source"),
"target.id = source.id"
).whenMatchedDelete() \
.execute()
# Write stream with foreachBatch
query = parsed_stream.writeStream \
.foreachBatch(process_cdc_batch) \
.outputMode("update") \
.option("checkpointLocation", "/checkpoints/cdc/") \
.start()
Schema Evolution in CDC
# ============================================================
# SCHEMA EVOLUTION IN CDC
# ============================================================
def handle_cdc_schema_evolution(batch_df, batch_id):
"""Handle schema evolution in CDC events"""
# Get current schema
target_schema = spark.read.format("delta").load("/delta-lake/orders/").schema
# Check for new columns
batch_columns = set(batch_df.columns)
target_columns = set(target_schema.fieldNames())
new_columns = batch_columns - target_columns
if new_columns:
print(f"New columns detected: {new_columns}")
# Add new columns to target table
for col_name in new_columns:
spark.sql(f"""
ALTER TABLE delta.`/delta-lake/orders/`
ADD COLUMN {col_name} STRING
""")
# Process CDC events
process_cdc_batch(batch_df, batch_id)
# Use schema evolution handler
query = parsed_stream.writeStream \
.foreachBatch(handle_cdc_schema_evolution) \
.outputMode("update") \
.option("checkpointLocation", "/checkpoints/cdc-schema-evolution/") \
.start()
Point-in-Time Recovery
# ============================================================
# POINT-IN-TIME RECOVERY
# ============================================================
def recover_to_timestamp(target_table_path, recovery_timestamp):
"""Recover table to a specific timestamp"""
# Read table at specified timestamp
df_at_timestamp = spark.read.format("delta") \
.option("timestampAsOf", recovery_timestamp) \
.load(target_table_path)
# Overwrite current table
df_at_timestamp.write.format("delta") \
.mode("overwrite") \
.save(target_table_path)
print(f"Table recovered to {recovery_timestamp}")
def recover_to_version(target_table_path, version):
"""Recover table to a specific version"""
# Read table at specified version
df_at_version = spark.read.format("delta") \
.option("versionAsOf", version) \
.load(target_table_path)
# Overwrite current table
df_at_version.write.format("delta") \
.mode("overwrite") \
.save(target_table_path)
print(f"Table recovered to version {version}")
# Get table versions
def get_table_versions(table_path):
"""Get all available versions of a table"""
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, table_path)
history = delta_table.history()
return history.select("version", "timestamp", "operation").collect()
Performance Optimization
# ============================================================
# PERFORMANCE OPTIMIZATION
# ============================================================
# 1. Batch CDC events
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.option("maxOffsetsPerTrigger", 100000) \
.load()
# 2. Use micro-batch processing
query = cdc_stream.writeStream \
.trigger(processingTime="30 seconds") \
.format("delta") \
.start()
# 3. Optimize Delta Lake after CDC
def optimize_delta_table(table_path):
"""Optimize Delta Lake table after CDC processing"""
spark.sql(f"""
OPTIMIZE delta.`{table_path}`
ZORDER BY (id)
""")
π‘
Production Tip: For high-volume CDC (100K+ changes/second), use: (1) Kafka partitions for parallelism, (2) Delta Lake MERGE for efficient upserts, (3) Z-Order for query performance, and (4) Auto-optimize for small file compaction.
Common Follow-Up Questions
Q1: How do you handle out-of-order CDC events?
# Use watermarking for out-of-order events
windowed_stream = cdc_stream \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes"),
"table_name"
) \
.agg(F.count("*").alias("change_count"))
Q2: How do you handle CDC from multiple tables?
# Subscribe to multiple tables
cdc_stream = spark.readStream \
.format("kafka") \
.option("subscribe", "orders,customers,products") \
.load()
# Route to different handlers
def route_cdc_events(batch_df, batch_id):
"""Route CDC events to appropriate handlers"""
for table_name in ["orders", "customers", "products"]:
table_events = batch_df.filter(F.col("topic") == table_name)
if table_events.count() > 0:
handle_table_cdc(table_name, table_events)
Q3: How do you monitor CDC pipelines?
# Monitor CDC lag
def monitor_cdc_lag():
"""Monitor CDC lag between source and target"""
# Get latest source timestamp
source_latest = spark.sql("SELECT MAX(updated_at) FROM orders").collect()[0][0]
# Get latest target timestamp
target_latest = spark.read.format("delta") \
.load("/delta-lake/orders/") \
.agg(F.max("_cdc_timestamp")).collect()[0][0]
lag = (source_latest - target_latest).total_seconds()
if lag > 300: # 5 minutes
print(f"WARNING: CDC lag is {lag} seconds")
return lag
Q4: How do you handle schema changes in CDC?
# Schema evolution with Debezium
# Debezium automatically captures schema changes from WAL
# Consumer can handle schema evolution using mergeSchema option
df = spark.read.format("delta") \
.option("mergeSchema", "true") \
.load("/delta-lake/orders/")
β οΈ
Critical Consideration: CDC can capture sensitive data (PII, financial). Always: (1) encrypt data in transit and at rest, (2) implement access controls, (3) mask sensitive fields, and (4) comply with regulations (GDPR, CCPA).
Company-Specific Tips
LinkedIn Interview Tips
- Discuss Debezium and Kafka Connect
- Explain schema evolution in CDC
- Mention exactly-once delivery guarantees
- Talk about multi-database CDC
Uber Interview Tips
- Focus on real-time ride data CDC
- Discuss geospatial data replication
- Mention multi-region CDC
- Talk about payment data consistency
Netflix Interview Tips
- Discuss content metadata CDC
- Explain A/B testing data replication
- Mention multi-cloud CDC
- Talk about data lake ingestion
βΉοΈ
Final Takeaway: CDC is essential for real-time data integration. Log-based CDC (Debezium) is preferred for most use cases due to low impact and high throughput. Trigger-based CDC is simpler but has performance implications. Always consider: (1) throughput requirements, (2) latency requirements, (3) schema evolution, and (4) exactly-once delivery.