CDC: ADF CDC, Debezium & Cosmos DB Change Feed
Change Data Capture patterns for real-time data synchronization across Azure data services
CDC Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CDC ARCHITECTURE ON AZURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SOURCE SYSTEMS CDC CAPTURE TARGET β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β SQL ββββββββββββββ>β ADF CDC ββββββ>β ADLS β β
β β Server β β β β Gen2 β β
β β β β β’ LSN-based β β β β
β β β β β’ Timestamp β β Delta β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β MySQL/ ββββββββββββββ>β Debezium ββββββ>β Event β β
β β PostgreSQLβ β (Kafka β β Hubs β β
β β β β Connect) β β β β
β β β β β β Capture β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β Cosmos ββββββββββββββ>β Change Feed ββββββ>β ADLS β β
β β DB β β β β Gen2 β β
β β β β β’ Partition β β β β
β β β β aware β β Synapse β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β
β CHANGE TYPES: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ INSERT: New record added β β
β β β’ UPDATE: Existing record modified β β
β β β’ DELETE: Record removed β β
β β β’ UPSERT: Insert or update (merge) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ADF CDC Configuration
{
"name": "CopyWithCDC",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"cdcSettings": {
"startLsn": "00000025:00000128:0001",
"stopLsn": "00000025:00000128:0020"
}
},
"sink": {
"type": "ParquetSink"
}
}
}
Debezium Configuration
{
"name": "sql-server-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "sql-server.database.windows.net",
"database.port": "1433",
"database.user": "debezium",
"database.password": "${secrets:debezium-password}",
"database.names": "sales_db",
"topic.prefix": "sqlserver",
"table.include.list": "dbo.Sales,dbo.Customers",
"database.encrypt": "true",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "read_committed",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.sales"
}
}
Cosmos DB Change Feed
from azure.cosmos import CosmosClient
import time
client = CosmosClient(
"https://cosmos-prod.documents.azure.com:443/",
DefaultAzureCredential()
)
database = client.get_database_client("analytics")
container = database.get_container_client("events")
# Process change feed
continuation_token = None
while True:
response = container.query_items_change_feed(
start_time=datetime.utcnow() - timedelta(minutes=5),
continuation_token=continuation_token
)
for event in response:
process_change(event)
if response.continuation_token:
continuation_token = response.continuation_token
time.sleep(5)
βΉοΈ
Pro Tip: Use Debezium for open-source CDC from SQL Server, MySQL, PostgreSQL. Use ADF CDC for Azure-native scenarios. Use Cosmos DB Change Feed for NoSQL change tracking.
Interview Questions
Q1: How do you handle schema changes with CDC? A: Debezium automatically captures schema changes and publishes them to a schema history topic. ADF CDC handles schema evolution. Monitor schema changes and update downstream systems accordingly.
Q2: What are the limitations of CDC? A: 1) Source system must support CDC (transaction log access), 2) Increased storage for change logs, 3) Latency depends on polling interval, 4) Deleted records require soft-delete or tombstone handling.
Q3: How do you ensure idempotent CDC processing? A: Use unique change identifiers (LSN, timestamp, sequence number) to detect duplicates. Implement upsert operations instead of insert-only. Use checkpointing to track last processed position.