PySpark Advanced Interview Series
Module 10: Structured Streaming β Real-Time Data Processing
Interview Question
"At Amazon, we process millions of events per second using Structured Streaming. Walk us through the streaming execution model, the difference between output modes, and how you would implement exactly-once processing with checkpointing." β Amazon Senior Data Engineer Interview
"At Uber, we need real-time aggregation of ride events. Explain how stateful streaming works, how you would handle late-arriving data with watermarks, and what happens when a streaming query fails and restarts." β Uber Data Engineer Interview
Structured Streaming Architecture
Structured Streaming treats real-time data as an unbounded table that is continuously appended. You write a query on this "table" as if it were a static DataFrame, and Spark continuously updates the result.
Input Source β Query β Sink
(Kafka, (DataFrame (Kafka, Console,
Files) Operations) Files, etc.)
Basic Streaming Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamingInterview") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read from a file source (for demo)
streaming_df = spark.readStream \
.format("csv") \
.option("header", "true") \
.schema(schema) \
.load("s3a://bucket/input/")
# Apply transformations
processed_df = streaming_df \
.filter(col("event_type").isin(["click", "view", "purchase"])) \
.withColumn("event_time", col("event_time").cast("timestamp")) \
.withColumn("window", window(col("event_time"), "5 minutes")) \
.groupBy("window", "event_type") \
.count()
# Write to output
query = processed_df.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "s3a://bucket/checkpoints/") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
Output Modes
Append Mode
Only new rows are written to the sink. Best for stateless operations.
# Append mode: only new rows
query = streaming_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", "s3a://bucket/checkpoints/append/") \
.start()
# Limitations:
# - Cannot use aggregations (except with watermark)
# - Cannot use outer joins
# - Only new rows are written
Complete Mode
The entire result table is written to the sink after each trigger.
# Complete mode: entire result table
query = streaming_df \
.groupBy("event_type") \
.count() \
.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "s3a://bucket/checkpoints/complete/") \
.start()
# Use case: Small result tables that fit in memory
# Aggregations are fully recomputed each trigger
Update Mode
Only rows that were updated since the last trigger are written.
# Update mode: only updated rows
query = streaming_df \
.groupBy("user_id") \
.agg(count("*").alias("event_count")) \
.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "s3a://bucket/checkpoints/update/") \
.start()
# Use case: Large result tables where most rows don't change each trigger
| Output Mode | Aggregations | Latency | Use Case |
|---|---|---|---|
| Append | No (unless watermark) | Low | Stateless transforms |
| Complete | Yes | Medium | Small result tables |
| Update | Yes | Low | Large result tables |
Trigger Types
Processing Time Trigger
# Process new data every 30 seconds
query = streaming_df.writeStream \
.trigger(processingTime="30 seconds") \
.start()
# Process as fast as possible (default)
query = streaming_df.writeStream \
.trigger(processingTime="0 seconds") \
.start()
Once Trigger
# Process all available data, then stop
query = streaming_df.writeStream \
.trigger(once=True) \
.start()
# Useful for backfilling or testing
Available Now Trigger
# Process all available data, then stop (Spark 2.2+)
query = streaming_df.writeStream \
.trigger(availableNow=True) \
.start()
# Better than once=True for large backfills
# Processes data in batches, avoids OOM
Continuous Trigger (Experimental)
# Sub-millisecond latency (experimental)
query = streaming_df.writeStream \
.trigger(continuous="1 second") \
.start()
# Limited operations support
# Not recommended for production
Stateful Streaming Operations
Windowed Aggregation
# Tumbling window: non-overlapping fixed-size windows
windowed_counts = streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"), # 5-minute tumbling window
"event_type"
) \
.count()
# Sliding window: overlapping windows
sliding_counts = streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "10 minutes", "5 minutes"), # 10-min window, slides every 5 min
"event_type"
) \
.count()
# Session window: dynamic windows based on activity gaps
session_counts = streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "10 minutes"), # Session gap
"user_id"
) \
.count()
Watermarks for Late Data
# Watermark defines how late data can arrive
# Data older than watermark is dropped
streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"),
"event_type"
) \
.count() \
.writeStream \
.outputMode("update") \
.start()
# Data arriving more than 10 minutes late is dropped
# Watermark is based on the maximum event time seen
Streaming Joins
# Stream-static join
static_df = spark.read.parquet("s3a://bucket/products/")
joined = streaming_df.join(
static_df,
streaming_df.product_id == static_df.product_id,
"left"
)
# Stream-stream join (requires watermarks)
clicks = spark.readStream.format("kafka").option("subscribe", "clicks").load()
purchases = spark.readStream.format("kafka").option("subscribe", "purchases").load()
joined = clicks \
.withWatermark("event_time", "10 minutes") \
.join(
purchases.withWatermark("event_time", "10 minutes"),
"user_id"
)
Real-World Scenario: Uber Real-Time Ride Analytics
Problem Statement
Build a real-time streaming pipeline that processes ride events from Kafka, computes real-time aggregations (rides per city, average fare, surge pricing), handles late data, and writes to a data lake.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("UberRealTimeAnalytics") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.streaming.kafka.maxRatePerPartition", "10000") \
.getOrCreate()
# Define schema for ride events
ride_schema = StructType([
StructField("ride_id", StringType(), False),
StructField("driver_id", StringType(), False),
StructField("rider_id", StringType(), False),
StructField("city", StringType(), False),
StructField("pickup_lat", DoubleType(), False),
StructField("pickup_lon", DoubleType(), False),
StructField("dropoff_lat", DoubleType(), True),
StructField("dropoff_lon", DoubleType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("status", StringType(), False),
StructField("event_time", TimestampType(), False)
])
# Read from Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \
.option("subscribe", "ride-events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 100000) \
.load()
# Parse JSON payload
parsed_stream = raw_stream \
.select(
col("key").cast("string").alias("key"),
from_json(col("value").cast("string"), ride_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
) \
.select("key", "data.*", "kafka_timestamp")
# Clean and enrich
clean_stream = parsed_stream \
.filter(col("status").isin(["completed", "cancelled", "in_progress"])) \
.withColumn("event_date", to_date(col("event_time"))) \
.withColumn("hour_of_day", hour(col("event_time")))
# === REAL-TIME AGGREGATIONS ===
# 1. Rides per city per 5-minute window
city_rides = clean_stream \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"),
"city"
) \
.agg(
count("*").alias("ride_count"),
sum(when(col("status") == "completed", 1).otherwise(0)).alias("completed_rides"),
avg("fare_amount").alias("avg_fare"),
approx_count_distinct("driver_id").alias("active_drivers")
) \
.withColumn("completion_rate",
col("completed_rides") / col("ride_count"))
# 2. Surge pricing detection (high demand per city)
surge_detection = clean_stream \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"),
"city"
) \
.agg(
count("*").alias("total_requests"),
sum(when(col("status") == "in_progress", 1).otherwise(0)).alias("active_rides"),
approx_count_distinct("driver_id").alias("available_drivers")
) \
.withColumn(
"surge_multiplier",
when(col("active_rides") / col("available_drivers") > 3, 2.0)
.when(col("active_rides") / col("available_drivers") > 2, 1.5)
.otherwise(1.0)
)
# 3. Driver performance tracking
driver_metrics = clean_stream \
.withWatermark("event_time", "30 minutes") \
.filter(col("status") == "completed") \
.groupBy(
window(col("event_time"), "15 minutes"),
"driver_id"
) \
.agg(
count("*").alias("completed_rides"),
avg("fare_amount").alias("avg_fare"),
countDistinct("rider_id").alias("unique_riders")
)
# === WRITE TO SINKS ===
# Write city rides to Kafka for real-time dashboard
city_rides_query = city_rides \
.select(
col("city").alias("key"),
to_json(struct("*")).alias("value")
) \
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092") \
.option("topic", "city-ride-metrics") \
.option("checkpointLocation", "s3a://uber-streaming/checkpoints/city-rides/") \
.trigger(processingTime="30 seconds") \
.start()
# Write surge alerts to console for monitoring
surge_query = surge_detection \
.filter(col("surge_multiplier") > 1.0) \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.option("checkpointLocation", "s3a://uber-streaming/checkpoints/surge/") \
.trigger(processingTime="30 seconds") \
.start()
# Write driver metrics to data lake (parquet)
driver_query = driver_metrics \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3a://uber-streaming/driver-metrics/") \
.option("checkpointLocation", "s3a://uber-streaming/checkpoints/driver-metrics/") \
.trigger(processingTime="5 minutes") \
.start()
# Wait for termination
spark.streams.awaitAnyTermination()
Checkpointing and Exactly-Once Semantics
# Checkpointing stores:
# 1. Offset ranges (what data has been processed)
# 2. State (for stateful operations)
# 3. Aggregated results (for incremental updates)
# Checkpoint location must be on reliable storage
query = streaming_df.writeStream \
.option("checkpointLocation", "s3a://bucket/checkpoints/query-1/") \
.start()
# On failure/restart:
# 1. Spark reads last committed offsets from checkpoint
# 2. Reprocesses only uncommitted data
# 3. Provides exactly-once processing guarantee
# WARNING: Changing query semantics requires new checkpoint
# If you change the query, delete old checkpoint first
β οΈAmazon Interview Warning
At Amazon, checkpoint corruption is a common production issue. If you change the query logic (add new aggregation, change window), you MUST delete the old checkpoint and restart from scratch. Otherwise, Spark will fail with analysis errors.
Streaming Performance Tuning
# 1. Rate limiting
spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "10000")
# 2. Micro-batch optimization
spark.conf.set("spark.sql.shuffle.partitions", "200")
# 3. State store optimization
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
# 4. Memory management for stateful operations
spark.conf.set("spark.sql.streaming.stateStore.rocksDB.format", "true")
# 5. Monitor through Spark UI
# Navigate to: http://driver-node:4040/streaming/
Edge Cases
1. Late Arriving Data
# Without watermark: all late data is processed (unbounded state)
# With watermark: late data beyond threshold is dropped
streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(window(col("event_time"), "5 minutes"), "city") \
.count()
# Data arriving >10 minutes late is dropped
# State is cleaned up for windows older than watermark
2. State Size Management
# Unbounded state can cause OOM
# Use watermark to limit state size
# Use state store cleanup
streaming_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "1 hour"),
"user_id"
) \
.agg(
collect_list("event_type").alias("events")
) \
.writeStream \
.outputMode("update") \
.start()
# Without watermark, user_id state grows unbounded
# With watermark, state is cleaned up for old windows
3. Query Restart After Failure
# Spark automatically restarts from last checkpoint
# But you should monitor for:
# - Skipped batches (processing too slow)
# - Failed batches (exceptions)
# - State store corruption
# Monitor through Spark UI streaming tab
# Or use metrics:
query.status # Current status
query.recentProgress # Recent batch progress
query.lastProgress # Last batch progress
Best Practices
π‘Production Streaming Checklist
- Always use checkpointing for production queries
- Set appropriate watermarks for late data
- Use rate limiting to prevent overwhelming sinks
- Monitor batch processing times (should be < trigger interval)
- Use idempotent sinks for exactly-once semantics
- Set maxOffsetsPerTrigger to prevent OOM
- Test with production-like data volumes before deployment
- Use separate checkpoint locations for each query
- Monitor state store size for stateful operations
Summary
Structured Streaming is Spark's unified batch and streaming engine. Understanding output modes, triggers, watermarks, and checkpointing is essential for building production streaming pipelines at Amazon and Uber. The key to success is managing state growth with watermarks and ensuring exactly-once semantics with proper checkpointing.