πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-Time Analytics with PySpark

🟒 Free Lesson

Advertisement

Real-Time Analytics with PySpark

Real-Time Stack & Streaming Windows

Real-Time Analytics StackProducersIoT/AppsKafkaEvent streamSpark StreamingMicro-batchStateful opsDelta LakeGold tablesDashboardsReal-time BIStreaming Window OperationsTumbling (fixed)Sliding (overlap)Session (gap-based)Global (all events)Use watermarks to bound state | Target latency {"<"} 1 second for real-time

Architecture Diagram: Real-Time Analytics Stack

Architecture Diagram: Streaming Window Operations

Architecture Diagram: Materialized View Incremental Refresh

Detailed Explanation

Real-time analytics with PySpark is achieved through Structured Streaming, a micro-batch processing engine built on the Spark SQL engine that processes streaming data as a series of small batch queries. Unlike traditional batch processing that operates on complete datasets, streaming analytics processes data incrementally, maintaining state across micro-batches to provide continuously updated results.


What is Structured Streaming?

The fundamental abstraction in Structured Streaming is the streaming DataFrame β€” an unbounded table that grows as new data arrives.

How it Works:

  • Each micro-batch processes a chunk of new data
  • Applies transformations (select, filter, join, aggregate)
  • Writes results to a sink (console, Kafka, Delta Lake, etc.)
  • Manages fault tolerance through WAL (Write-Ahead Log) and checkpoint-based offset tracking
  • Ensures exactly-once processing semantics

What are Windowed Aggregations?

Windowed aggregations are the cornerstone of streaming analytics. PySpark supports three window types:

Window TypeDescriptionUse Case
Tumbling WindowsFixed-size, non-overlappingHourly/daily reports
Sliding WindowsFixed-size, overlappingRolling averages
Session WindowsDynamic-size based on activity gapsUser session analysis

Each window type requires specifying a time column (event time) and a watermark (maximum lateness tolerance) to handle late-arriving events.


What are Watermarks?

Watermarks are critical for bounded state management in streaming. Without watermarks, the engine must retain state for all possible event times indefinitely.

How Watermarks Work:

  • Define a threshold beyond which late events are either dropped or handled separately
  • Example: A watermark of "10 minutes" means events arriving more than 10 minutes after the maximum event time seen so far are considered late and may be dropped from window aggregations

What are Materialized Views in Streaming?

Materialized views in the streaming context are pre-computed aggregations that are incrementally updated as new data arrives.

ApproachDescriptionBenefit
Full RefreshRecomputes the entire aggregation on every refreshSimple but expensive
Incremental RefreshTracks changes since the last refresh and applies only the deltaOrders-of-magnitude improvements in both latency and cost

Delta Lake's MERGE operation enables incremental refresh by allowing atomic upserts β€” insert new aggregation groups and update existing ones in a single transaction.


What is State Management in Streaming?

State management in Structured Streaming uses RocksDB (via StateStore) to maintain intermediate aggregation state across micro-batches.

Operation TypeState Requirements
Simple aggregations (count, sum)Minimal state
Complex stateful operations (session windows, stream-stream joins)Significant state growth, requires TTL configuration and periodic state cleanup

What are Output Modes?

The output mode determines how results are written:

Output ModeDescriptionIdeal Use Case
Append modeWrites only new rowsNon-aggregation queries
Update modeWrites only rows that changed since the last triggerAggregation queries
Complete modeRewrites the entire result tableSmall result sets

Key Takeaway: The choice of output mode affects both correctness and performance in streaming analytics.

Key Concepts Table

Mathematical Foundations

Definition: Streaming Window

A streaming window WW assigns each event ee with timestamp tt to a set of windows:

W(e)={w:t∈w.interval}W(e) = \{w : t \in w.\text{interval}\}

Types: Tumbling (non-overlapping), Sliding (overlapping), Session (activity-based).

Watermark Definition

A watermark w(t)w(t) is a monotonically non-decreasing function that estimates the maximum event time yet to arrive:

w(t)≀min⁑e∈unprocessede.event_timew(t) \leq \min_{e \in \text{unprocessed}} e.\text{event\_time}

Late events beyond watermark threshold Ξ΄\delta are dropped or routed to side output.

Window Correctness Theorem

A window computation is correct if:

  1. Every event assigned to window ww is processed exactly once
  2. No event is missed: βˆ€e:e.event_time∈w.intervalβ€…β€ŠβŸΉβ€…β€Še∈processed(w)\forall e: e.\text{event\_time} \in w.\text{interval} \implies e \in \text{processed}(w)
  3. Watermark progression guarantees eventual completion: lim⁑tβ†’βˆžw(t)=max⁑(e.event_time)\lim_{t \rightarrow \infty} w(t) = \max(e.\text{event\_time})

Materialized View Refresh

Incremental refresh cost for materialized view VV with source SS:

Costrefresh=βˆ£Ξ”Sβˆ£Γ—Ctransform+∣Vβˆ£Γ—Cmerge\text{Cost}_{\text{refresh}} = |\Delta S| \times C_{\text{transform}} + |V| \times C_{\text{merge}}

vs. full recomputation: Cfull=∣Sβˆ£Γ—CtransformC_{\text{full}} = |S| \times C_{\text{transform}}. Break-even when βˆ£Ξ”S∣/∣S∣<Cmerge/Ctransform|\Delta S| / |S| < C_{\text{merge}} / C_{\text{transform}}.

End-to-End Latency

Total latency from event arrival to query result:

Le2e=Lingest+Lprocess+Lcommit+LqueryL_{\text{e2e}} = L_{\text{ingest}} + L_{\text{process}} + L_{\text{commit}} + L_{\text{query}}

Target: Le2e<SLAL_{\text{e2e}} < \text{SLA} (typically seconds for real-time).

Key Insight

Structured Streaming uses micro-batch execution by default. For sub-second latency, use Continuous processing mode (experimental). The trade-off is exactly-once guarantees vs. latency.

Summary

Real-time analytics relies on windowing for temporal aggregation, watermarks for late data handling, and incremental refresh for materialized views. End-to-end latency is the sum of ingestion, processing, commit, and query phases. Window correctness requires exactly-once processing with watermark-driven completion.

Key Concepts Table (cont.)

ConceptDescriptionConfigurationUse Case
Micro-BatchDiscrete processing intervalstrigger(processingTime="10 seconds")General streaming
ContinuousLow-latency processingtrigger(processingMode="continuous", checkpointingInterval="1 second")Sub-second latency
WatermarkLate event tolerancewithWatermark("event_time", "10 minutes")Window aggregations
Tumbling WindowFixed non-overlapping windowswindow("event_time", "5 minutes")Hourly reports
Sliding WindowFixed overlapping windowswindow("event_time", "10 minutes", "5 minutes")Rolling averages
Session WindowDynamic gap-based windowsCustom implementationUser sessions
State StoreRocksDB-based state backendspark.sql.streaming.stateStore.providerClassStateful operations
CheckpointOffset + state persistence.option("checkpointLocation", "/path")Fault tolerance
TriggerMicro-batch frequency.trigger(processingTime="30 seconds")Throughput tuning
Output ModeResult writing strategy.outputMode("update"/"append"/"complete")Result correctness

Code Examples

Example 1: Real-Time Click Stream Analytics with Windowed Aggregations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTime-ClickAnalytics") \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()

# Define click event schema
click_schema = StructType([
    StructField("user_id", StringType()),
    StructField("page_url", StringType()),
    StructField("action", StringType()),  # click, scroll, hover
    StructField("timestamp", TimestampType()),
    StructField("session_id", StringType()),
    StructField("device_type", StringType()),
    StructField("geo_location", StringType()),
])

# Read click stream from Kafka
click_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "click-events")
    .option("startingOffsets", "latest")
    .load()
    .select(
        from_json(col("value").cast("string"), click_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
)

# ─── Tumbling Window: Page views per 5-minute window ───
page_views_5min = (
    click_stream
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("page_url")
    )
    .agg(
        count("*").alias("view_count"),
        approx_count_distinct("user_id").alias("unique_users"),
        count(when(col("action") == "click", 1)).alias("click_count"),
    )
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("page_url"),
        col("view_count"),
        col("unique_users"),
        col("click_count"),
        (col("click_count") / col("view_count")).alias("click_through_rate"),
    )
)

# Write to Delta Lake (Update mode for aggregations)
(
    page_views_5min
    .writeStream
    .format("delta")
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/page_views_5min")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/page_views_5min")
)

# ─── Sliding Window: Rolling 10-minute average with 5-minute slide ───
rolling_metrics = (
    click_stream
    .withWatermark("event_time", "15 minutes")
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),
        col("device_type")
    )
    .agg(
        count("*").alias("total_events"),
        avg(when(col("action") == "click", 1).otherwise(0)).alias("avg_click_rate"),
        countDistinct("session_id").alias("active_sessions"),
    )
)

(
    rolling_metrics
    .writeStream
    .format("delta")
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/rolling_metrics")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/rolling_metrics")
)

# ─── Session Window: User session analytics ───
# Custom session window implementation
from pyspark.sql.window import Window

session_events = (
    click_stream
    .withColumn("session_start",
        first(col("event_time")).over(
            Window.partitionBy("session_id")
            .orderBy("event_time")
            .rowsBetween(Window.unboundedPreceding, 0)
        )
    )
    .withColumn("gap_minutes",
        (col("event_time").cast("long") - col("session_start").cast("long")) / 60
    )
    .withColumn("new_session",
        when(col("gap_minutes") > 30, 1).otherwise(0)
    )
    .withColumn("session_group",
        sum("new_session").over(
            Window.partitionBy("user_id").orderBy("event_time")
        )
    )
)

session_analytics = (
    session_events
    .groupBy(
        col("user_id"),
        col("session_id"),
        col("session_start")
    )
    .agg(
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end"),
        count("*").alias("event_count"),
        countDistinct("page_url").alias("pages_visited"),
        (max("event_time").cast("long") - min("event_time").cast("long")).alias(
            "session_duration_seconds"
        ),
    )
)

Example 2: Real-Time Anomaly Detection with Stateful Processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTime-AnomalyDetection") \
    .getOrCreate()

# Read sensor data stream
sensor_schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("metric_name", StringType()),
    StructField("value", DoubleType()),
    StructField("timestamp", TimestampType()),
])

sensor_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "sensor-metrics")
    .load()
    .select(from_json(col("value").cast("string"), sensor_schema).alias("data"))
    .select("data.*")
)

# ─── Rolling Statistics for Anomaly Detection ───
windowed_stats = (
    sensor_stream
    .withWatermark("timestamp", "5 minutes")
    .groupBy(
        window(col("timestamp"), "15 minutes", "1 minute"),
        col("sensor_id"),
        col("metric_name"),
    )
    .agg(
        avg("value").alias("avg_value"),
        stddev("value").alias("stddev_value"),
        min("value").alias("min_value"),
        max("value").alias("max_value"),
        count("*").alias("reading_count"),
    )
    .withColumn("upper_bound", col("avg_value") + 3 * col("stddev_value"))
    .withColumn("lower_bound", col("avg_value") - 3 * col("stddev_value"))
)

# ─── Detect Anomalies in Real-Time ───
anomaly_stream = (
    sensor_stream
    .join(
        windowed_stats,
        (sensor_stream["sensor_id"] == windowed_stats["sensor_id"]) &
        (sensor_stream["metric_name"] == windowed_stats["metric_name"]) &
        (sensor_stream["timestamp"] >= windowed_stats["window.start"]) &
        (sensor_stream["timestamp"] <= windowed_stats["window.end"]),
        "left"
    )
    .withColumn("is_anomaly",
        (col("value") > col("upper_bound")) |
        (col("value") < col("lower_bound"))
    )
    .filter(col("is_anomaly") == True)
    .select(
        sensor_stream["sensor_id"],
        sensor_stream["metric_name"],
        sensor_stream["value"],
        sensor_stream["timestamp"],
        col("avg_value").alias("expected_value"),
        col("stddev_value").alias("expected_stddev"),
        (abs(col("value") - col("avg_value")) / col("stddev_value")).alias("z_score"),
    )
)

# Write anomalies to Delta Lake + send alerts
(
    anomaly_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/anomaly_alerts")
    .trigger(processingTime="10 seconds")
    .start("/mnt/analytics/gold/anomaly_alerts")
)

# ─── Real-Time Dashboard Query (Read from Materialized View) ───
dashboard_metrics = spark.read.format("delta").load("/mnt/analytics/gold/page_views_5min")

# Top pages in last hour
top_pages = (
    dashboard_metrics
    .filter(col("window_start") >= current_timestamp() - expr("INTERVAL 1 HOUR"))
    .groupBy("page_url")
    .agg(
        sum("view_count").alias("total_views"),
        sum("click_count").alias("total_clicks"),
        avg("click_through_rate").alias("avg_ctr"),
    )
    .orderBy(desc("total_views"))
    .limit(20)
)

top_pages.show()

Example 3: Streaming JOIN between Two Data Streams

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamStream-Join") \
    .getOrCreate()

# Stream 1: Click events
clicks = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "click-events")
    .load()
    .select(
        from_json(col("value").cast("string"), click_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
    .withWatermark("event_time", "10 minutes")
)

# Stream 2: Purchase events
purchases = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "purchase-events")
    .load()
    .select(
        from_json(col("value").cast("string"), purchase_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
    .withWatermark("event_time", "10 minutes")
)

# Stream-Stream Join: Match clicks to purchases within 30-minute window
click_purchase_joined = (
    clicks.alias("c")
    .join(
        purchases.alias("p"),
        expr("""
            c.user_id = p.user_id AND
            p.event_time BETWEEN c.event_time AND c.event_time + INTERVAL 30 MINUTES
        """),
        "left"
    )
    .select(
        col("c.user_id"),
        col("c.page_url").alias("clicked_page"),
        col("c.event_time").alias("click_time"),
        col("p.product_id"),
        col("p.amount").alias("purchase_amount"),
        col("p.event_time").alias("purchase_time"),
        (col("p.event_time").cast("long") - col("c.event_time").cast("long")).alias(
            "time_to_purchase_seconds"
        ),
    )
)

# Write joined stream to Delta Lake
(
    click_purchase_joined
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/click_purchase_join")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/click_purchase_attribution")
)

Performance Metrics

MetricBatch ProcessingStreaming (Micro-Batch)Streaming (Continuous)Improvement
End-to-End Latency1-24 hours10-60 seconds100-500 ms99.9% reduction
Throughput (events/sec)100K (batch)500K (streaming)1M (continuous)10x improvement
Cost per Million Events0.50∣0.50 |0.10$0.0884% reduction
State Size (1B events)N/A (stateless)2-10 GB (RocksDB)2-10 GB (RocksDB)Bounded
Recovery TimeRe-run entire batchReplay from checkpointReplay from checkpointSame
Late Event HandlingN/A (re-process)Watermark-basedWatermark-basedNew capability
Concurrent Queries1 (batch)Multiple (streaming)Multiple (streaming)Scaling
Exactly-OnceDifficultCheckpoint-basedCheckpoint-basedGuaranteed
Resource Utilization100% during batch30-50% (micro-batch)60-80% (continuous)More efficient
Complex Event ProcessingNot supportedWindow aggregationsWindow aggregationsNew capability

Best Practices

  1. Use watermarks for all windowed aggregations β€” Without watermarks, state grows unbounded. Set watermarks to slightly larger than the maximum expected event lateness (e.g., 10 minutes for events that typically arrive within 5 minutes).

  2. Tune micro-batch size for latency vs. throughput β€” Smaller trigger intervals (1-10 seconds) reduce latency but increase overhead; larger intervals (30-60 seconds) improve throughput but increase latency. Profile your workload to find the optimal balance.

  3. Use Delta Lake as the primary streaming sink β€” Delta Lake provides exactly-once semantics, ACID transactions, time travel, and schema evolution. It integrates natively with Structured Streaming as both source and sink.

  4. Implement idempotent writes β€” Design streaming writes to be idempotent so that replaying from checkpoints produces the same results. Delta Lake's MERGE operation is inherently idempotent.

  5. Monitor state size growth β€” For stateful operations (windowed aggregations, stream-stream joins), monitor RocksDB state size. Set TTL on state entries and configure periodic state cleanup to prevent OOM errors.

  6. Use AQE for streaming β€” Enable Adaptive Query Execution (spark.sql.streaming.microBatchPartitions=200 and AQE settings) to dynamically optimize partition counts and handle data skew in streaming aggregations.

  7. Separate raw and aggregated streams β€” Write raw events to a Bronze layer first, then process aggregations in a separate streaming query. This provides fault isolation and enables reprocessing.

  8. Implement dead-letter queues β€” Route events that fail deserialization or validation to a dead-letter topic/table for manual inspection rather than blocking the entire pipeline.

  9. Use structured streaming for CDC β€” Combine Debezium CDC with Structured Streaming to apply database changes to Delta Lake targets in near-real-time with exactly-once semantics.

  10. Optimize for your query pattern β€” For dashboards that query recent data, partition by time and use Z-ORDER on frequently filtered columns. For point-in-time queries, leverage Delta Lake time travel.

Mathematical Foundation Summary

Streaming analytics latency follows end-to-end_latency = ingestion + processing + query_latency. Watermark-based aggregation handles late data with allowed_late = watermark - event_time. Throughput scales as throughput = parallelism Γ— batch_size / processing_time. Materialized view freshness depends on refresh frequency: freshness = current_time - last_refresh_time. Backpressure mechanism adjusts max_offsets_per_trigger when processing_time > trigger_interval, maintaining stable throughput at optimal_throughput β‰ˆ 0.8 Γ— peak_capacity.

See also: Change Data Capture (36), Cost Optimization (39), Production Hardening (40)

See Also

⭐

Premium Content

Real-Time Analytics with PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement