πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Complex Aggregations and Pivot Operations

PySpark AdvancedAggregation Strategies⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 12: Complex Aggregations β€” Multi-Dimensional Analysis

GoogleMicrosoftDifficulty: Hard

Interview Question

"At Google, we build multi-dimensional analytical cubes using rollup and cube operations. Walk us through the difference between rollup, cube, and grouping sets, and demonstrate how you would pivot a sales dataset to create a cross-tabulation report." β€” Google Data Engineer Interview

"At Microsoft, we use complex aggregations for business intelligence. Explain how pivot and unpivot work, how you would compute running totals with groupBy, and what performance implications aggregation strategies have at scale." β€” Microsoft Senior Data Engineer Interview


Aggregation Fundamentals

Basic groupBy

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum, avg, count, min, max, 
    collect_list, collect_set, first, last,
    percentile_approx, stddev, variance
)

spark = SparkSession.builder.appName("AggInterview").getOrCreate()

# Create sample sales data
sales_data = [
    ("2024-01", "Electronics", "Laptop", "North", 1200),
    ("2024-01", "Electronics", "Phone", "North", 800),
    ("2024-01", "Clothing", "Shirt", "North", 50),
    ("2024-02", "Electronics", "Laptop", "South", 1300),
    ("2024-02", "Electronics", "Phone", "South", 850),
    ("2024-02", "Clothing", "Shirt", "South", 55),
    ("2024-01", "Electronics", "Laptop", "East", 1100),
    ("2024-01", "Clothing", "Shirt", "East", 45),
    ("2024-02", "Electronics", "Phone", "East", 780),
]

df = spark.createDataFrame(sales_data, 
    ["month", "category", "product", "region", "revenue"])

# Basic aggregation
df.groupBy("category").agg(
    sum("revenue").alias("total_revenue"),
    avg("revenue").alias("avg_revenue"),
    count("*").alias("transaction_count")
).show()

rollup()

Computes hierarchical subtotals from the most detailed to the most general level.

# Rollup: creates subtotals at each hierarchy level + grand total
rollup_result = df.rollup("category", "region", "product").agg(
    sum("revenue").alias("total_revenue"),
    count("*").alias("transaction_count")
).orderBy("category", "region", "product")

rollup_result.show(30)

# Output includes:
# Electronics | North | Laptop | 1200  (detail)
# Electronics | North | Phone  | 800   (detail)
# Electronics | North | NULL   | 2000  (subtotal: category + region)
# Electronics | NULL  | NULL   | 6030  (subtotal: category only)
# NULL        | NULL  | NULL   | 6180  (grand total)

Rollup Hierarchy

# Rollup creates a hierarchy: category β†’ region β†’ product
# NULL values indicate subtotal levels

# Filter for specific levels
# Detail level only (no subtotals)
detail = rollup_result.filter(col("category").isNotNull() & 
                               col("region").isNotNull() & 
                               col("product").isNotNull())

# Category subtotals only
category_totals = rollup_result.filter(
    col("category").isNotNull() & 
    col("region").isNull() & 
    col("product").isNull()
)

# Grand total
grand_total = rollup_result.filter(
    col("category").isNull() & 
    col("region").isNull() & 
    col("product").isNull()
)

cube()

Computes subtotals for ALL possible combinations of dimensions.

# Cube: all possible dimension combinations
cube_result = df.cube("category", "region", "product").agg(
    sum("revenue").alias("total_revenue"),
    count("*").alias("transaction_count")
)

cube_result.show(50)

# Cube produces MORE rows than rollup:
# - Category only subtotals
# - Region only subtotals
# - Product only subtotals
# - Category + Region subtotals
# - Category + Product subtotals
# - Region + Product subtotals
# - Grand total

# For 3 dimensions, cube produces 2^3 = 8 combinations
# Rollup produces 3 + 1 = 4 combinations (hierarchical)

Rollup vs Cube

Featurerollup()cube()
CombinationsHierarchicalAll possible
For 3 dimensions4 combinations8 combinations
Use caseKnown hierarchyAd-hoc analysis
PerformanceFasterSlower

grouping() and grouping_id()

Identify which columns are aggregated in subtotal rows.

# grouping() returns 1 for aggregated (NULL) columns, 0 otherwise
rollup_result = df.rollup("category", "region").agg(
    sum("revenue").alias("total_revenue")
).withColumn(
    "grouping_info",
    concat(
        when(grouping("category") == 1, "ALL").otherwise(col("category")),
        lit(" | "),
        when(grouping("region") == 1, "ALL").otherwise(col("region"))
    )
)

rollup_result.show()

# +----------+------+-------------+--------------+
# | category |region|total_revenue|grouping_info |
# +----------+------+-------------+--------------+
# |Electronics|North|         2000|Electronics   |
# |Electronics|South|         2150|Electronics   |
# |Electronics| East|         1880|Electronics   |
# |Electronics| NULL|         6030|ElectronicsALL|
# |  Clothing|North|           50|Clothing      |
# |  Clothing|South|           55|Clothing      |
# |  Clothing| East|           45|Clothing      |
# |  Clothing| NULL|          150|ClothingALL   |
# |      NULL| NULL|         6180|ALL |ALL      |
# +----------+------+-------------+--------------+

# grouping_id() combines grouping flags into a single integer
rollup_result = df.rollup("category", "region").agg(
    sum("revenue").alias("total_revenue")
).withColumn(
    "grouping_id",
    grouping_id("category", "region")
)

# grouping_id values:
# 0: both category and region are NOT aggregated (detail)
# 1: region is aggregated (category subtotal)
# 3: both are aggregated (grand total)

grouping sets (SQL)

Explicitly define which combinations to aggregate.

# SQL grouping sets
df.createOrReplaceTempView("sales")

result = spark.sql("""
    SELECT category, region, SUM(revenue) as total_revenue
    FROM sales
    GROUP BY category, region
    GROUPING SETS (
        (category, region),    -- category + region detail
        (category),            -- category subtotal
        (region),              -- region subtotal
        ()                     -- grand total
    )
    ORDER BY category, region
""")

result.show()

pivot()

Transforms rows into columns (cross-tabulation).

# Basic pivot: rows to columns
pivot_result = df.groupBy("category").pivot("region").sum("revenue")

pivot_result.show()

# +----------+-----+-----+------+
# | category | East|North| South|
# +----------+-----+-----+------+
# |Electronics| 1880| 2000|  2150|
# |  Clothing|   45|   50|    55|
# +----------+-----+-----+------+

# Pivot with specific values (faster)
pivot_result = df.groupBy("category").pivot("region", ["North", "South", "East"]).sum("revenue")

# Pivot with multiple aggregations
pivot_multi = df.groupBy("category").pivot("region").agg(
    sum("revenue").alias("total"),
    avg("revenue").alias("average")
)

# Pivot with expression
pivot_expr = df.groupBy("category").pivot("region").agg(
    (sum("revenue") / count("*")).alias("avg_revenue")
)

Unpivot (Spark 3.4+)

Transforms columns back to rows.

# Unpivot: columns to rows
unpivot_result = pivot_result.unpivot(
    ["category"],  # ID columns
    ["East", "North", "South"],  # Value columns
    "region", "revenue"  # Output column names
)

unpivot_result.show()

# +----------+------+-------+
# | category |region|revenue|
# +----------+------+-------+
# |Electronics| East|   1880|
# |Electronics|North|   2000|
# |Electronics|South|   2150|
# |  Clothing| East|     45|
# |  Clothing|North|     50|
# |  Clothing|South|     55|
# +----------+------+-------+

Real-World Scenario: Google Multi-Dimensional Analytics

Problem Statement

Build a multi-dimensional analytics pipeline that computes revenue across multiple hierarchies, pivots for executive dashboards, and handles complex aggregation scenarios.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("GoogleMultiDimAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read sales data
sales = spark.read.parquet("s3a://google-data/sales/")

# === 1. Multi-level Rollup Analysis ===
# Hierarchy: region β†’ category β†’ product
revenue_rollup = sales.rollup("region", "category", "product").agg(
    sum("revenue").alias("total_revenue"),
    sum("quantity").alias("total_quantity"),
    count("*").alias("transaction_count"),
    avg("revenue").alias("avg_revenue")
)

# Label aggregation levels
revenue_labeled = revenue_rollup \
    .withColumn(
        "analysis_level",
        case()
        .when(col("product").isNotNull(), "Product Level")
        .when(col("category").isNotNull(), "Category Level")
        .when(col("region").isNotNull(), "Region Level")
        .otherwise("Grand Total")
    )

# === 2. Cube Analysis for Ad-Hoc Reporting ===
# All possible dimension combinations
cube_analysis = sales.cube("region", "category", "month").agg(
    sum("revenue").alias("total_revenue"),
    approx_count_distinct("customer_id").alias("unique_customers")
)

# Add grouping labels
cube_labeled = cube_analysis \
    .withColumn(
        "dimensions",
        concat_ws(", ",
            when(grouping("region") == 0, col("region")),
            when(grouping("category") == 0, col("category")),
            when(grouping("month") == 0, col("month"))
        )
    ) \
    .withColumn(
        "aggregation_level",
        grouping_id("region", "category", "month")
    )

# === 3. Executive Dashboard Pivot ===
# Regional performance by category
regional_pivot = sales.groupBy("region").pivot("category").agg(
    sum("revenue").alias("total"),
    round(avg("revenue"), 2).alias("avg"),
    count("*").alias("count")
)

# Monthly trend by category
monthly_pivot = sales.groupBy("month").pivot("category").agg(
    sum("revenue")
)

# === 4. Cohort Analysis with Grouping Sets ===
cohort_analysis = spark.sql("""
    SELECT 
        DATE_FORMAT(first_purchase_date, 'yyyy-MM') as cohort_month,
        DATE_FORMAT(purchase_date, 'yyyy-MM') as purchase_month,
        COUNT(DISTINCT customer_id) as customers,
        SUM(revenue) as revenue
    FROM (
        SELECT 
            *,
            MIN(purchase_date) OVER (PARTITION BY customer_id) as first_purchase_date
        FROM sales
    )
    GROUP BY 
        DATE_FORMAT(first_purchase_date, 'yyyy-MM'),
        DATE_FORMAT(purchase_date, 'yyyy-MM')
    GROUPING SETS (
        (DATE_FORMAT(first_purchase_date, 'yyyy-MM'), DATE_FORMAT(purchase_date, 'yyyy-MM')),
        (DATE_FORMAT(first_purchase_date, 'yyyy-MM'))
    )
""")

# === 5. Running Aggregates with Window Functions ===
daily_window = Window.partitionBy("region", "category").orderBy("date")

daily_trends = sales \
    .groupBy("region", "category", "date") \
    .agg(sum("revenue").alias("daily_revenue")) \
    .withColumn("rolling_7d_avg", 
                avg("daily_revenue").over(daily_window.rowsBetween(-6, 0))) \
    .withColumn("cumulative_revenue",
                sum("daily_revenue").over(daily_window)) \
    .withColumn("day_over_day_change",
                col("daily_revenue") - lag("daily_revenue", 1).over(daily_window))

# Show results
revenue_labeled.orderBy("region", "category", "product").show(30, truncate=False)
cube_labeled.filter(col("total_revenue") > 0).show(30, truncate=False)
regional_pivot.show(truncate=False)

spark.stop()

Advanced Pivot Patterns

Dynamic Pivot

# Generate pivot columns dynamically
pivot_columns = df.select("region").distinct().collect()
pivot_col_list = [row.region for row in pivot_columns]

dynamic_pivot = df.groupBy("category").pivot("region", pivot_col_list).sum("revenue")

Pivot with Multiple Aggregations

# Multiple aggregations in pivot
pivot_multi = df.groupBy("category").pivot("region").agg(
    struct(
        sum("revenue").alias("total"),
        avg("revenue").alias("avg"),
        count("*").alias("count")
    )
)

# Access nested fields
pivot_multi.select(
    "category",
    col("North.total").alias("north_total"),
    col("North.avg").alias("north_avg")
)

Pivot Performance Optimization

# BAD: Pivot without specifying values (scans data twice)
df.groupBy("category").pivot("region").sum("revenue")

# GOOD: Specify values (scans data once)
df.groupBy("category").pivot("region", ["North", "South", "East"]).sum("revenue")

# For large datasets, cache before pivot
df.cache()
df.count()  # Force materialization
pivot_result = df.groupBy("category").pivot("region").sum("revenue")

Edge Cases

1. NULL Values in Pivot

# NULL keys create a NULL column in pivot
# Filter NULLs before pivot
df.filter(col("region").isNotNull()) \
  .groupBy("category").pivot("region").sum("revenue")

2. Empty Groups

# rollup/cube include empty groups
# Use coalesce or filter to handle NULLs
rollup_result.na.fill(0)  # Replace NULL revenue with 0
rollup_result.na.drop()   # Remove rows with NULL keys

3. Large Pivot Tables

# Many unique values in pivot column = many output columns
# This can cause OOM or wide tables

# Solution 1: Filter before pivot
df.filter(col("region").isin(["North", "South", "East"])) \
  .groupBy("category").pivot("region").sum("revenue")

# Solution 2: Use unpivot for downstream processing

Performance Analysis

OperationShufflePartitionsMemoryUse Case
groupBy().agg()YesIncreasesLowStandard aggregation
rollup()YesIncreasesMediumHierarchical subtotals
cube()YesIncreasesHighAll combinations
pivot()YesIncreasesHighCross-tabulation
grouping_sets()YesIncreasesMediumCustom combinations

πŸ’‘Performance Tip

For large datasets, always specify pivot values explicitly to avoid scanning the data twice. Use cache() before complex multi-step aggregations. Consider whether you need rollup (hierarchical) or cube (all combinations) β€” rollup is faster.


Best Practices

πŸ’‘Aggregation Best Practices

  • Use rollup() for known hierarchies, cube() for ad-hoc analysis
  • Always specify pivot values for better performance
  • Use grouping() to identify subtotal levels in rollup/cube results
  • Cache DataFrame before multiple aggregation passes
  • Handle NULLs in rollup/cube results (they indicate subtotals)
  • Use grouping_sets() SQL for custom aggregation combinations
  • Consider partitioning output by aggregation level

Summary

Complex aggregations like rollup, cube, and pivot enable multi-dimensional analysis in PySpark. Understanding when to use each β€” rollup for hierarchical subtotals, cube for all combinations, pivot for cross-tabulation β€” is essential for building executive dashboards and analytical reports at Google and Microsoft scale.

Advertisement