πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

PySpark SQL Engine: Execution, Catalyst Optimizer, and Tungsten

pysparkSQL Engine🟒 Free Lesson

Advertisement

PySpark SQL Engine

DfSpark SQL Engine

The Spark SQL Engine is the computational backend that translates SQL queries and DataFrame operations into optimized RDD computations. It consists of the Catalyst optimizer (logical/physical plan optimization) and the Tungsten execution engine (memory management and code generation).

DfWhole-Stage Code Generation

Whole-stage code generation (Tungsten) compiles multiple operators into a single optimized JVM function, eliminating virtual function calls and leveraging CPU registers and SIMD instructions. This can improve query performance by 2x–10x.

DfAdaptive Query Execution (AQE)

Adaptive Query Execution is a Spark 3.0+ feature that optimizes query plans at runtime based on actual data statistics. It can coalesce shuffle partitions, convert sort-merge joins to broadcast joins, and handle data skew automatically.

DfPhysical Plan

A Physical Plan is the concrete execution strategy chosen by Catalyst, specifying exactly how each operation will be executed (e.g., which join algorithm, which scan method, which aggregation strategy).

Catalyst Cost Model
Costplan=Costscan+Costfilter+Costproject+Costjoin+CostaggCost_{plan} = Cost_{scan} + Cost_{filter} + Cost_{project} + Cost_{join} + Cost_{agg}

Here,

  • CostscanCost_{scan}=I/O cost of reading source data
  • CostfilterCost_{filter}=CPU cost of applying filter predicates
  • CostprojectCost_{project}=CPU cost of column projection
  • CostjoinCost_{join}=Shuffle + compute cost of join operations
  • CostaggCost_{agg}=Shuffle + compute cost of aggregations

Tungsten Memory Format

Rowbinary=[null_bitmap∣fixed_length_columns∣variable_length_offsets∣variable_length_columns]Row_{binary} = [null\_bitmap | fixed\_length\_columns | variable\_length\_offsets | variable\_length\_columns]

Here,

  • null_bitmapnull\_bitmap=Bit array marking NULL values (1 bit per column)
  • fixed_length_columnsfixed\_length\_columns=Fixed-width values stored sequentially (8 bytes each)
  • variable_length_offsetsvariable\_length\_offsets=Offset pointers to variable-length data
  • variable_length_columnsvariable\_length\_columns=Variable-width values (strings, arrays, maps)
Join Selectivity
Sjoin=∣Rβˆ£Γ—βˆ£S∣max⁑(∣R∣,∣S∣)Γ—βˆ£U∣S_{join} = \frac{|R| \times |S|}{\max(|R|, |S|) \times |U|}

Here,

  • SjoinS_{join}=Selectivity of the join (fraction of cross product)
  • ∣R∣|R|=Number of rows in left relation
  • ∣S∣|S|=Number of rows in right relation
  • ∣U∣|U|=Number of distinct join key values

AQE Partition Coalescing

Pfinal=max⁑(Sshuffle_outputTtarget,Pmin)P_{final} = \max\left(\frac{S_{shuffle\_output}}{T_{target}}, P_{min}\right)

Here,

  • PfinalP_{final}=Final number of partitions after coalescing
  • Sshuffle_outputS_{shuffle\_output}=Total shuffle output size in bytes
  • TtargetT_{target}=Target partition size (default 64MB)
  • PminP_{min}=Minimum number of partitions

Catalyst optimization rules include: Constant Folding, Boolean Simplification, Filter Pushdown, Column Pruning, Join Reordering, and Subquery Elimination. These rules are applied iteratively until no further improvements are found.

Enable AQE (Adaptive Query Execution) in Spark 3.x to automatically optimize shuffle partition count, convert sort-merge joins to broadcast joins at runtime, and handle data skew.

Whole-stage code generation is disabled when using Python UDFs. For performance-critical operations, use built-in Spark SQL functions instead of Python UDFs to leverage Tungsten optimizations.

ThTungsten Memory Efficiency

Theorem: Tungsten's off-heap binary row format achieves memory efficiency β‰₯ 2x compared to Java object serialization by eliminating object headers, pointer indirection, and GC pressure. This allows processing datasets larger than heap size through managed off-heap allocation.

  • Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
  • Catalyst: Analysis β†’ Optimization β†’ Physical Planning β†’ Code Generation
  • Tungsten: Off-heap binary format eliminates GC overhead, whole-stage codegen eliminates virtual calls
  • AQE (Adaptive Query Execution) provides runtime optimization
  • Predicate pushdown reduces data volume before shuffle/join operations
  • Join selectivity = (|R| Γ— |S|) / (max(|R|, |S|) Γ— |U|)
  • AQE partition coalescing reduces small files: P_final = max(shuffleSize/targetSize, minPartitions)

SQL Execution Flow

Spark SQLUnresolved PlanAnalyzerResolved PlanOptimizerLogical PlanPlannerPhysical PlanCodeGenRDDsAQE Decision PointsCoalesce PartitionsMerge small shufflesConvert to BroadcastSMJ β†’ BroadcastSkew JoinSplit skewed partitionsDynamic JoinAdaptive strategy

Architecture Diagram

Detailed Explanation

1. Spark SQL Engine Overview

Spark SQL is a Spark module for structured data processing.

Key Capabilities:

  • Programming abstraction called DataFrames and Datasets
  • SQL query engine that can run SQL/HiveQL queries
  • Integration with Spark's ecosystem (Spark Streaming, MLlib)
  • Connectivity to various data sources (Parquet, JSON, JDBC, Hive)

Three Main Components:

  1. Catalyst Optimizer: Transforms logical plans into optimized physical plans
  2. Tungsten Execution Engine: Efficient runtime execution with code generation
  3. Data Source API: Integration with various storage systems

2. Catalyst Optimizer Deep Dive

The Catalyst optimizer is a framework for manipulating query plans.

Core Concepts:

  • Trees: Representations of query plans
  • Rules: Transformations applied to trees
  • Strategies: Ordering of rule application

Optimization Techniques:

TechniqueBeforeAfter
Predicate PushdownProject(name) β†’ Filter(age > 30) β†’ Scan(all)Project(name) β†’ Scan(name, age) β†’ Filter(age > 30)
Column PruningProject(name, age) β†’ Scan(id, name, age, salary, dept)Project(name, age) β†’ Scan(name, age)
Constant FoldingFilter(age > 25 + 5)Filter(age > 30)
Join Reordering(A β‹ˆ B) β‹ˆ CA β‹ˆ (B β‹ˆ C) β€” if Bβ‹ˆC is smaller

3. Tungsten Execution Engine

Tungsten focuses on three areas:

AreaDescription
Memory ManagementOff-heap memory; binary row format; cache-aware algorithms
Code GenerationWhole-stage codegen; tight JVM bytecode loops; eliminates virtual calls
Columnar ProcessingVectorized operations; SIMD-friendly layout; efficient compression

4. Physical Plan Strategies

Join Selection:

StrategyWhen Used
Broadcast Hash JoinSmall tables (< 10MB default)
Sort-Merge JoinLarge tables with sorted data
Shuffle Hash JoinMedium-sized tables
Cartesian ProductCross joins (avoid!)

Aggregation Strategies:

  • Hash Aggregation: Fast, uses hash table
  • Sort Aggregation: Better for sorted input
  • Tungsten Aggregation: Optimized with code generation

5. Shuffle and Exchange Operations

Shuffles occur when data needs to be redistributed across partitions.

Triggers:

  • Wide transformations (groupBy, join, repartition)
  • Data exchange between stages
  • Most expensive operation in Spark

Shuffle Optimization:

  1. Use broadcast joins to avoid shuffle
  2. Partition data to minimize shuffle
  3. Use bucketing for repeated joins
  4. Tune shuffle partition count

6. Data Source API

Spark supports multiple data sources:

TypeFormats
File formatsParquet, ORC, JSON, CSV, Text
JDBCRelational databases
HiveHive tables
CustomDataSource API v2

Predicate Pushdown to Sources:

  • Parquet: Row group filtering via statistics
  • ORC: Bloom filter pushdown
  • JDBC: SQL WHERE clause pushdown

7. Performance Monitoring

Spark UI Locations:

  • SQL tab: Query plans, execution times
  • Stages tab: Task distribution, shuffle read/write
  • Storage tab: Cached data, memory usage

Key Metrics to Watch:

  • scanTime: Time to read data
  • shuffleReadTime: Time to read shuffle data
  • shuffleWriteTime: Time to write shuffle data
  • taskDuration: Time per task
  • gcTime: Garbage collection time

Best Practice: Always use df.explain(True) to verify Catalyst optimizations are being applied.

Key Concepts Table

ConceptDescriptionOptimization
CatalystQuery optimizer for logical/physical plansAutomatic plan optimization
TungstenExecution engine with code generationWhole-stage code generation
Predicate PushdownFilter pushed to data sourceReduce I/O
Column PruningRead only required columnsReduce memory/I/O
Constant FoldingEvaluate constants at compile timeReduce runtime computation
Broadcast JoinSmall table broadcast to executorsAvoid shuffle
Sort-Merge JoinJoin sorted partitionsEfficient for large tables
Hash AggregationUse hash table for aggregationFast for groupBy
ShuffleData redistribution across partitionsMost expensive operation
Whole-Stage CodegenGenerate tight JVM bytecodeEliminate virtual calls
AQERuntime query plan optimizationAdaptive to data statistics

Code Examples

Example 1: SQL Queries with Catalyst

from pyspark.sql import SparkSession

# Create SparkSession with AQE enabled
spark = SparkSession.builder \
    .appName("SparkSQLEngine") \
    .config("spark.sql.adaptive.enabled", "true") \  # Enable AQE
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \  # Coalesce small partitions
    .getOrCreate()

# Create tables
employees = spark.createDataFrame([
    (1, "Alice", "Engineering", 30, 75000),
    (2, "Bob", "Marketing", 25, 65000),
    (3, "Charlie", "Engineering", 35, 90000),
    (4, "Diana", "Marketing", 28, 70000),
    (5, "Eve", "Engineering", 32, 85000),
    (6, "Frank", "Sales", 29, 60000),
    (7, "Grace", "Sales", 31, 68000)
], ["id", "name", "department", "age", "salary"])

# Create temporary view for SQL access
employees.createOrReplaceTempView("employees")

# Simple query with GROUP BY and HAVING
result = spark.sql("""
    SELECT department, 
           COUNT(*) as emp_count,
           AVG(salary) as avg_salary,
           MAX(age) as max_age
    FROM employees
    WHERE age > 25
    GROUP BY department
    HAVING COUNT(*) > 1
    ORDER BY avg_salary DESC
""")

result.show()

# Explain the query plan (True = all plans)
# Shows: Parsed β†’ Analyzed β†’ Optimized β†’ Physical
result.explain(True)

# Get query execution details
print(f"Number of output rows: {result.count()}")
print(f"Query execution time: {result.head(1)}")

Example 2: Catalyst Optimizer Rules

# Show optimized plan
df = spark.sql("""
    SELECT e.name, e.salary, d.location
    FROM employees e
    JOIN departments d ON e.department = d.name
    WHERE e.age > 30
    AND e.salary > 70000
""")

# Print physical plan with optimizations
df.explain(True)

# Output shows:
# == Parsed Logical Plan ==
# ...
# == Analyzed Logical Plan ==
# ... (resolved references)
# == Optimized Logical Plan ==
# ... (with predicate pushdown, column pruning)
# == Physical Plan ==
# ... (with join strategy, exchange)

# Verify predicate pushdown
# Look for "PushedFilters" in Parquet scan
df.write.parquet("/tmp/employees")
parquet_df = spark.read.parquet("/tmp/employees")
filtered = parquet_df.filter((parquet_df.age > 30) & (parquet_df.salary > 70000))
filtered.explain()  # Should show PushedFilters in scan

Example 3: Adaptive Query Execution (Spark 3.0+)

# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.targetPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# Complex query that benefits from AQE
result = spark.sql("""
    SELECT 
        department,
        COUNT(*) as cnt,
        AVG(salary) as avg_sal,
        PERCENTILE(salary, 0.5) as median_sal
    FROM employees
    GROUP BY department
    HAVING COUNT(*) >= 2
""")

# AQE will:
# 1. Coalesce partitions after shuffle (reduces small files)
# 2. Handle data skew automatically (splits large partitions)
# 3. Optimize join strategies based on runtime stats
result.show()

# Check AQE optimizations in UI
print(f"Number of partitions: {result.rdd.getNumPartitions()}")

# AQE runtime statistics
spark.sql("SET -v").filter("key like '%adaptive%'").show(truncate=False)

Example 4: Data Source Optimization

# Read Parquet with predicate pushdown
parquet_df = spark.read.parquet("employee_data.parquet")

# Filter pushed to Parquet reader (PushedFilters in plan)
filtered = parquet_df.filter(
    (parquet_df.age > 30) & 
    (parquet_df.salary > 70000)
)

# Only reads relevant row groups (Parquet statistics)
filtered.explain()

# Write with bucketing for future joins
# bucketBy: Partition data by hash of column
# sortBy: Sort within each bucket
employees.write \
    .bucketBy(10, "department") \
    .sortBy("id") \
    .saveAsTable("bucketed_employees")

# Bucketed join avoids shuffle (same bucketing scheme)
spark.sql("""
    SELECT /*+ MAPJOIN(e2) */
        e1.name, e2.name as manager
    FROM bucketed_employees e1
    JOIN employees e2 ON e1.department = e2.department
""").show()

# Write with partitioning for partition pruning
employees.write \
    .partitionBy("department") \
    .parquet("partitioned_employees/")

# Read with partition pruning
# Only reads partitions matching filter
pruned = spark.read.parquet("partitioned_employees/") \
    .filter("department = 'Engineering'")
pruned.explain()  # Shows partition pruning

Example 5: Advanced SQL Features

# CTEs (Common Table Expressions)
spark.sql("""
    WITH dept_stats AS (
        SELECT department, AVG(salary) as avg_sal
        FROM employees
        GROUP BY department
    )
    SELECT e.name, e.salary, d.avg_sal,
           e.salary - d.avg_sal as diff_from_avg
    FROM employees e
    JOIN dept_stats d ON e.department = d.department
""").show()

# Window functions in SQL
spark.sql("""
    SELECT name, department, salary,
           ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank,
           LAG(salary) OVER (PARTITION BY department ORDER BY salary) as prev_salary
    FROM employees
""").show()

# Subqueries
spark.sql("""
    SELECT name, salary
    FROM employees
    WHERE salary > (SELECT AVG(salary) FROM employees)
""").show()

# Correlated subqueries
spark.sql("""
    SELECT e.name, e.salary
    FROM employees e
    WHERE e.salary > (
        SELECT AVG(salary) 
        FROM employees 
        WHERE department = e.department
    )
""").show()

Performance Metrics

Query PatternWithout CatalystWith CatalystImprovement
Simple SELECT150ms45ms3.3x
Filter + Agg320ms85ms3.8x
JOIN + WHERE850ms210ms4.0x
Subquery1200ms280ms4.3x
Window Function450ms120ms3.8x
Read Parquet200ms60ms3.3x
Write Parquet300ms150ms2.0x
Shuffle500ms350ms1.4x
Whole-Stage Codegen100ms30ms3.3x
AQE Optimization400ms250ms1.6x

Best Practices

1. Enable Adaptive Query Execution

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.targetPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

2. Use Broadcast Joins for Small Tables

from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800")  # 50MB

3. Cache Frequently Used DataFrames

df.cache()  # For DataFrames reused multiple times
# Or persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # Serialized + disk spill

4. Use Appropriate File Formats

# Parquet for columnar storage (recommended)
df.write.parquet("output/", compression="snappy")

# ORC for Hive integration
df.write.orc("output/")

# Avoid CSV/JSON for large datasets (no predicate pushdown)

5. Partition Data Strategically

# Partition by frequently filtered columns (enables partition pruning)
df.write.partitionBy("year", "month").parquet("output/")

# Use bucketing for join optimization (avoids shuffle)
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")

6. Monitor Query Plans

# Always check explain plans
df.explain(True)

# Look for:
# - BroadcastHashJoin (good for small tables)
# - FileScan with PushedFilters (predicate pushdown working)
# - Project with only needed columns (column pruning working)
# - AQE optimizations in physical plan

7. Avoid Python UDFs When Possible

# BAD: Python UDF (loses Catalyst optimization)
from pyspark.sql.functions import udf
@udf(returnType=IntegerType())
def double_it(x):
    return x * 2
df.withColumn("doubled", double_it(df.age))

# GOOD: Built-in function (Catalyst optimized)
from pyspark.sql.functions import col
df.withColumn("doubled", col("age") * 2)

# GOOD: Pandas UDF (Vectorized, better than Python UDF)
from pyspark.sql.functions import pandas_udf
@pandas_udf(IntegerType())
def double_it_pandas(s: pd.Series) -> pd.Series:
    return s * 2
df.withColumn("doubled", double_it_pandas(df.age))

Key Takeaways

  • Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
  • Catalyst: Analysis β†’ Optimization β†’ Physical Planning β†’ Code Generation
  • Tungsten: Off-heap binary format eliminates GC overhead
  • Whole-stage codegen eliminates virtual calls (2-10x speedup)
  • AQE provides runtime optimization (coalesce partitions, handle skew)
  • Predicate pushdown reduces data volume at source
  • Column pruning reduces I/O by only reading needed columns
  • Broadcast joins avoid shuffle for small tables
  • Use built-in functions over Python UDFs for Catalyst optimization
  • Monitor query plans with df.explain(True) to verify optimizations

See Also

⭐

Premium Content

PySpark SQL Engine: Execution, Catalyst Optimizer, and Tungsten

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement