Structured Streaming
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Structured Streaming Fundamentals
Structured Streaming treats a live data stream as an unbounded table that is continuously appended. This model simplifies streaming applications with DataFrame-like operations.
Basic Streaming Application
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("StructuredStreaming") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("spark.sql.streaming.checkpointLocation", "hdfs://checkpoints/streaming-app") \
.getOrCreate()
# Read from Kafka
raw_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 1000000) \
.load()
# Parse JSON messages
events = raw_stream \
.select(
F.col("key").cast("string").alias("event_key"),
F.from_json(F.col("value").cast("string"), schema).alias("data"),
F.col("timestamp").alias("event_time")
) \
.select("event_key", "data.*", "event_time")
# Perform transformations
processed = events \
.filter(F.col("event_type") == "click") \
.withColumn("processing_time", F.current_timestamp()) \
.withColumn("window", F.window("event_time", "1 hour"))
# Write to sink
query = processed \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "hdfs://checkpoints/click-aggregation") \
.option("path", "hdfs://output/click-aggregations") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
βΉοΈ
Interview Insight: Structured Streaming provides exactly-once semantics through checkpointing and write-ahead logs. Always configure checkpointing for production workloads.
Output Modes
# Three output modes for streaming queries
# 1. Append Mode: Only new rows added since last trigger
# Best for: Stateless operations, filtering, projections
query_append = events \
.filter(F.col("amount") > 100) \
.writeStream \
.format("console") \
.outputMode("append") \
.start()
# 2. Complete Mode: Entire result table output each trigger
# Best for: Stateless aggregations
query_complete = events \
.groupBy("event_type") \
.count() \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()
# 3. Update Mode: Only rows that were updated since last trigger
# Best for: Stateful aggregations
query_update = events \
.groupBy("user_id") \
.agg(F.count("*").alias("event_count")) \
.writeStream \
.format("console") \
.outputMode("update") \
.start()
Windowed Aggregations
# Tumbling windows: Fixed-size, non-overlapping
tumbling = events \
.withColumn("window", F.window("event_time", "10 minutes")) \
.groupBy("window", "event_type") \
.agg(F.count("*").alias("count")) \
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"event_type",
"count"
)
# Sliding windows: Fixed-size, overlapping
sliding = events \
.withColumn("window", F.window("event_time", "1 hour", "15 minutes")) \
.groupBy("window", "event_type") \
.agg(F.count("*").alias("count"))
# Session windows: Activity-based, dynamic size
# Spark 3.2+ supports session windows
session = events \
.withColumn("window", F.session_window("event_time", "30 minutes")) \
.groupBy("window", "user_id") \
.agg(F.count("*").alias("event_count"))
β οΈ
Warning: Window operations require watermarking for state management. Without watermarks, state grows unboundedly and causes OOM.
Watermarking for Late Data
# Watermark tells Spark to discard state older than watermark
watermarked = events \
.withWatermark("event_time", "10 minutes") \
.withColumn("window", F.window("event_time", "1 hour")) \
.groupBy("window", "event_type") \
.agg(F.count("*").alias("count"))
# Watermark handling:
# - Data arriving after watermark is dropped
# - State older than watermark is cleaned up
# - Late data within watermark is included in results
# Configure watermark
watermarked = events \
.withWatermark("event_time", "5 minutes") \
.groupBy(
F.window("event_time", "10 minutes"),
"event_type"
) \
.agg(F.count("*").alias("count"))
# Write with watermark
query = watermarked \
.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "hdfs://checkpoints/watermarked") \
.start()
State Management
# Use mapGroupsWithFlatMap for custom state management
from pyspark.sql.streaming.group import GroupState, GroupStateTimeout
def update_user_state(user_id, events, state: GroupState):
"""Custom state management for user sessions"""
if state.hasTimedOut:
# State timed out, emit final result
if state.exists:
current_state = state.get
yield (user_id, current_state["count"], "timeout")
state.remove()
elif state.exists:
# Update existing state
current_state = state.get
for event in events:
current_state["count"] += 1
current_state["last_event"] = event["event_time"]
state.update(current_state)
state.setTimeoutTimestamp(
state.getCurrentProcessingTime() + 3600000 # 1 hour timeout
)
else:
# Initialize new state
new_state = {"count": len(events), "last_event": events[0]["event_time"]}
state.update(new_state)
state.setTimeoutTimestamp(
state.getCurrentProcessingTime() + 3600000
)
# Apply stateful operation
user_sessions = events \
.groupByKey(lambda row: row["user_id"]) \
.mapGroupsWithFlatMap(update_user_state, GroupStateTimeout.ProcessingTimeTimeout)
Trigger Strategies
# Different trigger options
# Default: Process all available data before next trigger
trigger_default = events.writeStream.trigger()
# Fixed interval: Process every N seconds
trigger_interval = events.writeStream.trigger(processingTime="30 seconds")
# Once: Process all data and stop
trigger_once = events.writeStream.trigger(once=True)
# Available now: Process all available data and stop (Spark 3.3+)
trigger_available = events.writeStream.trigger(availableNow=True)
# Continuous: Low-latency processing (experimental)
trigger_continuous = events.writeStream.trigger(continuous="1 second")
# Choose based on latency requirements:
# - Batch-like: trigger(processingTime="1 minute")
# - Near real-time: trigger(processingTime="10 seconds")
# - Low-latency: trigger(continuous="1 second")
βΉοΈ
Pro Tip: Use trigger(availableNow=True) for backfill scenarios. It processes all available data and stops, unlike trigger(once=True) which may miss data.
Fault Tolerance and Checkpointing
# Checkpointing is essential for fault tolerance
query = events \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "hdfs://checkpoints/my-streaming-app") \
.start()
# Checkpoint stores:
# 1. offsets (where we are in the stream)
# 2. state (for stateful operations)
# 3. committed offsets (for exactly-once)
# Recovery after failure:
# Spark automatically recovers from checkpoints
# Just restart the query with the same checkpoint location
# Monitor checkpoint size
def monitor_checkpoint(checkpoint_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
spark._jsc.hadoopConfiguration()
)
status = fs.listStatus(
spark._jvm.org.apache.hadoop.fs.Path(checkpoint_path)
)
for s in status:
print(f"{s.getPath().getName()}: {s.getLen() / 1024 / 1024:.2f} MB")
Performance Optimization
# Optimize streaming performance
spark = SparkSession.builder \
.appName("StreamingOptimization") \
.config("spark.sql.streaming.numPartitions", "200") \
.config("spark.sql.streaming.concurrentOps.pollingDelay", "10") \
.config("spark.sql.streaming.noDataMicroBatches.enabled", "true") \
.getOrCreate()
# Optimize Kafka integration
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("subscribe", "events") \
.option("maxOffsetsPerTrigger", 1000000) \
.option("minPartitions", "100") \
.option("failOnDataLoss", "false") \
.load()
# Process in batches for better performance
batched = kafka_stream \
.repartition(200) \
.withColumn("processing_time", F.current_timestamp()) \
.writeStream \
.format("delta") \
.outputMode("append") \
.trigger(processingTime="60 seconds") \
.start()
βΉοΈ
Key Takeaway: Structured Streaming provides a unified API for batch and streaming. Use watermarking for state management, checkpointing for fault tolerance, and appropriate triggers for latency requirements.
Follow-Up Questions
- How does Structured Streaming achieve exactly-once semantics?
- Explain the difference between processing time and event time.
- How would you handle schema evolution in a streaming application?
- Describe strategies for backfilling data in a streaming pipeline.
- How does continuous processing differ from micro-batch processing?