πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Change Data Capture (CDC) with PySpark

🟒 Free Lesson

Advertisement

Change Data Capture (CDC) with PySpark

CDC Pipeline Flow

Change Data Capture PipelineSource DBMySQL/PGDebeziumRead WAL/binlogCapture changesKafkaCDC eventsSpark StreamingRead CDC streamApply changesDelta LakeMERGE/UPSERTDebezium Event LifecycleINSERT eventUPDATE eventDELETE eventSchema changeSnapshot syncEvery event carries: before state, after state, timestamp, operation type

Architecture Diagram: CDC Pipeline Overview

Architecture Diagram: Debezium CDC Event Flow

Architecture Diagram: Event Sourcing Pattern

Detailed Explanation

Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. Unlike batch-based ETL that queries the entire source table periodically, CDC captures only the delta (inserts, updates, deletes) as they occur, enabling near-real-time data synchronization with minimal source system impact.


What Challenge Does CDC Solve?

The fundamental challenge CDC solves is data synchronization between operational systems (OLTP) and analytical systems (OLAP/data lakes).

ApproachLimitations
Traditional ETLPolls source tables using timestamp columns or checksums, which misses deletes, creates race conditions, and requires full-table scans that degrade source system performance
CDCEliminates these issues by reading the database's transaction log (binlog for MySQL, WAL for PostgreSQL, redo logs for Oracle), which already records every change atomically

What is Debezium?

Debezium is the most widely-used open-source CDC platform, built on Apache Kafka Connect.

Key Features:

  • Reads database transaction logs
  • Produces change events in a standardized JSON envelope format
  • Enables downstream consumers to reconstruct the exact sequence of changes

Envelope Format:

  • before state (for updates/deletes)
  • after state (for creates/updates)
  • Operation type (c for create, u for update, d for delete)
  • Source metadata (connector name, database, table, timestamp)
  • Transaction information

How Does PySpark Integrate with CDC?

PySpark Structured Streaming integrates with CDC through several approaches:

  1. Reading Kafka topics: Contains CDC events and applying them to Delta Lake tables using MERGE operations
  2. Delta Lake's built-in CDC feed capability: Uses appendOnly + cdc format for downstream consumption
  3. Custom watermark-based deduplication: For handling out-of-order events

What is Event Sourcing?

Event Sourcing, closely related to CDC, stores every state change as an immutable event rather than overwriting current state.

Benefits:

  • Complete audit trail
  • Temporal queries ("what was the state at time T?")
  • Event replay for rebuilding materialized views
  • Decouples write and read models (CQRS pattern)

Implementation in PySpark:

  • Write events to an append-only Delta table
  • Use streaming aggregations to maintain materialized views

How is Exactly-Once Semantics Achieved?

The key challenge in CDC processing is handling exactly-once semantics in distributed systems.

PySpark's Approach:

  • Checkpoint-based offset tracking
  • Idempotent MERGE operations on Delta Lake
  • Watermark-based handling of late-arriving events

Key Takeaway: The combination of Kafka's at-least-once delivery with Delta Lake's ACID transactions and idempotent MERGE operations provides effectively exactly-once processing semantics.

Key Concepts Table

Mathematical Foundations

Definition: Change Data Capture

CDC captures row-level changes Ξ”R={I,U,D}\Delta R = \{I, U, D\} (inserts, updates, deletes) from source database SS and propagates them to target TT such that:

T(t)=T(t0)βˆͺ⋃ti≀tΞ”R(ti)T(t) = T(t_0) \cup \bigcup_{t_i \leq t} \Delta R(t_i)

where T(t0)T(t_0) is the initial snapshot and Ξ”R(ti)\Delta R(t_i) is the change set at time tit_i.

Event Ordering

For CDC events e1,e2e_1, e_2 affecting the same key kk, the ordering must satisfy:

Version(e1)<Version(e2)β€…β€ŠβŸΉβ€…β€ŠApply(e1)Β beforeΒ Apply(e2)\text{Version}(e_1) < \text{Version}(e_2) \implies \text{Apply}(e_1) \text{ before } \text{Apply}(e_2)

Out-of-order events require buffering with timeout TbufferT_{\text{buffer}}:

Buffer(e)β€…β€ŠβŸΊβ€…β€Šβˆƒeβ€²:eβ€².seq=e.seqβˆ’1∧eβ€²Β notΒ yetΒ received\text{Buffer}(e) \iff \exists e': e'.\text{seq} = e.\text{seq} - 1 \land e' \text{ not yet received}

Merge Correctness Theorem

CDC merge is correct if the merge function MM is commutative and associative:

M(M(A,b),c)=M(M(A,c),b)=M(A,bβˆͺc)M(M(A, b), c) = M(M(A, c), b) = M(A, b \cup c)

This ensures that applying changes in any order produces the same result.

Log Retention

For CDC with retention period RR and average change rate Ξ»\lambda bytes/sec:

Storagelog=R×λ×86400(bytes)\text{Storage}_{\text{log}} = R \times \lambda \times 86400 \quad \text{(bytes)}

Debezium log retention must exceed maximum replication lag lagmax⁑\text{lag}_{\max}.

Throughput Capacity

Maximum CDC throughput with pp parallel consumers:

Tmax=pΓ—min⁑(Tsource,Tnetwork,Tsink)T_{\text{max}} = p \times \min(T_{\text{source}}, T_{\text{network}}, T_{\text{sink}})

Bottleneck is typically the slowest component.

Key Insight

Event sourcing with CDC provides a complete audit trail but requires compaction to manage storage growth. Debezium's outbox pattern guarantees exactly-once delivery by writing events to an outbox table in the same transaction as business data.

Summary

CDC enables real-time data synchronization with ordering guarantees. Merge operations must be commutative for correctness. Log retention must exceed replication lag. Throughput is bounded by the slowest component in the pipeline.

Key Concepts Table (cont.)

ComponentDescriptionFailure ModeRecovery Strategy
Source DB Transaction LogBinlog/WAL recording all changesLog rotation/truncationSnapshot + resume from log position
Debezium ConnectorReads log, produces Kafka eventsConnector crashRestart from last committed offset
Kafka TopicDurable event bufferBroker failureReplication factor 3+
PySpark Structured StreamingMicro-batch CDC processorExecutor failureCheckpoint recovery
Delta Lake MERGEAtomic upsert to targetWrite failureRetry from checkpoint
Checkpoint DirectoryOffset tracking for exactly-onceCorruptionRestore from backup
WatermarkLate event handlingClock skewGrace period configuration

Code Examples

Example 1: Reading CDC Events from Kafka with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("CDC-From-Kafka") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

# Define CDC event schema
cdc_event_schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType()),
        StructField("fields", ArrayType(StructType([
            StructField("name", StringType()),
            StructField("type", StringType()),
            StructField("fields", ArrayType(StructType([
                StructField("name", StringType()),
                StructField("type", StringType()),
            ])))
        ])))
    ])),
    StructField("payload", StructType([
        StructField("before", MapType(StringType(), StringType())),
        StructField("after", MapType(StringType(), StringType())),
        StructField("source", StructType([
            StructField("version", StringType()),
            StructField("connector", StringType()),
            StructField("name", StringType()),
            StructField("ts_ms", LongType()),
            StructField("snapshot", StringType()),
            StructField("db", StringType()),
            StructField("table", StringType()),
        ])),
        StructField("op", StringType()),
        StructField("ts_ms", LongType()),
        StructField("transaction", StructType([
            StructField("id", StringType()),
            StructField("total_order", IntegerType()),
            StructField("data_collection_order", IntegerType()),
        ])),
    ]))
])

# Read CDC events from Kafka
cdc_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
    .option("subscribe", "dbserver1.public.customers,dbserver1.public.orders")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .option("maxOffsetsPerTrigger", 100000)
    .load()
)

# Parse and flatten CDC events
parsed_cdc = (
    cdc_stream
    .select(
        col("key").cast("string").alias("kafka_key"),
        from_json(col("value").cast("string"), cdc_event_schema).alias("cdc"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("timestamp").alias("kafka_timestamp")
    )
    .select(
        col("kafka_key"),
        col("cdc.payload.before").alias("before"),
        col("cdc.payload.after").alias("after"),
        col("cdc.payload.op").alias("operation"),
        col("cdc.payload.source.ts_ms").alias("source_ts_ms"),
        col("cdc.payload.source.db").alias("source_db"),
        col("cdc.payload.source.table").alias("source_table"),
        col("cdc.payload.source.connector").alias("connector"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"),
        # Deduplication key
        concat(
            col("topic"),
            lit("-"),
            col("partition"),
            lit("-"),
            col("offset")
        ).alias("event_id"),
        # Processing timestamp
        current_timestamp().alias("processed_at")
    )
)

# ─── Write to Bronze Layer (Raw CDC Events) ───
def write_bronze_batch(batch_df, batch_id):
    """Write raw CDC events to Bronze layer with idempotency."""
    if batch_df.count() == 0:
        return
    
    (
        batch_df
        .write
        .format("delta")
        .mode("append")
        .save("/mnt/lakehouse/bronze/cdc_events")
    )

bronze_query = (
    parsed_cdc
    .writeStream
    .foreachBatch(write_bronze_batch)
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/cdc_bronze")
    .trigger(processingTime="30 seconds")
    .start()
)

bronze_query.awaitTermination()

Example 2: Applying CDC to Delta Lake with MERGE

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("CDC-Apply-to-Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

def apply_cdc_to_delta(micro_batch_df, batch_id):
    """Apply CDC events to Delta Lake target table using MERGE."""
    if micro_batch_df.count() == 0:
        return
    
    # Separate operations
    creates = micro_batch_df.filter(col("operation") == "c")
    updates = micro_batch_df.filter(col("operation") == "u")
    deletes = micro_batch_df.filter(col("operation") == "d")
    
    target_table = DeltaTable.forPath(spark, "/mnt/lakehouse/silver/customers")
    
    # Apply CREATE events (INSERT)
    if creates.count() > 0:
        new_records = (
            creates
            .select("after.*")  # Flatten the 'after' map to columns
            .withColumn("_cdc_operation", lit("INSERT"))
            .withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
            .withColumn("_event_id", col("event_id"))
        )
        
        (
            target_table.alias("target")
            .merge(
                new_records.alias("source"),
                "target.id = source.id"
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
    
    # Apply UPDATE events (MERGE)
    if updates.count() > 0:
        updated_records = (
            updates
            .select("after.*")
            .withColumn("_cdc_operation", lit("UPDATE"))
            .withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
            .withColumn("_event_id", col("event_id"))
        )
        
        (
            target_table.alias("target")
            .merge(
                updated_records.alias("source"),
                "target.id = source.id"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    
    # Apply DELETE events
    if deletes.count() > 0:
        deleted_keys = deletes.select(
            col("before.id").alias("id")
        ).distinct()
        
        (
            target_table.alias("target")
            .merge(
                deleted_keys.alias("source"),
                "target.id = source.id"
            )
            .whenMatchedDelete()
            .execute()
        )

# ─── Stream CDC Events and Apply to Target ───
cdc_stream = (
    spark.readStream
    .format("delta")
    .load("/mnt/lakehouse/bronze/cdc_events")
    .filter(
        col("source_table").isin("customers", "orders")
    )
)

# Deduplicate within micro-batch (keep latest per event_id)
deduped_stream = (
    cdc_stream
    .withColumn("row_num",
        row_number().over(
            Window.partitionBy("event_id")
            .orderBy(col("processed_at").desc())
        )
    )
    .filter(col("row_num") == 1)
    .drop("row_num")
)

# Write with MERGE
merge_query = (
    deduped_stream
    .writeStream
    .foreachBatch(apply_cdc_to_delta)
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/cdc_merge")
    .trigger(processingTime="60 seconds")
    .start()
)

merge_query.awaitTermination()

Example 3: Event Sourcing Implementation with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("Event-Sourcing-CDC") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Event Store (Append-Only) ───
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("aggregate_id", StringType(), False),
    StructField("aggregate_type", StringType(), False),
    StructField("event_data", MapType(StringType(), StringType())),
    StructField("metadata", MapType(StringType(), StringType())),
    StructField("event_version", IntegerType()),
    StructField("created_at", TimestampType()),
])

# Create event store
spark.sql("""
    CREATE TABLE IF NOT EXISTS event_store (
        event_id STRING NOT NULL,
        event_type STRING NOT NULL,
        aggregate_id STRING NOT NULL,
        aggregate_type STRING NOT NULL,
        event_data MAP<STRING, STRING>,
        metadata MAP<STRING, STRING>,
        event_version INT,
        created_at TIMESTAMP
    )
    USING DELTA
    PARTITIONED BY (aggregate_type)
    TBLPROPERTIES (
        'delta.appendOnly' = 'true',
        'delta.logRetentionDuration' = 'interval 365 days'
    )
""")

# ─── Emit Events ───
def emit_event(aggregate_id, aggregate_type, event_type, event_data, metadata=None):
    """Emit an event to the event store."""
    event = spark.createDataFrame([{
        "event_id": str(uuid.uuid4()),
        "event_type": event_type,
        "aggregate_id": aggregate_id,
        "aggregate_type": aggregate_type,
        "event_data": event_data,
        "metadata": metadata or {},
        "event_version": 1,
        "created_at": datetime.now(),
    }])
    
    event.write.format("delta").mode("append").save("/mnt/lakehouse/events")

# Example: Customer lifecycle events
emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerCreated",
    event_data={"name": "Alice Johnson", "email": "alice@email.com", "city": "NYC"},
    metadata={"source": "registration_api", "correlation_id": "req-123"}
)

emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerAddressChanged",
    event_data={"old_city": "NYC", "new_city": "LA", "change_reason": "relocation"},
    metadata={"source": "profile_api", "correlation_id": "req-456"}
)

emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerEmailUpdated",
    event_data={"old_email": "alice@email.com", "new_email": "alice.new@email.com"},
    metadata={"source": "profile_api", "correlation_id": "req-789"}
)

# ─── Build Materialized View (Projection) ───
def build_customer_view(events_df, aggregate_id=None):
    """Rebuild customer state from events (event replay)."""
    filtered = events_df if aggregate_id is None else \
        events_df.filter(col("aggregate_id") == aggregate_id)
    
    # Sort by version to replay events in order
    ordered = filtered.orderBy("event_version", "created_at")
    
    # Apply events to build current state
    state = ordered.groupBy("aggregate_id").agg(
        # Extract fields from event_data maps
        last(
            when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
            ignorenulls=True
        ).alias("name"),
        last(
            when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
                 col("event_data")["email"]),
            ignorenulls=True
        ).alias("email"),
        last(
            when(col("event_type").isin("CustomerCreated", "CustomerAddressChanged"),
                 coalesce(col("event_data")["new_city"], col("event_data")["city"])),
            ignorenulls=True
        ).alias("city"),
        count("*").alias("total_events"),
        max("created_at").alias("last_updated"),
        first("created_at").alias("created_at"),
    )
    
    return state

# Build view from all events
events_df = spark.read.format("delta").load("/mnt/lakehouse/events")
customer_view = build_customer_view(events_df)
customer_view.show()

# Build view for specific customer (point-in-time query)
def build_state_at_time(events_df, aggregate_id, as_of_timestamp):
    """Build state as of a specific timestamp."""
    return (
        events_df
        .filter(
            (col("aggregate_id") == aggregate_id) &
            (col("created_at") <= as_of_timestamp)
        )
        .orderBy("event_version", "created_at")
        .groupBy("aggregate_id")
        .agg(
            last(
                when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
                ignorenulls=True
            ).alias("name"),
            last(
                when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
                     col("event_data")["email"]),
                ignorenulls=True
            ).alias("email"),
        )
    )

# Query state at specific time
state_at_may = build_state_at_time(
    events_df, "C101", 
    "2026-05-15"
)
state_at_may.show()

Performance Metrics

MetricBatch ETL (Polling)CDC (Debezium)Improvement
Latency (Source β†’ Target)4-24 hours5-30 seconds99.9% reduction
Source DB Load15-30% CPU (full scans)1-3% CPU (log read)90% reduction
Data Transferred100% of table per runOnly changes (0.1-5%)95-99% reduction
Storage (Delta Lake)Full copies per snapshotEvent log only80% reduction
Query FreshnessStale by hoursNear real-timeSub-minute
Delete DetectionImpossible (without flags)AutomaticNew capability
Ordering GuaranteesNone (race conditions)Transaction-orderedStrict ordering
Exactly-OnceDifficult (dedup needed)Checkpoint-basedGuaranteed
Operational ComplexityLowMedium-HighTrade-off
Cost (Compute)0.50/hour(batch)∣0.50/hour (batch) |0.15/hour (streaming)70% reduction

Best Practices

  1. Use Debezium for database CDC β€” Debezium provides the most mature, well-tested CDC connectors for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB. It handles log reading, snapshotting, and event serialization correctly.

  2. Enable Kafka topic compaction for latest-state queries β€” For use cases that need the current state without replaying all events, configure Kafka topic compaction (cleanup.policy=compact) to retain only the latest value per key.

  3. Implement watermark-based deduplication β€” Use PySpark's watermark feature (withWatermark) to handle late-arriving events and ensure idempotent processing within configurable time windows.

  4. Separate bronze and silver CDC processing β€” Write raw CDC events to Bronze first, then process and apply to Silver. This provides an audit trail and enables reprocessing if logic changes.

  5. Use Delta Lake MERGE for idempotent applies β€” MERGE operations are idempotent by design; running the same MERGE twice produces the same result. This is critical for exactly-once semantics in distributed systems.

  6. Monitor Kafka consumer lag β€” Set up alerts when consumer lag exceeds thresholds. High lag indicates the CDC processor cannot keep up with the source change rate.

  7. Handle schema evolution in CDC events β€” Use Delta Lake's mergeSchema option and Debezium's schema history tracking to handle source schema changes without breaking the CDC pipeline.

  8. Implement dead-letter queues β€” Route events that fail deserialization or validation to a dead-letter queue for manual inspection rather than blocking the entire pipeline.

  9. Snapshot before CDC starts β€” When setting up CDC for the first time, take a consistent snapshot of the source database and replay it through the CDC pipeline to establish a baseline.

  10. Tune micro-batch frequency β€” Balance latency vs. throughput by adjusting the trigger interval. For most use cases, 30-60 seconds provides a good balance; for sub-second latency, consider continuous processing mode.

Mathematical Foundation Summary

CDC latency follows latency = source_commit_time + replication_lag + processing_time. Kafka-based CDC achieves at-least-once delivery with throughput = batch_size / processing_time. The MERGE operation complexity is O(n Γ— m) where n is source rows and m is target rows, optimized to O(n Γ— log(m)) with join indexes. Debezium log-based CDC has minimal source impact: source_overhead β‰ˆ 0.01-0.05% of transaction throughput. Watermark-based ordering ensures event_time ≀ watermark + allowed_late for out-of-order event handling.

See also: Iceberg Integration (21), Delta Lake Operations (22), Hudi Operations (23), SCD (35), Real-Time Analytics (38)

See Also

⭐

Premium Content

Change Data Capture (CDC) with PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement