PySpark Advanced Interview Series
Module 12: Complex Aggregations β Multi-Dimensional Analysis
Interview Question
"At Google, we build multi-dimensional analytical cubes using rollup and cube operations. Walk us through the difference between rollup, cube, and grouping sets, and demonstrate how you would pivot a sales dataset to create a cross-tabulation report." β Google Data Engineer Interview
"At Microsoft, we use complex aggregations for business intelligence. Explain how pivot and unpivot work, how you would compute running totals with groupBy, and what performance implications aggregation strategies have at scale." β Microsoft Senior Data Engineer Interview
Aggregation Fundamentals
Basic groupBy
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, sum, avg, count, min, max,
collect_list, collect_set, first, last,
percentile_approx, stddev, variance
)
spark = SparkSession.builder.appName("AggInterview").getOrCreate()
# Create sample sales data
sales_data = [
("2024-01", "Electronics", "Laptop", "North", 1200),
("2024-01", "Electronics", "Phone", "North", 800),
("2024-01", "Clothing", "Shirt", "North", 50),
("2024-02", "Electronics", "Laptop", "South", 1300),
("2024-02", "Electronics", "Phone", "South", 850),
("2024-02", "Clothing", "Shirt", "South", 55),
("2024-01", "Electronics", "Laptop", "East", 1100),
("2024-01", "Clothing", "Shirt", "East", 45),
("2024-02", "Electronics", "Phone", "East", 780),
]
df = spark.createDataFrame(sales_data,
["month", "category", "product", "region", "revenue"])
# Basic aggregation
df.groupBy("category").agg(
sum("revenue").alias("total_revenue"),
avg("revenue").alias("avg_revenue"),
count("*").alias("transaction_count")
).show()
rollup()
Computes hierarchical subtotals from the most detailed to the most general level.
# Rollup: creates subtotals at each hierarchy level + grand total
rollup_result = df.rollup("category", "region", "product").agg(
sum("revenue").alias("total_revenue"),
count("*").alias("transaction_count")
).orderBy("category", "region", "product")
rollup_result.show(30)
# Output includes:
# Electronics | North | Laptop | 1200 (detail)
# Electronics | North | Phone | 800 (detail)
# Electronics | North | NULL | 2000 (subtotal: category + region)
# Electronics | NULL | NULL | 6030 (subtotal: category only)
# NULL | NULL | NULL | 6180 (grand total)
Rollup Hierarchy
# Rollup creates a hierarchy: category β region β product
# NULL values indicate subtotal levels
# Filter for specific levels
# Detail level only (no subtotals)
detail = rollup_result.filter(col("category").isNotNull() &
col("region").isNotNull() &
col("product").isNotNull())
# Category subtotals only
category_totals = rollup_result.filter(
col("category").isNotNull() &
col("region").isNull() &
col("product").isNull()
)
# Grand total
grand_total = rollup_result.filter(
col("category").isNull() &
col("region").isNull() &
col("product").isNull()
)
cube()
Computes subtotals for ALL possible combinations of dimensions.
# Cube: all possible dimension combinations
cube_result = df.cube("category", "region", "product").agg(
sum("revenue").alias("total_revenue"),
count("*").alias("transaction_count")
)
cube_result.show(50)
# Cube produces MORE rows than rollup:
# - Category only subtotals
# - Region only subtotals
# - Product only subtotals
# - Category + Region subtotals
# - Category + Product subtotals
# - Region + Product subtotals
# - Grand total
# For 3 dimensions, cube produces 2^3 = 8 combinations
# Rollup produces 3 + 1 = 4 combinations (hierarchical)
Rollup vs Cube
| Feature | rollup() | cube() |
|---|---|---|
| Combinations | Hierarchical | All possible |
| For 3 dimensions | 4 combinations | 8 combinations |
| Use case | Known hierarchy | Ad-hoc analysis |
| Performance | Faster | Slower |
grouping() and grouping_id()
Identify which columns are aggregated in subtotal rows.
# grouping() returns 1 for aggregated (NULL) columns, 0 otherwise
rollup_result = df.rollup("category", "region").agg(
sum("revenue").alias("total_revenue")
).withColumn(
"grouping_info",
concat(
when(grouping("category") == 1, "ALL").otherwise(col("category")),
lit(" | "),
when(grouping("region") == 1, "ALL").otherwise(col("region"))
)
)
rollup_result.show()
# +----------+------+-------------+--------------+
# | category |region|total_revenue|grouping_info |
# +----------+------+-------------+--------------+
# |Electronics|North| 2000|Electronics |
# |Electronics|South| 2150|Electronics |
# |Electronics| East| 1880|Electronics |
# |Electronics| NULL| 6030|ElectronicsALL|
# | Clothing|North| 50|Clothing |
# | Clothing|South| 55|Clothing |
# | Clothing| East| 45|Clothing |
# | Clothing| NULL| 150|ClothingALL |
# | NULL| NULL| 6180|ALL |ALL |
# +----------+------+-------------+--------------+
# grouping_id() combines grouping flags into a single integer
rollup_result = df.rollup("category", "region").agg(
sum("revenue").alias("total_revenue")
).withColumn(
"grouping_id",
grouping_id("category", "region")
)
# grouping_id values:
# 0: both category and region are NOT aggregated (detail)
# 1: region is aggregated (category subtotal)
# 3: both are aggregated (grand total)
grouping sets (SQL)
Explicitly define which combinations to aggregate.
# SQL grouping sets
df.createOrReplaceTempView("sales")
result = spark.sql("""
SELECT category, region, SUM(revenue) as total_revenue
FROM sales
GROUP BY category, region
GROUPING SETS (
(category, region), -- category + region detail
(category), -- category subtotal
(region), -- region subtotal
() -- grand total
)
ORDER BY category, region
""")
result.show()
pivot()
Transforms rows into columns (cross-tabulation).
# Basic pivot: rows to columns
pivot_result = df.groupBy("category").pivot("region").sum("revenue")
pivot_result.show()
# +----------+-----+-----+------+
# | category | East|North| South|
# +----------+-----+-----+------+
# |Electronics| 1880| 2000| 2150|
# | Clothing| 45| 50| 55|
# +----------+-----+-----+------+
# Pivot with specific values (faster)
pivot_result = df.groupBy("category").pivot("region", ["North", "South", "East"]).sum("revenue")
# Pivot with multiple aggregations
pivot_multi = df.groupBy("category").pivot("region").agg(
sum("revenue").alias("total"),
avg("revenue").alias("average")
)
# Pivot with expression
pivot_expr = df.groupBy("category").pivot("region").agg(
(sum("revenue") / count("*")).alias("avg_revenue")
)
Unpivot (Spark 3.4+)
Transforms columns back to rows.
# Unpivot: columns to rows
unpivot_result = pivot_result.unpivot(
["category"], # ID columns
["East", "North", "South"], # Value columns
"region", "revenue" # Output column names
)
unpivot_result.show()
# +----------+------+-------+
# | category |region|revenue|
# +----------+------+-------+
# |Electronics| East| 1880|
# |Electronics|North| 2000|
# |Electronics|South| 2150|
# | Clothing| East| 45|
# | Clothing|North| 50|
# | Clothing|South| 55|
# +----------+------+-------+
Real-World Scenario: Google Multi-Dimensional Analytics
Problem Statement
Build a multi-dimensional analytics pipeline that computes revenue across multiple hierarchies, pivots for executive dashboards, and handles complex aggregation scenarios.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("GoogleMultiDimAnalytics") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read sales data
sales = spark.read.parquet("s3a://google-data/sales/")
# === 1. Multi-level Rollup Analysis ===
# Hierarchy: region β category β product
revenue_rollup = sales.rollup("region", "category", "product").agg(
sum("revenue").alias("total_revenue"),
sum("quantity").alias("total_quantity"),
count("*").alias("transaction_count"),
avg("revenue").alias("avg_revenue")
)
# Label aggregation levels
revenue_labeled = revenue_rollup \
.withColumn(
"analysis_level",
case()
.when(col("product").isNotNull(), "Product Level")
.when(col("category").isNotNull(), "Category Level")
.when(col("region").isNotNull(), "Region Level")
.otherwise("Grand Total")
)
# === 2. Cube Analysis for Ad-Hoc Reporting ===
# All possible dimension combinations
cube_analysis = sales.cube("region", "category", "month").agg(
sum("revenue").alias("total_revenue"),
approx_count_distinct("customer_id").alias("unique_customers")
)
# Add grouping labels
cube_labeled = cube_analysis \
.withColumn(
"dimensions",
concat_ws(", ",
when(grouping("region") == 0, col("region")),
when(grouping("category") == 0, col("category")),
when(grouping("month") == 0, col("month"))
)
) \
.withColumn(
"aggregation_level",
grouping_id("region", "category", "month")
)
# === 3. Executive Dashboard Pivot ===
# Regional performance by category
regional_pivot = sales.groupBy("region").pivot("category").agg(
sum("revenue").alias("total"),
round(avg("revenue"), 2).alias("avg"),
count("*").alias("count")
)
# Monthly trend by category
monthly_pivot = sales.groupBy("month").pivot("category").agg(
sum("revenue")
)
# === 4. Cohort Analysis with Grouping Sets ===
cohort_analysis = spark.sql("""
SELECT
DATE_FORMAT(first_purchase_date, 'yyyy-MM') as cohort_month,
DATE_FORMAT(purchase_date, 'yyyy-MM') as purchase_month,
COUNT(DISTINCT customer_id) as customers,
SUM(revenue) as revenue
FROM (
SELECT
*,
MIN(purchase_date) OVER (PARTITION BY customer_id) as first_purchase_date
FROM sales
)
GROUP BY
DATE_FORMAT(first_purchase_date, 'yyyy-MM'),
DATE_FORMAT(purchase_date, 'yyyy-MM')
GROUPING SETS (
(DATE_FORMAT(first_purchase_date, 'yyyy-MM'), DATE_FORMAT(purchase_date, 'yyyy-MM')),
(DATE_FORMAT(first_purchase_date, 'yyyy-MM'))
)
""")
# === 5. Running Aggregates with Window Functions ===
daily_window = Window.partitionBy("region", "category").orderBy("date")
daily_trends = sales \
.groupBy("region", "category", "date") \
.agg(sum("revenue").alias("daily_revenue")) \
.withColumn("rolling_7d_avg",
avg("daily_revenue").over(daily_window.rowsBetween(-6, 0))) \
.withColumn("cumulative_revenue",
sum("daily_revenue").over(daily_window)) \
.withColumn("day_over_day_change",
col("daily_revenue") - lag("daily_revenue", 1).over(daily_window))
# Show results
revenue_labeled.orderBy("region", "category", "product").show(30, truncate=False)
cube_labeled.filter(col("total_revenue") > 0).show(30, truncate=False)
regional_pivot.show(truncate=False)
spark.stop()
Advanced Pivot Patterns
Dynamic Pivot
# Generate pivot columns dynamically
pivot_columns = df.select("region").distinct().collect()
pivot_col_list = [row.region for row in pivot_columns]
dynamic_pivot = df.groupBy("category").pivot("region", pivot_col_list).sum("revenue")
Pivot with Multiple Aggregations
# Multiple aggregations in pivot
pivot_multi = df.groupBy("category").pivot("region").agg(
struct(
sum("revenue").alias("total"),
avg("revenue").alias("avg"),
count("*").alias("count")
)
)
# Access nested fields
pivot_multi.select(
"category",
col("North.total").alias("north_total"),
col("North.avg").alias("north_avg")
)
Pivot Performance Optimization
# BAD: Pivot without specifying values (scans data twice)
df.groupBy("category").pivot("region").sum("revenue")
# GOOD: Specify values (scans data once)
df.groupBy("category").pivot("region", ["North", "South", "East"]).sum("revenue")
# For large datasets, cache before pivot
df.cache()
df.count() # Force materialization
pivot_result = df.groupBy("category").pivot("region").sum("revenue")
Edge Cases
1. NULL Values in Pivot
# NULL keys create a NULL column in pivot
# Filter NULLs before pivot
df.filter(col("region").isNotNull()) \
.groupBy("category").pivot("region").sum("revenue")
2. Empty Groups
# rollup/cube include empty groups
# Use coalesce or filter to handle NULLs
rollup_result.na.fill(0) # Replace NULL revenue with 0
rollup_result.na.drop() # Remove rows with NULL keys
3. Large Pivot Tables
# Many unique values in pivot column = many output columns
# This can cause OOM or wide tables
# Solution 1: Filter before pivot
df.filter(col("region").isin(["North", "South", "East"])) \
.groupBy("category").pivot("region").sum("revenue")
# Solution 2: Use unpivot for downstream processing
Performance Analysis
| Operation | Shuffle | Partitions | Memory | Use Case |
|---|---|---|---|---|
| groupBy().agg() | Yes | Increases | Low | Standard aggregation |
| rollup() | Yes | Increases | Medium | Hierarchical subtotals |
| cube() | Yes | Increases | High | All combinations |
| pivot() | Yes | Increases | High | Cross-tabulation |
| grouping_sets() | Yes | Increases | Medium | Custom combinations |
π‘Performance Tip
For large datasets, always specify pivot values explicitly to avoid scanning the data twice. Use cache() before complex multi-step aggregations. Consider whether you need rollup (hierarchical) or cube (all combinations) β rollup is faster.
Best Practices
π‘Aggregation Best Practices
- Use rollup() for known hierarchies, cube() for ad-hoc analysis
- Always specify pivot values for better performance
- Use grouping() to identify subtotal levels in rollup/cube results
- Cache DataFrame before multiple aggregation passes
- Handle NULLs in rollup/cube results (they indicate subtotals)
- Use grouping_sets() SQL for custom aggregation combinations
- Consider partitioning output by aggregation level
Summary
Complex aggregations like rollup, cube, and pivot enable multi-dimensional analysis in PySpark. Understanding when to use each β rollup for hierarchical subtotals, cube for all combinations, pivot for cross-tabulation β is essential for building executive dashboards and analytical reports at Google and Microsoft scale.