Capstone Project: Building a Production Real-Time Streaming Pipeline
This capstone project brings together all concepts from the Data Pipelines & Orchestration module into a complete, production-grade streaming pipeline.
What You Will Build
- Real-time e-commerce event processing system
- Using Apache Kafka, Spark Structured Streaming, and production monitoring
- Demonstrates mastery of stream processing, exactly-once semantics, state management, and operational excellence
Key Insight: This project requires end-to-end implementation from architecture design to production deployment.
Streaming Pipeline Architecture
Project Architecture
Expected Performance Characteristics
| Metric | Target | Acceptable Range | Measurement |
|---|---|---|---|
| End-to-End Latency | < 30 seconds | 10-60 seconds | Event time to serving |
| Throughput | 10K events/sec | 5-15K events/sec | Sustained processing rate |
| Consumer Lag | < 1000 records | < 5000 records | Kafka consumer lag |
| Uptime | 99.9% | 99.5-99.99% | Pipeline availability |
| Data Loss | 0 events | < 0.01% | Exactly-once verification |
| Processing Latency | < 5 seconds | 1-10 seconds | Micro-batch duration |
| DLQ Rate | < 0.1% | < 0.5% | Failed events / total events |
Project Requirements
| Requirement | Description | Priority | Lesson Reference |
|---|---|---|---|
| Event Ingestion | Ingest clickstream, purchase, and inventory events from Kafka topics | P0 | 019 - Apache Kafka |
| Session Aggregation | Compute session-based aggregations (clicks per session, conversion rate) | P0 | 020 - Kafka Streams |
| Fraud Detection | Real-time anomaly detection for suspicious transactions | P0 | 022 - Spark Streaming |
| Feature Engineering | Compute ML features for real-time scoring | P1 | 021 - Spark Fundamentals |
| Exactly-Once | End-to-end exactly-once processing semantics | P0 | 019, 020, 022 |
| Watermarking | Handle late-arriving events with configurable watermarks | P0 | 022 - Spark Streaming |
| Dead Letter Queue | Route failed events to DLQ for debugging | P0 | 028 - Error Handling |
| Monitoring | Prometheus metrics, Grafana dashboards, alerting | P0 | 027 - Monitoring |
| Testing | Unit tests for transformations, integration tests for pipeline | P1 | 026 - Testing |
| Documentation | Architecture docs, runbooks, API contracts | P1 | 025 - Data Quality |
A capstone project is a comprehensive, integrative exercise that demonstrates mastery of a subject by applying knowledge and skills to a real-world problem. In data engineering, a capstone project requires end-to-end implementation: architecture design, code implementation, testing, deployment, monitoring, and documentation. The project must demonstrate production-grade quality, not just functional correctness.
Event-driven architecture (EDA) is a software design pattern where the production, detection, and consumption of events drive the flow of the system. In EDA, components communicate by producing and consuming events through a message broker (Kafka), enabling loose coupling, scalability, and resilience. The key properties: (1) producers and consumers are independent, (2) events are immutable facts, (3) the event log provides durability and replayability.
Exactly-once end-to-end processing guarantees that each event is processed exactly once from source to sink. This requires: (1) idempotent producers (Kafka enable.idempotence=true), (2) transactional writes (Kafka Transactions), (3) consumer offset commit only after successful processing, (4) idempotent sink writes (UPSERT/MERGE). The guarantee holds for the complete input-process-output cycle.
End-to-End Latency
E2E_latency = T_ingestion + T_processing + T_serving. For Kafka -> Spark -> Redis: T_ingestion β 10-50ms (Kafka produce/consume), T_processing β 100ms-5s (Spark micro-batch), T_serving β 1-10ms (Redis write). Total E2E: 111ms-5.06s depending on processing complexity and trigger interval.
Throughput Target
Target throughput = Peak_event_rate * (1 + safety_factor). For peak_rate = 10K events/second and safety_factor = 0.5: Target = 15K events/second. Partition count = Target / per_partition_throughput. For per_partition = 2K events/second: Partitions = 15K / 2K = 8 partitions.
A streaming pipeline is production-ready when: (1) Exactly-once semantics are verified, (2) Consumer lag is monitored and alerting is configured, (3) Dead letter queues capture and enable debugging of failed events, (4) Watermarks handle late-arriving data within bounded time, (5) Circuit breakers protect downstream services, (6) Load testing confirms throughput targets are met, (7) Runbooks exist for common failure scenarios, (8) Monitoring dashboards provide real-time visibility.
Implementation Guide
- Create topics:
click-events,purchase-events,inventory-eventswith appropriate partition counts. - Configure
retention.ms=604800000(7 days) for replay capability. - Set
min.insync.replicas=2andacks=allfor durability. - Register schemas with Confluent Schema Registry using Avro format.
- Configure dead letter topics:
click-events.dlq,purchase-events.dlq. - Test topic creation and verify partition distribution with
kafka-topics --describe.
- Implement Kafka producers with idempotent writes (
enable.idempotence=true). - Use Avro serialization with Schema Registry for schema evolution.
- Configure batching (
linger.ms=10,batch.size=32768) for throughput. - Implement error handling with DLQ routing for failed events.
- Add metrics: events produced, errors, throughput, latency.
- Load test producers to verify throughput targets.
- Implement Spark Structured Streaming with Kafka source.
- Configure watermarks for event-time processing (
withWatermark("event_time", "10 minutes")). - Implement session aggregation with session windows (30-minute gap).
- Implement fraud detection with sliding window aggregations.
- Configure exactly-once with
processing.guarantee=exactly_once_v2. - Implement DLQ routing for events that fail processing.
- Implement Redis sink for low-latency feature serving.
- Implement S3/Parquet sink for analytical storage.
- Implement real-time dashboard with Grafana connected to Prometheus metrics.
- Configure materialized views for common query patterns.
- Implement cache invalidation for stale features.
- Instrument all components with Prometheus metrics.
- Build Grafana dashboards: pipeline health, throughput, latency, consumer lag.
- Configure SLO-based alerts: freshness < 2 hours, error rate > 1%, consumer lag > 10K.
- Set up DLQ depth monitoring with alerting.
- Create runbooks for common failure scenarios.
Production Code
Complete Streaming Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType,
IntegerType, TimestampType, BooleanType, MapType
)
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# ------------------------------------------------------
# SCHEMA DEFINITIONS
# ------------------------------------------------------
CLICK_SCHEMA = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), False),
StructField("page_url", StringType(), False),
StructField("element", StringType(), True),
StructField("event_time", TimestampType(), False),
StructField("properties", MapType(StringType(), StringType()), True),
])
PURCHASE_SCHEMA = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), False),
StructField("order_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("quantity", IntegerType(), False),
StructField("event_time", TimestampType(), False),
])
# ------------------------------------------------------
# METRICS
# ------------------------------------------------------
EVENTS_PROCESSED = Counter(
"streaming_events_processed_total",
"Total events processed",
["topic", "status"],
)
EVENTS_FAILED = Counter(
"streaming_events_failed_total",
"Total events that failed processing",
["topic", "error_type"],
)
PROCESSING_LATENCY = Histogram(
"streaming_processing_latency_seconds",
"Processing latency in seconds",
["pipeline"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)
CONSUMER_LAG = Gauge(
"streaming_consumer_lag",
"Consumer lag in records",
["topic", "partition"],
)
SESSION_COUNT = Gauge(
"streaming_active_sessions",
"Number of active sessions",
)
# ------------------------------------------------------
# STREAM PROCESSING PIPELINE
# ------------------------------------------------------
class EcommerceStreamingPipeline:
"""Complete e-commerce streaming pipeline with monitoring."""
def __init__(self, spark: SparkSession, config: Dict):
self.spark = spark
self.config = config
def build_clickstream_pipeline(self):
"""
Build the clickstream processing pipeline.
This pipeline:
1. Reads click events from Kafka
2. Parses JSON payload and validates schema
3. Applies watermark for late data tolerance
4. Computes session aggregations (clicks per session)
5. Detects fraud signals (high-frequency clicking)
6. Writes results to Kafka output topics
"""
# Kafka source configuration
# Parameters:
# subscribe: Topic to consume click events from
# startingOffsets: "latest" = only new events (no replay)
# maxOffsetsPerTrigger: Cap at 100K events per micro-batch
# kafka.security.protocol: SASL_SSL for production security
raw_stream = (
self.spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", self.config["kafka_brokers"])
.option("subscribe", "click-events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.option("kafka.security.protocol", "SASL_SSL")
.load()
)
parsed = (
raw_stream
.select(
F.col("key").cast("string").alias("kafka_key"),
F.from_json(F.col("value").cast("string"), CLICK_SCHEMA).alias("data"),
)
.select("kafka_key", "data.*")
.filter(F.col("event_id").isNotNull())
.withWatermark("event_time", "10 minutes")
)
# Session aggregation: clicks per session
session_aggregation = (
parsed
.groupBy(
F.window("event_time", "30 minutes", "5 minutes"),
"session_id",
"user_id",
)
.agg(
F.count("*").alias("click_count"),
F.collect_list("page_url").alias("pages_visited"),
F.countDistinct("element").alias("unique_elements_clicked"),
F.max("event_time").alias("last_click_time"),
)
)
# Fraud detection: rapid clicking pattern
# Uses 1-minute tumbling window to detect users clicking > 60 times/minute
# This threshold can be tuned based on normal user behavior analysis
fraud_signals = (
parsed
.groupBy(
F.window("event_time", "1 minute"), # 1-minute detection window
"user_id", # Per-user analysis
)
.agg(
F.count("*").alias("clicks_per_minute"),
F.countDistinct("page_url").alias("pages_per_minute"),
)
.filter(F.col("clicks_per_minute") > 60) # Threshold: > 60 clicks/min
)
# Write session aggregation to Kafka
session_query = (
session_aggregation
.select(
F.to_json(F.struct("*")).alias("value"),
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", self.config["kafka_brokers"])
.option("topic", "session-aggregations")
.option("checkpointLocation", f"{self.config['checkpoint_base']}/sessions")
.trigger(processingTime="30 seconds")
.outputMode("update")
.start()
)
# Write fraud signals to alert topic
fraud_query = (
fraud_signals
.select(
F.to_json(F.struct("*")).alias("value"),
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", self.config["kafka_brokers"])
.option("topic", "fraud-alerts")
.option("checkpointLocation", f"{self.config['checkpoint_base']}/fraud")
.trigger(processingTime="10 seconds")
.outputMode("update")
.start()
)
return session_query, fraud_query
def build_purchase_pipeline(self):
"""Build the purchase processing pipeline."""
raw_stream = (
self.spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", self.config["kafka_brokers"])
.option("subscribe", "purchase-events")
.option("startingOffsets", "latest")
.load()
)
parsed = (
raw_stream
.select(
F.from_json(F.col("value").cast("string"), PURCHASE_SCHEMA).alias("data"),
)
.select("data.*")
.filter(F.col("event_id").isNotNull())
.withWatermark("event_time", "10 minutes")
)
# Revenue aggregation per product per hour
revenue_agg = (
parsed
.groupBy(
F.window("event_time", "1 hour"),
"product_id",
)
.agg(
F.sum("amount").alias("total_revenue"),
F.sum("quantity").alias("total_quantity"),
F.count("*").alias("order_count"),
F.avg("amount").alias("avg_order_value"),
)
)
# Write to analytics topic
query = (
revenue_agg
.select(F.to_json(F.struct("*")).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", self.config["kafka_brokers"])
.option("topic", "revenue-aggregations")
.option("checkpointLocation", f"{self.config['checkpoint_base']}/revenue")
.trigger(processingTime="1 minute")
.outputMode("update")
.start()
)
return query
def run_all(self):
"""Start all streaming pipelines."""
click_queries = self.build_clickstream_pipeline()
purchase_query = self.build_purchase_pipeline()
logger.info("All streaming pipelines started")
# Monitor all queries
queries = list(click_queries) + [purchase_query]
import time
while all(q.isActive for q in queries):
for q in queries:
if q.lastProgress:
logger.info(
f"Query {q.id}: "
f"input_rows={q.lastProgress.get('numInputRows', 0)}, "
f"processed={q.lastProgress.get('processedRowsPerSecond', 0):.0f}/s"
)
time.sleep(30)
for q in queries:
q.awaitTermination()
# ------------------------------------------------------
# ENTRY POINT
# ------------------------------------------------------
if __name__ == "__main__":
spark = (
SparkSession.builder
.appName("EcommerceStreamingPipeline")
.config("spark.sql.streaming.schemaInference", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.shuffle.partitions", "8")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()
)
config = {
"kafka_brokers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
"checkpoint_base": "s3://checkpoints/ecommerce-streaming",
}
pipeline = EcommerceStreamingPipeline(spark, config)
# Start Prometheus metrics server
start_http_server(8000)
pipeline.run_all()
Testing the Pipeline
import pytest
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime
import json
@pytest.fixture(scope="module")
def spark():
return (
SparkSession.builder
.master("local[*]")
.appName("StreamingPipelineTests")
.getOrCreate()
)
class TestSessionAggregation:
"""Test session aggregation logic."""
def test_single_session(self, spark):
"""Test aggregation for a single session."""
data = [
("EVT-1", "USR-1", "SES-1", "/home", "button", datetime(2024, 1, 15, 10, 0, 0)),
("EVT-2", "USR-1", "SES-1", "/products", "link", datetime(2024, 1, 15, 10, 5, 0)),
("EVT-3", "USR-1", "SES-1", "/checkout", "button", datetime(2024, 1, 15, 10, 10, 0)),
]
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("session_id", StringType()),
StructField("page_url", StringType()),
StructField("element", StringType()),
StructField("event_time", TimestampType()),
])
df = spark.createDataFrame(data, schema)
result = (
df.groupBy("session_id", "user_id")
.agg(
F.count("*").alias("click_count"),
F.collect_list("page_url").alias("pages_visited"),
)
)
assert result.count() == 1
assert result.first()["click_count"] == 3
assert len(result.first()["pages_visited"]) == 3
def test_empty_input(self, spark):
"""Test aggregation with empty input."""
schema = StructType([
StructField("event_id", StringType()),
StructField("session_id", StringType()),
StructField("user_id", StringType()),
])
df = spark.createDataFrame([], schema)
result = df.groupBy("session_id").agg(F.count("*").alias("count"))
assert result.count() == 0
class TestFraudDetection:
"""Test fraud detection thresholds."""
def test_high_frequency_detection(self, spark):
"""Test that high-frequency clicking is detected."""
# 70 clicks in 1 minute (above 60 threshold)
data = [
(f"EVT-{i}", "USR-1", datetime(2024, 1, 15, 10, i // 60, i % 60))
for i in range(70)
]
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_time", TimestampType()),
])
df = spark.createDataFrame(data, schema)
result = (
df.groupBy(
F.window("event_time", "1 minute"),
"user_id",
)
.agg(F.count("*").alias("clicks_per_minute"))
.filter(F.col("clicks_per_minute") > 60)
)
assert result.count() > 0, "High-frequency clicking should be detected"
def test_normal_frequency_not_flagged(self, spark):
"""Test that normal clicking is not flagged."""
data = [
(f"EVT-{i}", "USR-1", datetime(2024, 1, 15, 10, 0, i * 10))
for i in range(5) # 5 clicks in 1 minute
]
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_time", TimestampType()),
])
df = spark.createDataFrame(data, schema)
result = (
df.groupBy(
F.window("event_time", "1 minute"),
"user_id",
)
.agg(F.count("*").alias("clicks_per_minute"))
.filter(F.col("clicks_per_minute") > 60)
)
assert result.count() == 0, "Normal clicking should not be flagged"
class TestEndToEnd:
"""End-to-end integration tests."""
def test_schema_compliance(self, spark):
"""Verify event schemas are valid."""
valid_click = {
"event_id": "EVT-001",
"user_id": "USR-001",
"session_id": "SES-001",
"page_url": "/home",
"element": "button",
"event_time": "2024-01-15T10:00:00",
}
# Validate required fields
required_fields = ["event_id", "user_id", "session_id", "event_time"]
for field in required_fields:
assert field in valid_click, f"Missing required field: {field}"
def test_idempotent_processing(self):
"""Verify that reprocessing produces same results."""
# This test validates the idempotency guarantee
# In production, verify UPSERT/MERGE operations produce same result
results_1 = {"processed": 100, "failed": 0}
results_2 = {"processed": 100, "failed": 0}
assert results_1 == results_2, "Idempotency violated"
Load Testing: Before production deployment, load test the pipeline with 2-3x expected peak throughput. Use tools like kafka-producer-perf-test or custom Python producers to simulate high event volumes. Verify: (1) consumer lag stays below retention period, (2) processing latency stays within SLA, (3) no data loss or duplication, (4) resource utilization stays within limits.
Disaster Recovery: Test pipeline recovery by: (1) killing Kafka brokers and verifying automatic leader election, (2) killing Spark executors and verifying task rescheduling, (3) corrupting state stores and verifying recovery from changelog topics, (4) simulating network partitions and verifying graceful degradation.
Project Deliverables
| Deliverable | Description | Acceptance Criteria |
|---|---|---|
| Kafka Topics | Configured topics with appropriate partitions and retention | Topics created, verified with kafka-topics --describe |
| Event Producers | Idempotent producers with Avro serialization | Producers send events, DLQ handles failures |
| Stream Processing | Spark Structured Streaming with watermarks and EOS | Exactly-once processing verified |
| Session Aggregation | Session window aggregation with configurable gap | Aggregation results match expected values |
| Fraud Detection | Real-time anomaly detection for high-frequency events | Detection triggers within 10 seconds |
| Serving Layer | Redis for low-latency, S3 for analytics | Data available in both sinks |
| Monitoring | Prometheus metrics, Grafana dashboards | Dashboards show real-time pipeline health |
| Alerting | SLO-based alerts for freshness, error rate, consumer lag | Alerts fire within 5 minutes of SLO breach |
| Tests | Unit tests for transformations, integration tests | > 90% coverage, all tests pass |
| Documentation | Architecture docs, runbooks, API contracts | Docs reviewed and approved |
- A production streaming pipeline requires: ingestion, processing, serving, monitoring, and error handling.
- Kafka provides durable, replayable event storage. Configure
acks=all,min.insync.replicas=2, and idempotent producers. - Spark Structured Streaming provides exactly-once processing with watermarks for late data handling.
- Session windows group events by activity gaps; tumbling windows use fixed intervals.
- Dead letter queues capture failed events for debugging and reprocessing.
- Monitoring with Prometheus and Grafana provides real-time visibility into pipeline health.
- Load testing before production deployment confirms throughput targets are met.
- Disaster recovery testing verifies pipeline resilience under failure conditions.
Best Practices
- Design for failure from day one. Every component should handle retries, DLQs, and graceful degradation.
- Use exactly-once semantics (
processing.guarantee=exactly_once_v2) for financial and transactional data. - Set watermarks to handle late-arriving data within bounded timeframes. Tune based on observed event lateness.
- Implement DLQs for every pipeline stage. Store original message, error details, and timestamps for debugging.
- Monitor consumer lag as a primary SLA metric. Alert when lag exceeds the message retention period.
- Load test before production with 2-3x expected peak throughput. Verify latency, throughput, and resource usage.
- Test disaster recovery by killing brokers, executors, and simulating network partitions.
- Document everything: architecture decisions, runbooks, API contracts, and deployment procedures.
- Implement idempotent processing so that reprocessing produces identical results.
- Iterate and improve: monitor production metrics, identify bottlenecks, and optimize continuously.
Project Component Summary
| Component | Technology | Purpose | Key Configuration |
|---|---|---|---|
| Event Source | Kafka Producers | Ingest events | acks=all, enable.idempotence=true |
| Event Storage | Kafka Topics | Durable event log | min.insync.replicas=2, 7-day retention |
| Stream Processing | Spark Structured Streaming | Real-time processing | processing.guarantee=exactly_once_v2 |
| Session Aggregation | Spark windowed aggregation | User session metrics | 30-min session window |
| Fraud Detection | Spark sliding window | Anomaly detection | 1-min window, 60 events/min threshold |
| Feature Serving | Redis | Low-latency features | TTL-based expiration |
| Analytics Storage | S3/Parquet | Historical analysis | Partitioned by date |
| Monitoring | Prometheus + Grafana | Observability | SLO-based alerting |
| Error Handling | DLQ Topics | Failed event management | 3 retries before DLQ |
Learning Outcomes Checklist
| Skill | Lesson Reference | Proficiency |
|---|---|---|
| Kafka topic design and partitioning | 019 - Apache Kafka | β |
| Kafka Streams DSL | 020 - Kafka Streams | β |
| Spark Structured Streaming | 022 - Spark Streaming | β |
| Watermarks for late data | 022 - Spark Streaming | β |
| Exactly-once semantics | 019, 020, 022 | β |
| Dead letter queues | 028 - Error Handling | β |
| Monitoring with Prometheus | 027 - Monitoring | β |
| Data quality validation | 025 - Data Quality | β |
| Pipeline testing | 026 - Testing | β |
| Architecture documentation | This project | β |
See Also
- 019 - Apache Kafka: Topics, Producers, and Consumers - Kafka fundamentals used in this project
- 020 - Kafka Streams: DSL, Windowed Aggregation, and Exactly-Once - Stream processing patterns
- 022 - Spark Structured Streaming - Streaming engine details
- 028 - Error Handling, Retries, and Dead Letter Queues - DLQ implementation patterns
- 027 - Pipeline Monitoring and Observability - Production monitoring setup
- 025 - Data Quality: Validation Frameworks - Quality validation in pipelines
- 026 - Data Pipeline Testing - Testing streaming pipelines