🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Time Series Analysis in PySpark

🟢 Free Lesson

Advertisement

⏰ Time Series Analysis in PySpark

Time Series Pipeline

Time Series Processing PipelineIngestRaw timestampsResampleRegular intervalsInterpolateFill gapsWindow AggRolling statsDetectAnomaliesWindow Function Typeslead() / lag()row_number()rank() / dense_rank()running_total()

Architecture Diagram

Detailed Explanation

Fundamental Challenge

Time series data has an inherent ordering that must be preserved throughout processing.

Key Takeaway: Unlike standard DataFrame operations, time series operations must respect temporal sequence and handle gaps, irregularities, and varying frequencies.


Window Functions for Time Series

Frame TypeDefinitionUse Case
Range-basedrangeBetween(-3600, 0)Irregular time series, actual time values
Row-basedrowsBetween(-5, 0)Fixed row count, evenly spaced data

Range vs Row Frames:

  • Range frames handle irregular timestamps correctly
  • Row frames always include exactly specified number of rows
  • For most time series, range frames provide correct semantics

Gap Detection and Filling

Real-world data frequently has missing timestamps.

StrategyDescriptionBest For
Forward FillCarry last known value forwardSlowly changing metrics
Backward FillPropagate next known value backwardKnown future values
InterpolationEstimate based on surrounding valuesContinuously varying signals

Resampling Operations

Resampling changes the frequency of a time series.

OperationMethodAggregation
DownsamplingTumbling windowsmean, sum, last, first
UpsamplingCreate target timestamps + joinInterpolation

Best Practice: Choose aggregation based on metric semantics—sum for counts, mean/last for levels.


Seasonal Decomposition

Separates time series into components using window functions.

ComponentCalculation
TrendCentered moving average
SeasonalAverage deviation from trend at each seasonal position
ResidualWhat remains after removing trend and seasonal

Use Case: Anomaly detection, forecasting preprocessing, pattern identification.

Mathematical Foundations

Definition: Time Series Decomposition

A time series YtY_t can be decomposed as:

Yt=Tt+St+Rt(additive)orYt=Tt×St×Rt(multiplicative)Y_t = T_t + S_t + R_t \quad \text{(additive)} \qquad \text{or} \qquad Y_t = T_t \times S_t \times R_t \quad \text{(multiplicative)}

where TtT_t is the trend component, StS_t is the seasonal component with period pp (St+p=StS_{t+p} = S_t), and RtR_t is the residual.

Moving Average Smoothing

For window size ww (odd), the centered moving average is:

MAt=1wi=(w1)/2(w1)/2Yt+i\text{MA}_t = \frac{1}{w} \sum_{i=-(w-1)/2}^{(w-1)/2} Y_{t+i}

This removes seasonal variation when ww equals the seasonal period.

Stationarity Theorem (Wold's Decomposition)

A weakly stationary process XtX_t (constant mean μ\mu, autocovariance γ(h)\gamma(h) depending only on lag hh) can be represented as:

Xt=μ+j=0ψjϵtjX_t = \mu + \sum_{j=0}^{\infty} \psi_j \epsilon_{t-j}

where ϵt\epsilon_t are white noise and ψj2<\sum \psi_j^2 < \infty. Non-stationary series require differencing dd times: (1B)dXt(1-B)^d X_t where BB is the backshift operator.

ARIMA Model

The ARIMA(p,d,q)(p, d, q) model is:

ϕ(B)(1B)dXt=θ(B)ϵt\phi(B)(1-B)^d X_t = \theta(B)\epsilon_t

where ϕ(B)=1ϕ1BϕpBp\phi(B) = 1 - \phi_1 B - \cdots - \phi_p B^p and θ(B)=1+θ1B++θqBq\theta(B) = 1 + \theta_1 B + \cdots + \theta_q B^q.

Interpolation Error Bound

For linear interpolation between points (ti,Yi)(t_i, Y_i) and (ti+1,Yi+1)(t_{i+1}, Y_{i+1}) at time tt:

Y(t)Y^(t)(ti+1ti)28maxs[ti,ti+1]Y(s)|Y(t) - \hat{Y}(t)| \leq \frac{(t_{i+1} - t_i)^2}{8} \max_{s \in [t_i, t_{i+1}]} |Y''(s)|

Key Insight

Spark's Window.rowsBetween() enables distributed moving average computation, but partitioning by time series ID is critical. Without proper partitioning, rows from different series mix across partitions, producing incorrect results.

Summary

Time series analysis on Spark leverages window functions for decomposition, moving averages, and lag/lead operations. Stationarity is required for ARIMA models and is tested via ADF/KPSS tests. Interpolation accuracy depends on the gap size relative to the underlying signal's smoothness.

Key Concepts Table

ConceptDescriptionPySpark Implementation
Tumbling WindowNon-overlapping fixed-size windowsWindow.orderBy("ts").rangeBetween(-interval, 0)
Sliding WindowOverlapping windows with fixed stepWindow.orderBy("ts").rowsBetween(-size, 0)
Session WindowActivity-based windows with gap thresholdCustom implementation with gap detection
Range FrameWindow defined by value rangerangeBetween(start, end)
Row FrameWindow defined by row countrowsBetween(start, end)
Forward FillCarry last known value forwardlast("value", ignorenulls=True)
Backward FillPropagate next known value backwardfirst("value", ignorenulls=True)
Linear InterpolationEstimate missing values linearlyCustom UDF with linear regression
ResamplingChange frequency of time seriesTumbling window + aggregation
Seasonal DecompositionSeparate trend/seasonal/residualCentered MA + seasonal averaging

Code Examples

Window Functions for Time Series

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("TimeSeriesAnalysis") \
    .getOrCreate()

# Create sensor data with irregular timestamps
data = [
    ("sensor_1", "2024-01-15 08:00:00", 22.5, 45.0),
    ("sensor_1", "2024-01-15 08:03:00", 22.7, 44.8),
    ("sensor_1", "2024-01-15 08:15:00", 23.1, 43.5),
    ("sensor_1", "2024-01-15 08:20:00", 22.9, 44.2),
    ("sensor_1", "2024-01-15 08:35:00", 23.5, 42.0),
    ("sensor_1", "2024-01-15 08:40:00", 23.2, 43.1),
    ("sensor_1", "2024-01-15 09:00:00", 24.0, 41.5),
    ("sensor_1", "2024-01-15 09:10:00", 23.8, 42.3),
]

df = spark.createDataFrame(data, ["sensor_id", "timestamp", "temperature", "humidity"])
df = df.withColumn("ts", col("timestamp").cast("timestamp"))

# Define window specifications
# 30-minute tumbling window
tumbling_window = Window \
    .partitionBy("sensor_id") \
    .orderBy(col("ts").cast("long")) \
    .rangeBetween(-1800, 0)  # 30 minutes in seconds

# 5-row sliding window
sliding_window = Window \
    .partitionBy("sensor_id") \
    .orderBy("ts") \
    .rowsBetween(-4, 0)  # Current row + 4 previous

# 1-hour range window
hourly_range = Window \
    .partitionBy("sensor_id") \
    .orderBy(col("ts").cast("long")) \
    .rangeBetween(-3600, 0)

# Apply window functions
result = df \
    .withColumn("rolling_avg_temp", avg("temperature").over(tumbling_window)) \
    .withColumn("rolling_max_temp", max("temperature").over(tumbling_window)) \
    .withColumn("rolling_min_humidity", min("humidity").over(tumbling_window)) \
    .withColumn("row_avg_temp", avg("temperature").over(sliding_window)) \
    .withColumn("hourly_stddev", stddev("temperature").over(hourly_range)) \
    .withColumn("temp_rank", rank().over(
        Window.partitionBy("sensor_id").orderBy(desc("temperature"))
    ))

result.show(truncate=False)

Gap Detection and Filling

# Generate expected timestamps (every 5 minutes)
from pyspark.sql.functions import sequence, explode, collect_list

# Create reference time series (regular 5-minute intervals)
min_ts = df.agg(min("ts")).first()[0]
max_ts = df.agg(max("ts")).first()[0]

reference_df = spark.sql(f"""
    SELECT explode(sequence(
        '{min_ts}',
        '{max_ts}',
        interval 5 minutes
    )) AS expected_ts
""")

# Detect gaps
gaps_df = df.select("sensor_id", "ts") \
    .withColumnRenamed("ts", "actual_ts") \
    .crossJoin(reference_df) \
    .withColumn("diff_seconds", 
        abs(col("expected_ts").cast("long") - col("actual_ts").cast("long"))
    ) \
    .filter(col("diff_seconds") > 120)  # More than 2 minutes off

print("Detected gaps:")
gaps_df.groupBy("sensor_id").count().show()

# Forward fill missing values
forward_fill_window = Window \
    .partitionBy("sensor_id") \
    .orderBy("ts") \
    .rowsBetween(Window.unboundedPreceding, 0)

filled_df = df \
    .withColumn("temp_filled", last("temperature", ignorenulls=True).over(forward_fill_window)) \
    .withColumn("humidity_filled", last("humidity", ignorenulls=True).over(forward_fill_window))

# Linear interpolation for temperature
@udf(returnType=DoubleType())
def linear_interpolate(values, timestamps, target_ts):
    """Linear interpolation between two points."""
    if values is None or len(values) < 2:
        return values[0] if values else None
    
    for i in range(len(timestamps) - 1):
        if timestamps[i] <= target_ts <= timestamps[i + 1]:
            t0, t1 = timestamps[i], timestamps[i + 1]
            v0, v1 = values[i], values[i + 1]
            if t1 == t0:
                return v0
            ratio = (target_ts - t0) / (t1 - t0)
            return v0 + ratio * (v1 - v0)
    return values[-1]

# Apply interpolation using window functions
interpolated_df = df \
    .withColumn("ts_long", col("ts").cast("long")) \
    .withColumn("temp_array", collect_list("temperature").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("ts_array", collect_list("ts_long").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("temp_interpolated", 
        linear_interpolate(col("temp_array"), col("ts_array"), col("ts_long"))
    )

Resampling and Seasonal Analysis

# Resample to hourly frequency
hourly_df = df \
    .withColumn("hour", date_trunc("hour", col("ts"))) \
    .groupBy("sensor_id", "hour") \
    .agg(
        avg("temperature").alias("avg_temp"),
        max("temperature").alias("max_temp"),
        min("temperature").alias("min_temp"),
        avg("humidity").alias("avg_humidity"),
        count("*").alias("readings_count")
    )

# Compute daily aggregates
daily_df = df \
    .withColumn("date", to_date(col("ts"))) \
    .groupBy("sensor_id", "date") \
    .agg(
        avg("temperature").alias("daily_avg_temp"),
        stddev("temperature").alias("daily_stddev_temp"),
        (max("temperature") - min("temperature")).alias("daily_range")
    )

# Seasonal decomposition (hourly pattern)
hour_of_day = Window \
    .partitionBy("sensor_id") \
    .orderBy(hour(col("ts"))) \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

seasonal_df = df \
    .withColumn("hour_of_day", hour(col("ts"))) \
    .withColumn("daily_avg", avg("temperature").over(
        Window.partitionBy("sensor_id", to_date(col("ts")))
    )) \
    .withColumn("hourly_seasonal", avg("temperature").over(hour_of_day)) \
    .withColumn("trend", avg("temperature").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(-12, 12)  # 1-hour centered moving average
    )) \
    .withColumn("residual", col("temperature") - col("trend") - col("hourly_seasonal"))

# Anomaly detection using z-score
stats_df = df \
    .withColumn("rolling_mean", avg("temperature").over(hourly_range)) \
    .withColumn("rolling_std", stddev("temperature").over(hourly_range)) \
    .withColumn("z_score", 
        (col("temperature") - col("rolling_mean")) / col("rolling_std")
    ) \
    .withColumn("is_anomaly", abs(col("z_score")) > 3)

anomalies = stats_df.filter(col("is_anomaly"))
print(f"Detected {anomalies.count()} anomalies")
anomalies.show(truncate=False)

Performance Metrics

Operation1K Records100K Records10M Records100M Records
Tumbling Window Aggregation< 1 sec2-5 sec30-60 sec5-10 min
Sliding Window (size 100)< 1 sec3-8 sec45-90 sec8-15 min
Range Window (1 hour)< 1 sec2-5 sec20-40 sec3-8 min
Gap Detection< 1 sec1-3 sec10-20 sec2-5 min
Forward Fill< 1 sec1-2 sec8-15 sec1-3 min
Linear Interpolation< 1 sec5-10 sec60-120 sec10-20 min
Resampling (Downsample)< 1 sec1-3 sec10-20 sec2-5 min
Seasonal Decomposition< 1 sec3-8 sec30-60 sec5-10 min
Z-Score Anomaly Detection< 1 sec2-5 sec20-40 sec3-8 min
Timezone Conversion< 1 sec1-2 sec10-20 sec2-5 min

Best Practices

  1. Always cast timestamps to proper TimestampType before window operations to ensure correct time arithmetic
  2. Use range-based frames for irregular time series to handle missing timestamps correctly
  3. Cache intermediate results when performing multiple window operations on the same DataFrame to avoid recomputation
  4. Partition by time first, then by entity (sensor_id, user_id) for optimal data locality in window functions
  5. Use rangeBetween with seconds (long values) rather than string intervals for better performance
  6. Implement watermarks for streaming time series to handle late-arriving data gracefully
  7. Avoid UDFs for interpolation when possible—use built-in window functions with collect_list for better performance
  8. Set spark.sql.shuffle.partitions appropriately for time series joins to avoid skew from temporal patterns
  9. Use date_trunc for resampling instead of custom UDFs to leverage Spark's optimized datetime functions
  10. Monitor for temporal skew when windowing—concentrated timestamps in certain periods can cause partition imbalance
  11. Use Sessionize pattern for session windows: sort by timestamp, compute gap, assign session ID using cumulative sum of gaps
  12. Pre-compute time features (hour, day_of_week, month) and persist them to avoid repeated extraction across analyses

Mathematical Foundation Summary

Time series analysis in Spark uses window functions with ROWS BETWEEN for causal filtering and RANGE BETWEEN for temporal intervals. Linear interpolation computes missing values as y(t) = y(t₁) + (t - t₁) × (y(t₂) - y(t₁))/(t₂ - t₁). Anomaly detection uses z-score thresholding: z(t) = (x(t) - μ) / σ > threshold. Autocorrelation measures temporal dependency: ACF(k) = Σ(x(t) - μ)(x(t+k) - μ) / Σ(x(t) - μ)² enabling pattern detection and forecasting model selection.

See also: Real-Time Analytics (38), ML Feature Engineering (32), Adaptive Query Execution (30)

See Also

Premium Content

Time Series Analysis in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement