πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Spark Structured Streaming: Triggers, Watermarks, and State Management

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Spark Structured Streaming: Real-Time Processing at Scale

Spark Structured Streaming is a scalable, fault-tolerant stream processing engine built on the Spark SQL engine.

Why Structured Streaming?


Key Abstraction:

  • Live data stream as a continuously appended unbounded table
  • Express stream processing as DataFrame/Dataset operations
  • Same code for batch and streaming

Advantages over Spark Streaming (DStream):

  1. DataFrame abstraction β€” Catalyst optimizer enables SQL-level optimizations
  2. Incremental processing β€” engine handles state management
  3. Fault tolerance β€” built-in recovery mechanisms
  4. Unified model β€” same code for batch and streaming

Key Insight: You write the same code for both batch and streaming β€” the engine handles incremental processing, fault tolerance, and state management.

Structured Streaming Micro-Batch Architecture

SourceKafka, FilesMicro-Batch 1Micro-Batch 2Micro-Batch 3Query PlanCatalyst OptimizedSinkKafka, ParquetTrigger: 30sOffset LogState StoreAppendCompleteUpdate

Watermark Diagram

Watermark: Late Data HandlingT=0T=5T=10T=15T=20T=25On-time EventsLate EventsWatermarkWindow: 5 min tumblingIncludedDroppedOn-timeLate (included)Dropped

Architecture Diagram

Structured Streaming is a stream processing engine that models incoming data as an unbounded input table. Each new data event appends a new row to the input table. A streaming query continuously computes a result table over the input and outputs the result to a sink. The engine handles incremental processing, fault tolerance via write-ahead logs, and state management for stateful operations.

A watermark is a threshold that defines how late an event can arrive and still be included in aggregation results. Formally: for a stream with event timestamps t_1, t_2, ..., the watermark at time T is defined as W(T) = T - delay_threshold. Events with timestamp < W(T) are considered too late and are dropped. Watermarks enable the engine to clean up state for completed windows.

Output modes determine what data is written to the sink on each trigger: (1) Append mode β€” only new rows since the last trigger are written. Suitable for stateless queries or queries with watermarking. (2) Complete mode β€” the entire result table is written on every trigger. Suitable for aggregations where the full result is needed. (3) Update mode β€” only rows that were updated since the last trigger are written. Suitable for incremental aggregations.

State management in Structured Streaming tracks intermediate results for stateful operations (aggregations, joins, deduplication). State is stored in an internal state store (RocksDB by default) and is fault-tolerant via write-ahead logs. State is cleaned up automatically when watermarks advance past the window end time. The stateStoreFormats configuration controls serialization format for state.

Micro-Batch Latency

For a micro-batch trigger interval of T_interval and data arrival rate of R (rows/second), the minimum latency for a row is: Latency_min = T_interval (row waits for next trigger). The maximum latency depends on processing time: Latency_max = T_interval + T_processing. For continuous processing mode: Latency β‰ˆ T_processing (sub-second).

State Store Size

For a windowed aggregation with window size W, grace period G, and event rate R, the state store holds events in the range [current_time - W - G, current_time]. State_size = R * (W + G) * avg_event_size. To prevent unbounded state growth, set a maximum watermark delay: withWatermark("event_time", "10 minutes").

Spark Structured Streaming provides exactly-once semantics for end-to-end processing when: (1) the source supports replay (Kafka, file-based), (2) the sink supports transactions or idempotent writes, (3) checkpointing is enabled for fault tolerance. The engine tracks offsets in the write-ahead log and replays from the last committed offset on failure, ensuring each record is processed exactly once.

A watermark of W minutes guarantees that all events arriving within W minutes of the latest event will be included in aggregation results. Events arriving later than W minutes after the latest event may be dropped. The trade-off: larger W -> more complete results, more state; smaller W -> lower latency, potential data loss for late events. Formal: P(included) = P(arrival_delay <= W).

Key Concepts

ConceptDescriptionAPI
Streaming DataFrameContinuously updated unbounded tablespark.readStream.format("kafka")
Micro-Batch TriggerProcess data in periodic batches.trigger(processingTime="30 seconds")
Continuous TriggerSub-second latency processing.trigger(continuous=True, checkpointLocation="...")
WatermarkThreshold for late event tolerance.withWatermark("event_time", "10 minutes")
Output ModeWhat data to write to sink.outputMode("append"), .outputMode("complete"), .outputMode("update")
CheckpointFault tolerance via offset tracking.option("checkpointLocation", "hdfs:///checkpoint")
State StorePersistent state for stateful operationsRocksDB (default), HDFS-backed
ForeachCustom sink logic per row.foreach(batcher -> ...)
ForeachBatchCustom sink logic per micro-batch.foreachBatch(lambda df, epoch: ...)
WindowTime-based grouping of eventswindow("event_time", "5 minutes")
Session WindowActivity-gap-based windowingwindow("event_time", gap="30 minutes")
Drop DuplicatesRemove duplicate events.dropDuplicates(["event_id"])
Stream-Static JoinJoin stream with batch DataFramestream_df.join(static_df, "key")
Stream-Stream JoinJoin two streaming DataFramesstream1.join(stream2, "key")
CachingCache streaming DataFrame for reuse.cache()
ExplainView query execution plan.explain()
  1. Read from Kafka: Configure spark.readStream.format("kafka") with subscribe, startingOffsets, and security options.
  2. Parse and validate: Apply schema to raw bytes, validate required fields, handle malformed records with drop() or route to dead letter.
  3. Add event timestamp: Extract event time from payload for watermarking and windowing.
  4. Define watermark: Use .withWatermark("event_time", "10 minutes") to handle late-arriving data.
  5. Apply transformations: Use standard DataFrame operations: select(), filter(), groupBy(), window().
  6. Choose output mode: Append for filtered streams, Complete for full aggregations, Update for incremental aggregations.
  7. Write to sink: Use .writeStream.format() with appropriate options for the target system.
  8. Configure checkpoint: Set checkpointLocation for fault tolerance. Use HDFS or S3 for durability.
  9. Set trigger interval: Use .trigger(processingTime="30 seconds") for micro-batch or continuous for low-latency.
  10. Monitor the streaming query: Use query.lastProgress, query.status, and Spark UI for metrics.

Production Code

Kafka-to-Sink Streaming Pipeline with Watermarking

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import logging

logger = logging.getLogger(__name__)


def create_streaming_session() -> SparkSession:
    """Create SparkSession optimized for Structured Streaming."""
    return (
        SparkSession.builder
        .appName("KafkaStreamingPipeline")
        .config("spark.sql.streaming.schemaInference", "true")
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
        .config("spark.sql.shuffle.partitions", "8")
        .config("spark.streaming.stopGracefullyOnShutdown", "true")
        .config("spark.sql.streaming.metricsEnabled", "true")
        .getOrCreate()
    )


def build_streaming_pipeline(spark: SparkSession) -> None:
    """Build a production streaming pipeline with watermarking and aggregation."""

    # Define event schema
    event_schema = StructType([
        StructField("event_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("event_type", StringType(), False),
        StructField("amount", DoubleType(), True),
        StructField("event_time", TimestampType(), False),
    ])

    # Read from Kafka
    # Parameters explained:
    #   format("kafka") - Use Kafka as the streaming source
    #   kafka.bootstrap.servers - Comma-separated list of Kafka broker addresses
    #   subscribe - Kafka topic(s) to consume from (comma-separated for multiple)
    #   startingOffsets - "latest" (new data only) or "earliest" (replay all)
    #   kafka.security.protocol - Security protocol for authentication
    #   maxOffsetsPerTrigger - Cap on records per micro-batch (prevents OOM)
    raw_stream = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092")
        .option("subscribe", "user-events")
        .option("startingOffsets", "latest")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("maxOffsetsPerTrigger", 100000)
        .load()
    )

    # Parse JSON payload
    parsed_stream = (
        raw_stream
        .select(
            F.col("key").cast("string").alias("kafka_key"),
            F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
            F.col("timestamp").alias("kafka_timestamp"),
        )
        .select("kafka_key", "data.*")
        .filter(F.col("event_id").isNotNull())
        .filter(F.col("amount").isNotNull())
    )

    # Add watermark for late data tolerance (10 minutes)
    # Watermark defines how late an event can arrive and still be included
    # Events arriving > 10 minutes late will be dropped
    watermarked_stream = parsed_stream.withWatermark("event_time", "10 minutes")

    # Windowed aggregation: 5-minute tumbling windows
    # Parameters:
    #   F.window("event_time", "5 minutes") - Tumbling window of 5 minutes
    #   "event_type" - Group by event type within each window
    #   .agg() - Aggregation functions applied per window per group
    windowed_aggregation = (
        watermarked_stream
        .groupBy(
            F.window("event_time", "5 minutes"),
            "event_type",
        )
        .agg(
            F.count("*").alias("event_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount"),
            F.countDistinct("user_id").alias("unique_users"),
        )
        .select(
            F.col("window.start").alias("window_start"),
            F.col("window.end").alias("window_end"),
            "event_type",
            "event_count",
            "total_amount",
            "avg_amount",
            "unique_users",
        )
    )

    # Write to Kafka sink with checkpointing
    query = (
        windowed_aggregation
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker-1:9092")
        .option("topic", "event-aggregations")
        .option("checkpointLocation", "s3://checkpoints/event-aggregation")
        .option("failOnDataLoss", "false")
        .trigger(processingTime="30 seconds")
        .outputMode("update")
        .start()
    )

    logger.info(f"Streaming query started: {query.id}")

    # Monitor the streaming query
    import time
    while query.isActive:
        progress = query.lastProgress
        if progress:
            logger.info(
                f"Batch {progress['batchId']}: "
                f"input_rows={progress['numInputRows']}, "
                f"processed_rows={progress['processedRowsPerSecond']:.0f}/s, "
                f"state_rows={progress.get('stateOperator', {}).get('numRowsTotal', 0)}"
            )
        time.sleep(30)

    query.awaitTermination()

Stream-Stream Join with Session Windows

def build_stream_stream_join(spark: SparkSession) -> None:
    """Join two streaming DataFrames with session windows."""
    from pyspark.sql.types import StructType, StructField, StringType, TimestampType

    # Click stream schema
    click_schema = StructType([
        StructField("click_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("page_url", StringType(), False),
        StructField("click_time", TimestampType(), False),
    ])

    # Purchase stream schema
    purchase_schema = StructType([
        StructField("purchase_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("amount", StringType(), False),
        StructField("purchase_time", TimestampType(), False),
    ])

    # Read click stream
    click_stream = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker-1:9092")
        .option("subscribe", "click-events")
        .load()
        .select(F.from_json(F.col("value").cast("string"), click_schema).alias("data"))
        .select("data.*")
        .withWatermark("click_time", "30 minutes")
    )

    # Read purchase stream
    purchase_stream = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker-1:9092")
        .option("subscribe", "purchase-events")
        .load()
        .select(F.from_json(F.col("value").cast("string"), purchase_schema).alias("data"))
        .select("data.*")
        .withWatermark("purchase_time", "30 minutes")
    )

    # Stream-stream join: clicks within 30 minutes before purchase
    joined_stream = (
        click_stream
        .join(
            purchase_stream,
            (click_stream.user_id == purchase_stream.user_id) &
            (click_stream.click_time <= purchase_stream.purchase_time) &
            (click_stream.click_time >= F.expr("purchase_time - INTERVAL 30 MINUTES")),
            "leftOuter",
        )
        .select(
            click_stream.click_id,
            click_stream.user_id,
            click_stream.page_url,
            purchase_stream.purchase_id,
            purchase_stream.product_id,
            purchase_stream.amount,
            click_stream.click_time,
            purchase_stream.purchase_time,
        )
    )

    # Write to console for testing
    query = (
        joined_stream
        .writeStream
        .format("console")
        .option("truncate", "false")
        .option("checkpointLocation", "s3://checkpoints/click-purchase-join")
        .trigger(processingTime="10 seconds")
        .outputMode("append")
        .start()
    )

    query.awaitTermination()

Watermark Tuning: The watermark delay threshold determines the trade-off between completeness and state size. For a 5-minute window with a 10-minute watermark: (1) events arriving up to 10 minutes late are included, (2) state is retained for 10 minutes after the window closes, (3) state is cleaned up when the watermark advances past the window. If your data has events arriving more than 10 minutes late, increase the watermark but be aware of increased state store size.

Continuous Processing Mode: Spark 3.x supports continuous processing mode with sub-second end-to-end latency (typically 1-5ms). However, continuous mode only supports append output mode and a limited set of operations (no aggregations, no joins). Use continuous mode for low-latency filtering and projection; use micro-batch for stateful operations.

  • Structured Streaming treats streams as unbounded tables, using the same DataFrame API as batch processing.
  • Micro-batch triggers provide configurable latency (seconds to minutes); continuous mode provides sub-second latency.
  • Watermarks define how late events can arrive and still be included in results. Larger watermarks = more complete results, more state.
  • Output modes: Append (new rows only), Complete (full result), Update (changed rows only).
  • Checkpointing enables exactly-once semantics by tracking offsets and state in durable storage.
  • State management uses RocksDB for local state and write-ahead logs for fault tolerance.
  • Monitor streaming queries via query.lastProgress, query.status, and Spark UI streaming metrics.

Best Practices

  1. Always set checkpointLocation for production streaming queries. This enables fault tolerance and exactly-once semantics.
  2. Configure watermarks for any stateful operation to prevent unbounded state growth. Set the delay threshold based on observed event lateness.
  3. Use maxOffsetsPerTrigger to cap the amount of data processed per micro-batch, preventing OOM errors during data spikes.
  4. Enable streaming metrics (spark.sql.streaming.metricsEnabled=true) for monitoring input rates, processing rates, and state sizes.
  5. Use failOnDataLoss=false for production workloads where occasional data loss is acceptable. Investigate data loss alerts separately.
  6. Prefer Update mode for aggregations where only changed rows matter. Complete mode writes the full result every trigger, which is expensive.
  7. Set spark.sql.shuffle.partitions to a small number (4-8) for streaming aggregations to reduce task overhead.
  8. Monitor state store size via Spark UI state operator metrics. Alert on unexpected state growth.
  9. Use ForeachBatch for complex sink logic that cannot be expressed with built-in formats.
  10. Test with spark.sql.streaming.testing=true to enable deterministic testing of streaming queries.

Streaming Trigger Comparison

Trigger TypeLatencyThroughputUse CaseState Support
Micro-Batch (default)Seconds-minutesHighGeneral analyticsFull
Continuous1-5msLowLow-latency filteringLimited
OnceN/AHighestBatch replayFull
Processing TimeConfigurableHighScheduled processingFull
Once with DelayFixed delayHighBackpressure controlFull

Output Mode Compatibility

Output ModeAggregationsJoinsDedupWindowNon-Stateful
AppendNoNoNoNoYes
CompleteYesNoNoYesYes
UpdateYesYesYesYesYes

See Also

⭐

Premium Content

Spark Structured Streaming: Triggers, Watermarks, and State Management

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement