Real-Time Analytics with PySpark
Real-Time Stack & Streaming Windows
Architecture Diagram: Real-Time Analytics Stack
Architecture Diagram: Streaming Window Operations
Architecture Diagram: Materialized View Incremental Refresh
Detailed Explanation
Real-time analytics with PySpark is achieved through Structured Streaming, a micro-batch processing engine built on the Spark SQL engine that processes streaming data as a series of small batch queries. Unlike traditional batch processing that operates on complete datasets, streaming analytics processes data incrementally, maintaining state across micro-batches to provide continuously updated results.
What is Structured Streaming?
The fundamental abstraction in Structured Streaming is the streaming DataFrame β an unbounded table that grows as new data arrives.
How it Works:
- Each micro-batch processes a chunk of new data
- Applies transformations (select, filter, join, aggregate)
- Writes results to a sink (console, Kafka, Delta Lake, etc.)
- Manages fault tolerance through WAL (Write-Ahead Log) and checkpoint-based offset tracking
- Ensures exactly-once processing semantics
What are Windowed Aggregations?
Windowed aggregations are the cornerstone of streaming analytics. PySpark supports three window types:
| Window Type | Description | Use Case |
|---|---|---|
| Tumbling Windows | Fixed-size, non-overlapping | Hourly/daily reports |
| Sliding Windows | Fixed-size, overlapping | Rolling averages |
| Session Windows | Dynamic-size based on activity gaps | User session analysis |
Each window type requires specifying a time column (event time) and a watermark (maximum lateness tolerance) to handle late-arriving events.
What are Watermarks?
Watermarks are critical for bounded state management in streaming. Without watermarks, the engine must retain state for all possible event times indefinitely.
How Watermarks Work:
- Define a threshold beyond which late events are either dropped or handled separately
- Example: A watermark of "10 minutes" means events arriving more than 10 minutes after the maximum event time seen so far are considered late and may be dropped from window aggregations
What are Materialized Views in Streaming?
Materialized views in the streaming context are pre-computed aggregations that are incrementally updated as new data arrives.
| Approach | Description | Benefit |
|---|---|---|
| Full Refresh | Recomputes the entire aggregation on every refresh | Simple but expensive |
| Incremental Refresh | Tracks changes since the last refresh and applies only the delta | Orders-of-magnitude improvements in both latency and cost |
Delta Lake's MERGE operation enables incremental refresh by allowing atomic upserts β insert new aggregation groups and update existing ones in a single transaction.
What is State Management in Streaming?
State management in Structured Streaming uses RocksDB (via StateStore) to maintain intermediate aggregation state across micro-batches.
| Operation Type | State Requirements |
|---|---|
| Simple aggregations (count, sum) | Minimal state |
| Complex stateful operations (session windows, stream-stream joins) | Significant state growth, requires TTL configuration and periodic state cleanup |
What are Output Modes?
The output mode determines how results are written:
| Output Mode | Description | Ideal Use Case |
|---|---|---|
| Append mode | Writes only new rows | Non-aggregation queries |
| Update mode | Writes only rows that changed since the last trigger | Aggregation queries |
| Complete mode | Rewrites the entire result table | Small result sets |
Key Takeaway: The choice of output mode affects both correctness and performance in streaming analytics.
Key Concepts Table
Mathematical Foundations
Definition: Streaming Window
A streaming window assigns each event with timestamp to a set of windows:
Types: Tumbling (non-overlapping), Sliding (overlapping), Session (activity-based).
Watermark Definition
A watermark is a monotonically non-decreasing function that estimates the maximum event time yet to arrive:
Late events beyond watermark threshold are dropped or routed to side output.
Window Correctness Theorem
A window computation is correct if:
- Every event assigned to window is processed exactly once
- No event is missed:
- Watermark progression guarantees eventual completion:
Materialized View Refresh
Incremental refresh cost for materialized view with source :
vs. full recomputation: . Break-even when .
End-to-End Latency
Total latency from event arrival to query result:
Target: (typically seconds for real-time).
Key Insight
Structured Streaming uses micro-batch execution by default. For sub-second latency, use Continuous processing mode (experimental). The trade-off is exactly-once guarantees vs. latency.
Summary
Real-time analytics relies on windowing for temporal aggregation, watermarks for late data handling, and incremental refresh for materialized views. End-to-end latency is the sum of ingestion, processing, commit, and query phases. Window correctness requires exactly-once processing with watermark-driven completion.
Key Concepts Table (cont.)
| Concept | Description | Configuration | Use Case |
|---|---|---|---|
| Micro-Batch | Discrete processing intervals | trigger(processingTime="10 seconds") | General streaming |
| Continuous | Low-latency processing | trigger(processingMode="continuous", checkpointingInterval="1 second") | Sub-second latency |
| Watermark | Late event tolerance | withWatermark("event_time", "10 minutes") | Window aggregations |
| Tumbling Window | Fixed non-overlapping windows | window("event_time", "5 minutes") | Hourly reports |
| Sliding Window | Fixed overlapping windows | window("event_time", "10 minutes", "5 minutes") | Rolling averages |
| Session Window | Dynamic gap-based windows | Custom implementation | User sessions |
| State Store | RocksDB-based state backend | spark.sql.streaming.stateStore.providerClass | Stateful operations |
| Checkpoint | Offset + state persistence | .option("checkpointLocation", "/path") | Fault tolerance |
| Trigger | Micro-batch frequency | .trigger(processingTime="30 seconds") | Throughput tuning |
| Output Mode | Result writing strategy | .outputMode("update"/"append"/"complete") | Result correctness |
Code Examples
Example 1: Real-Time Click Stream Analytics with Windowed Aggregations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTime-ClickAnalytics") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.getOrCreate()
# Define click event schema
click_schema = StructType([
StructField("user_id", StringType()),
StructField("page_url", StringType()),
StructField("action", StringType()), # click, scroll, hover
StructField("timestamp", TimestampType()),
StructField("session_id", StringType()),
StructField("device_type", StringType()),
StructField("geo_location", StringType()),
])
# Read click stream from Kafka
click_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "click-events")
.option("startingOffsets", "latest")
.load()
.select(
from_json(col("value").cast("string"), click_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
)
# βββ Tumbling Window: Page views per 5-minute window βββ
page_views_5min = (
click_stream
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "5 minutes"),
col("page_url")
)
.agg(
count("*").alias("view_count"),
approx_count_distinct("user_id").alias("unique_users"),
count(when(col("action") == "click", 1)).alias("click_count"),
)
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("page_url"),
col("view_count"),
col("unique_users"),
col("click_count"),
(col("click_count") / col("view_count")).alias("click_through_rate"),
)
)
# Write to Delta Lake (Update mode for aggregations)
(
page_views_5min
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/page_views_5min")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/page_views_5min")
)
# βββ Sliding Window: Rolling 10-minute average with 5-minute slide βββ
rolling_metrics = (
click_stream
.withWatermark("event_time", "15 minutes")
.groupBy(
window(col("event_time"), "10 minutes", "5 minutes"),
col("device_type")
)
.agg(
count("*").alias("total_events"),
avg(when(col("action") == "click", 1).otherwise(0)).alias("avg_click_rate"),
countDistinct("session_id").alias("active_sessions"),
)
)
(
rolling_metrics
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/rolling_metrics")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/rolling_metrics")
)
# βββ Session Window: User session analytics βββ
# Custom session window implementation
from pyspark.sql.window import Window
session_events = (
click_stream
.withColumn("session_start",
first(col("event_time")).over(
Window.partitionBy("session_id")
.orderBy("event_time")
.rowsBetween(Window.unboundedPreceding, 0)
)
)
.withColumn("gap_minutes",
(col("event_time").cast("long") - col("session_start").cast("long")) / 60
)
.withColumn("new_session",
when(col("gap_minutes") > 30, 1).otherwise(0)
)
.withColumn("session_group",
sum("new_session").over(
Window.partitionBy("user_id").orderBy("event_time")
)
)
)
session_analytics = (
session_events
.groupBy(
col("user_id"),
col("session_id"),
col("session_start")
)
.agg(
min("event_time").alias("session_start"),
max("event_time").alias("session_end"),
count("*").alias("event_count"),
countDistinct("page_url").alias("pages_visited"),
(max("event_time").cast("long") - min("event_time").cast("long")).alias(
"session_duration_seconds"
),
)
)
Example 2: Real-Time Anomaly Detection with Stateful Processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTime-AnomalyDetection") \
.getOrCreate()
# Read sensor data stream
sensor_schema = StructType([
StructField("sensor_id", StringType()),
StructField("metric_name", StringType()),
StructField("value", DoubleType()),
StructField("timestamp", TimestampType()),
])
sensor_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "sensor-metrics")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"))
.select("data.*")
)
# βββ Rolling Statistics for Anomaly Detection βββ
windowed_stats = (
sensor_stream
.withWatermark("timestamp", "5 minutes")
.groupBy(
window(col("timestamp"), "15 minutes", "1 minute"),
col("sensor_id"),
col("metric_name"),
)
.agg(
avg("value").alias("avg_value"),
stddev("value").alias("stddev_value"),
min("value").alias("min_value"),
max("value").alias("max_value"),
count("*").alias("reading_count"),
)
.withColumn("upper_bound", col("avg_value") + 3 * col("stddev_value"))
.withColumn("lower_bound", col("avg_value") - 3 * col("stddev_value"))
)
# βββ Detect Anomalies in Real-Time βββ
anomaly_stream = (
sensor_stream
.join(
windowed_stats,
(sensor_stream["sensor_id"] == windowed_stats["sensor_id"]) &
(sensor_stream["metric_name"] == windowed_stats["metric_name"]) &
(sensor_stream["timestamp"] >= windowed_stats["window.start"]) &
(sensor_stream["timestamp"] <= windowed_stats["window.end"]),
"left"
)
.withColumn("is_anomaly",
(col("value") > col("upper_bound")) |
(col("value") < col("lower_bound"))
)
.filter(col("is_anomaly") == True)
.select(
sensor_stream["sensor_id"],
sensor_stream["metric_name"],
sensor_stream["value"],
sensor_stream["timestamp"],
col("avg_value").alias("expected_value"),
col("stddev_value").alias("expected_stddev"),
(abs(col("value") - col("avg_value")) / col("stddev_value")).alias("z_score"),
)
)
# Write anomalies to Delta Lake + send alerts
(
anomaly_stream
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/anomaly_alerts")
.trigger(processingTime="10 seconds")
.start("/mnt/analytics/gold/anomaly_alerts")
)
# βββ Real-Time Dashboard Query (Read from Materialized View) βββ
dashboard_metrics = spark.read.format("delta").load("/mnt/analytics/gold/page_views_5min")
# Top pages in last hour
top_pages = (
dashboard_metrics
.filter(col("window_start") >= current_timestamp() - expr("INTERVAL 1 HOUR"))
.groupBy("page_url")
.agg(
sum("view_count").alias("total_views"),
sum("click_count").alias("total_clicks"),
avg("click_through_rate").alias("avg_ctr"),
)
.orderBy(desc("total_views"))
.limit(20)
)
top_pages.show()
Example 3: Streaming JOIN between Two Data Streams
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamStream-Join") \
.getOrCreate()
# Stream 1: Click events
clicks = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "click-events")
.load()
.select(
from_json(col("value").cast("string"), click_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
.withWatermark("event_time", "10 minutes")
)
# Stream 2: Purchase events
purchases = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "purchase-events")
.load()
.select(
from_json(col("value").cast("string"), purchase_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
.withWatermark("event_time", "10 minutes")
)
# Stream-Stream Join: Match clicks to purchases within 30-minute window
click_purchase_joined = (
clicks.alias("c")
.join(
purchases.alias("p"),
expr("""
c.user_id = p.user_id AND
p.event_time BETWEEN c.event_time AND c.event_time + INTERVAL 30 MINUTES
"""),
"left"
)
.select(
col("c.user_id"),
col("c.page_url").alias("clicked_page"),
col("c.event_time").alias("click_time"),
col("p.product_id"),
col("p.amount").alias("purchase_amount"),
col("p.event_time").alias("purchase_time"),
(col("p.event_time").cast("long") - col("c.event_time").cast("long")).alias(
"time_to_purchase_seconds"
),
)
)
# Write joined stream to Delta Lake
(
click_purchase_joined
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/click_purchase_join")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/click_purchase_attribution")
)
Performance Metrics
| Metric | Batch Processing | Streaming (Micro-Batch) | Streaming (Continuous) | Improvement |
|---|---|---|---|---|
| End-to-End Latency | 1-24 hours | 10-60 seconds | 100-500 ms | 99.9% reduction |
| Throughput (events/sec) | 100K (batch) | 500K (streaming) | 1M (continuous) | 10x improvement |
| Cost per Million Events | 0.10 | $0.08 | 84% reduction | |
| State Size (1B events) | N/A (stateless) | 2-10 GB (RocksDB) | 2-10 GB (RocksDB) | Bounded |
| Recovery Time | Re-run entire batch | Replay from checkpoint | Replay from checkpoint | Same |
| Late Event Handling | N/A (re-process) | Watermark-based | Watermark-based | New capability |
| Concurrent Queries | 1 (batch) | Multiple (streaming) | Multiple (streaming) | Scaling |
| Exactly-Once | Difficult | Checkpoint-based | Checkpoint-based | Guaranteed |
| Resource Utilization | 100% during batch | 30-50% (micro-batch) | 60-80% (continuous) | More efficient |
| Complex Event Processing | Not supported | Window aggregations | Window aggregations | New capability |
Best Practices
-
Use watermarks for all windowed aggregations β Without watermarks, state grows unbounded. Set watermarks to slightly larger than the maximum expected event lateness (e.g., 10 minutes for events that typically arrive within 5 minutes).
-
Tune micro-batch size for latency vs. throughput β Smaller trigger intervals (1-10 seconds) reduce latency but increase overhead; larger intervals (30-60 seconds) improve throughput but increase latency. Profile your workload to find the optimal balance.
-
Use Delta Lake as the primary streaming sink β Delta Lake provides exactly-once semantics, ACID transactions, time travel, and schema evolution. It integrates natively with Structured Streaming as both source and sink.
-
Implement idempotent writes β Design streaming writes to be idempotent so that replaying from checkpoints produces the same results. Delta Lake's MERGE operation is inherently idempotent.
-
Monitor state size growth β For stateful operations (windowed aggregations, stream-stream joins), monitor RocksDB state size. Set TTL on state entries and configure periodic state cleanup to prevent OOM errors.
-
Use AQE for streaming β Enable Adaptive Query Execution (
spark.sql.streaming.microBatchPartitions=200and AQE settings) to dynamically optimize partition counts and handle data skew in streaming aggregations. -
Separate raw and aggregated streams β Write raw events to a Bronze layer first, then process aggregations in a separate streaming query. This provides fault isolation and enables reprocessing.
-
Implement dead-letter queues β Route events that fail deserialization or validation to a dead-letter topic/table for manual inspection rather than blocking the entire pipeline.
-
Use structured streaming for CDC β Combine Debezium CDC with Structured Streaming to apply database changes to Delta Lake targets in near-real-time with exactly-once semantics.
-
Optimize for your query pattern β For dashboards that query recent data, partition by time and use Z-ORDER on frequently filtered columns. For point-in-time queries, leverage Delta Lake time travel.
Mathematical Foundation Summary
Streaming analytics latency follows end-to-end_latency = ingestion + processing + query_latency. Watermark-based aggregation handles late data with allowed_late = watermark - event_time. Throughput scales as throughput = parallelism Γ batch_size / processing_time. Materialized view freshness depends on refresh frequency: freshness = current_time - last_refresh_time. Backpressure mechanism adjusts max_offsets_per_trigger when processing_time > trigger_interval, maintaining stable throughput at optimal_throughput β 0.8 Γ peak_capacity.
See also: Change Data Capture (36), Cost Optimization (39), Production Hardening (40)
See Also
- Structured Streaming β Real-time streaming framework
- Kafka Architecture β Kafka as the streaming backbone
- Streaming Pipeline β End-to-end streaming design
- Monitoring Metrics β Real-time monitoring patterns