Streaming Analytics: Real-Time Data Processing Patterns
Difficulty: Senior Level | Companies: Netflix, Uber, Lyft, DoorDash, Stripe
1. Streaming Architecture Patterns
Architecture Diagram
Streaming Patterns:
βββ Event Time vs Processing Time
βββ Windowing (Tumbling, Sliding, Session)
βββ State Management
βββ Watermarks (Handling Late Data)
βββ Exactly-Once Semantics
Event Time vs Processing Time
Architecture Diagram
Event Time: When the event actually happened
Processing Time: When the event is processed by the system
Example:
- User clicks button at 10:00:00 (event time)
- Event arrives at system at 10:00:05 (processing time)
- System processes at 10:00:10 (processing time)
For analytics, event time is usually what you want.
2. Windowing Strategies
Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("StreamingAnalytics") \
.getOrCreate()
# Read from Kafka
stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.load()
events = stream.select(
F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")
# Tumbling window (fixed, non-overlapping)
tumbling = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.window("event_timestamp", "5 minutes"),
"event_type"
).count()
# Sliding window (overlapping)
sliding = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.window("event_timestamp", "10 minutes", "5 minutes"),
"event_type"
).count()
# Session window (activity-based)
session = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.session_window("event_timestamp", "30 minutes"),
"user_id"
).agg(
F.count("*").alias("events_in_session"),
F.sum("amount").alias("session_revenue")
)
Flink Windowing
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)
# Tumbling window in Flink SQL
tenv.execute_sql("""
SELECT
event_type,
TUMBLE_START(event_timestamp, INTERVAL '5' MINUTE) AS window_start,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM user_events
GROUP BY event_type, TUMBLE(event_timestamp, INTERVAL '5' MINUTE)
""")
3. Watermarks & Late Data
# Spark watermark handling
streaming_df = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.window("event_timestamp", "5 minutes"),
"user_id"
).agg(F.count("*").alias("count"))
# Late data handling options:
# 1. Drop late data (default)
# 2. Side output (collect late data separately)
# 3. Update results (if late data arrives, update the window)
# With side output for late data
late_data = events \
.withWatermark("event_timestamp", "10 minutes") \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.window("event_timestamp", "5 minutes"),
"user_id"
).agg(F.count("*").alias("count"))
# Late data goes to side output
query = streaming_df.writeStream \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/") \
.start()
βΉοΈ
Key Insight: Watermarks tell the system how long to wait for late data. Setting it too short loses data; too long consumes more state.
4. State Management
Managed State (Spark)
# Spark manages state automatically with checkpoints
query = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.window("event_timestamp", "10 minutes"),
"user_id"
).agg(
F.count("*").alias("event_count"),
F.collect_list("event_type").alias("event_types")
) \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/user-sessions") \
.start()
# State cleanup is automatic with watermarks
External State (Redis)
import redis
import json
from datetime import datetime
class ExternalStateManager:
def __init__(self, redis_host: str):
self.client = redis.Redis(host=redis_host, port=6379)
def update_window_state(self, window_key: str, event: dict):
"""Update window state in Redis"""
pipe = self.client.pipeline()
# Increment counter
pipe.hincrby(f"window:{window_key}", "count", 1)
# Append event
pipe.rpush(f"window:{window_key}:events", json.dumps(event))
# Set TTL (auto-cleanup)
pipe.expire(f"window:{window_key}", 3600)
pipe.execute()
def get_window_result(self, window_key: str) -> dict:
"""Get aggregated window result"""
data = self.client.hgetall(f"window:{window_key}")
return {
"count": int(data.get(b"count", 0)),
"window_key": window_key
}
5. Exactly-Once Semantics
class ExactlyOnceProcessor:
"""Ensure each event is processed exactly once"""
def __init__(self, checkpoint_path: str):
self.checkpoint_path = checkpoint_path
self.processed_events = set()
def process_event(self, event: dict):
event_id = event["event_id"]
# Check if already processed
if self._is_processed(event_id):
return None # Skip duplicate
# Process event
result = self._process(event)
# Mark as processed (atomic with checkpoint)
self._mark_processed(event_id)
return result
def _is_processed(self, event_id: str) -> bool:
# Check checkpoint store
return event_id in self.processed_events
def _mark_processed(self, event_id: str):
self.processed_events.add(event_id)
def _process(self, event: dict) -> dict:
# Business logic
return {"processed": True, "event_id": event["event_id"]}
6. Real-Time Dashboard Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("RealTimeDashboard") \
.getOrCreate()
# Read events
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "page_views,clicks,purchases") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Real-time metrics
metrics = events \
.withWatermark("event_timestamp", "5 minutes") \
.groupBy(
F.window("event_timestamp", "1 minute"),
"page"
).agg(
F.count("*").alias("view_count"),
F.countDistinct("user_id").alias("unique_users"),
F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
F.avg("load_time_ms").alias("avg_load_time")
)
# Write to real-time dashboard backend
query = metrics.writeStream \
.outputMode("update") \
.format("redis") \
.option("checkpointLocation", "s3://checkpoints/dashboard") \
.start()
βΉοΈ
Best Practice: For real-time dashboards, use "update" output mode to minimize recomputation. Only send changed rows to the dashboard backend.
Follow-Up Questions
- Design a real-time fraud detection system processing 100K events/second.
- How would you handle exactly-once processing across multiple Kafka topics?
- Design a streaming pipeline that can handle backpressure.
- How would you implement a real-time A/B testing analysis system?
- Design a streaming data quality monitoring system.