πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

13. Window Operations in PySpark

🟒 Free Lesson

Advertisement

13. Window Operations in PySpark

DfTumbling Window

A tumbling window is a fixed-size, non-overlapping window that partitions the event timeline into equal intervals. Each event belongs to exactly one window: window_start = floor(event_time / window_size) Γ— window_size.

DfSliding Window

A sliding window has a fixed size but slides by a step interval smaller than the window size. Events can belong to multiple overlapping windows, increasing computation proportional to window_size / slide_interval.

DfSession Window

A session window groups events based on activity gaps. The window closes when no events arrive within a specified gap threshold. Window size is dynamic and depends on event arrival patterns.

Tumbling Window Assignment
Wi=⌊EtimeSwindowβŒ‹Γ—SwindowW_i = \left\lfloor \frac{E_{time}}{S_{window}} \right\rfloor \times S_{window}

Here,

  • WiW_i=Window start time for event i
  • EtimeE_{time}=Event timestamp
  • SwindowS_{window}=Window size (e.g., 10 minutes)

Sliding Window Overlap Factor

Foverlap=SwindowSslideF_{overlap} = \frac{S_{window}}{S_{slide}}

Here,

  • FoverlapF_{overlap}=Number of windows each event appears in (on average)
  • SwindowS_{window}=Window size
  • SslideS_{slide}=Slide interval

Tumbling windows are the most efficient because each event maps to exactly one window. Sliding windows cause each event to appear in multiple windows, increasing computation and state size proportionally.

Use watermarks with window operations to bound state growth. Without a watermark, Spark must retain state indefinitely to handle arbitrarily late data, which causes unbounded memory growth.

ThWindow State Growth

Theorem: For sliding windows with overlap factor F_{overlap}, the number of active windows at any time is ⌈S_{window} / S_{slide}βŒ‰, and state storage grows proportionally. State cleanup occurs only when the watermark passes the window end time.

  • Tumbling: non-overlapping, fixed size, most efficient (1 window per event)
  • Sliding: overlapping, fixed size, higher computation proportional to overlap factor
  • Session: dynamic size, gap-based, highest complexity
  • Watermarks bound state growth by triggering window cleanup
  • Session windows merge when gap threshold is exceeded

Window Timeline Comparison

Window Types: Event TimelineTime β†’eventsTumblingW1: [0-10min)W2: [10-20min)W3: [20-30min)No overlapSlidingW1: [0-20min)W2: [10-30min)W3: [20-40min)Overlap = W/SSessionS1: active (gap {"<"} 5min)S2: longer session (more events)Dynamic sizeUse watermarks to bound state growth for sliding/session windows

πŸ—οΈ Window Types Architecture

πŸ“š Detailed Explanation

Why Window Operations?

  • Fundamental for time-based aggregations in streaming data
  • Group events into temporal windows for sum, count, average, and more
  • Understanding window types is crucial for effective real-time analytics

Tumbling Windows

  • Simplest form of windowing
  • Fixed-size, non-overlapping intervals
  • Each event belongs to exactly one window based on event time
  • Ideal for: periodic reporting (hourly sales, minute-by-minute sensor readings)
  • Window size determines aggregation granularity

Sliding Windows

  • More flexibility with overlapping windows
  • Defined by two parameters:
ParameterDescription
Window SizeDuration of each window
Slide IntervalHow often a new window starts
  • When slide < window size β†’ windows overlap β†’ events belong to multiple windows
  • Ideal for: rolling averages, moving statistics
  • Computation increases proportional to window_size / slide_interval

Session Windows

  • Dynamic windows based on activity patterns
  • Define a maximum gap between events
  • If gap exceeds threshold β†’ new session starts
  • Ideal for: user behavior analysis, continuous activity periods
  • Automatically handles varying session lengths
  • Can merge sessions that are close together

Watermarking for Windows

  • Defines a threshold for handling late-arriving data
  • Events after watermark β†’ excluded from aggregations
  • Allows system to clean up old state and provide memory guarantees
  • Choose threshold based on expected lateness of events in data

Key Takeaway: Without watermarks, Spark must retain state indefinitely to handle arbitrarily late data, causing unbounded memory growth.


Window Aggregation Pipeline

  1. Event time extraction β€” extract timestamp from data
  2. Window assignment β€” assign events to windows
  3. State management β€” maintain state for each window
  4. Output generation β€” emit results when window closes (based on watermark)

Choosing the Right Window Type

Use CaseRecommended WindowWhy
Periodic reportingTumblingSimple, non-overlapping
Rolling calculationsSlidingOverlapping windows
Activity-based analysisSessionDynamic, gap-based

Performance Considerations

  • Window size β€” larger windows = larger state
  • State size β€” impacts memory usage and checkpoint duration
  • Watermark configuration β€” balances late data handling vs. state growth
  • Balance configuration with available resources and **expected data volume

Advanced Features

  • Multi-window aggregations β€” apply multiple window types simultaneously
  • Window joins β€” join data from different windows together
  • Enable complex real-time analytics use cases

πŸ“Š Key Concepts Table

Window TypeSizeOverlapUse CaseState Management
TumblingFixedNoPeriodic reportingSimple, bounded
SlidingFixedYesRolling calculationsModerate, multiple windows
SessionDynamicDepends on gapUser activity analysisComplex, variable size
GlobalEntire streamN/ACumulative statisticsUnbounded without watermark

πŸ’» Code Examples

Tumbling Window Aggregation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("TumblingWindow") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(
        from_json(col("json"), "device_id INT, temperature DOUBLE, timestamp TIMESTAMP").alias("data")
    ).select("data.*")

# Tumbling window aggregation (5-minute windows)
# Parameter: window("timestamp", "5 minutes") β€” tumbling window
# Parameter: withWatermark("timestamp", "10 minutes") β€” delay threshold
tumbling_agg = stream_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes"),  # 5-minute tumbling window
        "device_id"
    ).agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp"),
        count("*").alias("reading_count")
    ).select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "device_id", "avg_temp", "min_temp", "max_temp", "reading_count"
    )

# Write to console
query = tumbling_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/tumbling") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Sliding Window with Multiple Aggregations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("SlidingWindow") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load() \
    .withColumn("event_time", col("timestamp")) \
    .withColumn("user_id", (col("value") % 10).cast(IntegerType())) \
    .withColumn("amount", (col("value") * 1.5).cast(DoubleType()))

# Sliding window aggregation (10-minute window, 2-minute slide)
# Parameter: window("event_time", "10 minutes", "2 minutes") β€” window size, slide interval
sliding_agg = stream_df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        window("event_time", "10 minutes", "2 minutes"),  # 10min window, 2min slide
        "user_id"
    ).agg(
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount"),
        count_distinct("user_id").alias("unique_users")
    ).select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "user_id", "total_amount", "avg_amount", "unique_users"
    )

# Write to console
query = sliding_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/sliding") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Session Window Analysis

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("SessionWindow") \
    .getOrCreate()

# Read streaming data (user events)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(
        from_json(col("json"), "user_id INT, action STRING, event_time TIMESTAMP").alias("data")
    ).select("data.*")

# Session window aggregation (10-minute session gap)
# Parameter: session("event_time", "10 minutes") β€” gap threshold
session_agg = stream_df \
    .withWatermark("event_time", "20 minutes") \
    .groupBy(
        session("event_time", "10 minutes"),  # 10-minute session gap
        "user_id"
    ).agg(
        collect_list("action").alias("actions"),
        count("*").alias("event_count"),
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end")
    ).select(
        col("session_start"),
        col("session_end"),
        "user_id", "actions", "event_count",
        (unix_timestamp("session_end") - unix_timestamp("session_start")).alias("session_duration_seconds")
    )

# Write to console
query = session_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/session") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Advanced: Multi-Window Aggregation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MultiWindowAggregation") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 50) \
    .load() \
    .withColumn("event_time", col("timestamp")) \
    .withColumn("metric_value", col("value").cast(DoubleType()))

# Define multiple window specifications
window_1min = window("event_time", "1 minute")
window_5min = window("event_time", "5 minutes")
window_15min = window("event_time", "15 minutes")

# Apply watermark
watermarked_df = stream_df.withWatermark("event_time", "30 minutes")

# Multi-window aggregation
multi_window_agg = watermarked_df \
    .groupBy(
        window_1min,
        window_5min,
        window_15min
    ).agg(
        avg("metric_value").alias("avg_value"),
        count("*").alias("count")
    ).select(
        col("window_1min.start").alias("window_1min_start"),
        col("window_1min.end").alias("window_1min_end"),
        col("window_5min.start").alias("window_5min_start"),
        col("window_5min.end").alias("window_5min_end"),
        col("window_15min.start").alias("window_15min_start"),
        col("window_15min.end").alias("window_15min_end"),
        "avg_value", "count"
    )

# Write to console
query = multi_window_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/multi_window") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

πŸ“ˆ Performance Metrics

Window TypeLatencyThroughputState SizeMemory Usage
Tumbling (1min)~1minHighSmallLow
Tumbling (5min)~5minHighMediumMedium
Sliding (10min/2min)~10minMediumLargeHigh
Session (10min gap)VariableMediumVariableVariable
GlobalUnboundedLowVery LargeVery High

πŸ† Best Practices

  1. Use appropriate window size β€” Balance between granularity and performance
  2. Configure watermarks properly β€” Essential for state management and late data handling
  3. Monitor state sizes β€” Track state growth to prevent memory issues
  4. Choose output mode wisely β€” Append for simple, Complete for full results
  5. Handle late data explicitly β€” Define strategy for events arriving after watermark
  6. Optimize window intervals β€” Use slide intervals that match business requirements
  7. Test with realistic data β€” Validate window behavior with production-like event patterns
  8. Consider session merging β€” For session windows, tune gap threshold based on activity patterns
  9. Use checkpointing β€” Essential for fault tolerance in windowed aggregations
  10. Profile performance β€” Monitor processing times and state sizes for optimization

πŸ”— Related Topics

  • 11-structured-streaming.mdx: Core streaming architecture and triggers
  • 12-state-management.mdx: Stateful operations and checkpointing
  • 14-merge-upsert.mdx: Delta Lake merge operations for windowed data
  • 15-data-quality.mdx: Data validation in windowed aggregations

See Also

  • 06-joins-optimization.mdx: Join strategies for windowed data
  • 07-partitioning-strategies.mdx: Partitioning for window operations
  • Kafka Streams (kafka/03): Window operations in Kafka Streams
  • Data Engineering Streaming (data-engineering/022): Windowed aggregation patterns in streaming
⭐

Premium Content

13. Window Operations in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement