13. Window Operations in PySpark
DfTumbling Window
A tumbling window is a fixed-size, non-overlapping window that partitions the event timeline into equal intervals. Each event belongs to exactly one window: window_start = floor(event_time / window_size) Γ window_size.
DfSliding Window
A sliding window has a fixed size but slides by a step interval smaller than the window size. Events can belong to multiple overlapping windows, increasing computation proportional to window_size / slide_interval.
DfSession Window
A session window groups events based on activity gaps. The window closes when no events arrive within a specified gap threshold. Window size is dynamic and depends on event arrival patterns.
Here,
- =Window start time for event i
- =Event timestamp
- =Window size (e.g., 10 minutes)
Sliding Window Overlap Factor
Here,
- =Number of windows each event appears in (on average)
- =Window size
- =Slide interval
Tumbling windows are the most efficient because each event maps to exactly one window. Sliding windows cause each event to appear in multiple windows, increasing computation and state size proportionally.
Use watermarks with window operations to bound state growth. Without a watermark, Spark must retain state indefinitely to handle arbitrarily late data, which causes unbounded memory growth.
ThWindow State Growth
Theorem: For sliding windows with overlap factor F_{overlap}, the number of active windows at any time is βS_{window} / S_{slide}β, and state storage grows proportionally. State cleanup occurs only when the watermark passes the window end time.
- Tumbling: non-overlapping, fixed size, most efficient (1 window per event)
- Sliding: overlapping, fixed size, higher computation proportional to overlap factor
- Session: dynamic size, gap-based, highest complexity
- Watermarks bound state growth by triggering window cleanup
- Session windows merge when gap threshold is exceeded
Window Timeline Comparison
ποΈ Window Types Architecture
π Detailed Explanation
Why Window Operations?
- Fundamental for time-based aggregations in streaming data
- Group events into temporal windows for sum, count, average, and more
- Understanding window types is crucial for effective real-time analytics
Tumbling Windows
- Simplest form of windowing
- Fixed-size, non-overlapping intervals
- Each event belongs to exactly one window based on event time
- Ideal for: periodic reporting (hourly sales, minute-by-minute sensor readings)
- Window size determines aggregation granularity
Sliding Windows
- More flexibility with overlapping windows
- Defined by two parameters:
| Parameter | Description |
|---|---|
| Window Size | Duration of each window |
| Slide Interval | How often a new window starts |
- When slide < window size β windows overlap β events belong to multiple windows
- Ideal for: rolling averages, moving statistics
- Computation increases proportional to
window_size / slide_interval
Session Windows
- Dynamic windows based on activity patterns
- Define a maximum gap between events
- If gap exceeds threshold β new session starts
- Ideal for: user behavior analysis, continuous activity periods
- Automatically handles varying session lengths
- Can merge sessions that are close together
Watermarking for Windows
- Defines a threshold for handling late-arriving data
- Events after watermark β excluded from aggregations
- Allows system to clean up old state and provide memory guarantees
- Choose threshold based on expected lateness of events in data
Key Takeaway: Without watermarks, Spark must retain state indefinitely to handle arbitrarily late data, causing unbounded memory growth.
Window Aggregation Pipeline
- Event time extraction β extract timestamp from data
- Window assignment β assign events to windows
- State management β maintain state for each window
- Output generation β emit results when window closes (based on watermark)
Choosing the Right Window Type
| Use Case | Recommended Window | Why |
|---|---|---|
| Periodic reporting | Tumbling | Simple, non-overlapping |
| Rolling calculations | Sliding | Overlapping windows |
| Activity-based analysis | Session | Dynamic, gap-based |
Performance Considerations
- Window size β larger windows = larger state
- State size β impacts memory usage and checkpoint duration
- Watermark configuration β balances late data handling vs. state growth
- Balance configuration with available resources and **expected data volume
Advanced Features
- Multi-window aggregations β apply multiple window types simultaneously
- Window joins β join data from different windows together
- Enable complex real-time analytics use cases
π Key Concepts Table
| Window Type | Size | Overlap | Use Case | State Management |
|---|---|---|---|---|
| Tumbling | Fixed | No | Periodic reporting | Simple, bounded |
| Sliding | Fixed | Yes | Rolling calculations | Moderate, multiple windows |
| Session | Dynamic | Depends on gap | User activity analysis | Complex, variable size |
| Global | Entire stream | N/A | Cumulative statistics | Unbounded without watermark |
π» Code Examples
Tumbling Window Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("TumblingWindow") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_data") \
.load() \
.selectExpr("CAST(value AS STRING) as json") \
.select(
from_json(col("json"), "device_id INT, temperature DOUBLE, timestamp TIMESTAMP").alias("data")
).select("data.*")
# Tumbling window aggregation (5-minute windows)
# Parameter: window("timestamp", "5 minutes") β tumbling window
# Parameter: withWatermark("timestamp", "10 minutes") β delay threshold
tumbling_agg = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"), # 5-minute tumbling window
"device_id"
).agg(
avg("temperature").alias("avg_temp"),
min("temperature").alias("min_temp"),
max("temperature").alias("max_temp"),
count("*").alias("reading_count")
).select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"device_id", "avg_temp", "min_temp", "max_temp", "reading_count"
)
# Write to console
query = tumbling_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/tumbling") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Sliding Window with Multiple Aggregations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("SlidingWindow") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.load() \
.withColumn("event_time", col("timestamp")) \
.withColumn("user_id", (col("value") % 10).cast(IntegerType())) \
.withColumn("amount", (col("value") * 1.5).cast(DoubleType()))
# Sliding window aggregation (10-minute window, 2-minute slide)
# Parameter: window("event_time", "10 minutes", "2 minutes") β window size, slide interval
sliding_agg = stream_df \
.withWatermark("event_time", "15 minutes") \
.groupBy(
window("event_time", "10 minutes", "2 minutes"), # 10min window, 2min slide
"user_id"
).agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count_distinct("user_id").alias("unique_users")
).select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"user_id", "total_amount", "avg_amount", "unique_users"
)
# Write to console
query = sliding_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/sliding") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Session Window Analysis
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("SessionWindow") \
.getOrCreate()
# Read streaming data (user events)
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_events") \
.load() \
.selectExpr("CAST(value AS STRING) as json") \
.select(
from_json(col("json"), "user_id INT, action STRING, event_time TIMESTAMP").alias("data")
).select("data.*")
# Session window aggregation (10-minute session gap)
# Parameter: session("event_time", "10 minutes") β gap threshold
session_agg = stream_df \
.withWatermark("event_time", "20 minutes") \
.groupBy(
session("event_time", "10 minutes"), # 10-minute session gap
"user_id"
).agg(
collect_list("action").alias("actions"),
count("*").alias("event_count"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")
).select(
col("session_start"),
col("session_end"),
"user_id", "actions", "event_count",
(unix_timestamp("session_end") - unix_timestamp("session_start")).alias("session_duration_seconds")
)
# Write to console
query = session_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/session") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Advanced: Multi-Window Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MultiWindowAggregation") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 50) \
.load() \
.withColumn("event_time", col("timestamp")) \
.withColumn("metric_value", col("value").cast(DoubleType()))
# Define multiple window specifications
window_1min = window("event_time", "1 minute")
window_5min = window("event_time", "5 minutes")
window_15min = window("event_time", "15 minutes")
# Apply watermark
watermarked_df = stream_df.withWatermark("event_time", "30 minutes")
# Multi-window aggregation
multi_window_agg = watermarked_df \
.groupBy(
window_1min,
window_5min,
window_15min
).agg(
avg("metric_value").alias("avg_value"),
count("*").alias("count")
).select(
col("window_1min.start").alias("window_1min_start"),
col("window_1min.end").alias("window_1min_end"),
col("window_5min.start").alias("window_5min_start"),
col("window_5min.end").alias("window_5min_end"),
col("window_15min.start").alias("window_15min_start"),
col("window_15min.end").alias("window_15min_end"),
"avg_value", "count"
)
# Write to console
query = multi_window_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/multi_window") \
.option("truncate", "false") \
.start()
query.awaitTermination()
π Performance Metrics
| Window Type | Latency | Throughput | State Size | Memory Usage |
|---|---|---|---|---|
| Tumbling (1min) | ~1min | High | Small | Low |
| Tumbling (5min) | ~5min | High | Medium | Medium |
| Sliding (10min/2min) | ~10min | Medium | Large | High |
| Session (10min gap) | Variable | Medium | Variable | Variable |
| Global | Unbounded | Low | Very Large | Very High |
π Best Practices
- Use appropriate window size β Balance between granularity and performance
- Configure watermarks properly β Essential for state management and late data handling
- Monitor state sizes β Track state growth to prevent memory issues
- Choose output mode wisely β Append for simple, Complete for full results
- Handle late data explicitly β Define strategy for events arriving after watermark
- Optimize window intervals β Use slide intervals that match business requirements
- Test with realistic data β Validate window behavior with production-like event patterns
- Consider session merging β For session windows, tune gap threshold based on activity patterns
- Use checkpointing β Essential for fault tolerance in windowed aggregations
- Profile performance β Monitor processing times and state sizes for optimization
π Related Topics
- 11-structured-streaming.mdx: Core streaming architecture and triggers
- 12-state-management.mdx: Stateful operations and checkpointing
- 14-merge-upsert.mdx: Delta Lake merge operations for windowed data
- 15-data-quality.mdx: Data validation in windowed aggregations
See Also
- 06-joins-optimization.mdx: Join strategies for windowed data
- 07-partitioning-strategies.mdx: Partitioning for window operations
- Kafka Streams (kafka/03): Window operations in Kafka Streams
- Data Engineering Streaming (data-engineering/022): Windowed aggregation patterns in streaming