πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Capstone Project: Real-Time Streaming Pipeline

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Capstone Project: Building a Production Real-Time Streaming Pipeline

This capstone project brings together all concepts from the Data Pipelines & Orchestration module into a complete, production-grade streaming pipeline.


What You Will Build

  • Real-time e-commerce event processing system
  • Using Apache Kafka, Spark Structured Streaming, and production monitoring
  • Demonstrates mastery of stream processing, exactly-once semantics, state management, and operational excellence

Key Insight: This project requires end-to-end implementation from architecture design to production deployment.

Streaming Pipeline Architecture

End-to-End Streaming PipelineIngestKafka ProducerJSON eventsStreamFlink/Sparkwindowed aggEnrichJoin with DIMuser dataStoreClickHousereal-time OLAPAlertPagerDutySlackMonitoring StackKafka Lag MonitorThroughput CounterLatency HistogramGrafana DashboardsDead Letter Queue AlertsCheckpoint Validator

Project Architecture

Expected Performance Characteristics

MetricTargetAcceptable RangeMeasurement
End-to-End Latency< 30 seconds10-60 secondsEvent time to serving
Throughput10K events/sec5-15K events/secSustained processing rate
Consumer Lag< 1000 records< 5000 recordsKafka consumer lag
Uptime99.9%99.5-99.99%Pipeline availability
Data Loss0 events< 0.01%Exactly-once verification
Processing Latency< 5 seconds1-10 secondsMicro-batch duration
DLQ Rate< 0.1%< 0.5%Failed events / total events

Project Requirements

RequirementDescriptionPriorityLesson Reference
Event IngestionIngest clickstream, purchase, and inventory events from Kafka topicsP0019 - Apache Kafka
Session AggregationCompute session-based aggregations (clicks per session, conversion rate)P0020 - Kafka Streams
Fraud DetectionReal-time anomaly detection for suspicious transactionsP0022 - Spark Streaming
Feature EngineeringCompute ML features for real-time scoringP1021 - Spark Fundamentals
Exactly-OnceEnd-to-end exactly-once processing semanticsP0019, 020, 022
WatermarkingHandle late-arriving events with configurable watermarksP0022 - Spark Streaming
Dead Letter QueueRoute failed events to DLQ for debuggingP0028 - Error Handling
MonitoringPrometheus metrics, Grafana dashboards, alertingP0027 - Monitoring
TestingUnit tests for transformations, integration tests for pipelineP1026 - Testing
DocumentationArchitecture docs, runbooks, API contractsP1025 - Data Quality

A capstone project is a comprehensive, integrative exercise that demonstrates mastery of a subject by applying knowledge and skills to a real-world problem. In data engineering, a capstone project requires end-to-end implementation: architecture design, code implementation, testing, deployment, monitoring, and documentation. The project must demonstrate production-grade quality, not just functional correctness.

Event-driven architecture (EDA) is a software design pattern where the production, detection, and consumption of events drive the flow of the system. In EDA, components communicate by producing and consuming events through a message broker (Kafka), enabling loose coupling, scalability, and resilience. The key properties: (1) producers and consumers are independent, (2) events are immutable facts, (3) the event log provides durability and replayability.

Exactly-once end-to-end processing guarantees that each event is processed exactly once from source to sink. This requires: (1) idempotent producers (Kafka enable.idempotence=true), (2) transactional writes (Kafka Transactions), (3) consumer offset commit only after successful processing, (4) idempotent sink writes (UPSERT/MERGE). The guarantee holds for the complete input-process-output cycle.

End-to-End Latency

E2E_latency = T_ingestion + T_processing + T_serving. For Kafka -> Spark -> Redis: T_ingestion β‰ˆ 10-50ms (Kafka produce/consume), T_processing β‰ˆ 100ms-5s (Spark micro-batch), T_serving β‰ˆ 1-10ms (Redis write). Total E2E: 111ms-5.06s depending on processing complexity and trigger interval.

Throughput Target

Target throughput = Peak_event_rate * (1 + safety_factor). For peak_rate = 10K events/second and safety_factor = 0.5: Target = 15K events/second. Partition count = Target / per_partition_throughput. For per_partition = 2K events/second: Partitions = 15K / 2K = 8 partitions.

A streaming pipeline is production-ready when: (1) Exactly-once semantics are verified, (2) Consumer lag is monitored and alerting is configured, (3) Dead letter queues capture and enable debugging of failed events, (4) Watermarks handle late-arriving data within bounded time, (5) Circuit breakers protect downstream services, (6) Load testing confirms throughput targets are met, (7) Runbooks exist for common failure scenarios, (8) Monitoring dashboards provide real-time visibility.

Implementation Guide

  1. Create topics: click-events, purchase-events, inventory-events with appropriate partition counts.
  2. Configure retention.ms=604800000 (7 days) for replay capability.
  3. Set min.insync.replicas=2 and acks=all for durability.
  4. Register schemas with Confluent Schema Registry using Avro format.
  5. Configure dead letter topics: click-events.dlq, purchase-events.dlq.
  6. Test topic creation and verify partition distribution with kafka-topics --describe.
  1. Implement Kafka producers with idempotent writes (enable.idempotence=true).
  2. Use Avro serialization with Schema Registry for schema evolution.
  3. Configure batching (linger.ms=10, batch.size=32768) for throughput.
  4. Implement error handling with DLQ routing for failed events.
  5. Add metrics: events produced, errors, throughput, latency.
  6. Load test producers to verify throughput targets.
  1. Implement Spark Structured Streaming with Kafka source.
  2. Configure watermarks for event-time processing (withWatermark("event_time", "10 minutes")).
  3. Implement session aggregation with session windows (30-minute gap).
  4. Implement fraud detection with sliding window aggregations.
  5. Configure exactly-once with processing.guarantee=exactly_once_v2.
  6. Implement DLQ routing for events that fail processing.
  1. Implement Redis sink for low-latency feature serving.
  2. Implement S3/Parquet sink for analytical storage.
  3. Implement real-time dashboard with Grafana connected to Prometheus metrics.
  4. Configure materialized views for common query patterns.
  5. Implement cache invalidation for stale features.
  1. Instrument all components with Prometheus metrics.
  2. Build Grafana dashboards: pipeline health, throughput, latency, consumer lag.
  3. Configure SLO-based alerts: freshness < 2 hours, error rate > 1%, consumer lag > 10K.
  4. Set up DLQ depth monitoring with alerting.
  5. Create runbooks for common failure scenarios.

Production Code

Complete Streaming Pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType,
    IntegerType, TimestampType, BooleanType, MapType
)
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass

logger = logging.getLogger(__name__)


# ------------------------------------------------------
# SCHEMA DEFINITIONS
# ------------------------------------------------------
CLICK_SCHEMA = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("session_id", StringType(), False),
    StructField("page_url", StringType(), False),
    StructField("element", StringType(), True),
    StructField("event_time", TimestampType(), False),
    StructField("properties", MapType(StringType(), StringType()), True),
])

PURCHASE_SCHEMA = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("session_id", StringType(), False),
    StructField("order_id", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("event_time", TimestampType(), False),
])


# ------------------------------------------------------
# METRICS
# ------------------------------------------------------
EVENTS_PROCESSED = Counter(
    "streaming_events_processed_total",
    "Total events processed",
    ["topic", "status"],
)

EVENTS_FAILED = Counter(
    "streaming_events_failed_total",
    "Total events that failed processing",
    ["topic", "error_type"],
)

PROCESSING_LATENCY = Histogram(
    "streaming_processing_latency_seconds",
    "Processing latency in seconds",
    ["pipeline"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)

CONSUMER_LAG = Gauge(
    "streaming_consumer_lag",
    "Consumer lag in records",
    ["topic", "partition"],
)

SESSION_COUNT = Gauge(
    "streaming_active_sessions",
    "Number of active sessions",
)


# ------------------------------------------------------
# STREAM PROCESSING PIPELINE
# ------------------------------------------------------
class EcommerceStreamingPipeline:
    """Complete e-commerce streaming pipeline with monitoring."""

    def __init__(self, spark: SparkSession, config: Dict):
        self.spark = spark
        self.config = config

    def build_clickstream_pipeline(self):
        """
        Build the clickstream processing pipeline.
        
        This pipeline:
        1. Reads click events from Kafka
        2. Parses JSON payload and validates schema
        3. Applies watermark for late data tolerance
        4. Computes session aggregations (clicks per session)
        5. Detects fraud signals (high-frequency clicking)
        6. Writes results to Kafka output topics
        """
        # Kafka source configuration
        # Parameters:
        #   subscribe: Topic to consume click events from
        #   startingOffsets: "latest" = only new events (no replay)
        #   maxOffsetsPerTrigger: Cap at 100K events per micro-batch
        #   kafka.security.protocol: SASL_SSL for production security
        raw_stream = (
            self.spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.config["kafka_brokers"])
            .option("subscribe", "click-events")
            .option("startingOffsets", "latest")
            .option("maxOffsetsPerTrigger", 100000)
            .option("kafka.security.protocol", "SASL_SSL")
            .load()
        )

        parsed = (
            raw_stream
            .select(
                F.col("key").cast("string").alias("kafka_key"),
                F.from_json(F.col("value").cast("string"), CLICK_SCHEMA).alias("data"),
            )
            .select("kafka_key", "data.*")
            .filter(F.col("event_id").isNotNull())
            .withWatermark("event_time", "10 minutes")
        )

        # Session aggregation: clicks per session
        session_aggregation = (
            parsed
            .groupBy(
                F.window("event_time", "30 minutes", "5 minutes"),
                "session_id",
                "user_id",
            )
            .agg(
                F.count("*").alias("click_count"),
                F.collect_list("page_url").alias("pages_visited"),
                F.countDistinct("element").alias("unique_elements_clicked"),
                F.max("event_time").alias("last_click_time"),
            )
        )

        # Fraud detection: rapid clicking pattern
        # Uses 1-minute tumbling window to detect users clicking > 60 times/minute
        # This threshold can be tuned based on normal user behavior analysis
        fraud_signals = (
            parsed
            .groupBy(
                F.window("event_time", "1 minute"),   # 1-minute detection window
                "user_id",                             # Per-user analysis
            )
            .agg(
                F.count("*").alias("clicks_per_minute"),
                F.countDistinct("page_url").alias("pages_per_minute"),
            )
            .filter(F.col("clicks_per_minute") > 60)  # Threshold: > 60 clicks/min
        )

        # Write session aggregation to Kafka
        session_query = (
            session_aggregation
            .select(
                F.to_json(F.struct("*")).alias("value"),
            )
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.config["kafka_brokers"])
            .option("topic", "session-aggregations")
            .option("checkpointLocation", f"{self.config['checkpoint_base']}/sessions")
            .trigger(processingTime="30 seconds")
            .outputMode("update")
            .start()
        )

        # Write fraud signals to alert topic
        fraud_query = (
            fraud_signals
            .select(
                F.to_json(F.struct("*")).alias("value"),
            )
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.config["kafka_brokers"])
            .option("topic", "fraud-alerts")
            .option("checkpointLocation", f"{self.config['checkpoint_base']}/fraud")
            .trigger(processingTime="10 seconds")
            .outputMode("update")
            .start()
        )

        return session_query, fraud_query

    def build_purchase_pipeline(self):
        """Build the purchase processing pipeline."""
        raw_stream = (
            self.spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.config["kafka_brokers"])
            .option("subscribe", "purchase-events")
            .option("startingOffsets", "latest")
            .load()
        )

        parsed = (
            raw_stream
            .select(
                F.from_json(F.col("value").cast("string"), PURCHASE_SCHEMA).alias("data"),
            )
            .select("data.*")
            .filter(F.col("event_id").isNotNull())
            .withWatermark("event_time", "10 minutes")
        )

        # Revenue aggregation per product per hour
        revenue_agg = (
            parsed
            .groupBy(
                F.window("event_time", "1 hour"),
                "product_id",
            )
            .agg(
                F.sum("amount").alias("total_revenue"),
                F.sum("quantity").alias("total_quantity"),
                F.count("*").alias("order_count"),
                F.avg("amount").alias("avg_order_value"),
            )
        )

        # Write to analytics topic
        query = (
            revenue_agg
            .select(F.to_json(F.struct("*")).alias("value"))
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.config["kafka_brokers"])
            .option("topic", "revenue-aggregations")
            .option("checkpointLocation", f"{self.config['checkpoint_base']}/revenue")
            .trigger(processingTime="1 minute")
            .outputMode("update")
            .start()
        )

        return query

    def run_all(self):
        """Start all streaming pipelines."""
        click_queries = self.build_clickstream_pipeline()
        purchase_query = self.build_purchase_pipeline()

        logger.info("All streaming pipelines started")

        # Monitor all queries
        queries = list(click_queries) + [purchase_query]
        import time
        while all(q.isActive for q in queries):
            for q in queries:
                if q.lastProgress:
                    logger.info(
                        f"Query {q.id}: "
                        f"input_rows={q.lastProgress.get('numInputRows', 0)}, "
                        f"processed={q.lastProgress.get('processedRowsPerSecond', 0):.0f}/s"
                    )
            time.sleep(30)

        for q in queries:
            q.awaitTermination()


# ------------------------------------------------------
# ENTRY POINT
# ------------------------------------------------------
if __name__ == "__main__":
    spark = (
        SparkSession.builder
        .appName("EcommerceStreamingPipeline")
        .config("spark.sql.streaming.schemaInference", "true")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.shuffle.partitions", "8")
        .config("spark.streaming.stopGracefullyOnShutdown", "true")
        .getOrCreate()
    )

    config = {
        "kafka_brokers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
        "checkpoint_base": "s3://checkpoints/ecommerce-streaming",
    }

    pipeline = EcommerceStreamingPipeline(spark, config)

    # Start Prometheus metrics server
    start_http_server(8000)

    pipeline.run_all()

Testing the Pipeline

import pytest
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime
import json


@pytest.fixture(scope="module")
def spark():
    return (
        SparkSession.builder
        .master("local[*]")
        .appName("StreamingPipelineTests")
        .getOrCreate()
    )


class TestSessionAggregation:
    """Test session aggregation logic."""

    def test_single_session(self, spark):
        """Test aggregation for a single session."""
        data = [
            ("EVT-1", "USR-1", "SES-1", "/home", "button", datetime(2024, 1, 15, 10, 0, 0)),
            ("EVT-2", "USR-1", "SES-1", "/products", "link", datetime(2024, 1, 15, 10, 5, 0)),
            ("EVT-3", "USR-1", "SES-1", "/checkout", "button", datetime(2024, 1, 15, 10, 10, 0)),
        ]
        schema = StructType([
            StructField("event_id", StringType()),
            StructField("user_id", StringType()),
            StructField("session_id", StringType()),
            StructField("page_url", StringType()),
            StructField("element", StringType()),
            StructField("event_time", TimestampType()),
        ])
        df = spark.createDataFrame(data, schema)

        result = (
            df.groupBy("session_id", "user_id")
            .agg(
                F.count("*").alias("click_count"),
                F.collect_list("page_url").alias("pages_visited"),
            )
        )

        assert result.count() == 1
        assert result.first()["click_count"] == 3
        assert len(result.first()["pages_visited"]) == 3

    def test_empty_input(self, spark):
        """Test aggregation with empty input."""
        schema = StructType([
            StructField("event_id", StringType()),
            StructField("session_id", StringType()),
            StructField("user_id", StringType()),
        ])
        df = spark.createDataFrame([], schema)

        result = df.groupBy("session_id").agg(F.count("*").alias("count"))
        assert result.count() == 0


class TestFraudDetection:
    """Test fraud detection thresholds."""

    def test_high_frequency_detection(self, spark):
        """Test that high-frequency clicking is detected."""
        # 70 clicks in 1 minute (above 60 threshold)
        data = [
            (f"EVT-{i}", "USR-1", datetime(2024, 1, 15, 10, i // 60, i % 60))
            for i in range(70)
        ]
        schema = StructType([
            StructField("event_id", StringType()),
            StructField("user_id", StringType()),
            StructField("event_time", TimestampType()),
        ])
        df = spark.createDataFrame(data, schema)

        result = (
            df.groupBy(
                F.window("event_time", "1 minute"),
                "user_id",
            )
            .agg(F.count("*").alias("clicks_per_minute"))
            .filter(F.col("clicks_per_minute") > 60)
        )

        assert result.count() > 0, "High-frequency clicking should be detected"

    def test_normal_frequency_not_flagged(self, spark):
        """Test that normal clicking is not flagged."""
        data = [
            (f"EVT-{i}", "USR-1", datetime(2024, 1, 15, 10, 0, i * 10))
            for i in range(5)  # 5 clicks in 1 minute
        ]
        schema = StructType([
            StructField("event_id", StringType()),
            StructField("user_id", StringType()),
            StructField("event_time", TimestampType()),
        ])
        df = spark.createDataFrame(data, schema)

        result = (
            df.groupBy(
                F.window("event_time", "1 minute"),
                "user_id",
            )
            .agg(F.count("*").alias("clicks_per_minute"))
            .filter(F.col("clicks_per_minute") > 60)
        )

        assert result.count() == 0, "Normal clicking should not be flagged"


class TestEndToEnd:
    """End-to-end integration tests."""

    def test_schema_compliance(self, spark):
        """Verify event schemas are valid."""
        valid_click = {
            "event_id": "EVT-001",
            "user_id": "USR-001",
            "session_id": "SES-001",
            "page_url": "/home",
            "element": "button",
            "event_time": "2024-01-15T10:00:00",
        }
        # Validate required fields
        required_fields = ["event_id", "user_id", "session_id", "event_time"]
        for field in required_fields:
            assert field in valid_click, f"Missing required field: {field}"

    def test_idempotent_processing(self):
        """Verify that reprocessing produces same results."""
        # This test validates the idempotency guarantee
        # In production, verify UPSERT/MERGE operations produce same result
        results_1 = {"processed": 100, "failed": 0}
        results_2 = {"processed": 100, "failed": 0}
        assert results_1 == results_2, "Idempotency violated"

Load Testing: Before production deployment, load test the pipeline with 2-3x expected peak throughput. Use tools like kafka-producer-perf-test or custom Python producers to simulate high event volumes. Verify: (1) consumer lag stays below retention period, (2) processing latency stays within SLA, (3) no data loss or duplication, (4) resource utilization stays within limits.

Disaster Recovery: Test pipeline recovery by: (1) killing Kafka brokers and verifying automatic leader election, (2) killing Spark executors and verifying task rescheduling, (3) corrupting state stores and verifying recovery from changelog topics, (4) simulating network partitions and verifying graceful degradation.

Project Deliverables

DeliverableDescriptionAcceptance Criteria
Kafka TopicsConfigured topics with appropriate partitions and retentionTopics created, verified with kafka-topics --describe
Event ProducersIdempotent producers with Avro serializationProducers send events, DLQ handles failures
Stream ProcessingSpark Structured Streaming with watermarks and EOSExactly-once processing verified
Session AggregationSession window aggregation with configurable gapAggregation results match expected values
Fraud DetectionReal-time anomaly detection for high-frequency eventsDetection triggers within 10 seconds
Serving LayerRedis for low-latency, S3 for analyticsData available in both sinks
MonitoringPrometheus metrics, Grafana dashboardsDashboards show real-time pipeline health
AlertingSLO-based alerts for freshness, error rate, consumer lagAlerts fire within 5 minutes of SLO breach
TestsUnit tests for transformations, integration tests> 90% coverage, all tests pass
DocumentationArchitecture docs, runbooks, API contractsDocs reviewed and approved
  • A production streaming pipeline requires: ingestion, processing, serving, monitoring, and error handling.
  • Kafka provides durable, replayable event storage. Configure acks=all, min.insync.replicas=2, and idempotent producers.
  • Spark Structured Streaming provides exactly-once processing with watermarks for late data handling.
  • Session windows group events by activity gaps; tumbling windows use fixed intervals.
  • Dead letter queues capture failed events for debugging and reprocessing.
  • Monitoring with Prometheus and Grafana provides real-time visibility into pipeline health.
  • Load testing before production deployment confirms throughput targets are met.
  • Disaster recovery testing verifies pipeline resilience under failure conditions.

Best Practices

  1. Design for failure from day one. Every component should handle retries, DLQs, and graceful degradation.
  2. Use exactly-once semantics (processing.guarantee=exactly_once_v2) for financial and transactional data.
  3. Set watermarks to handle late-arriving data within bounded timeframes. Tune based on observed event lateness.
  4. Implement DLQs for every pipeline stage. Store original message, error details, and timestamps for debugging.
  5. Monitor consumer lag as a primary SLA metric. Alert when lag exceeds the message retention period.
  6. Load test before production with 2-3x expected peak throughput. Verify latency, throughput, and resource usage.
  7. Test disaster recovery by killing brokers, executors, and simulating network partitions.
  8. Document everything: architecture decisions, runbooks, API contracts, and deployment procedures.
  9. Implement idempotent processing so that reprocessing produces identical results.
  10. Iterate and improve: monitor production metrics, identify bottlenecks, and optimize continuously.

Project Component Summary

ComponentTechnologyPurposeKey Configuration
Event SourceKafka ProducersIngest eventsacks=all, enable.idempotence=true
Event StorageKafka TopicsDurable event logmin.insync.replicas=2, 7-day retention
Stream ProcessingSpark Structured StreamingReal-time processingprocessing.guarantee=exactly_once_v2
Session AggregationSpark windowed aggregationUser session metrics30-min session window
Fraud DetectionSpark sliding windowAnomaly detection1-min window, 60 events/min threshold
Feature ServingRedisLow-latency featuresTTL-based expiration
Analytics StorageS3/ParquetHistorical analysisPartitioned by date
MonitoringPrometheus + GrafanaObservabilitySLO-based alerting
Error HandlingDLQ TopicsFailed event management3 retries before DLQ

Learning Outcomes Checklist

SkillLesson ReferenceProficiency
Kafka topic design and partitioning019 - Apache Kafka☐
Kafka Streams DSL020 - Kafka Streams☐
Spark Structured Streaming022 - Spark Streaming☐
Watermarks for late data022 - Spark Streaming☐
Exactly-once semantics019, 020, 022☐
Dead letter queues028 - Error Handling☐
Monitoring with Prometheus027 - Monitoring☐
Data quality validation025 - Data Quality☐
Pipeline testing026 - Testing☐
Architecture documentationThis project☐

See Also

⭐

Premium Content

Capstone Project: Real-Time Streaming Pipeline

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement