Spark Structured Streaming: Real-Time Processing at Scale
Spark Structured Streaming is a scalable, fault-tolerant stream processing engine built on the Spark SQL engine.
Why Structured Streaming?
Key Abstraction:
- Live data stream as a continuously appended unbounded table
- Express stream processing as DataFrame/Dataset operations
- Same code for batch and streaming
Advantages over Spark Streaming (DStream):
- DataFrame abstraction β Catalyst optimizer enables SQL-level optimizations
- Incremental processing β engine handles state management
- Fault tolerance β built-in recovery mechanisms
- Unified model β same code for batch and streaming
Key Insight: You write the same code for both batch and streaming β the engine handles incremental processing, fault tolerance, and state management.
Structured Streaming Micro-Batch Architecture
Watermark Diagram
Architecture Diagram
Structured Streaming is a stream processing engine that models incoming data as an unbounded input table. Each new data event appends a new row to the input table. A streaming query continuously computes a result table over the input and outputs the result to a sink. The engine handles incremental processing, fault tolerance via write-ahead logs, and state management for stateful operations.
A watermark is a threshold that defines how late an event can arrive and still be included in aggregation results. Formally: for a stream with event timestamps t_1, t_2, ..., the watermark at time T is defined as W(T) = T - delay_threshold. Events with timestamp < W(T) are considered too late and are dropped. Watermarks enable the engine to clean up state for completed windows.
Output modes determine what data is written to the sink on each trigger: (1) Append mode β only new rows since the last trigger are written. Suitable for stateless queries or queries with watermarking. (2) Complete mode β the entire result table is written on every trigger. Suitable for aggregations where the full result is needed. (3) Update mode β only rows that were updated since the last trigger are written. Suitable for incremental aggregations.
State management in Structured Streaming tracks intermediate results for stateful operations (aggregations, joins, deduplication). State is stored in an internal state store (RocksDB by default) and is fault-tolerant via write-ahead logs. State is cleaned up automatically when watermarks advance past the window end time. The stateStoreFormats configuration controls serialization format for state.
Micro-Batch Latency
For a micro-batch trigger interval of T_interval and data arrival rate of R (rows/second), the minimum latency for a row is: Latency_min = T_interval (row waits for next trigger). The maximum latency depends on processing time: Latency_max = T_interval + T_processing. For continuous processing mode: Latency β T_processing (sub-second).
State Store Size
For a windowed aggregation with window size W, grace period G, and event rate R, the state store holds events in the range [current_time - W - G, current_time]. State_size = R * (W + G) * avg_event_size. To prevent unbounded state growth, set a maximum watermark delay: withWatermark("event_time", "10 minutes").
Spark Structured Streaming provides exactly-once semantics for end-to-end processing when: (1) the source supports replay (Kafka, file-based), (2) the sink supports transactions or idempotent writes, (3) checkpointing is enabled for fault tolerance. The engine tracks offsets in the write-ahead log and replays from the last committed offset on failure, ensuring each record is processed exactly once.
A watermark of W minutes guarantees that all events arriving within W minutes of the latest event will be included in aggregation results. Events arriving later than W minutes after the latest event may be dropped. The trade-off: larger W -> more complete results, more state; smaller W -> lower latency, potential data loss for late events. Formal: P(included) = P(arrival_delay <= W).
Key Concepts
| Concept | Description | API |
|---|---|---|
| Streaming DataFrame | Continuously updated unbounded table | spark.readStream.format("kafka") |
| Micro-Batch Trigger | Process data in periodic batches | .trigger(processingTime="30 seconds") |
| Continuous Trigger | Sub-second latency processing | .trigger(continuous=True, checkpointLocation="...") |
| Watermark | Threshold for late event tolerance | .withWatermark("event_time", "10 minutes") |
| Output Mode | What data to write to sink | .outputMode("append"), .outputMode("complete"), .outputMode("update") |
| Checkpoint | Fault tolerance via offset tracking | .option("checkpointLocation", "hdfs:///checkpoint") |
| State Store | Persistent state for stateful operations | RocksDB (default), HDFS-backed |
| Foreach | Custom sink logic per row | .foreach(batcher -> ...) |
| ForeachBatch | Custom sink logic per micro-batch | .foreachBatch(lambda df, epoch: ...) |
| Window | Time-based grouping of events | window("event_time", "5 minutes") |
| Session Window | Activity-gap-based windowing | window("event_time", gap="30 minutes") |
| Drop Duplicates | Remove duplicate events | .dropDuplicates(["event_id"]) |
| Stream-Static Join | Join stream with batch DataFrame | stream_df.join(static_df, "key") |
| Stream-Stream Join | Join two streaming DataFrames | stream1.join(stream2, "key") |
| Caching | Cache streaming DataFrame for reuse | .cache() |
| Explain | View query execution plan | .explain() |
- Read from Kafka: Configure
spark.readStream.format("kafka")withsubscribe,startingOffsets, and security options. - Parse and validate: Apply schema to raw bytes, validate required fields, handle malformed records with
drop()or route to dead letter. - Add event timestamp: Extract event time from payload for watermarking and windowing.
- Define watermark: Use
.withWatermark("event_time", "10 minutes")to handle late-arriving data. - Apply transformations: Use standard DataFrame operations:
select(),filter(),groupBy(),window(). - Choose output mode: Append for filtered streams, Complete for full aggregations, Update for incremental aggregations.
- Write to sink: Use
.writeStream.format()with appropriate options for the target system. - Configure checkpoint: Set
checkpointLocationfor fault tolerance. Use HDFS or S3 for durability. - Set trigger interval: Use
.trigger(processingTime="30 seconds")for micro-batch orcontinuousfor low-latency. - Monitor the streaming query: Use
query.lastProgress,query.status, and Spark UI for metrics.
Production Code
Kafka-to-Sink Streaming Pipeline with Watermarking
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import logging
logger = logging.getLogger(__name__)
def create_streaming_session() -> SparkSession:
"""Create SparkSession optimized for Structured Streaming."""
return (
SparkSession.builder
.appName("KafkaStreamingPipeline")
.config("spark.sql.streaming.schemaInference", "true")
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
.config("spark.sql.shuffle.partitions", "8")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.streaming.metricsEnabled", "true")
.getOrCreate()
)
def build_streaming_pipeline(spark: SparkSession) -> None:
"""Build a production streaming pipeline with watermarking and aggregation."""
# Define event schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("event_time", TimestampType(), False),
])
# Read from Kafka
# Parameters explained:
# format("kafka") - Use Kafka as the streaming source
# kafka.bootstrap.servers - Comma-separated list of Kafka broker addresses
# subscribe - Kafka topic(s) to consume from (comma-separated for multiple)
# startingOffsets - "latest" (new data only) or "earliest" (replay all)
# kafka.security.protocol - Security protocol for authentication
# maxOffsetsPerTrigger - Cap on records per micro-batch (prevents OOM)
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SASL_SSL")
.option("maxOffsetsPerTrigger", 100000)
.load()
)
# Parse JSON payload
parsed_stream = (
raw_stream
.select(
F.col("key").cast("string").alias("kafka_key"),
F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
F.col("timestamp").alias("kafka_timestamp"),
)
.select("kafka_key", "data.*")
.filter(F.col("event_id").isNotNull())
.filter(F.col("amount").isNotNull())
)
# Add watermark for late data tolerance (10 minutes)
# Watermark defines how late an event can arrive and still be included
# Events arriving > 10 minutes late will be dropped
watermarked_stream = parsed_stream.withWatermark("event_time", "10 minutes")
# Windowed aggregation: 5-minute tumbling windows
# Parameters:
# F.window("event_time", "5 minutes") - Tumbling window of 5 minutes
# "event_type" - Group by event type within each window
# .agg() - Aggregation functions applied per window per group
windowed_aggregation = (
watermarked_stream
.groupBy(
F.window("event_time", "5 minutes"),
"event_type",
)
.agg(
F.count("*").alias("event_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
F.countDistinct("user_id").alias("unique_users"),
)
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"event_type",
"event_count",
"total_amount",
"avg_amount",
"unique_users",
)
)
# Write to Kafka sink with checkpointing
query = (
windowed_aggregation
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-1:9092")
.option("topic", "event-aggregations")
.option("checkpointLocation", "s3://checkpoints/event-aggregation")
.option("failOnDataLoss", "false")
.trigger(processingTime="30 seconds")
.outputMode("update")
.start()
)
logger.info(f"Streaming query started: {query.id}")
# Monitor the streaming query
import time
while query.isActive:
progress = query.lastProgress
if progress:
logger.info(
f"Batch {progress['batchId']}: "
f"input_rows={progress['numInputRows']}, "
f"processed_rows={progress['processedRowsPerSecond']:.0f}/s, "
f"state_rows={progress.get('stateOperator', {}).get('numRowsTotal', 0)}"
)
time.sleep(30)
query.awaitTermination()
Stream-Stream Join with Session Windows
def build_stream_stream_join(spark: SparkSession) -> None:
"""Join two streaming DataFrames with session windows."""
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
# Click stream schema
click_schema = StructType([
StructField("click_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("page_url", StringType(), False),
StructField("click_time", TimestampType(), False),
])
# Purchase stream schema
purchase_schema = StructType([
StructField("purchase_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("amount", StringType(), False),
StructField("purchase_time", TimestampType(), False),
])
# Read click stream
click_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-1:9092")
.option("subscribe", "click-events")
.load()
.select(F.from_json(F.col("value").cast("string"), click_schema).alias("data"))
.select("data.*")
.withWatermark("click_time", "30 minutes")
)
# Read purchase stream
purchase_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-1:9092")
.option("subscribe", "purchase-events")
.load()
.select(F.from_json(F.col("value").cast("string"), purchase_schema).alias("data"))
.select("data.*")
.withWatermark("purchase_time", "30 minutes")
)
# Stream-stream join: clicks within 30 minutes before purchase
joined_stream = (
click_stream
.join(
purchase_stream,
(click_stream.user_id == purchase_stream.user_id) &
(click_stream.click_time <= purchase_stream.purchase_time) &
(click_stream.click_time >= F.expr("purchase_time - INTERVAL 30 MINUTES")),
"leftOuter",
)
.select(
click_stream.click_id,
click_stream.user_id,
click_stream.page_url,
purchase_stream.purchase_id,
purchase_stream.product_id,
purchase_stream.amount,
click_stream.click_time,
purchase_stream.purchase_time,
)
)
# Write to console for testing
query = (
joined_stream
.writeStream
.format("console")
.option("truncate", "false")
.option("checkpointLocation", "s3://checkpoints/click-purchase-join")
.trigger(processingTime="10 seconds")
.outputMode("append")
.start()
)
query.awaitTermination()
Watermark Tuning: The watermark delay threshold determines the trade-off between completeness and state size. For a 5-minute window with a 10-minute watermark: (1) events arriving up to 10 minutes late are included, (2) state is retained for 10 minutes after the window closes, (3) state is cleaned up when the watermark advances past the window. If your data has events arriving more than 10 minutes late, increase the watermark but be aware of increased state store size.
Continuous Processing Mode: Spark 3.x supports continuous processing mode with sub-second end-to-end latency (typically 1-5ms). However, continuous mode only supports append output mode and a limited set of operations (no aggregations, no joins). Use continuous mode for low-latency filtering and projection; use micro-batch for stateful operations.
- Structured Streaming treats streams as unbounded tables, using the same DataFrame API as batch processing.
- Micro-batch triggers provide configurable latency (seconds to minutes); continuous mode provides sub-second latency.
- Watermarks define how late events can arrive and still be included in results. Larger watermarks = more complete results, more state.
- Output modes: Append (new rows only), Complete (full result), Update (changed rows only).
- Checkpointing enables exactly-once semantics by tracking offsets and state in durable storage.
- State management uses RocksDB for local state and write-ahead logs for fault tolerance.
- Monitor streaming queries via
query.lastProgress,query.status, and Spark UI streaming metrics.
Best Practices
- Always set
checkpointLocationfor production streaming queries. This enables fault tolerance and exactly-once semantics. - Configure watermarks for any stateful operation to prevent unbounded state growth. Set the delay threshold based on observed event lateness.
- Use
maxOffsetsPerTriggerto cap the amount of data processed per micro-batch, preventing OOM errors during data spikes. - Enable streaming metrics (
spark.sql.streaming.metricsEnabled=true) for monitoring input rates, processing rates, and state sizes. - Use
failOnDataLoss=falsefor production workloads where occasional data loss is acceptable. Investigate data loss alerts separately. - Prefer Update mode for aggregations where only changed rows matter. Complete mode writes the full result every trigger, which is expensive.
- Set
spark.sql.shuffle.partitionsto a small number (4-8) for streaming aggregations to reduce task overhead. - Monitor state store size via Spark UI state operator metrics. Alert on unexpected state growth.
- Use
ForeachBatchfor complex sink logic that cannot be expressed with built-in formats. - Test with
spark.sql.streaming.testing=trueto enable deterministic testing of streaming queries.
Streaming Trigger Comparison
| Trigger Type | Latency | Throughput | Use Case | State Support |
|---|---|---|---|---|
| Micro-Batch (default) | Seconds-minutes | High | General analytics | Full |
| Continuous | 1-5ms | Low | Low-latency filtering | Limited |
| Once | N/A | Highest | Batch replay | Full |
| Processing Time | Configurable | High | Scheduled processing | Full |
| Once with Delay | Fixed delay | High | Backpressure control | Full |
Output Mode Compatibility
| Output Mode | Aggregations | Joins | Dedup | Window | Non-Stateful |
|---|---|---|---|---|---|
| Append | No | No | No | No | Yes |
| Complete | Yes | No | No | Yes | Yes |
| Update | Yes | Yes | Yes | Yes | Yes |
See Also
- 021 - Apache Spark: RDDs, DataFrames, and the Catalyst Optimizer - Spark batch fundamentals
- 023 - Batch vs Streaming: Lambda, Kappa, and Architecture Trade-offs - Streaming architecture patterns
- 019 - Apache Kafka: Topics, Producers, and Consumers - Kafka as streaming source
- 020 - Kafka Streams: DSL, Windowed Aggregation, and Exactly-Once - Alternative streaming approach
- 030 - Capstone Project: Real-Time Streaming Pipeline - Production streaming pipeline project