πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Window Functions and Analytical Operations

PySpark AdvancedWindow Functions⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 11: Window Functions β€” Advanced Analytical Patterns

MetaAppleDifficulty: Hard

Interview Question

"At Meta, we compute user engagement metrics using window functions over billions of events. Walk us through all window function types, explain frame specifications, and demonstrate how you would compute a 7-day rolling average and session gap analysis." β€” Meta Data Engineer Interview

"At Apple, we use window functions for time-series analytics across our services. Explain the difference between ROWS and RANGE frames, how NULLS are handled in window functions, and how you would detect gaps in user activity sequences." β€” Apple Senior Data Engineer Interview


Window Function Fundamentals

Window functions perform calculations across a set of rows related to the current row, without collapsing them (unlike GROUP BY).

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, row_number, rank, dense_rank, ntile,
    lead, lag, sum as _sum, avg, count,
    min, max, first, last, percent_rank, cume_dist
)

spark = SparkSession.builder.appName("WindowInterview").getOrCreate()

# Create sample data
data = [
    ("Alice", "2024-01-01", 100),
    ("Alice", "2024-01-02", 150),
    ("Alice", "2024-01-03", 200),
    ("Alice", "2024-01-04", 120),
    ("Bob", "2024-01-01", 80),
    ("Bob", "2024-01-02", 90),
    ("Bob", "2024-01-03", 110),
    ("Bob", "2024-01-04", 130),
]

df = spark.createDataFrame(data, ["user_id", "date", "value"])

# Define window specification
window_spec = Window.partitionBy("user_id").orderBy("date")

Ranking Functions

row_number()

Assigns unique sequential integers starting from 1.

df.withColumn("row_num", row_number().over(window_spec)).show()

# +-------+----------+-----+--------+
# |user_id|      date|value|row_num |
# +-------+----------+-----+--------+
# |  Alice|2024-01-01|  100|       1|
# |  Alice|2024-01-02|  150|       2|
# |  Alice|2024-01-03|  200|       3|
# |  Alice|2024-01-04|  120|       4|
# |    Bob|2024-01-01|   80|       1|
# |    Bob|2024-01-02|   90|       2|
# |    Bob|2024-01-03|  110|       3|
# |    Bob|2024-01-04|  130|       4|
# +-------+----------+-----+--------+

rank()

Assigns ranks with gaps for ties.

df.withColumn("rank", rank().over(window_spec)).show()
# Ties get same rank, next rank is skipped
# Example: ranks 1, 2, 2, 4 (rank 3 is skipped)

dense_rank()

Assigns ranks without gaps for ties.

df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
# Ties get same rank, no gaps
# Example: ranks 1, 2, 2, 3 (rank 3 follows rank 2)

ntile()

Divides rows into N roughly equal groups.

# Divide into 4 quartiles
df.withColumn("quartile", ntile(4).over(window_spec)).show()

# +-------+----------+-----+--------+
# |user_id|      date|value|quartile|
# +-------+----------+-----+--------+
# |  Alice|2024-01-01|  100|       1|
# |  Alice|2024-01-02|  150|       2|
# |  Alice|2024-01-03|  200|       3|
# |  Alice|2024-01-04|  120|       4|
# +-------+----------+-----+--------+

percent_rank() and cume_dist()

# percent_rank: (rank - 1) / (total_rows - 1)
df.withColumn("percent_rank", percent_rank().over(window_spec)).show()

# cume_dist: cumulative distribution (rank / total_rows)
df.withColumn("cume_dist", cume_dist().over(window_spec)).show()

Navigation Functions

lead() and lag()

Access values from rows before or after the current row.

# lead: look ahead
df.withColumn("next_value", lead("value", 1).over(window_spec)).show()

# +-------+----------+-----+-----------+
# |user_id|      date|value|next_value |
# +-------+----------+-----+-----------+
# |  Alice|2024-01-01|  100|        150|
# |  Alice|2024-01-02|  150|        200|
# |  Alice|2024-01-03|  200|        120|
# |  Alice|2024-01-04|  120|       null|
# +-------+----------+-----+-----------+

# lag: look behind
df.withColumn("prev_value", lag("value", 1).over(window_spec)).show()

# +-------+----------+-----+-----------+
# |user_id|      date|value|prev_value |
# +-------+----------+-----+-----------+
# |  Alice|2024-01-01|  100|       null|
# |  Alice|2024-01-02|  150|        100|
# |  Alice|2024-01-03|  200|        150|
# |  Alice|2024-01-04|  120|        200|
# +-------+----------+-----+-----------+

# lead with default value
df.withColumn("next_value", lead("value", 1, 0).over(window_spec)).show()

# lead with offset
df.withColumn("next_2_days", lead("value", 2).over(window_spec)).show()

Aggregate Window Functions

Running Totals and Moving Averages

# Running total
df.withColumn(
    "running_total",
    _sum("value").over(window_spec)
).show()

# +-------+----------+-----+-------------+
# |user_id|      date|value|running_total|
# +-------+----------+-----+-------------+
# |  Alice|2024-01-01|  100|          100|
# |  Alice|2024-01-02|  150|          250|
# |  Alice|2024-01-03|  200|          450|
# |  Alice|2024-01-04|  120|          570|
# +-------+----------+-----+-------------+

# 3-day moving average
moving_avg_window = Window.partitionBy("user_id") \
    .orderBy("date") \
    .rowsBetween(-2, 0)  # Current row and 2 preceding

df.withColumn(
    "moving_avg_3d",
    avg("value").over(moving_avg_window)
).show()

# +-------+----------+-----+-------------+
# |user_id|      date|value|moving_avg_3d|
# +-------+----------+-----+-------------+
# |  Alice|2024-01-01|  100|        100.0|
# |  Alice|2024-01-02|  150|        125.0|
# |  Alice|2024-01-03|  200|        150.0|
# |  Alice|2024-01-04|  120|        156.67|
# +-------+----------+-----+-------------+

Frame Specifications

# rowsBetween: physical rows
window_rows = Window.partitionBy("user_id") \
    .orderBy("date") \
    .rowsBetween(-6, 0)  # Last 6 rows + current

# rangeBetween: logical value range
window_range = Window.partitionBy("user_id") \
    .orderBy(col("date").cast("long")) \
    .rangeBetween(-6 * 86400, 0)  # Last 6 days (in seconds)

# UNBOUNDED boundaries
window_unbounded = Window.partitionBy("user_id") \
    .orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, 0)  # All rows before + current

# Current row only
window_current = Window.partitionBy("user_id") \
    .orderBy("date") \
    .rowsBetween(0, 0)

ℹ️Meta Interview Insight

At Meta, interviewers often ask about the difference between rowsBetween and rangeBetween. rowsBetween counts physical rows (e.g., last 6 rows), while rangeBetween counts logical values (e.g., last 6 days). If data has gaps, rangeBetween is more semantically correct.


Real-World Scenario: Meta User Engagement Analytics

Problem Statement

Compute comprehensive user engagement metrics including daily active users, session analysis, retention cohorts, and engagement scoring using window functions.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("MetaEngagementAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read user events
events = spark.read.parquet("s3a://meta-data/user-events/")

# === 1. Daily Active Users (DAU) with Rolling Metrics ===

# Window for daily aggregation
daily_window = Window.partitionBy("user_id").orderBy("event_date")

# Daily event counts
daily_events = events \
    .groupBy("user_id", "event_date") \
    .agg(count("*").alias("daily_events"))

# Add rolling metrics
daily_with_metrics = daily_events \
    .withColumn("events_7d_avg", 
                avg("daily_events").over(daily_window.rowsBetween(-6, 0))) \
    .withColumn("events_30d_avg",
                avg("daily_events").over(daily_window.rowsBetween(-29, 0))) \
    .withColumn("events_trend",
                col("daily_events") - lag("daily_events", 1).over(daily_window))

# === 2. Session Analysis ===

# Define session window (30-minute inactivity = new session)
session_window = Window.partitionBy("user_id").orderBy("event_timestamp")

# Detect session boundaries
with_session_boundaries = events \
    .withColumn("prev_timestamp", 
                lag("event_timestamp", 1).over(session_window)) \
    .withColumn(
        "is_new_session",
        when(
            (col("event_timestamp").cast("long") - col("prev_timestamp").cast("long")) > 1800,
            1
        ).otherwise(0)
    ) \
    .withColumn(
        "session_id",
        _sum("is_new_session").over(session_window)
    )

# Session-level metrics
session_metrics = with_session_boundaries \
    .groupBy("user_id", "session_id") \
    .agg(
        min("event_timestamp").alias("session_start"),
        max("event_timestamp").alias("session_end"),
        count("*").alias("session_events"),
        countDistinct("event_type").alias("unique_event_types")
    ) \
    .withColumn(
        "session_duration_minutes",
        (col("session_end").cast("long") - col("session_start").cast("long")) / 60
    )

# === 3. User Engagement Scoring ===

# Compute engagement components
user_engagement = events \
    .groupBy("user_id") \
    .agg(
        count("*").alias("total_events"),
        countDistinct("event_date").alias("active_days"),
        countDistinct("event_type").alias("unique_event_types"),
        max("event_timestamp").alias("last_event"),
        min("event_timestamp").alias("first_event")
    )

# Add engagement score using window functions
engagement_window = Window.orderBy(col("total_events").desc())

user_scored = user_engagement \
    .withColumn(
        "engagement_rank",
        rank().over(engagement_window)
    ) \
    .withColumn(
        "engagement_percentile",
        percent_rank().over(engagement_window)
    ) \
    .withColumn(
        "engagement_tier",
        ntile(4).over(engagement_window)
    )

# === 4. Retention Cohort Analysis ===

# Define cohort by first event date
cohort_definition = user_engagement \
    .withColumn("cohort_date", to_date(col("first_event")))

# Join with daily events for retention
retention_analysis = events \
    .join(cohort_definition.select("user_id", "cohort_date"), "user_id") \
    .withColumn("event_date", to_date(col("event_timestamp"))) \
    .withColumn(
        "days_since_cohort",
        datediff(col("event_date"), col("cohort_date"))
    ) \
    .withColumn(
        "cohort_period",
        floor(col("days_since_cohort") / 7).cast("int")  # Weekly cohorts
    )

# Retention rates
retention_rates = retention_analysis \
    .groupBy("cohort_date", "cohort_period") \
    .agg(
        countDistinct("user_id").alias("active_users")
    ) \
    .withColumn("cohort_size",
        first("active_users").over(
            Window.partitionBy("cohort_date").orderBy("cohort_period")
        )
    ) \
    .withColumn(
        "retention_rate",
        col("active_users") / col("cohort_size")
    )

# Show results
daily_with_metrics.show(20, truncate=False)
session_metrics.show(20, truncate=False)
user_scored.show(20, truncate=False)
retention_rates.show(50, truncate=False)

spark.stop()

Advanced Window Patterns

Gap Detection

# Find gaps in sequential data
gap_window = Window.partitionBy("user_id").orderBy("date")

with_gaps = df \
    .withColumn("next_date", 
                lead("date", 1).over(gap_window)) \
    .withColumn(
        "days_until_next",
        datediff(col("next_date"), col("date"))
    ) \
    .withColumn(
        "has_gap",
        when(col("days_until_next") > 1, 1).otherwise(0)
    )

# Find users with gaps
users_with_gaps = with_gaps.filter(col("has_gap") == 1)

Running Difference

# Compute difference from previous row
df.withColumn(
    "diff_from_prev",
    col("value") - lag("value", 1).over(window_spec)
).withColumn(
    "pct_change",
    (col("value") - lag("value", 1).over(window_spec)) / lag("value", 1).over(window_spec)
)

First and Last Value in Window

# Get first and last value in each partition
first_last_window = Window.partitionBy("user_id").orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("first_value", first("value").over(first_last_window)) \
  .withColumn("last_value", last("value").over(first_last_window))

Conditional Window

# Different windows for different conditions
normal_window = Window.partitionBy("user_id").orderBy("date")
premium_window = Window.partitionBy("user_id").orderBy("date").rowsBetween(-29, 0)

df.withColumn(
    "rolling_avg_normal",
    avg("value").over(normal_window.rowsBetween(-6, 0))
).withColumn(
    "rolling_avg_premium",
    avg("value").over(premium_window)
)

Edge Cases

1. NULL Handling

# Window functions handle NULLs based on function
# lead/lag: returns NULL for out-of-range
# sum/avg: ignores NULLs
# row_number: NULLs get ranks based on sort order

# Handle NULLs explicitly
df.withColumn(
    "next_value",
    lead("value", 1, 0).over(window_spec)  # Default 0 for NULL
)

2. Empty Partitions

# Window functions return NULL for empty partitions
# Always handle potential NULLs
df.withColumn(
    "rank",
    when(count("*").over(window_spec) > 0, 
         rank().over(window_spec))
    .otherwise(None)
)

3. Large Windows

# Very large windows can cause memory issues
# rowsBetween(-1000000, 0) processes 1M rows per partition

# Use rangeBetween for time-based windows
window_range = Window.partitionBy("user_id") \
    .orderBy(col("timestamp").cast("long")) \
    .rangeBetween(-30 * 86400, 0)  # Last 30 days

Performance Considerations

TechniqueWhen to UseImpact
Partition by join keyJoin after windowReduces shuffle
Order by within partitionCorrect orderingRequired
Cache after window opsReuse resultsAvoids recomputation
Use rangeBetween for timeTime-series analysisMore efficient
Limit window sizeLarge datasetsPrevents OOM

πŸ’‘Production Checklist

  • Always specify both partitionBy and orderBy for deterministic results
  • Use rowsBetween for fixed-count windows, rangeBetween for time-based
  • Handle NULLs explicitly in lead/lag with default values
  • Cache DataFrame after expensive window operations
  • Monitor partition sizes when using large windows
  • Consider rangeBetween for time-series with gaps

Summary

Window functions are essential for analytical queries in PySpark. Mastering ranking, navigation, and aggregate window functions with proper frame specifications enables complex time-series analysis at Meta and Apple scale. Understanding the difference between rowsBetween and rangeBetween is critical for correct results.

Advertisement