πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Spark SQL: Predicate Pushdown, Column Pruning, Join Reorder

Apache SparkSQL Optimization⭐ Premium

Advertisement

Spark SQL: Predicate Pushdown, Column Pruning, Join Reorder

Difficulty: Expert | Companies: Databricks, Snowflake, Meta, Google, Amazon

ℹ️Interview Context

SQL optimization questions test understanding of Catalyst optimizer rules. Interviewers expect knowledge of how each optimization rule works and when it doesn't apply.

Question

Explain how Spark SQL's Catalyst optimizer applies predicate pushdown, column pruning, and join reordering. What are the limitations of each optimization? How does the cost-based optimizer (CBO) decide between different physical plans?


Detailed Answer

1. Catalyst Optimization Rules Overview

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("SQLOptimization") \
    .config("spark.sql.cbo.enabled", "true") \
    .config("spark.sql.statistics.histogram.enabled", "true") \
    .getOrCreate()

# Example query with multiple optimization opportunities:
result = spark.sql("""
    SELECT o.order_id, c.name, p.product_name, o.amount
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    JOIN products p ON o.product_id = p.id
    WHERE c.country = 'USA' 
      AND o.amount > 100
      AND p.category = 'Electronics'
    ORDER BY o.amount DESC
    LIMIT 100
""")

2. Predicate Pushdown

Predicate pushdown moves filter conditions as close to the data source as possible, reducing I/O.

# Without predicate pushdown:
# Scan ALL rows β†’ Filter β†’ Join β†’ Filter β†’ Sort β†’ Limit

# With predicate pushdown:
# Scan (with filter) β†’ Join β†’ Sort β†’ Limit
# Only relevant rows are read from storage

# Catalyst rule: PushThroughJoin
# Pushes predicates through joins when columns exist on both sides

# Example execution plan:
result.explain(True)
# == Optimized Logical Plan ==
# *(2) Sort [amount#45 DESC], false, 0
# +- *(2) GlobalLimit 101
#    +- *(2) LocalLimit 101
#       +- *(2) Project [order_id#42, name#51, product_name#62, amount#45]
#          +- *(2) BroadcastHashJoin [product_id#44], [id#60], BuildRight
#             :- *(2) BroadcastHashJoin [customer_id#43], [id#49], BuildLeft
#             :  :- *(2) Filter ((amount#45 > 100) AND isnotnull(customer_id#43))
#             :  :  +- *(2) FileScan parquet [order_id, customer_id, product_id, amount]
#             :  :     PushedFilters: [IsNotNull(customer_id), GreaterThan(amount,100)]
#             :  :     ReadSchema: <struct with only needed columns>
#             :  +- BroadcastExchange ...
#             :     +- *(1) Filter (isnotnull(id#49) AND (country#53 = USA))
#             :        +- *(1) FileScan parquet [id, name, country]
#             :           PushedFilters: [IsNotNull(id), EqualTo(country,USA)]
#             +- BroadcastExchange ...
#                +- *(1) Filter (isnotnull(id#60) AND (category#64 = Electronics))
#                   +- *(1) FileScan parquet [id, product_name, category]
#                      PushedFilters: [IsNotNull(id), EqualTo(category,Electronics)]
# Predicate Pushdown Mathematical Analysis:
#
# Let:
# N = total rows in table
# F = filter selectivity (fraction passing filter, 0 < F ≀ 1)
# B = I/O block size
# C = cost per row read
#
# Without pushdown:
#   I/O cost = N Γ— C  (read all rows)
#   Processing cost = N Γ— filter_cost
#   Total = N Γ— C + N Γ— filter_cost = N Γ— (C + filter_cost)
#
# With pushdown:
#   I/O cost = N Γ— C Γ— F  (only matching rows read)
#   Processing cost = N Γ— C Γ— F Γ— filter_cost
#   Total = N Γ— C Γ— F Γ— (1 + filter_cost)
#
# Savings = N Γ— C Γ— (1 - F) Γ— (1 + filter_cost)
# For F = 0.01 (1% selectivity): savings = 99% of I/O cost
# Limitations of predicate pushdown:
# 1. UDFs β€” Catalyst cannot push predicates through Python UDFs
udf_filter = F.udf(lambda x: x > 100, "boolean")
df = spark.table("orders").filter(udf_filter(F.col("amount")))
df.explain(True)  # UDF will NOT be pushed to scan

# 2. Non-deterministic functions
df = spark.table("orders").filter(F.rand() > 0.5)  # NOT pushed down

# 3. Complex expressions with side effects
df = spark.table("orders").filter(F.current_timestamp() > F.col("deadline"))

# 4. Null semantics β€” IS NULL cannot be pushed for complex types

3. Column Pruning

Column pruning eliminates reading columns that are not needed for the query.

# Without column pruning:
# SELECT name FROM users β†’ Read ALL columns, then project name

# With column pruning:
# SELECT name FROM users β†’ Read only name column

# Catalyst rule: ColumnPruning
# Applied after analysis, before physical planning

# Example:
df = spark.table("wide_table")  # 100 columns
result = df.select("col1", "col2")  # Only need 2 columns

result.explain(True)
# == Optimized Logical Plan ==
# *(1) Project [col1#1, col2#2]
# +- *(1) FileScan parquet [col1#1, col2#2]  ← only 2 columns read!
#    ReadSchema: <struct with 2 fields>

# Column pruning benefits:
# 1. Reduced I/O: read fewer bytes from storage
# 2. Reduced memory: less data in executor memory
# 3. Reduced network: less data transferred in shuffle

# Quantitative impact:
# Wide table: 100 columns, 1KB per row
# Without pruning: 100KB per row
# With pruning (2 columns): 2KB per row
# I/O reduction: 98%
# Limitations of column pruning:
# 1. Star schema with many joins β€” columns from fact table may be needed
# for join even if not in SELECT

# 2. Columns used in complex expressions
df = spark.table("events").withColumn(
    "score", F.col("col_a") * F.col("col_b") + F.col("col_c")
).select("score")
# col_a, col_b, col_c cannot be pruned

# 3. Columns in WHERE clause
df = spark.table("events").filter(F.col("status") == "active").select("name")
# status column cannot be pruned

# 4. Shuffle β€” all columns needed for join key must be materialized

4. Join Reordering

Catalyst reorders joins to minimize intermediate result size.

# Original join order: A β‹ˆ B β‹ˆ C
# Reordered: B β‹ˆ C β‹ˆ A (if smaller intermediate)

# Catalyst rule: ReorderJoin
# Uses a greedy algorithm to find optimal join order

# Example:
# orders (1M rows) β‹ˆ customers (100K rows) β‹ˆ products (10K rows)

# Without reordering:
# Step 1: orders β‹ˆ customers β†’ 1M intermediate rows
# Step 2: (1M intermediate) β‹ˆ products β†’ 1M result rows

# With reordering:
# Step 1: customers β‹ˆ products β†’ 100K intermediate rows (smaller!)
# Step 2: orders β‹ˆ (100K intermediate) β†’ 1M result rows

# Catalyst heuristic:
# 1. Start with smallest tables (by row count)
# 2. Join with next smallest that shares a join key
# 3. Continue until all tables joined

# Cost-based join reordering (Spark 3.x):
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")

# CBO uses table statistics to compute join costs:
# Cost(A β‹ˆ B) = |A| + |B| + |A| Γ— |B| / max(unique_keys(A), unique_keys(B))
#
# For sort-merge join:
# Cost_SMJ = O(|A| Γ— log(|A|) + |B| Γ— log(|B|))
#
# For broadcast hash join:
# Cost_BHJ = O(|A| + |B|)  (if |B| < broadcast_threshold)

5. Cost-Based Optimizer (CBO)

# CBO requires table statistics:
# 1. Row count
# 2. Column statistics (min, max, distinct count, null count)
# 3. Histograms (for skewed data distributions)

# Compute statistics:
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount histogram")

# Verify statistics:
spark.sql("DESCRIBE FORMATTED orders").show()
# Look for: Statistics, Size in Bytes, # Rows

# CBO decision example:
# Query: SELECT * FROM orders WHERE customer_id = 12345
# 
# Plan A: Full table scan + filter
#   Cost = N (scan all rows)
#   Estimated rows after filter = N / distinct(customer_id)
#
# Plan B: Index lookup (if available)
#   Cost = log(N) (B-tree lookup)
#   Estimated rows after filter = exact count
#
# CBO chooses Plan B if cost is lower

# Enable CBO features:
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")

6. Advanced Optimization Rules

# Rule 1: Constant Folding
# Simplifies constant expressions at compile time
df = spark.table("events").filter(F.lit(1) + F.lit(1) == F.lit(2))
# Becomes: filter(True) β†’ filter removed entirely

# Rule 2: Null Propagation
# Pushes IS NULL checks through operations
df = spark.table("events").filter(F.col("id").isNotNull())
# If id is primary key (NOT NULL constraint), filter is removed

# Rule 3: Limit Pushdown
# Pushes LIMIT into table scans
df = spark.table("events").limit(100)
# Reads only 100 rows from scan (if supported by format)

# Rule 4: Subquery Elimination
# Converts correlated subqueries to joins
df = spark.sql("""
    SELECT * FROM orders 
    WHERE customer_id IN (SELECT id FROM customers WHERE country = 'USA')
""")
# Becomes: orders JOIN customers ON customer_id = id AND country = 'USA'

# Rule 5: Aggregate Pushdown
# Pushes aggregations into data sources that support it
df = spark.table("parquet_table").groupBy("category").count()
# If Parquet file has statistics, aggregation may be pushed down

7. Query Plan Analysis

# Analyzing query plans for optimization opportunities:
def analyze_query_plan(df, label=""):
    print(f"\n{'='*60}")
    print(f"Query Plan: {label}")
    print('='*60)
    
    # Unresolved Logical Plan
    df.explain(extended=True)
    
    # Key metrics to look for:
    # 1. PushedFilters β€” predicates pushed to scan
    # 2. ReadSchema β€” columns being read
    # 3. PartitionFilters β€” partition pruning applied
    # 4. PushedAggregates β€” aggregation pushdown
    # 5. BroadcastExchange β€” broadcast joins used
    # 6. SortMergeJoin vs BroadcastHashJoin β€” join strategy

# Compare before/after optimization:
df_original = spark.table("orders").join(
    spark.table("customers"), "customer_id"
).filter(F.col("amount") > 100)

df_optimized = spark.table("orders").filter(F.col("amount") > 100).join(
    spark.table("customers"), "customer_id"
)

analyze_query_plan(df_original, "Before optimization")
analyze_query_plan(df_optimized, "After predicate pushdown")

⚠️Common Pitfall

Predicate pushdown doesn't work with Python UDFs because Catalyst treats them as opaque functions. Always prefer built-in functions or Pandas UDFs for optimization-friendly code.

πŸ’‘Interview Tip

When discussing CBO, mention that it requires accurate statistics. If statistics are stale or missing, CBO may make worse decisions than the rule-based optimizer. Always ANALYZE TABLE after large data changes.


Summary

OptimizationWhat It DoesWhen It WorksLimitations
Predicate PushdownReduces I/O by filtering earlyColumn filters on sourceUDFs, non-deterministic functions
Column PruningReads only needed columnsColumn projectionsComplex expressions, shuffle
Join ReorderMinimizes intermediate resultsMulti-way joinsStatistics dependency
CBOCost-based plan selectionWith accurate statisticsStale statistics can hurt

These optimizations work together in Catalyst to produce highly efficient execution plans from declarative SQL queries.

Advertisement