⏰ Time Series Analysis in PySpark
Time Series Pipeline
Architecture Diagram
Detailed Explanation
Fundamental Challenge
Time series data has an inherent ordering that must be preserved throughout processing.
Key Takeaway: Unlike standard DataFrame operations, time series operations must respect temporal sequence and handle gaps, irregularities, and varying frequencies.
Window Functions for Time Series
| Frame Type | Definition | Use Case |
|---|---|---|
| Range-based | rangeBetween(-3600, 0) | Irregular time series, actual time values |
| Row-based | rowsBetween(-5, 0) | Fixed row count, evenly spaced data |
Range vs Row Frames:
- Range frames handle irregular timestamps correctly
- Row frames always include exactly specified number of rows
- For most time series, range frames provide correct semantics
Gap Detection and Filling
Real-world data frequently has missing timestamps.
| Strategy | Description | Best For |
|---|---|---|
| Forward Fill | Carry last known value forward | Slowly changing metrics |
| Backward Fill | Propagate next known value backward | Known future values |
| Interpolation | Estimate based on surrounding values | Continuously varying signals |
Resampling Operations
Resampling changes the frequency of a time series.
| Operation | Method | Aggregation |
|---|---|---|
| Downsampling | Tumbling windows | mean, sum, last, first |
| Upsampling | Create target timestamps + join | Interpolation |
Best Practice: Choose aggregation based on metric semantics—sum for counts, mean/last for levels.
Seasonal Decomposition
Separates time series into components using window functions.
| Component | Calculation |
|---|---|
| Trend | Centered moving average |
| Seasonal | Average deviation from trend at each seasonal position |
| Residual | What remains after removing trend and seasonal |
Use Case: Anomaly detection, forecasting preprocessing, pattern identification.
Mathematical Foundations
Definition: Time Series Decomposition
A time series can be decomposed as:
where is the trend component, is the seasonal component with period (), and is the residual.
Moving Average Smoothing
For window size (odd), the centered moving average is:
This removes seasonal variation when equals the seasonal period.
Stationarity Theorem (Wold's Decomposition)
A weakly stationary process (constant mean , autocovariance depending only on lag ) can be represented as:
where are white noise and . Non-stationary series require differencing times: where is the backshift operator.
ARIMA Model
The ARIMA model is:
where and .
Interpolation Error Bound
For linear interpolation between points and at time :
Key Insight
Spark's Window.rowsBetween() enables distributed moving average computation, but partitioning by time series ID is critical. Without proper partitioning, rows from different series mix across partitions, producing incorrect results.
Summary
Time series analysis on Spark leverages window functions for decomposition, moving averages, and lag/lead operations. Stationarity is required for ARIMA models and is tested via ADF/KPSS tests. Interpolation accuracy depends on the gap size relative to the underlying signal's smoothness.
Key Concepts Table
| Concept | Description | PySpark Implementation |
|---|---|---|
| Tumbling Window | Non-overlapping fixed-size windows | Window.orderBy("ts").rangeBetween(-interval, 0) |
| Sliding Window | Overlapping windows with fixed step | Window.orderBy("ts").rowsBetween(-size, 0) |
| Session Window | Activity-based windows with gap threshold | Custom implementation with gap detection |
| Range Frame | Window defined by value range | rangeBetween(start, end) |
| Row Frame | Window defined by row count | rowsBetween(start, end) |
| Forward Fill | Carry last known value forward | last("value", ignorenulls=True) |
| Backward Fill | Propagate next known value backward | first("value", ignorenulls=True) |
| Linear Interpolation | Estimate missing values linearly | Custom UDF with linear regression |
| Resampling | Change frequency of time series | Tumbling window + aggregation |
| Seasonal Decomposition | Separate trend/seasonal/residual | Centered MA + seasonal averaging |
Code Examples
Window Functions for Time Series
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("TimeSeriesAnalysis") \
.getOrCreate()
# Create sensor data with irregular timestamps
data = [
("sensor_1", "2024-01-15 08:00:00", 22.5, 45.0),
("sensor_1", "2024-01-15 08:03:00", 22.7, 44.8),
("sensor_1", "2024-01-15 08:15:00", 23.1, 43.5),
("sensor_1", "2024-01-15 08:20:00", 22.9, 44.2),
("sensor_1", "2024-01-15 08:35:00", 23.5, 42.0),
("sensor_1", "2024-01-15 08:40:00", 23.2, 43.1),
("sensor_1", "2024-01-15 09:00:00", 24.0, 41.5),
("sensor_1", "2024-01-15 09:10:00", 23.8, 42.3),
]
df = spark.createDataFrame(data, ["sensor_id", "timestamp", "temperature", "humidity"])
df = df.withColumn("ts", col("timestamp").cast("timestamp"))
# Define window specifications
# 30-minute tumbling window
tumbling_window = Window \
.partitionBy("sensor_id") \
.orderBy(col("ts").cast("long")) \
.rangeBetween(-1800, 0) # 30 minutes in seconds
# 5-row sliding window
sliding_window = Window \
.partitionBy("sensor_id") \
.orderBy("ts") \
.rowsBetween(-4, 0) # Current row + 4 previous
# 1-hour range window
hourly_range = Window \
.partitionBy("sensor_id") \
.orderBy(col("ts").cast("long")) \
.rangeBetween(-3600, 0)
# Apply window functions
result = df \
.withColumn("rolling_avg_temp", avg("temperature").over(tumbling_window)) \
.withColumn("rolling_max_temp", max("temperature").over(tumbling_window)) \
.withColumn("rolling_min_humidity", min("humidity").over(tumbling_window)) \
.withColumn("row_avg_temp", avg("temperature").over(sliding_window)) \
.withColumn("hourly_stddev", stddev("temperature").over(hourly_range)) \
.withColumn("temp_rank", rank().over(
Window.partitionBy("sensor_id").orderBy(desc("temperature"))
))
result.show(truncate=False)
Gap Detection and Filling
# Generate expected timestamps (every 5 minutes)
from pyspark.sql.functions import sequence, explode, collect_list
# Create reference time series (regular 5-minute intervals)
min_ts = df.agg(min("ts")).first()[0]
max_ts = df.agg(max("ts")).first()[0]
reference_df = spark.sql(f"""
SELECT explode(sequence(
'{min_ts}',
'{max_ts}',
interval 5 minutes
)) AS expected_ts
""")
# Detect gaps
gaps_df = df.select("sensor_id", "ts") \
.withColumnRenamed("ts", "actual_ts") \
.crossJoin(reference_df) \
.withColumn("diff_seconds",
abs(col("expected_ts").cast("long") - col("actual_ts").cast("long"))
) \
.filter(col("diff_seconds") > 120) # More than 2 minutes off
print("Detected gaps:")
gaps_df.groupBy("sensor_id").count().show()
# Forward fill missing values
forward_fill_window = Window \
.partitionBy("sensor_id") \
.orderBy("ts") \
.rowsBetween(Window.unboundedPreceding, 0)
filled_df = df \
.withColumn("temp_filled", last("temperature", ignorenulls=True).over(forward_fill_window)) \
.withColumn("humidity_filled", last("humidity", ignorenulls=True).over(forward_fill_window))
# Linear interpolation for temperature
@udf(returnType=DoubleType())
def linear_interpolate(values, timestamps, target_ts):
"""Linear interpolation between two points."""
if values is None or len(values) < 2:
return values[0] if values else None
for i in range(len(timestamps) - 1):
if timestamps[i] <= target_ts <= timestamps[i + 1]:
t0, t1 = timestamps[i], timestamps[i + 1]
v0, v1 = values[i], values[i + 1]
if t1 == t0:
return v0
ratio = (target_ts - t0) / (t1 - t0)
return v0 + ratio * (v1 - v0)
return values[-1]
# Apply interpolation using window functions
interpolated_df = df \
.withColumn("ts_long", col("ts").cast("long")) \
.withColumn("temp_array", collect_list("temperature").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)) \
.withColumn("ts_array", collect_list("ts_long").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)) \
.withColumn("temp_interpolated",
linear_interpolate(col("temp_array"), col("ts_array"), col("ts_long"))
)
Resampling and Seasonal Analysis
# Resample to hourly frequency
hourly_df = df \
.withColumn("hour", date_trunc("hour", col("ts"))) \
.groupBy("sensor_id", "hour") \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
min("temperature").alias("min_temp"),
avg("humidity").alias("avg_humidity"),
count("*").alias("readings_count")
)
# Compute daily aggregates
daily_df = df \
.withColumn("date", to_date(col("ts"))) \
.groupBy("sensor_id", "date") \
.agg(
avg("temperature").alias("daily_avg_temp"),
stddev("temperature").alias("daily_stddev_temp"),
(max("temperature") - min("temperature")).alias("daily_range")
)
# Seasonal decomposition (hourly pattern)
hour_of_day = Window \
.partitionBy("sensor_id") \
.orderBy(hour(col("ts"))) \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
seasonal_df = df \
.withColumn("hour_of_day", hour(col("ts"))) \
.withColumn("daily_avg", avg("temperature").over(
Window.partitionBy("sensor_id", to_date(col("ts")))
)) \
.withColumn("hourly_seasonal", avg("temperature").over(hour_of_day)) \
.withColumn("trend", avg("temperature").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(-12, 12) # 1-hour centered moving average
)) \
.withColumn("residual", col("temperature") - col("trend") - col("hourly_seasonal"))
# Anomaly detection using z-score
stats_df = df \
.withColumn("rolling_mean", avg("temperature").over(hourly_range)) \
.withColumn("rolling_std", stddev("temperature").over(hourly_range)) \
.withColumn("z_score",
(col("temperature") - col("rolling_mean")) / col("rolling_std")
) \
.withColumn("is_anomaly", abs(col("z_score")) > 3)
anomalies = stats_df.filter(col("is_anomaly"))
print(f"Detected {anomalies.count()} anomalies")
anomalies.show(truncate=False)
Performance Metrics
| Operation | 1K Records | 100K Records | 10M Records | 100M Records |
|---|---|---|---|---|
| Tumbling Window Aggregation | < 1 sec | 2-5 sec | 30-60 sec | 5-10 min |
| Sliding Window (size 100) | < 1 sec | 3-8 sec | 45-90 sec | 8-15 min |
| Range Window (1 hour) | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Gap Detection | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Forward Fill | < 1 sec | 1-2 sec | 8-15 sec | 1-3 min |
| Linear Interpolation | < 1 sec | 5-10 sec | 60-120 sec | 10-20 min |
| Resampling (Downsample) | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Seasonal Decomposition | < 1 sec | 3-8 sec | 30-60 sec | 5-10 min |
| Z-Score Anomaly Detection | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Timezone Conversion | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
Best Practices
- Always cast timestamps to proper TimestampType before window operations to ensure correct time arithmetic
- Use range-based frames for irregular time series to handle missing timestamps correctly
- Cache intermediate results when performing multiple window operations on the same DataFrame to avoid recomputation
- Partition by time first, then by entity (sensor_id, user_id) for optimal data locality in window functions
- Use
rangeBetweenwith seconds (long values) rather than string intervals for better performance - Implement watermarks for streaming time series to handle late-arriving data gracefully
- Avoid UDFs for interpolation when possible—use built-in window functions with collect_list for better performance
- Set
spark.sql.shuffle.partitionsappropriately for time series joins to avoid skew from temporal patterns - Use
date_truncfor resampling instead of custom UDFs to leverage Spark's optimized datetime functions - Monitor for temporal skew when windowing—concentrated timestamps in certain periods can cause partition imbalance
- Use
Sessionizepattern for session windows: sort by timestamp, compute gap, assign session ID using cumulative sum of gaps - Pre-compute time features (hour, day_of_week, month) and persist them to avoid repeated extraction across analyses
Mathematical Foundation Summary
Time series analysis in Spark uses window functions with ROWS BETWEEN for causal filtering and RANGE BETWEEN for temporal intervals. Linear interpolation computes missing values as y(t) = y(t₁) + (t - t₁) × (y(t₂) - y(t₁))/(t₂ - t₁). Anomaly detection uses z-score thresholding: z(t) = (x(t) - μ) / σ > threshold. Autocorrelation measures temporal dependency: ACF(k) = Σ(x(t) - μ)(x(t+k) - μ) / Σ(x(t) - μ)² enabling pattern detection and forecasting model selection.
See also: Real-Time Analytics (38), ML Feature Engineering (32), Adaptive Query Execution (30)
See Also
- Window Operations — Window functions for time series
- Structured Streaming — Real-time time series processing
- Advanced Aggregations — Time-based aggregations
- Real-time Analytics — Real-time analytics patterns