Spark Internals: Catalyst, Tungsten, DAG Scheduler
Difficulty: Expert | Companies: Databricks, Meta, Netflix, Uber, Airbnb
βΉοΈInterview Context
This question is commonly asked for Senior/Staff Data Engineer roles at FAANG+ companies. Expect 45-60 minutes on this topic alone, with deep follow-ups on each subsystem.
Question
Explain the complete lifecycle of a Spark job from code submission to task execution. Cover the Catalyst optimizer's four phases, Tungsten's memory management and code generation, and the DAG Scheduler's stage boundary decisions. How does Spark achieve near-native performance without JIT compilation of user code?
Detailed Answer
1. Spark Application Lifecycle
The journey from spark-submit to physical execution involves multiple translation layers:
User Code (DataFrame/SQL/RDD)
β Unresolved Logical Plan
β Resolved Logical Plan
β Optimized Logical Plan
β Physical Plan
β RDD DAG
β Stage/Task DAG
β Task Binary + Partition Info
β Executor JVM execution
2. Catalyst Optimizer β Four Phases
Catalyst is a rule-based + cost-based optimizer framework. Each phase transforms the query plan:
Phase 1: Analysis
The analyzer resolves names and columns against the Catalog (metastore):
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("CatalystDeepDive") \
.config("spark.sql.analyzer.enabled", "true") \
.getOrCreate()
# Unresolved plan β column "name" not yet validated
df = spark.table("users").select("name", "age")
# After analysis β catalog lookup confirms "users.name" and "users.age" exist
df.explain(True)
The analysis phase performs:
- Table resolution: Maps
"users"βCatalogTable(identifier=Database.default, table=users) - Column resolution: Maps
"name"βAttributeReference(name, StringType, nullable=true) - Type checking: Validates compatible types for operations
Phase 2: Optimization (Logical)
Rule-based transformations applied repeatedly until no more rules fire:
# Rules applied include:
# 1. Constant Folding: lit(2) + lit(3) β lit(5)
# 2.θ°θ―δΈζ¨ (Predicate Pushdown): filter before join
# 3. Column Pruning: remove unused columns early
# 4. Limit Pushdown: push LIMIT into scans
# Example: predicate pushdown
df = spark.table("orders").filter(F.col("amount") > 100) \
.join(spark.table("customers"), "customer_id")
# Catalyst pushes filter into the orders scan
df.explain(True)
# == Optimized Logical Plan ==
# *(2) Project [customer_id, amount, name]
# *(2) BroadcastHashJoin [customer_id], [customer_id], BuildLeft
# :- BroadcastExchange HashedRelationBroadcastMode
# : +- *(1) Filter (amount > 100) β pushed down
# : +- *(1) FileScan parquet [amount, customer_id]
# +- *(2) FileScan parquet [customer_id, name] β pruned columns
Phase 3: Physical Planning
Generates one or more physical plans and selects the best one using a cost model:
# Cost model considers:
# - Estimated row count (from table statistics)
# - Estimated data size per partition
# - Join selectivity estimates
# - Sort cost vs hash cost
# Spark may choose between:
# - SortMergeJoin (cost = O(n log n + m log m))
# - BroadcastHashJoin (cost = O(n Γ m) but m is tiny)
# - ShuffleHashJoin (cost = O(n Γ m) with hash table)
# The cost formula for SortMergeJoin:
# Cost_SMJ = (n Γ logβ(n)) + (m Γ logβ(m)) + (n + m)
# where n, m are estimated row counts of left/right
# For BroadcastHashJoin:
# Cost_BHJ = n + B
# where B = broadcast threshold (default 10MB)
Phase 4: Code Generation (Tungsten)
The physical plan is converted to an optimized RDD DAG with whole-stage code generation:
# Spark 3.x enables whole-stage code generation by default
# Multiple operators are fused into a single JVM method
# Without codegen (Spark 1.x):
# Scan β Filter β Project β Sort β HashAggregate
# Each operator is a separate virtual function call
# With whole-stage codegen (Spark 3.x):
# One generated Java method: doProcess(row)
# Inlined filter, projection, sort key extraction, aggregation
# To inspect generated code:
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
# View generated code in Spark UI β SQL tab β expand stage
3. Tungsten Execution Engine
Tungsten provides three key optimizations:
3a. Off-Heap Memory Management
# Tungsten uses UnsafeRow β a compact binary format
# Instead of Java objects (which have 16-byte headers + padding)
# Object row size (approximate):
# Each field: 8 bytes (reference) + actual object
# String: 40 bytes minimum (StringObject header + char array ref)
# Total per row with 5 fields: ~200+ bytes
# UnsafeRow size:
# Header (8 bytes) + fixed-width fields (8 bytes each) + offset array
# Total per row with 5 fields: ~56 bytes
<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 80" width="100%" style={{ maxWidth: 700 }} xmlns="http://www.w3.org/2000/svg">
<defs>
<linearGradient id="sa-mem-grad" x1="0" y1="0" x2="1" y2="1">
<stop offset="0%" stopColor="#6366f1"/>
<stop offset="100%" stopColor="#8b5cf6"/>
</linearGradient>
<filter id="sa-mem-shadow">
<feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.15"/>
</filter>
</defs>
<rect x="0" y="10" width="160" height="50" rx="12" fill="url(#sa-mem-grad)" filter="url(#sa-mem-shadow)"/>
<text x="80" y="40" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="13" fontWeight="600">Header (8B)</text>
<rect x="170" y="10" width="160" height="50" rx="12" fill="#3b82f6" filter="url(#sa-mem-shadow)"/>
<text x="250" y="40" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="13" fontWeight="600">Field0 (8B)</text>
<rect x="340" y="10" width="160" height="50" rx="12" fill="#3b82f6" filter="url(#sa-mem-shadow)"/>
<text x="420" y="40" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="13" fontWeight="600">Field1 (8B)</text>
<rect x="510" y="10" width="160" height="50" rx="12" fill="#3b82f6" filter="url(#sa-mem-shadow)"/>
<text x="590" y="40" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="13" fontWeight="600">Field2 (8B)</text>
<text x="700" y="40" textAnchor="middle" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="16" fontWeight="700">...</text>
</svg>
</div>
# Enable off-heap memory:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
# Verify memory usage via Spark UI β Storage/Execution tabs
3b. Whole-Stage Code Generation
# The code generator produces a single long method that:
# 1. Inlines all operator logic
# 2. Eliminates virtual function dispatch
# 3. Enables CPU cache-friendly sequential access
# 4. Allows JVM JIT to optimize the fused code
# Mathematical benefit:
# Original: O(n Γ k) where k = number of operators (virtual calls)
# Codegen: O(n) β single pass, no call overhead
# Benchmark results (typical):
# Without codegen: ~100M rows/sec/operator
# With codegen: ~500M rows/sec/operator (5x improvement)
3c. Cache-Aware Computation
# Tungsten Sort uses a specialized radix sort
# that is aware of CPU cache hierarchy:
#
# L1 Cache: 32KB (per core)
# L2 Cache: 256KB (per core)
# L3 Cache: 8MB (shared)
#
# Radix sort algorithm:
# 1. Determine number of passes: ceil(bits / pass_bits)
# 2. For 64-bit keys with 8-bit passes: 8 passes
# 3. Each pass creates histogram β prefix sum β scatter
#
# Time complexity: O(n Γ passes) β O(8n) = O(n)
# Compare with comparison sort: O(n Γ logβ(n))
# For n = 1 billion: 8B vs 30B operations β 3.75x faster
4. DAG Scheduler β Stage Boundaries
The DAG Scheduler divides the RDD graph into stages based on shuffle boundaries:
# Stage boundary rule:
# A new stage is created whenever a shuffle dependency exists
# (i.e., data must be redistributed across partitions)
# Example pipeline:
df = spark.read.parquet("events") # Stage 0: Scan
.filter(F.col("type") == "click") # Stage 0: Filter (same partition)
.groupBy("user_id") \ # β SHUFFLE BOUNDARY
.agg(F.count("*")) # Stage 1: Aggregation
.join(broadcast_df, "user_id") # β SHUFFLE (or broadcast)
.select("user_id", "count") # Stage 1 or 2
<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 380" width="100%" style={{ maxWidth: 600 }} xmlns="http://www.w3.org/2000/svg">
<defs>
<linearGradient id="sa-stage0" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#3b82f6"/>
<stop offset="100%" stopColor="#2563eb"/>
</linearGradient>
<linearGradient id="sa-stage1" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#8b5cf6"/>
<stop offset="100%" stopColor="#7c3aed"/>
</linearGradient>
<linearGradient id="sa-stage2" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#10b981"/>
<stop offset="100%" stopColor="#059669"/>
</linearGradient>
<filter id="sa-stg-shadow">
<feDropShadow dx="0" dy="3" stdDeviation="4" floodOpacity="0.15"/>
</filter>
<marker id="sa-arrow" viewBox="0 0 10 7" refX="10" refY="3.5" markerWidth="8" markerHeight="6" orient="auto-start-reverse">
<polygon points="0 0, 10 3.5, 0 7" fill="#6b7280"/>
</marker>
</defs>
<rect x="50" y="10" width="700" height="90" rx="18" fill="url(#sa-stage0)" filter="url(#sa-stg-shadow)"/>
<text x="400" y="38" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="15" fontWeight="700">Stage 0</text>
<text x="400" y="62" textAnchor="middle" fill="rgba(255,255,255,0.9)" fontFamily="Inter,system-ui,sans-serif" fontSize="12">Scan β Filter β HashPartitioner</text>
<line x1="400" y1="100" x2="400" y2="140" stroke="#6b7280" strokeWidth="2" markerEnd="url(#sa-arrow)"/>
<text x="430" y="125" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">Shuffle Write</text>
<rect x="50" y="145" width="700" height="110" rx="18" fill="url(#sa-stage1)" filter="url(#sa-stg-shadow)"/>
<text x="400" y="173" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="15" fontWeight="700">Stage 1</text>
<text x="400" y="197" textAnchor="middle" fill="rgba(255,255,255,0.9)" fontFamily="Inter,system-ui,sans-serif" fontSize="12">Shuffle Read β Partial Agg β Combine β Final Agg</text>
<line x1="400" y1="255" x2="400" y2="295" stroke="#6b7280" strokeWidth="2" markerEnd="url(#sa-arrow)"/>
<rect x="50" y="300" width="700" height="70" rx="18" fill="url(#sa-stage2)" filter="url(#sa-stg-shadow)"/>
<text x="400" y="328" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="15" fontWeight="700">Stage 2</text>
<text x="400" y="350" textAnchor="middle" fill="rgba(255,255,255,0.9)" fontFamily="Inter,system-ui,sans-serif" fontSize="12">BroadcastHashJoin β Project β Write</text>
</svg>
</div>
# The DAG Scheduler creates a Stage object for each stage:
# - MapStage: for map-side operations (scan, filter, project)
# - ShuffleMapStage: produces shuffle output
# - ResultStage: produces the final result
# Task distribution:
# - One Task per partition within a Stage
# - Tasks are serialized and sent to executors
# - TaskSetManager tracks task state (pending/running/done/failed)
# Stage submission logic:
# A stage can only submit when all parent stages are complete
# This creates a DAG of stage dependencies
5. Key Metrics and Diagnostics
# Enable detailed metrics collection
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.eventLog.enabled", "true")
# Monitor via Spark UI:
# - SQL tab: query plan visualization
# - Stages tab: task-level metrics (shuffle read/write, GC time)
# - Executors tab: memory usage, task distribution
# - Storage tab: cached RDD/DataFrame information
# Programmatic metrics access:
result = df.groupBy("category").count().collect()
# Spark automatically tracks:
# - recordsRead, bytesRead
# - shuffleRead, shuffleWrite
# - peakExecutionMemory
# - resultRows, resultSerializationTime
β οΈCommon Pitfall
A common mistake is assuming Catalyst optimizes across UDF boundaries. Catalyst cannot optimize inside Python UDFs β it treats them as opaque black boxes. Always prefer built-in functions or Pandas UDFs for optimization-friendly code.
π‘Interview Tip
When discussing Tungsten, mention that UnsafeRow is not truly "unsafe" β it's "unsafe" because it bypasses Java's memory safety guarantees for performance. This is a deliberate engineering trade-off, not a design flaw.
6. Spark 3.x Adaptive Query Execution (AQE)
# AQE dynamically re-optimizes queries at runtime:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE can:
# 1. Coalesce post-shuffle partitions (reduce small files)
# 2. Convert sort-merge join to broadcast join at runtime
# 3. Handle skewed joins by splitting large partitions
# 4. Re-optimize subqueries based on runtime statistics
# Before AQE (static plan):
# Stage 0: Scan (200 partitions) β Stage 1: Shuffle (200 partitions)
# Many partitions may be nearly empty β wasted scheduling overhead
# After AQE (dynamic):
# Stage 0: Scan (200 partitions) β Stage 1: Coalesced Shuffle (12 partitions)
# 16.7x reduction in scheduling overhead
βΉοΈPerformance Impact
AQE typically improves query performance by 10-40% on production workloads, with the biggest gains on queries with skewed data or many small partitions.
Summary
| Component | Purpose | Key Innovation |
|---|---|---|
| Catalyst | Query optimization | Four-phase rule-based + cost-based optimization |
| Tungsten | Execution engine | Off-heap memory, codegen, cache-aware algorithms |
| DAG Scheduler | Stage management | Shuffle-based stage boundaries, task serialization |
| AQE | Runtime optimization | Dynamic partition coalescing, skew handling |
The synergy between these components allows Spark to approach native performance while maintaining the flexibility of a general-purpose distributed computing framework.