11. Structured Streaming in PySpark
DfStructured Streaming
Structured Streaming is a stream processing engine built on the Spark SQL engine that treats a live data stream as a continuously appended unbounded table. It provides exactly-once fault tolerance guarantees and supports event-time processing via watermarks.
DfTrigger
A trigger defines when micro-batch processing occurs: FixedInterval (default 500ms), Once (process all available data), AvailableNow (process all available, then stop), or Continuous (low-latency processing).
DfWatermark
A watermark is a threshold that tracks the progress of event-time in a stream. It determines when to stop waiting for late-arriving data and trigger window aggregation. Watermark = max event time seen - delay threshold.
Here,
- =Current watermark value
- =Maximum event time seen across all partitions
- =Maximum delay threshold (how late data can arrive)
Micro-Batch Processing Time
Here,
- =Total micro-batch processing time
- =Time to read input data from source
- =Time to execute query transformations
- =Time to write output to sink
End-to-End Latency
Here,
- =End-to-end latency from event to output
- =Single micro-batch processing time
- =Number of batches to wait
- =Network latency for source reads
- =Sink commit latency
Structured Streaming provides exactly-once semantics by coordinating source offsets, state updates, and sink writes within each micro-batch. For at-least-once sinks (e.g., Kafka), deduplication via event time and idempotent writes is required.
For low-latency use cases (< 100ms), use trigger(continuous="1 second") instead of micro-batch mode. However, continuous mode supports only a limited set of operations (no aggregations).
ThLate Data Handling
Theorem: With a watermark threshold of D_{threshold}, any event arriving more than D_{threshold} time units after the maximum event time seen will be dropped from window aggregations. Events arriving within D_{threshold} are included in the current window.
- Structured Streaming = unbounded table with exactly-once guarantees
- Triggers: FixedInterval (default), Once, AvailableNow, Continuous
- Watermark = max(event_time) - delay_threshold; controls late data handling
- Output modes: Append (new rows only), Complete (full result table), Update (changed rows)
- Checkpointing is required for fault tolerance and exactly-once processing
- Enable schema inference carefully; prefer explicit schemas for performance
Micro-Batch Processing Flow
ποΈ Streaming Architecture Diagram
π Detailed Explanation
What is Structured Streaming?
- A scalable, fault-tolerant stream processing engine built on the Spark SQL engine
- Treats streaming data as an unbounded table being continuously appended
- Uses the same DataFrame/Dataset API for both batch and streaming queries
- Dramatically simplifies development of real-time data pipelines
Core Abstractions
- Input Stream β continuously receives data from sources (Kafka, files, sockets)
- Result Table β updated as new data arrives; represents the query output
- Micro-batch or Continuous processing depending on trigger configuration
- Exactly-once guarantees via checkpointing and write-ahead logs
Trigger Mechanisms
| Trigger Type | Behavior | Best For |
|---|---|---|
| Default | Processes each batch as soon as previous completes | Maximum throughput |
| Fixed Interval | Predictable processing intervals | Consistent latency |
| Once | Process all available data, then stop | Batch-style processing |
| Continuous | Lowest latency (~1ms) | Ultra-low latency use cases |
Watermarking
- Defines a threshold beyond which events are considered too late
- Prevents unbounded state growth for event-time windowing
- Allows the system to clean up state and provide memory guarantees
- Formula:
Watermark = max(event_time) - delay_threshold
Checkpoint Mechanism
- Persists query progress including offsets, accumulated state, and committed sink offsets
- Enables fault tolerance β query resumes from last checkpoint on failure
- Ensures exactly-once semantics across failures
Output Modes
| Mode | Behavior | Use Case |
|---|---|---|
| Append | Outputs new rows since last trigger | Simple aggregations |
| Complete | Outputs entire result table each time | Full result tables |
| Update | Outputs only updated rows since last trigger | Changed rows only |
Key Takeaway: The choice of output mode depends on the operation type and the sink's capabilities.
Performance Tuning Considerations
- Partitioning of the source data
- Parallelism of the query
- State store configuration for stateful operations
- Memory management for large state
- Monitor processing times, input rates, and state sizes
Advanced Features
- Multi-lingual queries β mix SQL and DataFrame operations
- Streaming joins β join streaming data with batch data or other streams
- Expand use cases beyond simple transformations
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Micro-batch Processing | Processes data in small batches at intervals | High throughput, moderate latency |
| Continuous Processing | Processes records individually as they arrive | Ultra-low latency (~1ms) |
| Trigger | Determines when a new batch is processed | Control batch timing and frequency |
| Watermark | Threshold for handling late-arriving data | Event-time windowing operations |
| Output Mode | How results are written to the sink | Append, Complete, Update modes |
| Checkpoint | Persists query progress for fault tolerance | Exactly-once processing guarantees |
| State Store | Stores state for stateful operations | Window aggregations, joins |
| Event Time | Timestamp embedded in the data | Time-based windowing and ordering |
| Processing Time | Time when the record is processed | Monitoring and debugging |
| Late Data | Events arriving after their watermark threshold | Requires special handling |
π» Code Examples
Basic Structured Streaming Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark Session
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoint/path") \
.getOrCreate()
# Read from a socket stream
# Parameter: format("socket") β source type
# Parameter: option("host", "localhost") β socket host
# Parameter: option("port", 9999) β socket port
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Count words in each batch
word_counts = words.groupBy("word").count()
# Start the streaming query
# Parameter: outputMode("complete") β output full result table
# Parameter: format("console") β sink type
# Parameter: trigger(processingTime="10 seconds") β batch interval
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
Windowed Aggregation with Watermark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("WindowedAggregation") \
.getOrCreate()
# Read from Kafka
lines = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON data
events = lines.select(
col("value").cast("string").alias("json")
).select(
from_json(col("json"), "timestamp TIMESTAMP, value DOUBLE").alias("data")
).select("data.*")
# Windowed aggregation with watermark
# Parameter: withWatermark("timestamp", "10 minutes") β delay threshold
# Parameter: window("timestamp", "5 minutes", "1 minute") β window size, slide interval
windowed_counts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"value"
).count()
# Write to console
query = windowed_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Streaming Join with Batch Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamingJoin") \
.getOrCreate()
# Read streaming data from Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_events") \
.load() \
.selectExpr("CAST(value AS STRING) as event_json") \
.select(from_json(col("event_json"), "user_id INT, action STRING, timestamp TIMESTAMP").alias("data")) \
.select("data.*")
# Read batch reference data
batch_df = spark.read.parquet("/path/to/user_profiles")
# Stream-stream join
joined_df = stream_df.alias("s").join(
batch_df.alias("b"),
col("s.user_id") == col("b.user_id"),
"inner"
).select("s.user_id", "s.action", "b.profile_name", "s.timestamp")
# Write to Delta sink
# Parameter: checkpointLocation β required for fault tolerance
query = joined_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoint/join") \
.start("/output/path")
query.awaitTermination()
Advanced: Multi-Query Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MultiQueryStreaming") \
.getOrCreate()
# Single source
source_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Multiple transformations from same source
parsed_df = source_df.select(
col("value").cast("string").alias("json"),
col("timestamp").alias("event_time")
).select(
from_json(col("json"), "user_id INT, action STRING, amount DOUBLE").alias("data"),
"event_time"
).select("data.*", "event_time")
# Query 1: Real-time aggregation
aggregated = parsed_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window("event_time", "1 minute"),
"action"
).agg(
sum("amount").alias("total_amount"),
count("*").alias("event_count")
)
# Query 2: Detailed event log
detailed_log = parsed_df.select(
"user_id", "action", "amount", "event_time"
)
# Start multiple queries
query1 = aggregated.writeStream \
.outputMode("update") \
.format("console") \
.start()
query2 = detailed_log.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoint/detailed") \
.start("/detailed_log_path")
# Wait for all queries
spark.streams.awaitAnyTermination()
π Performance Metrics
| Metric | Micro-batch | Continuous | Notes |
|---|---|---|---|
| Latency | 100ms-10s | ~1ms | Depends on batch interval |
| Throughput | 100K-1M records/sec | 10K-100K records/sec | Micro-batch higher throughput |
| Fault Recovery | seconds-minutes | seconds | Checkpoint-based recovery |
| State Management | Optimized | Limited | Micro-batch has better state support |
| Exactly-once | Yes | Yes | Both guarantee exactly-once |
| Resource Usage | Higher | Lower | Micro-batch uses more resources |
π Best Practices
- Always set checkpoint location β Critical for fault tolerance and exactly-once processing
- Use appropriate output mode β Append for simple aggregations, Complete for full result tables
- Configure watermarks β Essential for event-time windowing and state management
- Monitor streaming metrics β Track processing times, input rates, and state sizes
- Tune micro-batch interval β Balance between latency and throughput requirements
- Use schema inference carefully β Can be expensive; prefer explicit schemas when possible
- Handle late data explicitly β Define watermark thresholds based on data patterns
- Optimize state store β Configure RocksDB or other state stores for large state
- Use multiple queries β Separate different processing logic into multiple queries
- Test with realistic data β Validate performance with production-like data volumes
π Related Topics
- 12-state-management.mdx: Advanced stateful operations and checkpointing
- 13-window-operations.mdx: Detailed windowing strategies and implementations
- 14-merge-upsert.mdx: Delta Lake merge operations for streaming data
- 15-data-quality.mdx: Data validation in streaming pipelines
See Also
- 06-joins-optimization.mdx: Join strategies for stream-stream joins
- 08-caching-persistence.mdx: Caching in streaming applications
- Kafka Streams (kafka/03): Kafka integration with Structured Streaming
- Data Engineering Streaming (data-engineering/022): End-to-end streaming pipeline architecture