πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Change Data Capture: Debezium, Log-based, Trigger-based

Data EngineeringData Integration⭐ Premium

Advertisement

LinkedIn & Uber Interview

Change Data Capture: Debezium, Log-based, Trigger-based

Capturing and streaming database changes in real-time

Interview Question

"Design a CDC pipeline that: (1) captures changes from PostgreSQL, (2) handles schema evolution, (3) provides exactly-once delivery, (4) supports 100K changes/second, (5) allows point-in-time recovery. Compare log-based vs trigger-based approaches and justify your choice."

Difficulty: Hard | Frequently asked at LinkedIn, Uber, Netflix, Stripe


Theoretical Foundation

What is Change Data Capture (CDC)?

CDC captures changes (INSERT, UPDATE, DELETE) made to a database and delivers them to downstream systems.

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CDC Architecture                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Source Database ──▢ CDC Connector ──▢ Message Queue ──▢ Sinkβ”‚
β”‚                                                             β”‚
β”‚  PostgreSQL         Debezium           Kafka          S3/   β”‚
β”‚  MySQL              Maxwell            Kinesis        Delta  β”‚
β”‚  SQL Server         GoldenGate         EventBridge    Lake   β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CDC Approaches

1. Log-based CDC (Debezium)

Reads database transaction log to capture changes.

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Log-based CDC (Debezium)                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  PostgreSQL WAL ──▢ Debezium Connector ──▢ Kafka            β”‚
β”‚                                                             β”‚
β”‚  WAL (Write-Ahead Log):                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ LSN 100: INSERT INTO orders (id, amount) VALUES (1, 100)β”‚
β”‚  β”‚ LSN 101: UPDATE orders SET amount = 150 WHERE id = 1    β”‚
β”‚  β”‚ LSN 102: DELETE FROM orders WHERE id = 2                 β”‚
β”‚  β”‚ LSN 103: COMMIT                                          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β”‚  Debezium reads WAL and converts to Kafka messages:        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ {"op": "c", "before": null, "after": {id: 1, amount: 100}}β”‚
β”‚  β”‚ {"op": "u", "before": {amount: 100}, "after": {amount: 150}}β”‚
β”‚  β”‚ {"op": "d", "before": {id: 2}, "after": null}            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Pros:

  • No impact on source database
  • Captures all changes (including deletes)
  • Low latency (milliseconds)
  • Guarantees order within transaction

Cons:

  • Requires database-specific connector
  • May expose sensitive data in logs
  • Log retention affects recovery window

2. Trigger-based CDC

Database triggers fire on changes and write to a change table.

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Trigger-based CDC                              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Source Table: orders                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ CREATE TRIGGER orders_cdc_trigger                    β”‚   β”‚
β”‚  β”‚ AFTER INSERT OR UPDATE OR DELETE ON orders           β”‚   β”‚
β”‚  β”‚ FOR EACH ROW                                         β”‚   β”‚
β”‚  β”‚ EXECUTE FUNCTION log_change();                       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β”‚  Change Table: orders_changelog                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ id β”‚ operation β”‚ old_data β”‚ new_data β”‚ changed_at    β”‚   β”‚
β”‚  β”‚ 1  β”‚ INSERT    β”‚ NULL     β”‚ {...}    β”‚ 2024-01-15    β”‚   β”‚
β”‚  β”‚ 1  β”‚ UPDATE    β”‚ {...}    β”‚ {...}    β”‚ 2024-01-15    β”‚   β”‚
β”‚  β”‚ 2  β”‚ DELETE    β”‚ {...}    β”‚ NULL     β”‚ 2024-01-15    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β”‚  Connector reads change table and sends to Kafka:          β”‚
β”‚  Polling query: SELECT * FROM orders_changelog              β”‚
β”‚                 WHERE id > last_processed_id                β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Pros:

  • Database-agnostic
  • No special permissions needed
  • Simple to implement

Cons:

  • Performance impact on source (triggers)
  • Only captures committed changes
  • May miss changes if trigger fails
  • Requires polling

3. Timestamp-based CDC

Uses a last_updated timestamp to find changes.

-- Query for changes since last run
SELECT * FROM orders 
WHERE updated_at > :last_run_timestamp
ORDER BY updated_at;

Pros:

  • Simple to implement
  • No database modifications needed

Cons:

  • Misses deletions
  • May miss concurrent updates
  • Requires indexed timestamp column

4. Version-based CDC

Adds a version number that increments on each change.

-- Query for changes since last version
SELECT * FROM orders 
WHERE version > :last_version
ORDER BY version;

Pros:

  • Captures all changes
  • Simple to query
  • No timestamp issues

Cons:

  • Requires schema change
  • Version column must be maintained

Debezium Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 Debezium Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  PostgreSQL ──▢ Debezium Connector ──▢ Kafka Connect ──▢ Kafkaβ”‚
β”‚                                                             β”‚
β”‚  Debezium Components:                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ 1. Snapshotter: Initial data snapshot                β”‚   β”‚
β”‚  β”‚ 2. WAL Reader: Reads transaction log                 β”‚   β”‚
β”‚  β”‚ 3. Schema History: Tracks schema changes             β”‚   β”‚
β”‚  β”‚ 4. Offset Storage: Tracks reading position           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β”‚  Kafka Topics:                                              β”‚
β”‚  - {server}.{schema}.{table}: Change events                β”‚
β”‚  - {server}.schema.history: Schema change events           β”‚
β”‚  - {server}.offsets: Debezium offsets                       β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Exactly-Once CDC

Exactly-Once CDC FlowPostgreSQL WAL→Debezium→Kafka Transaction→Sink(offset + data)On failure: Kafka transaction aborted → Debezium retries from last committed offset → Exactly-once guaranteed

Code Implementation

Debezium PostgreSQL Connector

{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${secrets:postgres-password}",
    "database.dbname": "orders_db",
    "database.server.name": "orders_server",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_publication",
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    "tombstones.on.delete": true,
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "producer.override.max.request.size": "10485760",
    "producer.override.batch.size": "65536",
    "producer.override.linger.ms": "10"
  }
}

Debezium Kafka Connect Setup

import requests
import json

# ============================================================
# KAFKA CONNECT SETUP
# ============================================================

KAFKA_CONNECT_URL = "http://kafka-connect:8083"

# Create connector
def create_debezium_connector():
    """Create Debezium PostgreSQL connector"""
    
    config = {
        "name": "postgres-cdc",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "password",
            "database.dbname": "orders_db",
            "database.server.name": "orders_server",
            "table.include.list": "public.orders",
            "plugin.name": "pgoutput",
            "snapshot.mode": "initial",
            "tombstones.on.delete": "true",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        }
    }
    
    response = requests.post(
        f"{KAFKA_CONNECT_URL}/connectors",
        json=config,
        headers={"Content-Type": "application/json"}
    )
    
    if response.status_code == 201:
        print("Connector created successfully")
    else:
        print(f"Error: {response.text}")

# Check connector status
def get_connector_status(connector_name):
    """Get connector status"""
    
    response = requests.get(
        f"{KAFKA_CONNECT_URL}/connectors/{connector_name}/status"
    )
    
    return response.json()

# Restart connector
def restart_connector(connector_name):
    """Restart connector"""
    
    response = requests.post(
        f"{KAFKA_CONNECT_URL}/connectors/{connector_name}/restart"
    )
    
    return response.status_code == 204

Trigger-based CDC Implementation

-- ============================================================
-- TRIGGER-BASED CDC IN POSTGRESQL
-- ============================================================

-- Create change log table
CREATE TABLE orders_changelog (
    changelog_id BIGSERIAL PRIMARY KEY,
    table_name VARCHAR(100),
    operation VARCHAR(10),  -- INSERT, UPDATE, DELETE
    old_data JSONB,
    new_data JSONB,
    changed_by VARCHAR(100),
    changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create CDC function
CREATE OR REPLACE FUNCTION log_orders_change()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO orders_changelog (table_name, operation, new_data, changed_by)
        VALUES ('orders', 'INSERT', to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO orders_changelog (table_name, operation, old_data, new_data, changed_by)
        VALUES ('orders', 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO orders_changelog (table_name, operation, old_data, changed_by)
        VALUES ('orders', 'DELETE', to_jsonb(OLD), current_user);
        RETURN OLD;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Create trigger
CREATE TRIGGER orders_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW
EXECUTE FUNCTION log_orders_change();

-- Create index for efficient polling
CREATE INDEX idx_orders_changelog_id ON orders_changelog(changelog_id);
CREATE INDEX idx_orders_changelog_timestamp ON orders_changelog(changed_at);

CDC Consumer in Python

from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

# ============================================================
# CDC CONSUMER
# ============================================================

spark = SparkSession.builder \
    .appName("CDCConsumer") \
    .getOrCreate()

# Read CDC events from Kafka
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse CDC events
schema = "op STRING, before STRUCT<id BIGINT, amount DOUBLE>, after STRUCT<id BIGINT, amount DOUBLE>, source STRUCT<ts_ms BIGINT>"

parsed_stream = cdc_stream \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Process INSERT operations
inserts = parsed_stream.filter(F.col("op") == "c") \
    .select("after.*")

# Process UPDATE operations
updates = parsed_stream.filter(F.col("op") == "u") \
    .select("after.*")

# Process DELETE operations
deletes = parsed_stream.filter(F.col("op") == "d") \
    .select("before.*")

# Write to Delta Lake
def process_cdc_batch(batch_df, batch_id):
    """Process a batch of CDC events"""
    
    # Separate by operation type
    inserts = batch_df.filter(F.col("op") == "c")
    updates = batch_df.filter(F.col("op") == "u")
    deletes = batch_df.filter(F.col("op") == "d")
    
    # Merge into target table
    from delta.tables import DeltaTable
    
    target_table = DeltaTable.forPath(spark, "/delta-lake/orders/")
    
    # Process inserts and updates
    changes = inserts.unionByName(updates).select("after.*")
    
    if changes.count() > 0:
        target_table.alias("target").merge(
            changes.alias("source"),
            "target.id = source.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    
    # Process deletes
    if deletes.count() > 0:
        delete_ids = deletes.select("before.id")
        
        target_table.alias("target").merge(
            delete_ids.alias("source"),
            "target.id = source.id"
        ).whenMatchedDelete() \
         .execute()

# Write stream with foreachBatch
query = parsed_stream.writeStream \
    .foreachBatch(process_cdc_batch) \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/cdc/") \
    .start()

Schema Evolution in CDC

# ============================================================
# SCHEMA EVOLUTION IN CDC
# ============================================================

def handle_cdc_schema_evolution(batch_df, batch_id):
    """Handle schema evolution in CDC events"""
    
    # Get current schema
    target_schema = spark.read.format("delta").load("/delta-lake/orders/").schema
    
    # Check for new columns
    batch_columns = set(batch_df.columns)
    target_columns = set(target_schema.fieldNames())
    
    new_columns = batch_columns - target_columns
    
    if new_columns:
        print(f"New columns detected: {new_columns}")
        
        # Add new columns to target table
        for col_name in new_columns:
            spark.sql(f"""
                ALTER TABLE delta.`/delta-lake/orders/` 
                ADD COLUMN {col_name} STRING
            """)
    
    # Process CDC events
    process_cdc_batch(batch_df, batch_id)

# Use schema evolution handler
query = parsed_stream.writeStream \
    .foreachBatch(handle_cdc_schema_evolution) \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/cdc-schema-evolution/") \
    .start()

Point-in-Time Recovery

# ============================================================
# POINT-IN-TIME RECOVERY
# ============================================================

def recover_to_timestamp(target_table_path, recovery_timestamp):
    """Recover table to a specific timestamp"""
    
    # Read table at specified timestamp
    df_at_timestamp = spark.read.format("delta") \
        .option("timestampAsOf", recovery_timestamp) \
        .load(target_table_path)
    
    # Overwrite current table
    df_at_timestamp.write.format("delta") \
        .mode("overwrite") \
        .save(target_table_path)
    
    print(f"Table recovered to {recovery_timestamp}")

def recover_to_version(target_table_path, version):
    """Recover table to a specific version"""
    
    # Read table at specified version
    df_at_version = spark.read.format("delta") \
        .option("versionAsOf", version) \
        .load(target_table_path)
    
    # Overwrite current table
    df_at_version.write.format("delta") \
        .mode("overwrite") \
        .save(target_table_path)
    
    print(f"Table recovered to version {version}")

# Get table versions
def get_table_versions(table_path):
    """Get all available versions of a table"""
    
    from delta.tables import DeltaTable
    
    delta_table = DeltaTable.forPath(spark, table_path)
    history = delta_table.history()
    
    return history.select("version", "timestamp", "operation").collect()

Performance Optimization

# ============================================================
# PERFORMANCE OPTIMIZATION
# ============================================================

# 1. Batch CDC events
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders") \
    .option("maxOffsetsPerTrigger", 100000) \
    .load()

# 2. Use micro-batch processing
query = cdc_stream.writeStream \
    .trigger(processingTime="30 seconds") \
    .format("delta") \
    .start()

# 3. Optimize Delta Lake after CDC
def optimize_delta_table(table_path):
    """Optimize Delta Lake table after CDC processing"""
    
    spark.sql(f"""
        OPTIMIZE delta.`{table_path}`
        ZORDER BY (id)
    """)

πŸ’‘

Production Tip: For high-volume CDC (100K+ changes/second), use: (1) Kafka partitions for parallelism, (2) Delta Lake MERGE for efficient upserts, (3) Z-Order for query performance, and (4) Auto-optimize for small file compaction.


Common Follow-Up Questions

Q1: How do you handle out-of-order CDC events?

# Use watermarking for out-of-order events
windowed_stream = cdc_stream \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "table_name"
    ) \
    .agg(F.count("*").alias("change_count"))

Q2: How do you handle CDC from multiple tables?

# Subscribe to multiple tables
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("subscribe", "orders,customers,products") \
    .load()

# Route to different handlers
def route_cdc_events(batch_df, batch_id):
    """Route CDC events to appropriate handlers"""
    
    for table_name in ["orders", "customers", "products"]:
        table_events = batch_df.filter(F.col("topic") == table_name)
        
        if table_events.count() > 0:
            handle_table_cdc(table_name, table_events)

Q3: How do you monitor CDC pipelines?

# Monitor CDC lag
def monitor_cdc_lag():
    """Monitor CDC lag between source and target"""
    
    # Get latest source timestamp
    source_latest = spark.sql("SELECT MAX(updated_at) FROM orders").collect()[0][0]
    
    # Get latest target timestamp
    target_latest = spark.read.format("delta") \
        .load("/delta-lake/orders/") \
        .agg(F.max("_cdc_timestamp")).collect()[0][0]
    
    lag = (source_latest - target_latest).total_seconds()
    
    if lag > 300:  # 5 minutes
        print(f"WARNING: CDC lag is {lag} seconds")
    
    return lag

Q4: How do you handle schema changes in CDC?

# Schema evolution with Debezium
# Debezium automatically captures schema changes from WAL
# Consumer can handle schema evolution using mergeSchema option

df = spark.read.format("delta") \
    .option("mergeSchema", "true") \
    .load("/delta-lake/orders/")

⚠️

Critical Consideration: CDC can capture sensitive data (PII, financial). Always: (1) encrypt data in transit and at rest, (2) implement access controls, (3) mask sensitive fields, and (4) comply with regulations (GDPR, CCPA).


Company-Specific Tips

LinkedIn Interview Tips

  • Discuss Debezium and Kafka Connect
  • Explain schema evolution in CDC
  • Mention exactly-once delivery guarantees
  • Talk about multi-database CDC

Uber Interview Tips

  • Focus on real-time ride data CDC
  • Discuss geospatial data replication
  • Mention multi-region CDC
  • Talk about payment data consistency

Netflix Interview Tips

  • Discuss content metadata CDC
  • Explain A/B testing data replication
  • Mention multi-cloud CDC
  • Talk about data lake ingestion

ℹ️

Final Takeaway: CDC is essential for real-time data integration. Log-based CDC (Debezium) is preferred for most use cases due to low impact and high throughput. Trigger-based CDC is simpler but has performance implications. Always consider: (1) throughput requirements, (2) latency requirements, (3) schema evolution, and (4) exactly-once delivery.

Advertisement