Data Ingestion Patterns: Bringing Data into the Pipeline
Data ingestion is the process of moving data from source systems into a data platform for processing, analysis, and storage.
The Ingestion Landscape
Data Sources:
- Relational databases β PostgreSQL, MySQL
- NoSQL stores β MongoDB, Cassandra
- SaaS APIs β Salesforce, HubSpot
- Streaming platforms β Kafka, Kinesis
- File systems β SFTP, S3
- Message queues β RabbitMQ, SQS
Key Considerations:
- Source capabilities β what does the source support?
- Data volume β how much data needs to be ingested?
- Latency requirements β real-time or batch?
- Impact tolerance β can the source handle the load?
Key Insight: Each source demands a different ingestion strategy based on its capabilities, data volume, latency requirements, and impact tolerance.
Data Ingestion Patterns Overview
CDC Flow Diagram
Architecture Diagram
Batch ingestion loads data in discrete chunks at scheduled intervals (hourly, daily, weekly). Data is extracted from source systems as complete snapshots or delta files and loaded into the target platform. Batch ingestion is simple, well-understood, and minimizes source system impact, but provides high latency (data is stale by the batch interval).
Streaming ingestion processes data as it arrives, event by event or in micro-batches. Data is ingested continuously from source systems (Kafka, Kinesis, event logs) with low latency (seconds to minutes). Streaming ingestion provides real-time freshness but requires more complex infrastructure (message queues, state management, exactly-once processing).
CDC is a technique that captures changes (inserts, updates, deletes) made to data in a source database and delivers them to downstream systems in real-time. CDC reads the database transaction log (WAL, binlog, redo log) to capture changes without impacting source system performance. Unlike polling-based approaches, CDC provides low-latency, low-impact, and complete change capture including deletes.
API polling is the process of periodically calling an API endpoint to retrieve new or updated data. The polling frequency determines data freshness. Common patterns: (1) Timestamp-based β request records modified since last poll, (2) Cursor-based β use pagination cursors to iterate through results, (3) Webhook-based β push model where the source notifies the consumer of changes.
File watching monitors a directory for new, modified, or deleted files and triggers ingestion pipelines when changes are detected. Common implementations: inotify (Linux), WatchService (Java), Watchdog (Python). File watching is used for SFTP drops, CSV exports, and legacy system integrations where API access is unavailable.
Batch Ingestion Throughput
Batch ingestion throughput = Total_records / (T_extract + T_transfer + T_load). For a daily batch: T_batch = T_extract + T_transfer + T_load < 24 hours. Parallel extraction from N source shards: T_extract = max(T_shard_i) for i in 1..N. Parallel loading to M target partitions: T_load = Total_records / (M * Load_rate_per_partition).
CDC Latency
CDC end-to-end latency = T_log_read + T_network + T_deserialize + T_transform + T_load. For Debezium with Kafka: T_CDC β 100ms-1s (typical). For batch polling: T_polling = poll_interval / 2 (average). CDC latency is independent of data volume; polling latency scales with data volume.
CDC based on transaction log reading provides a completeness guarantee: all committed changes to the source database are captured, including inserts, updates, and deletes. The guarantee holds if: (1) the CDC connector reads the complete transaction log, (2) the log retention period exceeds the maximum CDC lag, and (3) the connector is not restarted from a stale offset. Formal: Captured_changes = {all committed writes to source} within log retention window.
Any ingestion pipeline must be idempotent: re-running the same ingestion with the same input produces identical output. Formally: f(input, run_id) = f(input, run_id') for all run_id, run_id'. Idempotency is achieved via: (1) deterministic deduplication keys, (2) UPSERT/MERGE operations instead of INSERT, (3) transactional writes, (4) idempotent consumers in streaming systems.
Key Concepts
| Concept | Description | Use Case |
|---|---|---|
| Batch Loading | Scheduled bulk data transfer | Daily ETL, data warehouse refresh |
| Streaming Ingestion | Real-time event processing | Clickstream, IoT, fraud detection |
| CDC (Change Data Capture) | Capture database changes via log | Real-time replication, data sync |
| Debezium | Open-source CDC connector | MySQL, PostgreSQL, MongoDB CDC |
| Maxwell | MySQL CDC daemon | MySQL binlog streaming |
| Debezium vs Maxwell | Debezium supports more databases; Maxwell is MySQL-only | Database-specific needs |
| API Polling | Periodic API calls for data retrieval | SaaS integrations (Salesforce, HubSpot) |
| Webhooks | Push-based event notifications | Real-time SaaS event delivery |
| File Watching | Monitor directories for new files | SFTP drops, legacy system exports |
| inotify | Linux kernel file system events | File creation, modification, deletion |
| Airbyte | Open-source data integration platform | ELT from 300+ sources |
| Fivetran | Managed ELT platform | Automated SaaS connector management |
| Deduplication | Remove duplicate records | Event replay, at-least-once delivery |
| Upsert/MERGE | Insert-or-update operations | Idempotent loads, SCD Type 1 |
| Watermark | Track ingestion progress | Gap detection, backfill triggers |
| Schema Registry | Enforce schema at ingestion time | Prevent malformed data from entering pipeline |
| Dead Letter Queue | Store failed ingestion records | Debugging, retry, manual intervention |
| Backpressure | Flow control for overwhelmed consumers | Prevent memory exhaustion in streaming |
- Assess source capabilities: Does the source support CDC (database logs)? Does it have an API? Does it emit events?
- Determine latency requirement: Real-time (< 5 min) -> streaming/CDC. Near-real-time (5-60 min) -> micro-batch. Batch (1-24 hours) -> batch loading.
- Evaluate source impact tolerance: Can the source handle polling queries? Is it OK to read transaction logs? Is there API rate limiting?
- Consider data volume: High-volume (> 1TB/day) -> streaming/CDC to avoid batch window pressure. Low-volume -> batch is simpler.
- Assess data complexity: Structured -> CDC or batch. Semi-structured (JSON) -> streaming or API. Unstructured -> file watching.
- Check for deletes: If capturing deletes is required, CDC is the only reliable pattern. Batch polling cannot capture deletes without soft-delete flags.
- Evaluate operational maturity: Batch is simplest. CDC requires Debezium, Kafka, and connector management. Webhooks require endpoint infrastructure.
- Design for idempotency: All ingestion patterns must handle re-runs gracefully. Use deduplication keys and upsert operations.
- Implement monitoring: Track ingestion lag, error rates, and data completeness. Alert on anomalies.
- Document data contracts: Define expected schemas, SLAs, and error handling with source system owners.
Production Code
CDC Pipeline with Debezium and Kafka
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from typing import Dict, List
from dataclasses import dataclass, asdict
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class CDCEvent:
"""Represents a Change Data Capture event from Debezium."""
operation: str # c=create, u=update, d=delete
before: Dict
after: Dict
source: Dict
ts_ms: int
table: str
class DebeziumCDCProcessor:
"""Process CDC events from Debezium via Kafka."""
def __init__(self, kafka_config: Dict, topic: str):
self.topic = topic
self.consumer = Consumer({
"bootstrap.servers": kafka_config["bootstrap.servers"],
"group.id": "cdc-processor",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
self.producer = Producer({
"bootstrap.servers": kafka_config["bootstrap.servers"],
"acks": "all",
"enable.idempotence": True,
})
def parse_cdc_event(self, raw_value: str) -> CDCEvent:
"""Parse Debezium CDC envelope format."""
envelope = json.loads(raw_value)
return CDCEvent(
operation=envelope["op"],
before=envelope.get("before"),
after=envelope.get("after"),
source=envelope["source"],
ts_ms=envelope["ts_ms"],
table=envelope["source"]["table"],
)
def process_create(self, event: CDCEvent) -> Dict:
"""Handle new record creation."""
return {
"action": "insert",
"table": event.table,
"data": event.after,
"cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
}
def process_update(self, event: CDCEvent) -> Dict:
"""Handle record update with change detection."""
changes = {}
if event.before and event.after:
for key in event.after:
if event.before.get(key) != event.after.get(key):
changes[key] = {
"old": event.before.get(key),
"new": event.after.get(key),
}
return {
"action": "update",
"table": event.table,
"data": event.after,
"changes": changes,
"cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
}
def process_delete(self, event: CDCEvent) -> Dict:
"""Handle record deletion."""
return {
"action": "delete",
"table": event.table,
"data": event.before,
"cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
}
def run(self, max_events: int = None):
"""Main consumption loop for CDC events."""
self.consumer.subscribe([self.topic])
event_count = 0
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
logger.error(f"Consumer error: {msg.error()}")
continue
event = self.parse_cdc_event(msg.value().decode("utf-8"))
if event.operation == "c":
result = self.process_create(event)
elif event.operation == "u":
result = self.process_update(event)
elif event.operation == "d":
result = self.process_delete(event)
else:
logger.warning(f"Unknown operation: {event.operation}")
continue
# Produce processed event to downstream topic
self.producer.produce(
topic=f"{self.topic}.processed",
key=event.after.get("id", "").encode() if event.after else None,
value=json.dumps(result, default=str).encode(),
)
event_count += 1
if max_events and event_count >= max_events:
break
if event_count % 1000 == 0:
logger.info(f"Processed {event_count} CDC events")
self.consumer.commit()
finally:
self.consumer.commit()
self.consumer.close()
self.producer.flush()
# Debezium connector configuration (JSON)
DEBEZIUM_CONFIG = {
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${secrets:debezium-password}",
"database.dbname": "production",
"database.server.name": "production",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"table.include.list": "public.orders,public.customers",
"tombstones.on.delete": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
},
}
Batch Ingestion with Deduplication
import pandas as pd
import hashlib
from datetime import datetime, timedelta
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class BatchIngestor:
"""Batch ingestion with idempotent deduplication and upsert operations."""
def __init__(self, target_connection, dedup_key: str = "id"):
self.target_connection = target_connection
self.dedup_key = dedup_key
def extract_with_watermark(
self,
source_connection,
query: str,
watermark_table: str,
last_watermark: Optional[datetime] = None,
) -> pd.DataFrame:
"""Extract data from source using watermark for incremental loading."""
if last_watermark is None:
# First run: get the max value from the target
watermark_query = f"SELECT MAX(updated_at) FROM {watermark_table}"
last_watermark = pd.read_sql(watermark_query, self.target_connection).iloc[0, 0]
if last_watermark is None:
# No existing data: full load
return pd.read_sql(query, source_connection)
# Incremental: only rows modified since last watermark
incremental_query = f"""
{query}
WHERE updated_at > '{last_watermark.isoformat()}'
"""
logger.info(f"Extracting incremental data since {last_watermark}")
return pd.read_sql(incremental_query, source_connection)
def compute_record_hash(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute a hash of all columns for change detection."""
hash_columns = [col for col in df.columns if col != self.dedup_key]
df["_record_hash"] = df[hash_columns].apply(
lambda row: hashlib.md5(
"|".join(str(v) for v in row).encode()
).hexdigest(),
axis=1,
)
return df
def upsert(self, df: pd.DataFrame, target_table: str) -> dict:
"""Perform idempotent upsert (insert-or-update) operation."""
if df.empty:
return {"inserted": 0, "updated": 0, "unchanged": 0}
df = self.compute_record_hash(df)
# Read existing records for comparison
existing_query = f"""
SELECT {self.dedup_key}, _record_hash
FROM {target_table}
WHERE {self.dedup_key} IN ({','.join(df[self.dedup_key].astype(str))})
"""
try:
existing = pd.read_sql(existing_query, self.target_connection)
except Exception:
existing = pd.DataFrame(columns=[self.dedup_key, "_record_hash"])
# Merge existing hashes with new data
merged = df.merge(
existing, on=self.dedup_key, how="left", suffixes=("", "_existing")
)
# Classify records
new_records = merged[merged["_record_hash_existing"].isna()]
changed_records = merged[
(merged["_record_hash_existing"].notna()) &
(merged["_record_hash"] != merged["_record_hash_existing"])
]
unchanged_records = merged[
(merged["_record_hash_existing"].notna()) &
(merged["_record_hash"] == merged["_record_hash_existing"])
]
# Insert new records
if not new_records.empty:
insert_df = new_records.drop(columns=["_record_hash_existing"])
insert_df.to_sql(
target_table,
self.target_connection,
if_exists="append",
index=False,
method="multi",
chunksize=500,
)
logger.info(f"Inserted {len(new_records)} new records")
# Update changed records (using temporary table for efficiency)
if not changed_records.empty:
update_df = changed_records.drop(columns=["_record_hash_existing"])
temp_table = f"{target_table}_temp_update"
update_df.to_sql(
temp_table,
self.target_connection,
if_exists="replace",
index=False,
)
# Execute MERGE/UPSERT via SQL
merge_query = f"""
INSERT INTO {target_table}
SELECT * FROM {temp_table}
ON CONFLICT ({self.dedup_key})
DO UPDATE SET
{', '.join(
f"{col} = EXCLUDED.{col}"
for col in update_df.columns
if col != self.dedup_key
)}
"""
self.target_connection.execute(merge_query)
self.target_connection.execute(f"DROP TABLE {temp_table}")
logger.info(f"Updated {len(changed_records)} changed records")
return {
"inserted": len(new_records),
"updated": len(changed_records),
"unchanged": len(unchanged_records),
}
def update_watermark(self, watermark_table: str, max_timestamp: datetime):
"""Update the watermark to track ingestion progress."""
watermark_df = pd.DataFrame([{
"pipeline": watermark_table,
"last_watermark": max_timestamp,
"updated_at": datetime.utcnow(),
}])
watermark_df.to_sql(
"ingestion_watermarks",
self.target_connection,
if_exists="append",
index=False,
)
logger.info(f"Watermark updated to {max_timestamp}")
# Usage
ingestor = BatchIngestor(target_connection=engine, dedup_key="order_id")
df = ingestor.extract_with_watermark(
source_connection=source_engine,
query="SELECT * FROM orders",
watermark_table="orders_fact",
)
results = ingestor.upsert(df, target_table="orders_fact")
logger.info(f"Ingestion results: {results}")
CDC vs Batch Polling: CDC reads the database transaction log, which provides: (1) low latency (< 1s), (2) no source system load, (3) capture of deletes, (4) ordering guarantees. Batch polling is simpler but has: (1) higher latency (minutes to hours), (2) source system query load, (3) cannot capture deletes without soft-delete flags, (4) no ordering guarantees.
Webhook Best Practices: Webhooks are push-based notifications from source systems. Implement: (1) signature verification for security, (2) idempotent processing (webhooks may be retried), (3) a durable queue (SQS, Kafka) between webhook receiver and processing pipeline, (4) retry with exponential backoff for transient failures, (5) a dead letter queue for persistent failures.
- Batch ingestion is simple, well-understood, and suitable for high-volume, low-latency-tolerant workloads.
- Streaming ingestion provides real-time freshness but requires more complex infrastructure and expertise.
- CDC (Debezium, Maxwell) captures database changes via transaction logs with low latency and minimal source impact.
- API polling is the simplest integration pattern but limited by rate limits and cannot capture deletes without soft-delete flags.
- File watching (inotify, Watchdog) handles SFTP drops and legacy system integrations.
- All ingestion pipelines must be idempotent: re-running produces identical results via deduplication keys and upsert operations.
- Monitor ingestion lag, error rates, and data completeness. Alert on anomalies.
Best Practices
- Implement idempotent ingestion β use deduplication keys and upsert/MERGE operations so re-runs produce identical results.
- Use CDC (Debezium) for relational databases when real-time freshness and delete capture are required.
- Set watermarks to track ingestion progress and detect gaps. Alert when gaps exceed SLA thresholds.
- Use schema registry at ingestion time to reject malformed data before it enters the pipeline.
- Implement dead letter queues for records that fail ingestion. Never silently drop failed records.
- Rate-limit API polling to respect source system rate limits. Use exponential backoff for 429 responses.
- Verify webhook signatures to prevent unauthorized data injection. Use HMAC-SHA256 for signature validation.
- Monitor source system impact β CDC should have negligible impact; polling should not exceed 10% of source CPU.
- Design for backfill β support full reprocessing from raw data without modifying the ingestion pipeline.
- Document data contracts with source system owners: expected schemas, update frequencies, and SLAs.
Ingestion Pattern Selection Matrix
| Pattern | Latency | Source Impact | Delete Capture | Complexity | Best For |
|---|---|---|---|---|---|
| Batch Loading | Hours | High (queries) | No | Low | Daily ETL, analytics |
| CDC (Debezium) | Seconds | Low (log read) | Yes | High | Real-time replication |
| API Polling | Minutes | Medium (rate limits) | No | Low | SaaS integrations |
| Webhooks | Seconds | None (push) | Yes | Medium | Real-time SaaS events |
| File Watching | Minutes | None | Partial | Low | Legacy systems, SFTP |
| Streaming Ingest | Sub-second | None | Yes | High | IoT, clickstream |
CDC Tool Comparison
| Feature | Debezium | Maxwell | Fivetran | Airbyte |
|---|---|---|---|---|
| Database Support | MySQL, PostgreSQL, MongoDB, Oracle | MySQL only | 200+ sources | 300+ sources |
| Deployment | Kafka Connect | Standalone daemon | Managed SaaS | Self-hosted/Cloud |
| Log Reading | WAL, binlog, redo | binlog | Proprietary | Varies |
| Schema Evolution | Yes | Limited | Yes | Yes |
| Cost | Free (OSS) | Free (OSS) | Paid | Free/Paid |
See Also
- 019 - Apache Kafka: Topics, Producers, and Consumers - Kafka as ingestion backbone
- 016 - ETL vs ELT - Transformation paradigm after ingestion
- 025 - Data Quality: Validation Frameworks - Quality checks at ingestion
- 022 - Spark Structured Streaming - Streaming ingestion processing
- 028 - Error Handling, Retries, and Dead Letter Queues - Error handling for ingestion