πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Ingestion Patterns: Batch, Streaming, CDC, and APIs

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Data Ingestion Patterns: Bringing Data into the Pipeline

Data ingestion is the process of moving data from source systems into a data platform for processing, analysis, and storage.

The Ingestion Landscape


Data Sources:

  • Relational databases β€” PostgreSQL, MySQL
  • NoSQL stores β€” MongoDB, Cassandra
  • SaaS APIs β€” Salesforce, HubSpot
  • Streaming platforms β€” Kafka, Kinesis
  • File systems β€” SFTP, S3
  • Message queues β€” RabbitMQ, SQS

Key Considerations:

  1. Source capabilities β€” what does the source support?
  2. Data volume β€” how much data needs to be ingested?
  3. Latency requirements β€” real-time or batch?
  4. Impact tolerance β€” can the source handle the load?

Key Insight: Each source demands a different ingestion strategy based on its capabilities, data volume, latency requirements, and impact tolerance.

Data Ingestion Patterns Overview

SourcesRDBMSSaaS APIsFiles (CSV/JSON)Event StreamsIngestion PatternsBatch LoadingCDC (Debezium)API PollingStreaming IngestTarget ZonesRaw ZoneStaging ZoneCurated ZoneBatch: Scheduled bulk loads | CDC: Real-time DB log capture | API: Periodic polling | Streaming: Event-driven

CDC Flow Diagram

CDC Flow: DB WAL to TargetDB WALBinlog/RedoDebeziumCDC ConnectorKafkaEvent TopicsTransformSpark / FlinkTargetWarehouseLog read~100msDurable queueProcessLoadEnd-to-end CDC Latency: ~100ms to 1 second (vs minutes for batch polling)

Architecture Diagram

Batch ingestion loads data in discrete chunks at scheduled intervals (hourly, daily, weekly). Data is extracted from source systems as complete snapshots or delta files and loaded into the target platform. Batch ingestion is simple, well-understood, and minimizes source system impact, but provides high latency (data is stale by the batch interval).

Streaming ingestion processes data as it arrives, event by event or in micro-batches. Data is ingested continuously from source systems (Kafka, Kinesis, event logs) with low latency (seconds to minutes). Streaming ingestion provides real-time freshness but requires more complex infrastructure (message queues, state management, exactly-once processing).

CDC is a technique that captures changes (inserts, updates, deletes) made to data in a source database and delivers them to downstream systems in real-time. CDC reads the database transaction log (WAL, binlog, redo log) to capture changes without impacting source system performance. Unlike polling-based approaches, CDC provides low-latency, low-impact, and complete change capture including deletes.

API polling is the process of periodically calling an API endpoint to retrieve new or updated data. The polling frequency determines data freshness. Common patterns: (1) Timestamp-based β€” request records modified since last poll, (2) Cursor-based β€” use pagination cursors to iterate through results, (3) Webhook-based β€” push model where the source notifies the consumer of changes.

File watching monitors a directory for new, modified, or deleted files and triggers ingestion pipelines when changes are detected. Common implementations: inotify (Linux), WatchService (Java), Watchdog (Python). File watching is used for SFTP drops, CSV exports, and legacy system integrations where API access is unavailable.

Batch Ingestion Throughput

Batch ingestion throughput = Total_records / (T_extract + T_transfer + T_load). For a daily batch: T_batch = T_extract + T_transfer + T_load < 24 hours. Parallel extraction from N source shards: T_extract = max(T_shard_i) for i in 1..N. Parallel loading to M target partitions: T_load = Total_records / (M * Load_rate_per_partition).

CDC Latency

CDC end-to-end latency = T_log_read + T_network + T_deserialize + T_transform + T_load. For Debezium with Kafka: T_CDC β‰ˆ 100ms-1s (typical). For batch polling: T_polling = poll_interval / 2 (average). CDC latency is independent of data volume; polling latency scales with data volume.

CDC based on transaction log reading provides a completeness guarantee: all committed changes to the source database are captured, including inserts, updates, and deletes. The guarantee holds if: (1) the CDC connector reads the complete transaction log, (2) the log retention period exceeds the maximum CDC lag, and (3) the connector is not restarted from a stale offset. Formal: Captured_changes = {all committed writes to source} within log retention window.

Any ingestion pipeline must be idempotent: re-running the same ingestion with the same input produces identical output. Formally: f(input, run_id) = f(input, run_id') for all run_id, run_id'. Idempotency is achieved via: (1) deterministic deduplication keys, (2) UPSERT/MERGE operations instead of INSERT, (3) transactional writes, (4) idempotent consumers in streaming systems.

Key Concepts

ConceptDescriptionUse Case
Batch LoadingScheduled bulk data transferDaily ETL, data warehouse refresh
Streaming IngestionReal-time event processingClickstream, IoT, fraud detection
CDC (Change Data Capture)Capture database changes via logReal-time replication, data sync
DebeziumOpen-source CDC connectorMySQL, PostgreSQL, MongoDB CDC
MaxwellMySQL CDC daemonMySQL binlog streaming
Debezium vs MaxwellDebezium supports more databases; Maxwell is MySQL-onlyDatabase-specific needs
API PollingPeriodic API calls for data retrievalSaaS integrations (Salesforce, HubSpot)
WebhooksPush-based event notificationsReal-time SaaS event delivery
File WatchingMonitor directories for new filesSFTP drops, legacy system exports
inotifyLinux kernel file system eventsFile creation, modification, deletion
AirbyteOpen-source data integration platformELT from 300+ sources
FivetranManaged ELT platformAutomated SaaS connector management
DeduplicationRemove duplicate recordsEvent replay, at-least-once delivery
Upsert/MERGEInsert-or-update operationsIdempotent loads, SCD Type 1
WatermarkTrack ingestion progressGap detection, backfill triggers
Schema RegistryEnforce schema at ingestion timePrevent malformed data from entering pipeline
Dead Letter QueueStore failed ingestion recordsDebugging, retry, manual intervention
BackpressureFlow control for overwhelmed consumersPrevent memory exhaustion in streaming
  1. Assess source capabilities: Does the source support CDC (database logs)? Does it have an API? Does it emit events?
  2. Determine latency requirement: Real-time (< 5 min) -> streaming/CDC. Near-real-time (5-60 min) -> micro-batch. Batch (1-24 hours) -> batch loading.
  3. Evaluate source impact tolerance: Can the source handle polling queries? Is it OK to read transaction logs? Is there API rate limiting?
  4. Consider data volume: High-volume (> 1TB/day) -> streaming/CDC to avoid batch window pressure. Low-volume -> batch is simpler.
  5. Assess data complexity: Structured -> CDC or batch. Semi-structured (JSON) -> streaming or API. Unstructured -> file watching.
  6. Check for deletes: If capturing deletes is required, CDC is the only reliable pattern. Batch polling cannot capture deletes without soft-delete flags.
  7. Evaluate operational maturity: Batch is simplest. CDC requires Debezium, Kafka, and connector management. Webhooks require endpoint infrastructure.
  8. Design for idempotency: All ingestion patterns must handle re-runs gracefully. Use deduplication keys and upsert operations.
  9. Implement monitoring: Track ingestion lag, error rates, and data completeness. Alert on anomalies.
  10. Document data contracts: Define expected schemas, SLAs, and error handling with source system owners.

Production Code

CDC Pipeline with Debezium and Kafka

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from typing import Dict, List
from dataclasses import dataclass, asdict
from datetime import datetime

logger = logging.getLogger(__name__)


@dataclass
class CDCEvent:
    """Represents a Change Data Capture event from Debezium."""
    operation: str  # c=create, u=update, d=delete
    before: Dict
    after: Dict
    source: Dict
    ts_ms: int
    table: str


class DebeziumCDCProcessor:
    """Process CDC events from Debezium via Kafka."""

    def __init__(self, kafka_config: Dict, topic: str):
        self.topic = topic
        self.consumer = Consumer({
            "bootstrap.servers": kafka_config["bootstrap.servers"],
            "group.id": "cdc-processor",
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
        })
        self.producer = Producer({
            "bootstrap.servers": kafka_config["bootstrap.servers"],
            "acks": "all",
            "enable.idempotence": True,
        })

    def parse_cdc_event(self, raw_value: str) -> CDCEvent:
        """Parse Debezium CDC envelope format."""
        envelope = json.loads(raw_value)
        return CDCEvent(
            operation=envelope["op"],
            before=envelope.get("before"),
            after=envelope.get("after"),
            source=envelope["source"],
            ts_ms=envelope["ts_ms"],
            table=envelope["source"]["table"],
        )

    def process_create(self, event: CDCEvent) -> Dict:
        """Handle new record creation."""
        return {
            "action": "insert",
            "table": event.table,
            "data": event.after,
            "cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
        }

    def process_update(self, event: CDCEvent) -> Dict:
        """Handle record update with change detection."""
        changes = {}
        if event.before and event.after:
            for key in event.after:
                if event.before.get(key) != event.after.get(key):
                    changes[key] = {
                        "old": event.before.get(key),
                        "new": event.after.get(key),
                    }
        return {
            "action": "update",
            "table": event.table,
            "data": event.after,
            "changes": changes,
            "cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
        }

    def process_delete(self, event: CDCEvent) -> Dict:
        """Handle record deletion."""
        return {
            "action": "delete",
            "table": event.table,
            "data": event.before,
            "cdc_timestamp": datetime.fromtimestamp(event.ts_ms / 1000),
        }

    def run(self, max_events: int = None):
        """Main consumption loop for CDC events."""
        self.consumer.subscribe([self.topic])
        event_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    logger.error(f"Consumer error: {msg.error()}")
                    continue

                event = self.parse_cdc_event(msg.value().decode("utf-8"))

                if event.operation == "c":
                    result = self.process_create(event)
                elif event.operation == "u":
                    result = self.process_update(event)
                elif event.operation == "d":
                    result = self.process_delete(event)
                else:
                    logger.warning(f"Unknown operation: {event.operation}")
                    continue

                # Produce processed event to downstream topic
                self.producer.produce(
                    topic=f"{self.topic}.processed",
                    key=event.after.get("id", "").encode() if event.after else None,
                    value=json.dumps(result, default=str).encode(),
                )

                event_count += 1
                if max_events and event_count >= max_events:
                    break

                if event_count % 1000 == 0:
                    logger.info(f"Processed {event_count} CDC events")
                    self.consumer.commit()

        finally:
            self.consumer.commit()
            self.consumer.close()
            self.producer.flush()


# Debezium connector configuration (JSON)
DEBEZIUM_CONFIG = {
    "name": "postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres-host",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "${secrets:debezium-password}",
        "database.dbname": "production",
        "database.server.name": "production",
        "plugin.name": "pgoutput",
        "slot.name": "debezium_slot",
        "publication.name": "debezium_publication",
        "table.include.list": "public.orders,public.customers",
        "tombstones.on.delete": "true",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
    },
}

Batch Ingestion with Deduplication

import pandas as pd
import hashlib
from datetime import datetime, timedelta
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class BatchIngestor:
    """Batch ingestion with idempotent deduplication and upsert operations."""

    def __init__(self, target_connection, dedup_key: str = "id"):
        self.target_connection = target_connection
        self.dedup_key = dedup_key

    def extract_with_watermark(
        self,
        source_connection,
        query: str,
        watermark_table: str,
        last_watermark: Optional[datetime] = None,
    ) -> pd.DataFrame:
        """Extract data from source using watermark for incremental loading."""
        if last_watermark is None:
            # First run: get the max value from the target
            watermark_query = f"SELECT MAX(updated_at) FROM {watermark_table}"
            last_watermark = pd.read_sql(watermark_query, self.target_connection).iloc[0, 0]

        if last_watermark is None:
            # No existing data: full load
            return pd.read_sql(query, source_connection)

        # Incremental: only rows modified since last watermark
        incremental_query = f"""
            {query}
            WHERE updated_at > '{last_watermark.isoformat()}'
        """
        logger.info(f"Extracting incremental data since {last_watermark}")
        return pd.read_sql(incremental_query, source_connection)

    def compute_record_hash(self, df: pd.DataFrame) -> pd.DataFrame:
        """Compute a hash of all columns for change detection."""
        hash_columns = [col for col in df.columns if col != self.dedup_key]
        df["_record_hash"] = df[hash_columns].apply(
            lambda row: hashlib.md5(
                "|".join(str(v) for v in row).encode()
            ).hexdigest(),
            axis=1,
        )
        return df

    def upsert(self, df: pd.DataFrame, target_table: str) -> dict:
        """Perform idempotent upsert (insert-or-update) operation."""
        if df.empty:
            return {"inserted": 0, "updated": 0, "unchanged": 0}

        df = self.compute_record_hash(df)

        # Read existing records for comparison
        existing_query = f"""
            SELECT {self.dedup_key}, _record_hash
            FROM {target_table}
            WHERE {self.dedup_key} IN ({','.join(df[self.dedup_key].astype(str))})
        """
        try:
            existing = pd.read_sql(existing_query, self.target_connection)
        except Exception:
            existing = pd.DataFrame(columns=[self.dedup_key, "_record_hash"])

        # Merge existing hashes with new data
        merged = df.merge(
            existing, on=self.dedup_key, how="left", suffixes=("", "_existing")
        )

        # Classify records
        new_records = merged[merged["_record_hash_existing"].isna()]
        changed_records = merged[
            (merged["_record_hash_existing"].notna()) &
            (merged["_record_hash"] != merged["_record_hash_existing"])
        ]
        unchanged_records = merged[
            (merged["_record_hash_existing"].notna()) &
            (merged["_record_hash"] == merged["_record_hash_existing"])
        ]

        # Insert new records
        if not new_records.empty:
            insert_df = new_records.drop(columns=["_record_hash_existing"])
            insert_df.to_sql(
                target_table,
                self.target_connection,
                if_exists="append",
                index=False,
                method="multi",
                chunksize=500,
            )
            logger.info(f"Inserted {len(new_records)} new records")

        # Update changed records (using temporary table for efficiency)
        if not changed_records.empty:
            update_df = changed_records.drop(columns=["_record_hash_existing"])
            temp_table = f"{target_table}_temp_update"
            update_df.to_sql(
                temp_table,
                self.target_connection,
                if_exists="replace",
                index=False,
            )
            # Execute MERGE/UPSERT via SQL
            merge_query = f"""
                INSERT INTO {target_table}
                SELECT * FROM {temp_table}
                ON CONFLICT ({self.dedup_key})
                DO UPDATE SET
                    {', '.join(
                        f"{col} = EXCLUDED.{col}"
                        for col in update_df.columns
                        if col != self.dedup_key
                    )}
            """
            self.target_connection.execute(merge_query)
            self.target_connection.execute(f"DROP TABLE {temp_table}")
            logger.info(f"Updated {len(changed_records)} changed records")

        return {
            "inserted": len(new_records),
            "updated": len(changed_records),
            "unchanged": len(unchanged_records),
        }

    def update_watermark(self, watermark_table: str, max_timestamp: datetime):
        """Update the watermark to track ingestion progress."""
        watermark_df = pd.DataFrame([{
            "pipeline": watermark_table,
            "last_watermark": max_timestamp,
            "updated_at": datetime.utcnow(),
        }])
        watermark_df.to_sql(
            "ingestion_watermarks",
            self.target_connection,
            if_exists="append",
            index=False,
        )
        logger.info(f"Watermark updated to {max_timestamp}")


# Usage
ingestor = BatchIngestor(target_connection=engine, dedup_key="order_id")
df = ingestor.extract_with_watermark(
    source_connection=source_engine,
    query="SELECT * FROM orders",
    watermark_table="orders_fact",
)
results = ingestor.upsert(df, target_table="orders_fact")
logger.info(f"Ingestion results: {results}")

CDC vs Batch Polling: CDC reads the database transaction log, which provides: (1) low latency (< 1s), (2) no source system load, (3) capture of deletes, (4) ordering guarantees. Batch polling is simpler but has: (1) higher latency (minutes to hours), (2) source system query load, (3) cannot capture deletes without soft-delete flags, (4) no ordering guarantees.

Webhook Best Practices: Webhooks are push-based notifications from source systems. Implement: (1) signature verification for security, (2) idempotent processing (webhooks may be retried), (3) a durable queue (SQS, Kafka) between webhook receiver and processing pipeline, (4) retry with exponential backoff for transient failures, (5) a dead letter queue for persistent failures.

  • Batch ingestion is simple, well-understood, and suitable for high-volume, low-latency-tolerant workloads.
  • Streaming ingestion provides real-time freshness but requires more complex infrastructure and expertise.
  • CDC (Debezium, Maxwell) captures database changes via transaction logs with low latency and minimal source impact.
  • API polling is the simplest integration pattern but limited by rate limits and cannot capture deletes without soft-delete flags.
  • File watching (inotify, Watchdog) handles SFTP drops and legacy system integrations.
  • All ingestion pipelines must be idempotent: re-running produces identical results via deduplication keys and upsert operations.
  • Monitor ingestion lag, error rates, and data completeness. Alert on anomalies.

Best Practices

  1. Implement idempotent ingestion β€” use deduplication keys and upsert/MERGE operations so re-runs produce identical results.
  2. Use CDC (Debezium) for relational databases when real-time freshness and delete capture are required.
  3. Set watermarks to track ingestion progress and detect gaps. Alert when gaps exceed SLA thresholds.
  4. Use schema registry at ingestion time to reject malformed data before it enters the pipeline.
  5. Implement dead letter queues for records that fail ingestion. Never silently drop failed records.
  6. Rate-limit API polling to respect source system rate limits. Use exponential backoff for 429 responses.
  7. Verify webhook signatures to prevent unauthorized data injection. Use HMAC-SHA256 for signature validation.
  8. Monitor source system impact β€” CDC should have negligible impact; polling should not exceed 10% of source CPU.
  9. Design for backfill β€” support full reprocessing from raw data without modifying the ingestion pipeline.
  10. Document data contracts with source system owners: expected schemas, update frequencies, and SLAs.

Ingestion Pattern Selection Matrix

PatternLatencySource ImpactDelete CaptureComplexityBest For
Batch LoadingHoursHigh (queries)NoLowDaily ETL, analytics
CDC (Debezium)SecondsLow (log read)YesHighReal-time replication
API PollingMinutesMedium (rate limits)NoLowSaaS integrations
WebhooksSecondsNone (push)YesMediumReal-time SaaS events
File WatchingMinutesNonePartialLowLegacy systems, SFTP
Streaming IngestSub-secondNoneYesHighIoT, clickstream

CDC Tool Comparison

FeatureDebeziumMaxwellFivetranAirbyte
Database SupportMySQL, PostgreSQL, MongoDB, OracleMySQL only200+ sources300+ sources
DeploymentKafka ConnectStandalone daemonManaged SaaSSelf-hosted/Cloud
Log ReadingWAL, binlog, redobinlogProprietaryVaries
Schema EvolutionYesLimitedYesYes
CostFree (OSS)Free (OSS)PaidFree/Paid

See Also

⭐

Premium Content

Data Ingestion Patterns: Batch, Streaming, CDC, and APIs

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement