πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Structured Streaming and Real-Time Processing

PySpark AdvancedStructured Streaming⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 10: Structured Streaming β€” Real-Time Data Processing

AmazonUberDifficulty: Hard

Interview Question

"At Amazon, we process millions of events per second using Structured Streaming. Walk us through the streaming execution model, the difference between output modes, and how you would implement exactly-once processing with checkpointing." β€” Amazon Senior Data Engineer Interview

"At Uber, we need real-time aggregation of ride events. Explain how stateful streaming works, how you would handle late-arriving data with watermarks, and what happens when a streaming query fails and restarts." β€” Uber Data Engineer Interview


Structured Streaming Architecture

Structured Streaming treats real-time data as an unbounded table that is continuously appended. You write a query on this "table" as if it were a static DataFrame, and Spark continuously updates the result.

Architecture Diagram
Input Source β†’ Query β†’ Sink
(Kafka,       (DataFrame  (Kafka, Console,
 Files)        Operations)  Files, etc.)

Basic Streaming Example

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamingInterview") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read from a file source (for demo)
streaming_df = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("s3a://bucket/input/")

# Apply transformations
processed_df = streaming_df \
    .filter(col("event_type").isin(["click", "view", "purchase"])) \
    .withColumn("event_time", col("event_time").cast("timestamp")) \
    .withColumn("window", window(col("event_time"), "5 minutes")) \
    .groupBy("window", "event_type") \
    .count()

# Write to output
query = processed_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Output Modes

Append Mode

Only new rows are written to the sink. Best for stateless operations.

# Append mode: only new rows
query = streaming_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/append/") \
    .start()

# Limitations:
# - Cannot use aggregations (except with watermark)
# - Cannot use outer joins
# - Only new rows are written

Complete Mode

The entire result table is written to the sink after each trigger.

# Complete mode: entire result table
query = streaming_df \
    .groupBy("event_type") \
    .count() \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/complete/") \
    .start()

# Use case: Small result tables that fit in memory
# Aggregations are fully recomputed each trigger

Update Mode

Only rows that were updated since the last trigger are written.

# Update mode: only updated rows
query = streaming_df \
    .groupBy("user_id") \
    .agg(count("*").alias("event_count")) \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/update/") \
    .start()

# Use case: Large result tables where most rows don't change each trigger
Output ModeAggregationsLatencyUse Case
AppendNo (unless watermark)LowStateless transforms
CompleteYesMediumSmall result tables
UpdateYesLowLarge result tables

Trigger Types

Processing Time Trigger

# Process new data every 30 seconds
query = streaming_df.writeStream \
    .trigger(processingTime="30 seconds") \
    .start()

# Process as fast as possible (default)
query = streaming_df.writeStream \
    .trigger(processingTime="0 seconds") \
    .start()

Once Trigger

# Process all available data, then stop
query = streaming_df.writeStream \
    .trigger(once=True) \
    .start()

# Useful for backfilling or testing

Available Now Trigger

# Process all available data, then stop (Spark 2.2+)
query = streaming_df.writeStream \
    .trigger(availableNow=True) \
    .start()

# Better than once=True for large backfills
# Processes data in batches, avoids OOM

Continuous Trigger (Experimental)

# Sub-millisecond latency (experimental)
query = streaming_df.writeStream \
    .trigger(continuous="1 second") \
    .start()

# Limited operations support
# Not recommended for production

Stateful Streaming Operations

Windowed Aggregation

# Tumbling window: non-overlapping fixed-size windows
windowed_counts = streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),  # 5-minute tumbling window
        "event_type"
    ) \
    .count()

# Sliding window: overlapping windows
sliding_counts = streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),  # 10-min window, slides every 5 min
        "event_type"
    ) \
    .count()

# Session window: dynamic windows based on activity gaps
session_counts = streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "10 minutes"),  # Session gap
        "user_id"
    ) \
    .count()

Watermarks for Late Data

# Watermark defines how late data can arrive
# Data older than watermark is dropped
streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        "event_type"
    ) \
    .count() \
    .writeStream \
    .outputMode("update") \
    .start()

# Data arriving more than 10 minutes late is dropped
# Watermark is based on the maximum event time seen

Streaming Joins

# Stream-static join
static_df = spark.read.parquet("s3a://bucket/products/")

joined = streaming_df.join(
    static_df,
    streaming_df.product_id == static_df.product_id,
    "left"
)

# Stream-stream join (requires watermarks)
clicks = spark.readStream.format("kafka").option("subscribe", "clicks").load()
purchases = spark.readStream.format("kafka").option("subscribe", "purchases").load()

joined = clicks \
    .withWatermark("event_time", "10 minutes") \
    .join(
        purchases.withWatermark("event_time", "10 minutes"),
        "user_id"
    )

Real-World Scenario: Uber Real-Time Ride Analytics

Problem Statement

Build a real-time streaming pipeline that processes ride events from Kafka, computes real-time aggregations (rides per city, average fare, surge pricing), handles late data, and writes to a data lake.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("UberRealTimeAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.streaming.kafka.maxRatePerPartition", "10000") \
    .getOrCreate()

# Define schema for ride events
ride_schema = StructType([
    StructField("ride_id", StringType(), False),
    StructField("driver_id", StringType(), False),
    StructField("rider_id", StringType(), False),
    StructField("city", StringType(), False),
    StructField("pickup_lat", DoubleType(), False),
    StructField("pickup_lon", DoubleType(), False),
    StructField("dropoff_lat", DoubleType(), True),
    StructField("dropoff_lon", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("status", StringType(), False),
    StructField("event_time", TimestampType(), False)
])

# Read from Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \
    .option("subscribe", "ride-events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 100000) \
    .load()

# Parse JSON payload
parsed_stream = raw_stream \
    .select(
        col("key").cast("string").alias("key"),
        from_json(col("value").cast("string"), ride_schema).alias("data"),
        col("timestamp").alias("kafka_timestamp")
    ) \
    .select("key", "data.*", "kafka_timestamp")

# Clean and enrich
clean_stream = parsed_stream \
    .filter(col("status").isin(["completed", "cancelled", "in_progress"])) \
    .withColumn("event_date", to_date(col("event_time"))) \
    .withColumn("hour_of_day", hour(col("event_time")))

# === REAL-TIME AGGREGATIONS ===

# 1. Rides per city per 5-minute window
city_rides = clean_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        "city"
    ) \
    .agg(
        count("*").alias("ride_count"),
        sum(when(col("status") == "completed", 1).otherwise(0)).alias("completed_rides"),
        avg("fare_amount").alias("avg_fare"),
        approx_count_distinct("driver_id").alias("active_drivers")
    ) \
    .withColumn("completion_rate", 
                col("completed_rides") / col("ride_count"))

# 2. Surge pricing detection (high demand per city)
surge_detection = clean_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        "city"
    ) \
    .agg(
        count("*").alias("total_requests"),
        sum(when(col("status") == "in_progress", 1).otherwise(0)).alias("active_rides"),
        approx_count_distinct("driver_id").alias("available_drivers")
    ) \
    .withColumn(
        "surge_multiplier",
        when(col("active_rides") / col("available_drivers") > 3, 2.0)
        .when(col("active_rides") / col("available_drivers") > 2, 1.5)
        .otherwise(1.0)
    )

# 3. Driver performance tracking
driver_metrics = clean_stream \
    .withWatermark("event_time", "30 minutes") \
    .filter(col("status") == "completed") \
    .groupBy(
        window(col("event_time"), "15 minutes"),
        "driver_id"
    ) \
    .agg(
        count("*").alias("completed_rides"),
        avg("fare_amount").alias("avg_fare"),
        countDistinct("rider_id").alias("unique_riders")
    )

# === WRITE TO SINKS ===

# Write city rides to Kafka for real-time dashboard
city_rides_query = city_rides \
    .select(
        col("city").alias("key"),
        to_json(struct("*")).alias("value")
    ) \
    .writeStream \
    .outputMode("update") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker1:9092") \
    .option("topic", "city-ride-metrics") \
    .option("checkpointLocation", "s3a://uber-streaming/checkpoints/city-rides/") \
    .trigger(processingTime="30 seconds") \
    .start()

# Write surge alerts to console for monitoring
surge_query = surge_detection \
    .filter(col("surge_multiplier") > 1.0) \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .option("checkpointLocation", "s3a://uber-streaming/checkpoints/surge/") \
    .trigger(processingTime="30 seconds") \
    .start()

# Write driver metrics to data lake (parquet)
driver_query = driver_metrics \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3a://uber-streaming/driver-metrics/") \
    .option("checkpointLocation", "s3a://uber-streaming/checkpoints/driver-metrics/") \
    .trigger(processingTime="5 minutes") \
    .start()

# Wait for termination
spark.streams.awaitAnyTermination()

Checkpointing and Exactly-Once Semantics

# Checkpointing stores:
# 1. Offset ranges (what data has been processed)
# 2. State (for stateful operations)
# 3. Aggregated results (for incremental updates)

# Checkpoint location must be on reliable storage
query = streaming_df.writeStream \
    .option("checkpointLocation", "s3a://bucket/checkpoints/query-1/") \
    .start()

# On failure/restart:
# 1. Spark reads last committed offsets from checkpoint
# 2. Reprocesses only uncommitted data
# 3. Provides exactly-once processing guarantee

# WARNING: Changing query semantics requires new checkpoint
# If you change the query, delete old checkpoint first

⚠️Amazon Interview Warning

At Amazon, checkpoint corruption is a common production issue. If you change the query logic (add new aggregation, change window), you MUST delete the old checkpoint and restart from scratch. Otherwise, Spark will fail with analysis errors.


Streaming Performance Tuning

# 1. Rate limiting
spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "10000")

# 2. Micro-batch optimization
spark.conf.set("spark.sql.shuffle.partitions", "200")

# 3. State store optimization
spark.conf.set("spark.sql.streaming.stateStore.providerClass", 
               "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")

# 4. Memory management for stateful operations
spark.conf.set("spark.sql.streaming.stateStore.rocksDB.format", "true")

# 5. Monitor through Spark UI
# Navigate to: http://driver-node:4040/streaming/

Edge Cases

1. Late Arriving Data

# Without watermark: all late data is processed (unbounded state)
# With watermark: late data beyond threshold is dropped

streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window(col("event_time"), "5 minutes"), "city") \
    .count()

# Data arriving >10 minutes late is dropped
# State is cleaned up for windows older than watermark

2. State Size Management

# Unbounded state can cause OOM
# Use watermark to limit state size
# Use state store cleanup

streaming_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "1 hour"),
        "user_id"
    ) \
    .agg(
        collect_list("event_type").alias("events")
    ) \
    .writeStream \
    .outputMode("update") \
    .start()

# Without watermark, user_id state grows unbounded
# With watermark, state is cleaned up for old windows

3. Query Restart After Failure

# Spark automatically restarts from last checkpoint
# But you should monitor for:
# - Skipped batches (processing too slow)
# - Failed batches (exceptions)
# - State store corruption

# Monitor through Spark UI streaming tab
# Or use metrics:
query.status  # Current status
query.recentProgress  # Recent batch progress
query.lastProgress  # Last batch progress

Best Practices

πŸ’‘Production Streaming Checklist

  • Always use checkpointing for production queries
  • Set appropriate watermarks for late data
  • Use rate limiting to prevent overwhelming sinks
  • Monitor batch processing times (should be < trigger interval)
  • Use idempotent sinks for exactly-once semantics
  • Set maxOffsetsPerTrigger to prevent OOM
  • Test with production-like data volumes before deployment
  • Use separate checkpoint locations for each query
  • Monitor state store size for stateful operations

Summary

Structured Streaming is Spark's unified batch and streaming engine. Understanding output modes, triggers, watermarks, and checkpointing is essential for building production streaming pipelines at Amazon and Uber. The key to success is managing state growth with watermarks and ensuring exactly-once semantics with proper checkpointing.

Advertisement