πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

11. Structured Streaming in PySpark

🟒 Free Lesson

Advertisement

11. Structured Streaming in PySpark

DfStructured Streaming

Structured Streaming is a stream processing engine built on the Spark SQL engine that treats a live data stream as a continuously appended unbounded table. It provides exactly-once fault tolerance guarantees and supports event-time processing via watermarks.

DfTrigger

A trigger defines when micro-batch processing occurs: FixedInterval (default 500ms), Once (process all available data), AvailableNow (process all available, then stop), or Continuous (low-latency processing).

DfWatermark

A watermark is a threshold that tracks the progress of event-time in a stream. It determines when to stop waiting for late-arriving data and trigger window aggregation. Watermark = max event time seen - delay threshold.

Watermark Calculation
W=max⁑(Etime)βˆ’DthresholdW = \max(E_{time}) - D_{threshold}

Here,

  • WW=Current watermark value
  • max⁑(Etime)\max(E_{time})=Maximum event time seen across all partitions
  • DthresholdD_{threshold}=Maximum delay threshold (how late data can arrive)

Micro-Batch Processing Time

Tbatch=max⁑(Tsource,Tprocessing,Tsink)T_{batch} = \max(T_{source}, T_{processing}, T_{sink})

Here,

  • TbatchT_{batch}=Total micro-batch processing time
  • TsourceT_{source}=Time to read input data from source
  • TprocessingT_{processing}=Time to execute query transformations
  • TsinkT_{sink}=Time to write output to sink

End-to-End Latency

Te2e=TbatchΓ—Nbatches+Tnetwork+Tsink_commitT_{e2e} = T_{batch} \times N_{batches} + T_{network} + T_{sink\_commit}

Here,

  • Te2eT_{e2e}=End-to-end latency from event to output
  • TbatchT_{batch}=Single micro-batch processing time
  • NbatchesN_{batches}=Number of batches to wait
  • TnetworkT_{network}=Network latency for source reads
  • Tsink_commitT_{sink\_commit}=Sink commit latency

Structured Streaming provides exactly-once semantics by coordinating source offsets, state updates, and sink writes within each micro-batch. For at-least-once sinks (e.g., Kafka), deduplication via event time and idempotent writes is required.

For low-latency use cases (< 100ms), use trigger(continuous="1 second") instead of micro-batch mode. However, continuous mode supports only a limited set of operations (no aggregations).

ThLate Data Handling

Theorem: With a watermark threshold of D_{threshold}, any event arriving more than D_{threshold} time units after the maximum event time seen will be dropped from window aggregations. Events arriving within D_{threshold} are included in the current window.

  • Structured Streaming = unbounded table with exactly-once guarantees
  • Triggers: FixedInterval (default), Once, AvailableNow, Continuous
  • Watermark = max(event_time) - delay_threshold; controls late data handling
  • Output modes: Append (new rows only), Complete (full result table), Update (changed rows)
  • Checkpointing is required for fault tolerance and exactly-once processing
  • Enable schema inference carefully; prefer explicit schemas for performance

Micro-Batch Processing Flow

SourceKafka / FilesMicro-Batch1. Read offsets2. Process data3. Write outputTrigger500ms defaultOnce / NowContinuousSinkKafka / DeltaCommitOffset logOutput ModesAppend: new rows onlyComplete: full result tableUpdate: changed rows

πŸ—οΈ Streaming Architecture Diagram

πŸ“š Detailed Explanation

What is Structured Streaming?

  • A scalable, fault-tolerant stream processing engine built on the Spark SQL engine
  • Treats streaming data as an unbounded table being continuously appended
  • Uses the same DataFrame/Dataset API for both batch and streaming queries
  • Dramatically simplifies development of real-time data pipelines

Core Abstractions

  • Input Stream β€” continuously receives data from sources (Kafka, files, sockets)
  • Result Table β€” updated as new data arrives; represents the query output
  • Micro-batch or Continuous processing depending on trigger configuration
  • Exactly-once guarantees via checkpointing and write-ahead logs

Trigger Mechanisms

Trigger TypeBehaviorBest For
DefaultProcesses each batch as soon as previous completesMaximum throughput
Fixed IntervalPredictable processing intervalsConsistent latency
OnceProcess all available data, then stopBatch-style processing
ContinuousLowest latency (~1ms)Ultra-low latency use cases

Watermarking

  • Defines a threshold beyond which events are considered too late
  • Prevents unbounded state growth for event-time windowing
  • Allows the system to clean up state and provide memory guarantees
  • Formula: Watermark = max(event_time) - delay_threshold

Checkpoint Mechanism

  • Persists query progress including offsets, accumulated state, and committed sink offsets
  • Enables fault tolerance β€” query resumes from last checkpoint on failure
  • Ensures exactly-once semantics across failures

Output Modes

ModeBehaviorUse Case
AppendOutputs new rows since last triggerSimple aggregations
CompleteOutputs entire result table each timeFull result tables
UpdateOutputs only updated rows since last triggerChanged rows only

Key Takeaway: The choice of output mode depends on the operation type and the sink's capabilities.


Performance Tuning Considerations

  1. Partitioning of the source data
  2. Parallelism of the query
  3. State store configuration for stateful operations
  4. Memory management for large state
  5. Monitor processing times, input rates, and state sizes

Advanced Features

  • Multi-lingual queries β€” mix SQL and DataFrame operations
  • Streaming joins β€” join streaming data with batch data or other streams
  • Expand use cases beyond simple transformations

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Micro-batch ProcessingProcesses data in small batches at intervalsHigh throughput, moderate latency
Continuous ProcessingProcesses records individually as they arriveUltra-low latency (~1ms)
TriggerDetermines when a new batch is processedControl batch timing and frequency
WatermarkThreshold for handling late-arriving dataEvent-time windowing operations
Output ModeHow results are written to the sinkAppend, Complete, Update modes
CheckpointPersists query progress for fault toleranceExactly-once processing guarantees
State StoreStores state for stateful operationsWindow aggregations, joins
Event TimeTimestamp embedded in the dataTime-based windowing and ordering
Processing TimeTime when the record is processedMonitoring and debugging
Late DataEvents arriving after their watermark thresholdRequires special handling

πŸ’» Code Examples

Basic Structured Streaming Setup

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("StructuredStreamingExample") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoint/path") \
    .getOrCreate()

# Read from a socket stream
# Parameter: format("socket") β€” source type
# Parameter: option("host", "localhost") β€” socket host
# Parameter: option("port", 9999) β€” socket port
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

# Count words in each batch
word_counts = words.groupBy("word").count()

# Start the streaming query
# Parameter: outputMode("complete") β€” output full result table
# Parameter: format("console") β€” sink type
# Parameter: trigger(processingTime="10 seconds") β€” batch interval
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()

Windowed Aggregation with Watermark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("WindowedAggregation") \
    .getOrCreate()

# Read from Kafka
lines = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Parse JSON data
events = lines.select(
    col("value").cast("string").alias("json")
).select(
    from_json(col("json"), "timestamp TIMESTAMP, value DOUBLE").alias("data")
).select("data.*")

# Windowed aggregation with watermark
# Parameter: withWatermark("timestamp", "10 minutes") β€” delay threshold
# Parameter: window("timestamp", "5 minutes", "1 minute") β€” window size, slide interval
windowed_counts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes", "1 minute"),
        "value"
    ).count()

# Write to console
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Streaming Join with Batch Data

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamingJoin") \
    .getOrCreate()

# Read streaming data from Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .selectExpr("CAST(value AS STRING) as event_json") \
    .select(from_json(col("event_json"), "user_id INT, action STRING, timestamp TIMESTAMP").alias("data")) \
    .select("data.*")

# Read batch reference data
batch_df = spark.read.parquet("/path/to/user_profiles")

# Stream-stream join
joined_df = stream_df.alias("s").join(
    batch_df.alias("b"),
    col("s.user_id") == col("b.user_id"),
    "inner"
).select("s.user_id", "s.action", "b.profile_name", "s.timestamp")

# Write to Delta sink
# Parameter: checkpointLocation β€” required for fault tolerance
query = joined_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoint/join") \
    .start("/output/path")

query.awaitTermination()

Advanced: Multi-Query Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MultiQueryStreaming") \
    .getOrCreate()

# Single source
source_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Multiple transformations from same source
parsed_df = source_df.select(
    col("value").cast("string").alias("json"),
    col("timestamp").alias("event_time")
).select(
    from_json(col("json"), "user_id INT, action STRING, amount DOUBLE").alias("data"),
    "event_time"
).select("data.*", "event_time")

# Query 1: Real-time aggregation
aggregated = parsed_df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        window("event_time", "1 minute"),
        "action"
    ).agg(
        sum("amount").alias("total_amount"),
        count("*").alias("event_count")
    )

# Query 2: Detailed event log
detailed_log = parsed_df.select(
    "user_id", "action", "amount", "event_time"
)

# Start multiple queries
query1 = aggregated.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query2 = detailed_log.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoint/detailed") \
    .start("/detailed_log_path")

# Wait for all queries
spark.streams.awaitAnyTermination()

πŸ“ˆ Performance Metrics

MetricMicro-batchContinuousNotes
Latency100ms-10s~1msDepends on batch interval
Throughput100K-1M records/sec10K-100K records/secMicro-batch higher throughput
Fault Recoveryseconds-minutessecondsCheckpoint-based recovery
State ManagementOptimizedLimitedMicro-batch has better state support
Exactly-onceYesYesBoth guarantee exactly-once
Resource UsageHigherLowerMicro-batch uses more resources

πŸ† Best Practices

  1. Always set checkpoint location β€” Critical for fault tolerance and exactly-once processing
  2. Use appropriate output mode β€” Append for simple aggregations, Complete for full result tables
  3. Configure watermarks β€” Essential for event-time windowing and state management
  4. Monitor streaming metrics β€” Track processing times, input rates, and state sizes
  5. Tune micro-batch interval β€” Balance between latency and throughput requirements
  6. Use schema inference carefully β€” Can be expensive; prefer explicit schemas when possible
  7. Handle late data explicitly β€” Define watermark thresholds based on data patterns
  8. Optimize state store β€” Configure RocksDB or other state stores for large state
  9. Use multiple queries β€” Separate different processing logic into multiple queries
  10. Test with realistic data β€” Validate performance with production-like data volumes

πŸ”— Related Topics

  • 12-state-management.mdx: Advanced stateful operations and checkpointing
  • 13-window-operations.mdx: Detailed windowing strategies and implementations
  • 14-merge-upsert.mdx: Delta Lake merge operations for streaming data
  • 15-data-quality.mdx: Data validation in streaming pipelines

See Also

  • 06-joins-optimization.mdx: Join strategies for stream-stream joins
  • 08-caching-persistence.mdx: Caching in streaming applications
  • Kafka Streams (kafka/03): Kafka integration with Structured Streaming
  • Data Engineering Streaming (data-engineering/022): End-to-end streaming pipeline architecture
⭐

Premium Content

11. Structured Streaming in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement