Change Data Capture (CDC) with PySpark
CDC Pipeline Flow
Architecture Diagram: CDC Pipeline Overview
Architecture Diagram: Debezium CDC Event Flow
Architecture Diagram: Event Sourcing Pattern
Detailed Explanation
Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. Unlike batch-based ETL that queries the entire source table periodically, CDC captures only the delta (inserts, updates, deletes) as they occur, enabling near-real-time data synchronization with minimal source system impact.
What Challenge Does CDC Solve?
The fundamental challenge CDC solves is data synchronization between operational systems (OLTP) and analytical systems (OLAP/data lakes).
| Approach | Limitations |
|---|---|
| Traditional ETL | Polls source tables using timestamp columns or checksums, which misses deletes, creates race conditions, and requires full-table scans that degrade source system performance |
| CDC | Eliminates these issues by reading the database's transaction log (binlog for MySQL, WAL for PostgreSQL, redo logs for Oracle), which already records every change atomically |
What is Debezium?
Debezium is the most widely-used open-source CDC platform, built on Apache Kafka Connect.
Key Features:
- Reads database transaction logs
- Produces change events in a standardized JSON envelope format
- Enables downstream consumers to reconstruct the exact sequence of changes
Envelope Format:
beforestate (for updates/deletes)afterstate (for creates/updates)- Operation type (
cfor create,ufor update,dfor delete) - Source metadata (connector name, database, table, timestamp)
- Transaction information
How Does PySpark Integrate with CDC?
PySpark Structured Streaming integrates with CDC through several approaches:
- Reading Kafka topics: Contains CDC events and applying them to Delta Lake tables using MERGE operations
- Delta Lake's built-in CDC feed capability: Uses
appendOnly+cdcformat for downstream consumption - Custom watermark-based deduplication: For handling out-of-order events
What is Event Sourcing?
Event Sourcing, closely related to CDC, stores every state change as an immutable event rather than overwriting current state.
Benefits:
- Complete audit trail
- Temporal queries ("what was the state at time T?")
- Event replay for rebuilding materialized views
- Decouples write and read models (CQRS pattern)
Implementation in PySpark:
- Write events to an append-only Delta table
- Use streaming aggregations to maintain materialized views
How is Exactly-Once Semantics Achieved?
The key challenge in CDC processing is handling exactly-once semantics in distributed systems.
PySpark's Approach:
- Checkpoint-based offset tracking
- Idempotent MERGE operations on Delta Lake
- Watermark-based handling of late-arriving events
Key Takeaway: The combination of Kafka's at-least-once delivery with Delta Lake's ACID transactions and idempotent MERGE operations provides effectively exactly-once processing semantics.
Key Concepts Table
Mathematical Foundations
Definition: Change Data Capture
CDC captures row-level changes (inserts, updates, deletes) from source database and propagates them to target such that:
where is the initial snapshot and is the change set at time .
Event Ordering
For CDC events affecting the same key , the ordering must satisfy:
Out-of-order events require buffering with timeout :
Merge Correctness Theorem
CDC merge is correct if the merge function is commutative and associative:
This ensures that applying changes in any order produces the same result.
Log Retention
For CDC with retention period and average change rate bytes/sec:
Debezium log retention must exceed maximum replication lag .
Throughput Capacity
Maximum CDC throughput with parallel consumers:
Bottleneck is typically the slowest component.
Key Insight
Event sourcing with CDC provides a complete audit trail but requires compaction to manage storage growth. Debezium's outbox pattern guarantees exactly-once delivery by writing events to an outbox table in the same transaction as business data.
Summary
CDC enables real-time data synchronization with ordering guarantees. Merge operations must be commutative for correctness. Log retention must exceed replication lag. Throughput is bounded by the slowest component in the pipeline.
Key Concepts Table (cont.)
| Component | Description | Failure Mode | Recovery Strategy |
|---|---|---|---|
| Source DB Transaction Log | Binlog/WAL recording all changes | Log rotation/truncation | Snapshot + resume from log position |
| Debezium Connector | Reads log, produces Kafka events | Connector crash | Restart from last committed offset |
| Kafka Topic | Durable event buffer | Broker failure | Replication factor 3+ |
| PySpark Structured Streaming | Micro-batch CDC processor | Executor failure | Checkpoint recovery |
| Delta Lake MERGE | Atomic upsert to target | Write failure | Retry from checkpoint |
| Checkpoint Directory | Offset tracking for exactly-once | Corruption | Restore from backup |
| Watermark | Late event handling | Clock skew | Grace period configuration |
Code Examples
Example 1: Reading CDC Events from Kafka with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("CDC-From-Kafka") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Define CDC event schema
cdc_event_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType()),
StructField("fields", ArrayType(StructType([
StructField("name", StringType()),
StructField("type", StringType()),
StructField("fields", ArrayType(StructType([
StructField("name", StringType()),
StructField("type", StringType()),
])))
])))
])),
StructField("payload", StructType([
StructField("before", MapType(StringType(), StringType())),
StructField("after", MapType(StringType(), StringType())),
StructField("source", StructType([
StructField("version", StringType()),
StructField("connector", StringType()),
StructField("name", StringType()),
StructField("ts_ms", LongType()),
StructField("snapshot", StringType()),
StructField("db", StringType()),
StructField("table", StringType()),
])),
StructField("op", StringType()),
StructField("ts_ms", LongType()),
StructField("transaction", StructType([
StructField("id", StringType()),
StructField("total_order", IntegerType()),
StructField("data_collection_order", IntegerType()),
])),
]))
])
# Read CDC events from Kafka
cdc_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
.option("subscribe", "dbserver1.public.customers,dbserver1.public.orders")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 100000)
.load()
)
# Parse and flatten CDC events
parsed_cdc = (
cdc_stream
.select(
col("key").cast("string").alias("kafka_key"),
from_json(col("value").cast("string"), cdc_event_schema).alias("cdc"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_timestamp")
)
.select(
col("kafka_key"),
col("cdc.payload.before").alias("before"),
col("cdc.payload.after").alias("after"),
col("cdc.payload.op").alias("operation"),
col("cdc.payload.source.ts_ms").alias("source_ts_ms"),
col("cdc.payload.source.db").alias("source_db"),
col("cdc.payload.source.table").alias("source_table"),
col("cdc.payload.source.connector").alias("connector"),
col("topic"),
col("partition"),
col("offset"),
col("kafka_timestamp"),
# Deduplication key
concat(
col("topic"),
lit("-"),
col("partition"),
lit("-"),
col("offset")
).alias("event_id"),
# Processing timestamp
current_timestamp().alias("processed_at")
)
)
# βββ Write to Bronze Layer (Raw CDC Events) βββ
def write_bronze_batch(batch_df, batch_id):
"""Write raw CDC events to Bronze layer with idempotency."""
if batch_df.count() == 0:
return
(
batch_df
.write
.format("delta")
.mode("append")
.save("/mnt/lakehouse/bronze/cdc_events")
)
bronze_query = (
parsed_cdc
.writeStream
.foreachBatch(write_bronze_batch)
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/cdc_bronze")
.trigger(processingTime="30 seconds")
.start()
)
bronze_query.awaitTermination()
Example 2: Applying CDC to Delta Lake with MERGE
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("CDC-Apply-to-Delta") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()
def apply_cdc_to_delta(micro_batch_df, batch_id):
"""Apply CDC events to Delta Lake target table using MERGE."""
if micro_batch_df.count() == 0:
return
# Separate operations
creates = micro_batch_df.filter(col("operation") == "c")
updates = micro_batch_df.filter(col("operation") == "u")
deletes = micro_batch_df.filter(col("operation") == "d")
target_table = DeltaTable.forPath(spark, "/mnt/lakehouse/silver/customers")
# Apply CREATE events (INSERT)
if creates.count() > 0:
new_records = (
creates
.select("after.*") # Flatten the 'after' map to columns
.withColumn("_cdc_operation", lit("INSERT"))
.withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
.withColumn("_event_id", col("event_id"))
)
(
target_table.alias("target")
.merge(
new_records.alias("source"),
"target.id = source.id"
)
.whenNotMatchedInsertAll()
.execute()
)
# Apply UPDATE events (MERGE)
if updates.count() > 0:
updated_records = (
updates
.select("after.*")
.withColumn("_cdc_operation", lit("UPDATE"))
.withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
.withColumn("_event_id", col("event_id"))
)
(
target_table.alias("target")
.merge(
updated_records.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Apply DELETE events
if deletes.count() > 0:
deleted_keys = deletes.select(
col("before.id").alias("id")
).distinct()
(
target_table.alias("target")
.merge(
deleted_keys.alias("source"),
"target.id = source.id"
)
.whenMatchedDelete()
.execute()
)
# βββ Stream CDC Events and Apply to Target βββ
cdc_stream = (
spark.readStream
.format("delta")
.load("/mnt/lakehouse/bronze/cdc_events")
.filter(
col("source_table").isin("customers", "orders")
)
)
# Deduplicate within micro-batch (keep latest per event_id)
deduped_stream = (
cdc_stream
.withColumn("row_num",
row_number().over(
Window.partitionBy("event_id")
.orderBy(col("processed_at").desc())
)
)
.filter(col("row_num") == 1)
.drop("row_num")
)
# Write with MERGE
merge_query = (
deduped_stream
.writeStream
.foreachBatch(apply_cdc_to_delta)
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/cdc_merge")
.trigger(processingTime="60 seconds")
.start()
)
merge_query.awaitTermination()
Example 3: Event Sourcing Implementation with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("Event-Sourcing-CDC") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Event Store (Append-Only) βββ
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("aggregate_id", StringType(), False),
StructField("aggregate_type", StringType(), False),
StructField("event_data", MapType(StringType(), StringType())),
StructField("metadata", MapType(StringType(), StringType())),
StructField("event_version", IntegerType()),
StructField("created_at", TimestampType()),
])
# Create event store
spark.sql("""
CREATE TABLE IF NOT EXISTS event_store (
event_id STRING NOT NULL,
event_type STRING NOT NULL,
aggregate_id STRING NOT NULL,
aggregate_type STRING NOT NULL,
event_data MAP<STRING, STRING>,
metadata MAP<STRING, STRING>,
event_version INT,
created_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (aggregate_type)
TBLPROPERTIES (
'delta.appendOnly' = 'true',
'delta.logRetentionDuration' = 'interval 365 days'
)
""")
# βββ Emit Events βββ
def emit_event(aggregate_id, aggregate_type, event_type, event_data, metadata=None):
"""Emit an event to the event store."""
event = spark.createDataFrame([{
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"aggregate_id": aggregate_id,
"aggregate_type": aggregate_type,
"event_data": event_data,
"metadata": metadata or {},
"event_version": 1,
"created_at": datetime.now(),
}])
event.write.format("delta").mode("append").save("/mnt/lakehouse/events")
# Example: Customer lifecycle events
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerCreated",
event_data={"name": "Alice Johnson", "email": "alice@email.com", "city": "NYC"},
metadata={"source": "registration_api", "correlation_id": "req-123"}
)
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerAddressChanged",
event_data={"old_city": "NYC", "new_city": "LA", "change_reason": "relocation"},
metadata={"source": "profile_api", "correlation_id": "req-456"}
)
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerEmailUpdated",
event_data={"old_email": "alice@email.com", "new_email": "alice.new@email.com"},
metadata={"source": "profile_api", "correlation_id": "req-789"}
)
# βββ Build Materialized View (Projection) βββ
def build_customer_view(events_df, aggregate_id=None):
"""Rebuild customer state from events (event replay)."""
filtered = events_df if aggregate_id is None else \
events_df.filter(col("aggregate_id") == aggregate_id)
# Sort by version to replay events in order
ordered = filtered.orderBy("event_version", "created_at")
# Apply events to build current state
state = ordered.groupBy("aggregate_id").agg(
# Extract fields from event_data maps
last(
when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
ignorenulls=True
).alias("name"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
col("event_data")["email"]),
ignorenulls=True
).alias("email"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerAddressChanged"),
coalesce(col("event_data")["new_city"], col("event_data")["city"])),
ignorenulls=True
).alias("city"),
count("*").alias("total_events"),
max("created_at").alias("last_updated"),
first("created_at").alias("created_at"),
)
return state
# Build view from all events
events_df = spark.read.format("delta").load("/mnt/lakehouse/events")
customer_view = build_customer_view(events_df)
customer_view.show()
# Build view for specific customer (point-in-time query)
def build_state_at_time(events_df, aggregate_id, as_of_timestamp):
"""Build state as of a specific timestamp."""
return (
events_df
.filter(
(col("aggregate_id") == aggregate_id) &
(col("created_at") <= as_of_timestamp)
)
.orderBy("event_version", "created_at")
.groupBy("aggregate_id")
.agg(
last(
when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
ignorenulls=True
).alias("name"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
col("event_data")["email"]),
ignorenulls=True
).alias("email"),
)
)
# Query state at specific time
state_at_may = build_state_at_time(
events_df, "C101",
"2026-05-15"
)
state_at_may.show()
Performance Metrics
| Metric | Batch ETL (Polling) | CDC (Debezium) | Improvement |
|---|---|---|---|
| Latency (Source β Target) | 4-24 hours | 5-30 seconds | 99.9% reduction |
| Source DB Load | 15-30% CPU (full scans) | 1-3% CPU (log read) | 90% reduction |
| Data Transferred | 100% of table per run | Only changes (0.1-5%) | 95-99% reduction |
| Storage (Delta Lake) | Full copies per snapshot | Event log only | 80% reduction |
| Query Freshness | Stale by hours | Near real-time | Sub-minute |
| Delete Detection | Impossible (without flags) | Automatic | New capability |
| Ordering Guarantees | None (race conditions) | Transaction-ordered | Strict ordering |
| Exactly-Once | Difficult (dedup needed) | Checkpoint-based | Guaranteed |
| Operational Complexity | Low | Medium-High | Trade-off |
| Cost (Compute) | 0.15/hour (streaming) | 70% reduction |
Best Practices
-
Use Debezium for database CDC β Debezium provides the most mature, well-tested CDC connectors for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB. It handles log reading, snapshotting, and event serialization correctly.
-
Enable Kafka topic compaction for latest-state queries β For use cases that need the current state without replaying all events, configure Kafka topic compaction (cleanup.policy=compact) to retain only the latest value per key.
-
Implement watermark-based deduplication β Use PySpark's watermark feature (
withWatermark) to handle late-arriving events and ensure idempotent processing within configurable time windows. -
Separate bronze and silver CDC processing β Write raw CDC events to Bronze first, then process and apply to Silver. This provides an audit trail and enables reprocessing if logic changes.
-
Use Delta Lake MERGE for idempotent applies β MERGE operations are idempotent by design; running the same MERGE twice produces the same result. This is critical for exactly-once semantics in distributed systems.
-
Monitor Kafka consumer lag β Set up alerts when consumer lag exceeds thresholds. High lag indicates the CDC processor cannot keep up with the source change rate.
-
Handle schema evolution in CDC events β Use Delta Lake's
mergeSchemaoption and Debezium's schema history tracking to handle source schema changes without breaking the CDC pipeline. -
Implement dead-letter queues β Route events that fail deserialization or validation to a dead-letter queue for manual inspection rather than blocking the entire pipeline.
-
Snapshot before CDC starts β When setting up CDC for the first time, take a consistent snapshot of the source database and replay it through the CDC pipeline to establish a baseline.
-
Tune micro-batch frequency β Balance latency vs. throughput by adjusting the trigger interval. For most use cases, 30-60 seconds provides a good balance; for sub-second latency, consider continuous processing mode.
Mathematical Foundation Summary
CDC latency follows latency = source_commit_time + replication_lag + processing_time. Kafka-based CDC achieves at-least-once delivery with throughput = batch_size / processing_time. The MERGE operation complexity is O(n Γ m) where n is source rows and m is target rows, optimized to O(n Γ log(m)) with join indexes. Debezium log-based CDC has minimal source impact: source_overhead β 0.01-0.05% of transaction throughput. Watermark-based ordering ensures event_time β€ watermark + allowed_late for out-of-order event handling.
See also: Iceberg Integration (21), Delta Lake Operations (22), Hudi Operations (23), SCD (35), Real-Time Analytics (38)
See Also
- Structured Streaming β Real-time CDC processing
- Delta Lake β Delta Lake CDC support
- Merge Upsert β Merge and upsert operations
- Kafka Streams β Kafka-based CDC pipelines