πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Streaming Analytics: Real-Time Data Processing Patterns

Data EngineeringStream Processing⭐ Premium

Advertisement

Streaming Analytics: Real-Time Data Processing Patterns

Difficulty: Senior Level | Companies: Netflix, Uber, Lyft, DoorDash, Stripe

1. Streaming Architecture Patterns

Architecture Diagram
Streaming Patterns:
β”œβ”€β”€ Event Time vs Processing Time
β”œβ”€β”€ Windowing (Tumbling, Sliding, Session)
β”œβ”€β”€ State Management
β”œβ”€β”€ Watermarks (Handling Late Data)
└── Exactly-Once Semantics

Event Time vs Processing Time

Architecture Diagram
Event Time: When the event actually happened
Processing Time: When the event is processed by the system

Example:
- User clicks button at 10:00:00 (event time)
- Event arrives at system at 10:00:05 (processing time)
- System processes at 10:00:10 (processing time)

For analytics, event time is usually what you want.

2. Windowing Strategies

Spark Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("StreamingAnalytics") \
    .getOrCreate()

# Read from Kafka
stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

events = stream.select(
    F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")

# Tumbling window (fixed, non-overlapping)
tumbling = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "5 minutes"),
        "event_type"
    ).count()

# Sliding window (overlapping)
sliding = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "10 minutes", "5 minutes"),
        "event_type"
    ).count()

# Session window (activity-based)
session = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.session_window("event_timestamp", "30 minutes"),
        "user_id"
    ).agg(
        F.count("*").alias("events_in_session"),
        F.sum("amount").alias("session_revenue")
    )

Flink Windowing

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)

# Tumbling window in Flink SQL
tenv.execute_sql("""
    SELECT
        event_type,
        TUMBLE_START(event_timestamp, INTERVAL '5' MINUTE) AS window_start,
        COUNT(*) AS event_count,
        SUM(amount) AS total_amount
    FROM user_events
    GROUP BY event_type, TUMBLE(event_timestamp, INTERVAL '5' MINUTE)
""")

3. Watermarks & Late Data

# Spark watermark handling
streaming_df = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "5 minutes"),
        "user_id"
    ).agg(F.count("*").alias("count"))

# Late data handling options:
# 1. Drop late data (default)
# 2. Side output (collect late data separately)
# 3. Update results (if late data arrives, update the window)

# With side output for late data
late_data = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "5 minutes"),
        "user_id"
    ).agg(F.count("*").alias("count"))

# Late data goes to side output
query = streaming_df.writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/") \
    .start()

ℹ️

Key Insight: Watermarks tell the system how long to wait for late data. Setting it too short loses data; too long consumes more state.

4. State Management

Managed State (Spark)

# Spark manages state automatically with checkpoints
query = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.window("event_timestamp", "10 minutes"),
        "user_id"
    ).agg(
        F.count("*").alias("event_count"),
        F.collect_list("event_type").alias("event_types")
    ) \
    .writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/user-sessions") \
    .start()

# State cleanup is automatic with watermarks

External State (Redis)

import redis
import json
from datetime import datetime

class ExternalStateManager:
    def __init__(self, redis_host: str):
        self.client = redis.Redis(host=redis_host, port=6379)
    
    def update_window_state(self, window_key: str, event: dict):
        """Update window state in Redis"""
        pipe = self.client.pipeline()
        
        # Increment counter
        pipe.hincrby(f"window:{window_key}", "count", 1)
        
        # Append event
        pipe.rpush(f"window:{window_key}:events", json.dumps(event))
        
        # Set TTL (auto-cleanup)
        pipe.expire(f"window:{window_key}", 3600)
        
        pipe.execute()
    
    def get_window_result(self, window_key: str) -> dict:
        """Get aggregated window result"""
        data = self.client.hgetall(f"window:{window_key}")
        return {
            "count": int(data.get(b"count", 0)),
            "window_key": window_key
        }

5. Exactly-Once Semantics

class ExactlyOnceProcessor:
    """Ensure each event is processed exactly once"""
    
    def __init__(self, checkpoint_path: str):
        self.checkpoint_path = checkpoint_path
        self.processed_events = set()
    
    def process_event(self, event: dict):
        event_id = event["event_id"]
        
        # Check if already processed
        if self._is_processed(event_id):
            return None  # Skip duplicate
        
        # Process event
        result = self._process(event)
        
        # Mark as processed (atomic with checkpoint)
        self._mark_processed(event_id)
        
        return result
    
    def _is_processed(self, event_id: str) -> bool:
        # Check checkpoint store
        return event_id in self.processed_events
    
    def _mark_processed(self, event_id: str):
        self.processed_events.add(event_id)
    
    def _process(self, event: dict) -> dict:
        # Business logic
        return {"processed": True, "event_id": event["event_id"]}

6. Real-Time Dashboard Pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("RealTimeDashboard") \
    .getOrCreate()

# Read events
events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "page_views,clicks,purchases") \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Real-time metrics
metrics = events \
    .withWatermark("event_timestamp", "5 minutes") \
    .groupBy(
        F.window("event_timestamp", "1 minute"),
        "page"
    ).agg(
        F.count("*").alias("view_count"),
        F.countDistinct("user_id").alias("unique_users"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        F.avg("load_time_ms").alias("avg_load_time")
    )

# Write to real-time dashboard backend
query = metrics.writeStream \
    .outputMode("update") \
    .format("redis") \
    .option("checkpointLocation", "s3://checkpoints/dashboard") \
    .start()

ℹ️

Best Practice: For real-time dashboards, use "update" output mode to minimize recomputation. Only send changed rows to the dashboard backend.

Follow-Up Questions

  1. Design a real-time fraud detection system processing 100K events/second.
  2. How would you handle exactly-once processing across multiple Kafka topics?
  3. Design a streaming pipeline that can handle backpressure.
  4. How would you implement a real-time A/B testing analysis system?
  5. Design a streaming data quality monitoring system.

Advertisement