PySpark SQL Engine
DfSpark SQL Engine
The Spark SQL Engine is the computational backend that translates SQL queries and DataFrame operations into optimized RDD computations. It consists of the Catalyst optimizer (logical/physical plan optimization) and the Tungsten execution engine (memory management and code generation).
DfWhole-Stage Code Generation
Whole-stage code generation (Tungsten) compiles multiple operators into a single optimized JVM function, eliminating virtual function calls and leveraging CPU registers and SIMD instructions. This can improve query performance by 2xβ10x.
DfAdaptive Query Execution (AQE)
Adaptive Query Execution is a Spark 3.0+ feature that optimizes query plans at runtime based on actual data statistics. It can coalesce shuffle partitions, convert sort-merge joins to broadcast joins, and handle data skew automatically.
DfPhysical Plan
A Physical Plan is the concrete execution strategy chosen by Catalyst, specifying exactly how each operation will be executed (e.g., which join algorithm, which scan method, which aggregation strategy).
Here,
- =I/O cost of reading source data
- =CPU cost of applying filter predicates
- =CPU cost of column projection
- =Shuffle + compute cost of join operations
- =Shuffle + compute cost of aggregations
Tungsten Memory Format
Here,
- =Bit array marking NULL values (1 bit per column)
- =Fixed-width values stored sequentially (8 bytes each)
- =Offset pointers to variable-length data
- =Variable-width values (strings, arrays, maps)
Here,
- =Selectivity of the join (fraction of cross product)
- =Number of rows in left relation
- =Number of rows in right relation
- =Number of distinct join key values
AQE Partition Coalescing
Here,
- =Final number of partitions after coalescing
- =Total shuffle output size in bytes
- =Target partition size (default 64MB)
- =Minimum number of partitions
Catalyst optimization rules include: Constant Folding, Boolean Simplification, Filter Pushdown, Column Pruning, Join Reordering, and Subquery Elimination. These rules are applied iteratively until no further improvements are found.
Enable AQE (Adaptive Query Execution) in Spark 3.x to automatically optimize shuffle partition count, convert sort-merge joins to broadcast joins at runtime, and handle data skew.
Whole-stage code generation is disabled when using Python UDFs. For performance-critical operations, use built-in Spark SQL functions instead of Python UDFs to leverage Tungsten optimizations.
ThTungsten Memory Efficiency
Theorem: Tungsten's off-heap binary row format achieves memory efficiency β₯ 2x compared to Java object serialization by eliminating object headers, pointer indirection, and GC pressure. This allows processing datasets larger than heap size through managed off-heap allocation.
- Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
- Catalyst: Analysis β Optimization β Physical Planning β Code Generation
- Tungsten: Off-heap binary format eliminates GC overhead, whole-stage codegen eliminates virtual calls
- AQE (Adaptive Query Execution) provides runtime optimization
- Predicate pushdown reduces data volume before shuffle/join operations
- Join selectivity = (|R| Γ |S|) / (max(|R|, |S|) Γ |U|)
- AQE partition coalescing reduces small files: P_final = max(shuffleSize/targetSize, minPartitions)
SQL Execution Flow
Architecture Diagram
Detailed Explanation
1. Spark SQL Engine Overview
Spark SQL is a Spark module for structured data processing.
Key Capabilities:
- Programming abstraction called DataFrames and Datasets
- SQL query engine that can run SQL/HiveQL queries
- Integration with Spark's ecosystem (Spark Streaming, MLlib)
- Connectivity to various data sources (Parquet, JSON, JDBC, Hive)
Three Main Components:
- Catalyst Optimizer: Transforms logical plans into optimized physical plans
- Tungsten Execution Engine: Efficient runtime execution with code generation
- Data Source API: Integration with various storage systems
2. Catalyst Optimizer Deep Dive
The Catalyst optimizer is a framework for manipulating query plans.
Core Concepts:
- Trees: Representations of query plans
- Rules: Transformations applied to trees
- Strategies: Ordering of rule application
Optimization Techniques:
| Technique | Before | After |
|---|---|---|
| Predicate Pushdown | Project(name) β Filter(age > 30) β Scan(all) | Project(name) β Scan(name, age) β Filter(age > 30) |
| Column Pruning | Project(name, age) β Scan(id, name, age, salary, dept) | Project(name, age) β Scan(name, age) |
| Constant Folding | Filter(age > 25 + 5) | Filter(age > 30) |
| Join Reordering | (A β B) β C | A β (B β C) β if BβC is smaller |
3. Tungsten Execution Engine
Tungsten focuses on three areas:
| Area | Description |
|---|---|
| Memory Management | Off-heap memory; binary row format; cache-aware algorithms |
| Code Generation | Whole-stage codegen; tight JVM bytecode loops; eliminates virtual calls |
| Columnar Processing | Vectorized operations; SIMD-friendly layout; efficient compression |
4. Physical Plan Strategies
Join Selection:
| Strategy | When Used |
|---|---|
| Broadcast Hash Join | Small tables (< 10MB default) |
| Sort-Merge Join | Large tables with sorted data |
| Shuffle Hash Join | Medium-sized tables |
| Cartesian Product | Cross joins (avoid!) |
Aggregation Strategies:
- Hash Aggregation: Fast, uses hash table
- Sort Aggregation: Better for sorted input
- Tungsten Aggregation: Optimized with code generation
5. Shuffle and Exchange Operations
Shuffles occur when data needs to be redistributed across partitions.
Triggers:
- Wide transformations (groupBy, join, repartition)
- Data exchange between stages
- Most expensive operation in Spark
Shuffle Optimization:
- Use broadcast joins to avoid shuffle
- Partition data to minimize shuffle
- Use bucketing for repeated joins
- Tune shuffle partition count
6. Data Source API
Spark supports multiple data sources:
| Type | Formats |
|---|---|
| File formats | Parquet, ORC, JSON, CSV, Text |
| JDBC | Relational databases |
| Hive | Hive tables |
| Custom | DataSource API v2 |
Predicate Pushdown to Sources:
- Parquet: Row group filtering via statistics
- ORC: Bloom filter pushdown
- JDBC: SQL WHERE clause pushdown
7. Performance Monitoring
Spark UI Locations:
- SQL tab: Query plans, execution times
- Stages tab: Task distribution, shuffle read/write
- Storage tab: Cached data, memory usage
Key Metrics to Watch:
scanTime: Time to read datashuffleReadTime: Time to read shuffle datashuffleWriteTime: Time to write shuffle datataskDuration: Time per taskgcTime: Garbage collection time
Best Practice: Always use
df.explain(True)to verify Catalyst optimizations are being applied.
Key Concepts Table
| Concept | Description | Optimization |
|---|---|---|
| Catalyst | Query optimizer for logical/physical plans | Automatic plan optimization |
| Tungsten | Execution engine with code generation | Whole-stage code generation |
| Predicate Pushdown | Filter pushed to data source | Reduce I/O |
| Column Pruning | Read only required columns | Reduce memory/I/O |
| Constant Folding | Evaluate constants at compile time | Reduce runtime computation |
| Broadcast Join | Small table broadcast to executors | Avoid shuffle |
| Sort-Merge Join | Join sorted partitions | Efficient for large tables |
| Hash Aggregation | Use hash table for aggregation | Fast for groupBy |
| Shuffle | Data redistribution across partitions | Most expensive operation |
| Whole-Stage Codegen | Generate tight JVM bytecode | Eliminate virtual calls |
| AQE | Runtime query plan optimization | Adaptive to data statistics |
Code Examples
Example 1: SQL Queries with Catalyst
from pyspark.sql import SparkSession
# Create SparkSession with AQE enabled
spark = SparkSession.builder \
.appName("SparkSQLEngine") \
.config("spark.sql.adaptive.enabled", "true") \ # Enable AQE
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ # Coalesce small partitions
.getOrCreate()
# Create tables
employees = spark.createDataFrame([
(1, "Alice", "Engineering", 30, 75000),
(2, "Bob", "Marketing", 25, 65000),
(3, "Charlie", "Engineering", 35, 90000),
(4, "Diana", "Marketing", 28, 70000),
(5, "Eve", "Engineering", 32, 85000),
(6, "Frank", "Sales", 29, 60000),
(7, "Grace", "Sales", 31, 68000)
], ["id", "name", "department", "age", "salary"])
# Create temporary view for SQL access
employees.createOrReplaceTempView("employees")
# Simple query with GROUP BY and HAVING
result = spark.sql("""
SELECT department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM employees
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 1
ORDER BY avg_salary DESC
""")
result.show()
# Explain the query plan (True = all plans)
# Shows: Parsed β Analyzed β Optimized β Physical
result.explain(True)
# Get query execution details
print(f"Number of output rows: {result.count()}")
print(f"Query execution time: {result.head(1)}")
Example 2: Catalyst Optimizer Rules
# Show optimized plan
df = spark.sql("""
SELECT e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.department = d.name
WHERE e.age > 30
AND e.salary > 70000
""")
# Print physical plan with optimizations
df.explain(True)
# Output shows:
# == Parsed Logical Plan ==
# ...
# == Analyzed Logical Plan ==
# ... (resolved references)
# == Optimized Logical Plan ==
# ... (with predicate pushdown, column pruning)
# == Physical Plan ==
# ... (with join strategy, exchange)
# Verify predicate pushdown
# Look for "PushedFilters" in Parquet scan
df.write.parquet("/tmp/employees")
parquet_df = spark.read.parquet("/tmp/employees")
filtered = parquet_df.filter((parquet_df.age > 30) & (parquet_df.salary > 70000))
filtered.explain() # Should show PushedFilters in scan
Example 3: Adaptive Query Execution (Spark 3.0+)
# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.targetPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# Complex query that benefits from AQE
result = spark.sql("""
SELECT
department,
COUNT(*) as cnt,
AVG(salary) as avg_sal,
PERCENTILE(salary, 0.5) as median_sal
FROM employees
GROUP BY department
HAVING COUNT(*) >= 2
""")
# AQE will:
# 1. Coalesce partitions after shuffle (reduces small files)
# 2. Handle data skew automatically (splits large partitions)
# 3. Optimize join strategies based on runtime stats
result.show()
# Check AQE optimizations in UI
print(f"Number of partitions: {result.rdd.getNumPartitions()}")
# AQE runtime statistics
spark.sql("SET -v").filter("key like '%adaptive%'").show(truncate=False)
Example 4: Data Source Optimization
# Read Parquet with predicate pushdown
parquet_df = spark.read.parquet("employee_data.parquet")
# Filter pushed to Parquet reader (PushedFilters in plan)
filtered = parquet_df.filter(
(parquet_df.age > 30) &
(parquet_df.salary > 70000)
)
# Only reads relevant row groups (Parquet statistics)
filtered.explain()
# Write with bucketing for future joins
# bucketBy: Partition data by hash of column
# sortBy: Sort within each bucket
employees.write \
.bucketBy(10, "department") \
.sortBy("id") \
.saveAsTable("bucketed_employees")
# Bucketed join avoids shuffle (same bucketing scheme)
spark.sql("""
SELECT /*+ MAPJOIN(e2) */
e1.name, e2.name as manager
FROM bucketed_employees e1
JOIN employees e2 ON e1.department = e2.department
""").show()
# Write with partitioning for partition pruning
employees.write \
.partitionBy("department") \
.parquet("partitioned_employees/")
# Read with partition pruning
# Only reads partitions matching filter
pruned = spark.read.parquet("partitioned_employees/") \
.filter("department = 'Engineering'")
pruned.explain() # Shows partition pruning
Example 5: Advanced SQL Features
# CTEs (Common Table Expressions)
spark.sql("""
WITH dept_stats AS (
SELECT department, AVG(salary) as avg_sal
FROM employees
GROUP BY department
)
SELECT e.name, e.salary, d.avg_sal,
e.salary - d.avg_sal as diff_from_avg
FROM employees e
JOIN dept_stats d ON e.department = d.department
""").show()
# Window functions in SQL
spark.sql("""
SELECT name, department, salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank,
LAG(salary) OVER (PARTITION BY department ORDER BY salary) as prev_salary
FROM employees
""").show()
# Subqueries
spark.sql("""
SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
""").show()
# Correlated subqueries
spark.sql("""
SELECT e.name, e.salary
FROM employees e
WHERE e.salary > (
SELECT AVG(salary)
FROM employees
WHERE department = e.department
)
""").show()
Performance Metrics
| Query Pattern | Without Catalyst | With Catalyst | Improvement |
|---|---|---|---|
| Simple SELECT | 150ms | 45ms | 3.3x |
| Filter + Agg | 320ms | 85ms | 3.8x |
| JOIN + WHERE | 850ms | 210ms | 4.0x |
| Subquery | 1200ms | 280ms | 4.3x |
| Window Function | 450ms | 120ms | 3.8x |
| Read Parquet | 200ms | 60ms | 3.3x |
| Write Parquet | 300ms | 150ms | 2.0x |
| Shuffle | 500ms | 350ms | 1.4x |
| Whole-Stage Codegen | 100ms | 30ms | 3.3x |
| AQE Optimization | 400ms | 250ms | 1.6x |
Best Practices
1. Enable Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.targetPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
2. Use Broadcast Joins for Small Tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50MB
3. Cache Frequently Used DataFrames
df.cache() # For DataFrames reused multiple times
# Or persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Serialized + disk spill
4. Use Appropriate File Formats
# Parquet for columnar storage (recommended)
df.write.parquet("output/", compression="snappy")
# ORC for Hive integration
df.write.orc("output/")
# Avoid CSV/JSON for large datasets (no predicate pushdown)
5. Partition Data Strategically
# Partition by frequently filtered columns (enables partition pruning)
df.write.partitionBy("year", "month").parquet("output/")
# Use bucketing for join optimization (avoids shuffle)
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
6. Monitor Query Plans
# Always check explain plans
df.explain(True)
# Look for:
# - BroadcastHashJoin (good for small tables)
# - FileScan with PushedFilters (predicate pushdown working)
# - Project with only needed columns (column pruning working)
# - AQE optimizations in physical plan
7. Avoid Python UDFs When Possible
# BAD: Python UDF (loses Catalyst optimization)
from pyspark.sql.functions import udf
@udf(returnType=IntegerType())
def double_it(x):
return x * 2
df.withColumn("doubled", double_it(df.age))
# GOOD: Built-in function (Catalyst optimized)
from pyspark.sql.functions import col
df.withColumn("doubled", col("age") * 2)
# GOOD: Pandas UDF (Vectorized, better than Python UDF)
from pyspark.sql.functions import pandas_udf
@pandas_udf(IntegerType())
def double_it_pandas(s: pd.Series) -> pd.Series:
return s * 2
df.withColumn("doubled", double_it_pandas(df.age))
Key Takeaways
- Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
- Catalyst: Analysis β Optimization β Physical Planning β Code Generation
- Tungsten: Off-heap binary format eliminates GC overhead
- Whole-stage codegen eliminates virtual calls (2-10x speedup)
- AQE provides runtime optimization (coalesce partitions, handle skew)
- Predicate pushdown reduces data volume at source
- Column pruning reduces I/O by only reading needed columns
- Broadcast joins avoid shuffle for small tables
- Use built-in functions over Python UDFs for Catalyst optimization
- Monitor query plans with df.explain(True) to verify optimizations