Catalyst Optimizer
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Understanding Catalyst's Pipeline
Catalyst translates SQL/DataFrame operations through four phases: Analysis, Logical Optimization, Physical Planning, and Code Generation.
The Four Phases
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder \
.appName("CatalystOptimizer") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Create a query that exercises Catalyst
df = spark.read.parquet("hdfs://data/orders") \
.filter(F.col("amount") > 100) \
.join(spark.read.parquet("hdfs://data/customers"), "customer_id") \
.groupBy("region") \
.agg(F.sum("amount").alias("total"))
# View all plan stages
print("=== UNRESOLVED LOGICAL PLAN ===")
df.explain(mode="simple")
print("\n=== ANALYZED LOGICAL PLAN ===")
# After analysis (resolved references, type checking)
df.explain(mode="extended")
print("\n=== OPTIMIZED LOGICAL PLAN ===")
# After optimization rules applied
df.explain(mode="formatted")
print("\n=== PHYSICAL PLAN ===")
# After physical planning (chosen execution strategy)
df.explain(mode="cost")
βΉοΈ
Interview Insight: Catalyst is rule-based in Spark 2.x but adds cost-based optimization (CBO) in Spark 3.x. Understanding each phase helps you write queries that Catalyst can optimize effectively.
Analysis Phase
The analyzer resolves references and validates types using the Catalog.
# Catalyst's analysis phase:
# 1. Resolves UnresolvedRelations to concrete tables
# 2. Looks up column references in the schema
# 3. Type checking and coercion
# 4. Resolves functions to their implementations
# Example: Column resolution
df = spark.read.parquet("hdfs://data/users")
result = df.select("name", "age") # Columns resolved here
# Type coercion happens automatically
df.withColumn("amount_str", F.col("amount").cast("string"))
df.withColumn("date_col", F.to_date(F.lit("2024-01-01")))
# Check if Catalyst resolved references correctly
result.explain(mode="extended") # Shows analyzed plan
Logical Optimization Rules
Catalyst applies dozens of optimization rules to the logical plan.
Predicate Pushdown
# Catalyst pushes filters as close to data source as possible
df = spark.read.parquet("hdfs://data/events")
# This filter is pushed down to Parquet reader
result = df \
.filter(F.col("event_type") == "click") \
.filter(F.col("amount") > 100)
# Check that filters are pushed to the data source
result.explain(mode="formatted")
# Look for "PushedFilters" in the Parquet scan
# Catalyst also pushes past joins
orders = spark.read.parquet("hdfs://data/orders")
customers = spark.read.parquet("hdfs://data/customers")
result = orders \
.join(customers, "customer_id") \
.filter(F.col("order_date") > "2024-01-01")
# Filter on order_date is pushed before the join
result.explain(mode="formatted")
Column Pruning
# Catalyst eliminates unnecessary columns early
df = spark.read.parquet("hdfs://data/wide-table") # 100+ columns
# Only read the columns you need
result = df.select("col1", "col2", "col3") \
.filter(F.col("col1") > 100)
# Check that only needed columns are read
result.explain(mode="formatted")
# Look for column list in the scan node
# Column pruning also applies to nested structures
df = spark.read.parquet("hdfs://data/nested")
result = df.select(
F.col("nested_struct.field1"),
F.col("array_col")[0]
)
Constant Folding and Strength Reduction
# Catalyst evaluates constant expressions at compile time
df = spark.read.parquet("hdfs://data/sales")
result = df.withColumn("tax", F.col("amount") * 0.08) \
.withColumn("total", F.col("amount") + F.col("tax")) \
.filter(F.col("total") > 100)
# Catalyst computes: amount * 0.08 + amount > 100
# Simplifies to: amount * 1.08 > 100
# Simplifies to: amount > 92.59
result.explain(mode="formatted")
# Strength reduction: converts expensive operations to cheaper ones
result = df.filter(F.year(F.col("date")) == 2024)
# May be converted to date range filter
βΉοΈ
Key Rule: Write filter conditions in a way that Catalyst can optimize. Use direct column comparisons rather than UDFs when possible.
Physical Planning
Catalyst generates multiple physical plans and selects the best one.
# Catalyst considers multiple join strategies
orders = spark.read.parquet("hdfs://data/orders")
customers = spark.read.parquet("hdfs://data/customers")
# Physical plan options:
# 1. SortMergeJoin (default for large tables)
# 2. BroadcastHashJoin (if small table fits in memory)
# 3. ShuffleHashJoin (medium tables)
# 4. BroadcastNestedLoopJoin (no join key)
# Check which strategy was chosen
result = orders.join(customers, "customer_id")
result.explain(mode="cost") # Shows cost estimates
# Force specific join strategy
from pyspark.sql.functions import broadcast
result = orders.join(broadcast(customers), "customer_id")
result.explain(mode="formatted")
Cost-Based Optimization (Spark 3.x)
# Enable CBO for better plan selection
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
# Collect table statistics
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount")
# Now CBO uses statistics for better decisions
result = orders.join(customers, "customer_id")
result.explain(mode="cost") # Shows cost estimates with statistics
Whole-Stage Code Generation
Spark 3.x uses Tungsten's whole-stage code generation to produce optimized bytecode.
# Whole-stage code generation merges multiple operators
df = spark.read.parquet("hdfs://data/events")
# This query benefits from code generation
result = df \
.filter(F.col("amount") > 100) \
.withColumn("tax", F.col("amount") * 0.08) \
.groupBy("category") \
.agg(F.sum("amount"), F.count("*"))
# Check code generation in physical plan
result.explain(mode="formatted")
# Look for "(2) WholeStageCodegen" nodes
# Control code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
spark.conf.set("spark.sql.codegen.maxNestedFields", "50")
β οΈ
Warning: Very complex queries with many joins or UDFs may not benefit from code generation. UDFs break the code generation pipeline.
Custom Catalyst Rules (Advanced)
# You can extend Catalyst with custom rules (Scala/Java only)
# This is typically done in library development
# Example: Custom optimization rule in Scala
# class MyCustomRule extends Rule[LogicalPlan] {
# def apply(plan: LogicalPlan): LogicalPlan = plan transform {
# case Filter(condition, child) =>
# // Custom filter optimization logic
# optimizeFilter(condition, child)
# }
# }
# For PySpark, use Spark's built-in extensions
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CustomCatalyst") \
.config("spark.sql.extensions",
"com.example.MyCustomRule") \
.getOrCreate()
Debugging Catalyst Optimizations
# Detailed plan analysis
df = spark.read.parquet("hdfs://data/complex-query")
result = df \
.join(spark.read.parquet("hdfs://data/dim1"), "key1") \
.join(spark.read.parquet("hdfs://data/dim2"), "key2") \
.filter(F.col("status") == "active") \
.groupBy("category") \
.agg(F.sum("amount"))
# Simple plan
result.explain(mode="simple")
# Extended plan with analysis
result.explain(mode="extended")
# Formatted plan with node details
result.explain(mode="formatted")
# Cost-based plan with estimates
result.explain(mode="cost")
# Most detailed
result.explain(mode="everything")
# Check if Catalyst applied specific optimizations
plan = result._jdf.queryExecution().optimizedPlan()
print(plan.toString()) # Shows optimized logical plan
βΉοΈ
Key Takeaway: Catalyst's optimization pipeline transforms your query through analysis, logical optimization, physical planning, and code generation. Understanding this pipeline helps you write queries that Catalyst can optimize effectively.
Follow-Up Questions
- How does Catalyst handle UDFs in the optimization pipeline?
- Explain the difference between logical plan and physical plan. When is each created?
- What are the limitations of Catalyst's cost-based optimization in Spark 3.x?
- How does predicate pushdown interact with partitioned tables?
- Describe how Catalyst optimizes window functions.