⚡ Adaptive Query Execution in PySpark
AQE Architecture & Skew Resolution
Architecture Diagram
Detailed Explanation
What is AQE?
Adaptive Query Execution (AQE) is Spark's framework for dynamically optimizing query plans at runtime based on actual data statistics.
Key Takeaway: AQE addresses inaccurate statistics by re-evaluating and modifying the physical plan at stage boundaries.
Three Types of Dynamic Optimizations
| Optimization | Trigger | Benefit |
|---|---|---|
| Join Strategy Switching | Actual size differs from estimate | Sort-merge → broadcast hash join |
| Shuffle Partition Coalescing | Over-partitioning detected | Merge small partitions |
| Skew Partition Splitting | Partition size exceeds threshold | Split skewed partitions |
Join Strategy Switching
AQE switches join types based on actual data size:
| Condition | Strategy |
|---|---|
Smaller side < adaptiveBroadcastJoinThreshold | Broadcast Hash Join |
| Otherwise | Sort-Merge Join |
Impact: Can reduce query time from minutes to seconds.
Shuffle Partition Coalescing
Addresses over-partitioning when spark.sql.shuffle.partitions is too high.
| Problem | Solution |
|---|---|
| 200 small files created | AQE merges partitions below threshold |
| Unnecessary I/O overhead | Reduces to optimal partition count |
Configuration: spark.sql.adaptive.coalescePartitions.minPartitionSize
Skew Detection and Resolution
| Step | Action |
|---|---|
| Detection | Compare partition size to median |
| Threshold | skewedPartitionFactor (default 5) |
| Splitting | Sample skewed partition, identify offending keys |
| Redistribution | Separate sub-partitions for skewed keys |
Stage Boundaries
AQE operates at shuffle boundaries in the query plan.
- Collects runtime statistics from completed stage
- Re-optimizes plan for subsequent stages
- Cannot optimize within a single stage
- More opportunities with multiple joins/aggregations
Essential Configuration
| Setting | Purpose |
|---|---|
spark.sql.adaptive.enabled | Master switch |
spark.sql.adaptive.coalescePartitions.enabled | Enable partition coalescing |
spark.sql.adaptive.skewJoin.enabled | Enable skew handling |
spark.sql.adaptive.autoBroadcastJoinThreshold | Max size for auto broadcast |
Best Practice: Enable AQE for all Spark 3.x+ workloads—provides automatic optimization with minimal overhead.
Mathematical Foundations
Definition: Adaptive Query Execution
AQE dynamically re-optimizes query plans during execution based on runtime statistics. A plan is re-optimized at stage boundaries when observed statistics differ from estimated statistics by more than threshold :
Skew Detection
A partition is skewed if its size exceeds the median by factor :
AQE splits skewed partitions into sub-partitions of target size .
Cost-Based Optimization Theorem
Given plan alternatives with estimated costs , AQE selects:
at each re-optimization point, where is the current runtime statistics estimate.
Dynamic Join Strategy
For join of tables and with runtime sizes and :
AQE switches strategies mid-query when actual sizes differ from estimates.
AQE Overhead
The overhead of AQE is:
where is the number of re-optimization points. Target: .
Key Insight
AQE is most beneficial for multi-stage queries with intermediate shuffles where cardinality estimates are unreliable. For simple queries with accurate statistics, AQE overhead may exceed benefit. Enable selectively via spark.sql.adaptive.enabled.
Summary
AQE improves query performance by 20-50% through dynamic plan re-optimization. Key mechanisms include skew split, dynamic join strategy selection, and optimized sort-merge join. The cost of re-optimization must be justified by the improvement in plan quality.
Key Concepts Table
| Concept | Description | Configuration |
|---|---|---|
| AQE Enabled | Master switch for adaptive execution | spark.sql.adaptive.enabled=true |
| Join Strategy Switching | Dynamically change join type based on runtime stats | spark.sql.adaptive.autoBroadcastJoinThreshold |
| Partition Coalescing | Merge small partitions after shuffle | spark.sql.adaptive.coalescePartitions.enabled=true |
| Skew Detection | Identify partitions with disproportionate size | spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 |
| Skew Splitting | Split skewed partitions into sub-partitions | spark.sql.adaptive.skewJoin.skewedPartitionThreshold=256MB |
| Local Shuffle Reader | Read coalesced partitions without global shuffle | spark.sql.adaptive.localShuffleReader.enabled=true |
| Stage Boundary | Point where AQE collects stats and re-optimizes | Automatic at shuffle boundaries |
| Runtime Statistics | Actual row counts, file sizes, partition sizes | Collected at each stage boundary |
| Min Partition Size | Minimum size for coalesced partitions | spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB |
| Target Post-Shuffle Partitions | Optimal partition count after coalescing | spark.sql.adaptive.advisoryPartitionSizeInMB=64MB |
Code Examples
Enabling and Configuring AQE
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("AdaptiveQueryExecution") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB") \
.config("spark.sql.adaptive.advisoryPartitionSizeInMB", "64") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionThreshold", "256MB") \
.config("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB") \
.config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Create skewed dataset
from pyspark.sql.functions import rand, when, concat, lit
# Orders with skewed customer distribution (some customers have 100x more orders)
orders_df = spark.range(0, 10_000_000) \
.withColumn("order_id", concat(lit("ORD-"), col("id"))) \
.withColumn("customer_id",
when(rand() < 0.001, concat(lit("VIP-"), (col("id") % 10))) # 0.1% customers = 50% orders
.when(rand() < 0.01, concat(lit("PREMIUM-"), (col("id") % 100))) # 1% customers
.otherwise(concat(lit("REGULAR-"), (col("id") % 100000))) # 99% customers
) \
.withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)"))
# Small customers table
customers_df = spark.range(0, 100_000) \
.withColumn("customer_id", concat(lit("REGULAR-"), col("id"))) \
.withColumn("name", concat(lit("Customer_"), col("id"))) \
.withColumn("segment",
when(rand() < 0.1, "VIP")
.when(rand() < 0.3, "Premium")
.otherwise("Regular"))
# Write tables
orders_df.write.mode("overwrite").saveAsTable("skewed_orders")
customers_df.write.mode("overwrite").saveAsTable("skewed_customers")
AQE Join Strategy Switching
# Query that benefits from AQE join switching
# Without AQE: SortMergeJoin (based on stale statistics)
# With AQE: BroadcastHashJoin (based on actual small table size)
# Enable AQE logging to see decisions
spark.sparkContext.setLogLevel("INFO")
# Execute join query
result = spark.sql("""
SELECT
o.order_id,
o.amount,
c.name,
c.segment
FROM skewed_orders o
JOIN skewed_customers c ON o.customer_id = c.customer_id
WHERE c.segment = 'VIP'
""")
# Force execution and collect plan
result.write.mode("overwrite").saveAsTable("aqe_result")
# Check the physical plan after AQE optimization
spark.sql("""
EXPLAIN FORMATTED
SELECT o.order_id, o.amount, c.name
FROM skewed_orders o
JOIN skewed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)
# Verify AQE is active
spark.sql("SET -v").filter("spark.sql.adaptive.enabled").show(truncate=False)
Skew Detection and Resolution
# Create heavily skewed data for demonstration
skewed_data = []
# Normal keys (100 rows each)
for i in range(100):
for j in range(100):
skewed_data.append((f"key_{i}", f"value_{j}", j * 1.0))
# Skewed key (1M rows)
for j in range(1_000_000):
skewed_data.append(("skewed_key", f"value_{j}", j * 1.0))
skewed_df = spark.createDataFrame(skewed_data, ["key", "value", "amount"])
# Without AQE: skewed_key creates one massive partition
# With AQE: skewed_key is split across multiple partitions
# Write skewed data
skewed_df.write.mode("overwrite").saveAsTable("heavily_skewed")
# Perform aggregation that exposes skew
spark.sql("""
SELECT key, COUNT(*) as cnt, SUM(amount) as total
FROM heavily_skewed
GROUP BY key
ORDER BY cnt DESC
""").show(10, truncate=False)
# Join with skew
skewed_df2 = spark.createDataFrame(
[(f"key_{i}", f"lookup_{i}") for i in range(100)] + [("skewed_key", "skewed_lookup")],
["key", "lookup_value"]
)
# This join will be optimized by AQE to handle the skewed key
result = skewed_df.join(skewed_df2, "key")
result.write.mode("overwrite").saveAsTable("skew_resolved_result")
# Verify skew was handled
spark.sql("""
EXPLAIN FORMATTED
SELECT * FROM heavily_skewed h
JOIN (SELECT key, lookup_value FROM
(VALUES ('key_0', 'v0'), ('skewed_key', 'sv')) AS t(key, lookup_value)
) l ON h.key = l.key
""").show(truncate=False)
Partition Coalescing
# Demonstrate partition coalescing
# Start with high shuffle partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")
# Small dataset that doesn't need 500 partitions
small_df = spark.range(0, 10_000) \
.withColumn("key", col("id") % 100) \
.withColumn("value", rand())
# Without AQE: 500 partitions (most empty or tiny)
# With AQE: Coalesced to optimal count based on actual data size
result = small_df.groupBy("key").agg(sum("value").alias("total"))
# Check number of partitions in the plan
result.write.mode("overwrite").saveAsTable("coalesced_result")
# Verify partition count was reduced
spark.sql("""
SELECT
input_file_name(),
count(*) as row_count
FROM coalesced_result
GROUP BY input_file_name()
""").show(truncate=False)
# Compare with explicit coalesce
spark.conf.set("spark.sql.shuffle.partitions", "200")
result_explicit = small_df.repartition(20, "key") \
.groupBy("key").agg(sum("value").alias("total"))
# AQE automatically determines optimal partition count
spark.conf.set("spark.sql.shuffle.partitions", "500") # Over-partition
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE will coalesce to appropriate count
adaptive_result = small_df.groupBy("key").agg(sum("value").alias("total"))
adaptive_result.write.mode("overwrite").saveAsTable("adaptive_coalesced")
Performance Metrics
| Metric | Without AQE | With AQE (Join Switch) | With AQE (Skew Resolve) | With AQE (Coalesce) |
|---|---|---|---|---|
| Join Execution Time | 120-180 sec | 5-15 sec | 30-60 sec | N/A |
| Skew Impact | 10-50x slower | N/A | 2-5x faster | N/A |
| Shuffle Partition Count | Fixed (200) | Fixed (200) | Fixed (200) | Dynamic (10-50) |
| Small File Count | High (200) | High (200) | High (200) | Low (10-50) |
| Memory Usage | High (skew) | Low (broadcast) | Moderate (split) | Low (coalesced) |
| Query Planning Time | 1-2 sec | 2-4 sec | 3-6 sec | 2-3 sec |
| Stage Re-optimization | None | 1-2 per query | 1-3 per query | 1-2 per query |
| Statistics Collection | Pre-execution only | Runtime per stage | Runtime per stage | Runtime per stage |
| Plan Adaptation | Static | Dynamic join type | Dynamic partition split | Dynamic partition count |
| Total Query Improvement | Baseline | 10-30x faster | 2-10x faster | 20-50% faster |
Best Practices
- Always enable AQE (
spark.sql.adaptive.enabled=true) for Spark 3.x+ workloads—it provides automatic optimization with minimal overhead - Set
spark.sql.adaptive.autoBroadcastJoinThresholdto 10-100MB based on driver memory to enable automatic broadcast join switching - Configure skew detection with
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5for most workloads; increase to 10 for naturally skewed data - Use
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MBto prevent creating partitions smaller than 1MB - Monitor AQE decisions in Spark UI under the "Adaptive Execution" section to understand what optimizations were applied
- Keep
spark.sql.shuffle.partitionshigh initially (200-1000) and let AQE coalesce to the optimal count—this avoids under-partitioning - Test with
spark.sql.adaptive.enabled=falseperiodically to measure AQE's impact on your specific workloads - Use
EXPLAIN FORMATTEDafter query execution to see the post-AQE optimized plan - Combine AQE with dynamic partition pruning for maximum benefit on partitioned tables
- Increase driver memory for AQE workloads since it collects and processes runtime statistics at each stage boundary
- Avoid disabling AQE for production workloads unless you have specific compatibility issues—overriding defaults should be rare
- Use
spark.sql.adaptive.skewJoin.enabled=trueexplicitly even though it's the default, to ensure it's not overridden by other configurations
Mathematical Foundation Summary
AQE optimizes queries dynamically using runtime statistics collected at stage boundaries. Optimal partition count is computed as optimal_partitions = total_shuffle_size / target_shuffle_size. Skew detection identifies partitions where partition_size > median_size × skew_threshold. Split skew partitions into split_count = ceil(partition_size / target_size) sub-partitions. AQE reduces query cost by ΔCost = Σ(optimal_shuffle_bytes - actual_shuffle_bytes) which typically achieves 20-50% improvement on skewed datasets.
See also: Bucketing Strategies (29), Cost Optimization (39), Production Hardening (40)
See Also
- Partitioning Strategies — Data partitioning approaches
- Join Optimization — Join performance optimization
- Bucketing Strategies — Data bucketing approaches
- Cost Optimization — Cost optimization patterns