🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Adaptive Query Execution in PySpark

🟢 Free Lesson

Advertisement

⚡ Adaptive Query Execution in PySpark

AQE Architecture & Skew Resolution

AQE Runtime OptimizationQuery PlanStage 1Collect statsAQE DecisionOptimize planStage 2Execute optimizedSkew Detection & ResolutionSkewed partitionDetect via statsSplit into sub-partsBalance workloadBefore: 1 partition 10GB, 3 partitions 1GB → After: 4 partitions ~3GB each

Architecture Diagram

Detailed Explanation

What is AQE?

Adaptive Query Execution (AQE) is Spark's framework for dynamically optimizing query plans at runtime based on actual data statistics.

Key Takeaway: AQE addresses inaccurate statistics by re-evaluating and modifying the physical plan at stage boundaries.


Three Types of Dynamic Optimizations

OptimizationTriggerBenefit
Join Strategy SwitchingActual size differs from estimateSort-merge → broadcast hash join
Shuffle Partition CoalescingOver-partitioning detectedMerge small partitions
Skew Partition SplittingPartition size exceeds thresholdSplit skewed partitions

Join Strategy Switching

AQE switches join types based on actual data size:

ConditionStrategy
Smaller side < adaptiveBroadcastJoinThresholdBroadcast Hash Join
OtherwiseSort-Merge Join

Impact: Can reduce query time from minutes to seconds.


Shuffle Partition Coalescing

Addresses over-partitioning when spark.sql.shuffle.partitions is too high.

ProblemSolution
200 small files createdAQE merges partitions below threshold
Unnecessary I/O overheadReduces to optimal partition count

Configuration: spark.sql.adaptive.coalescePartitions.minPartitionSize


Skew Detection and Resolution

StepAction
DetectionCompare partition size to median
ThresholdskewedPartitionFactor (default 5)
SplittingSample skewed partition, identify offending keys
RedistributionSeparate sub-partitions for skewed keys

Stage Boundaries

AQE operates at shuffle boundaries in the query plan.

  • Collects runtime statistics from completed stage
  • Re-optimizes plan for subsequent stages
  • Cannot optimize within a single stage
  • More opportunities with multiple joins/aggregations

Essential Configuration

SettingPurpose
spark.sql.adaptive.enabledMaster switch
spark.sql.adaptive.coalescePartitions.enabledEnable partition coalescing
spark.sql.adaptive.skewJoin.enabledEnable skew handling
spark.sql.adaptive.autoBroadcastJoinThresholdMax size for auto broadcast

Best Practice: Enable AQE for all Spark 3.x+ workloads—provides automatic optimization with minimal overhead.

Mathematical Foundations

Definition: Adaptive Query Execution

AQE dynamically re-optimizes query plans during execution based on runtime statistics. A plan PP is re-optimized at stage boundaries when observed statistics S^\hat{S} differ from estimated statistics SS by more than threshold τ\tau:

Reoptimize(P)    SS^>τ\text{Reoptimize}(P) \iff \|S - \hat{S}\| > \tau

Skew Detection

A partition ii is skewed if its size exceeds the median by factor α\alpha:

Skewed(i)    Pi>αmedian(P1,,Pk)\text{Skewed}(i) \iff |P_i| > \alpha \cdot \text{median}(|P_1|, \ldots, |P_k|)

AQE splits skewed partitions into sub-partitions of target size TT.

Cost-Based Optimization Theorem

Given plan alternatives {P1,,Pm}\{P_1, \ldots, P_m\} with estimated costs {C1,,Cm}\{C_1, \ldots, C_m\}, AQE selects:

P=argminPjCj(S^)P^* = \arg\min_{P_j} C_j(\hat{S})

at each re-optimization point, where S^\hat{S} is the current runtime statistics estimate.

Dynamic Join Strategy

For join of tables AA and BB with runtime sizes A^\hat{|A|} and B^\hat{|B|}:

Strategy={BroadcastHashif B^<thresholdSortMergeotherwise\text{Strategy} = \begin{cases} \text{BroadcastHash} & \text{if } \hat{|B|} < \text{threshold} \\ \text{SortMerge} & \text{otherwise} \end{cases}

AQE switches strategies mid-query when actual sizes differ from estimates.

AQE Overhead

The overhead of AQE is:

OAQE=i=1r(cstatsSi+cplanPi)O_{\text{AQE}} = \sum_{i=1}^{r} \left( c_{\text{stats}} \cdot |S_i| + c_{\text{plan}} \cdot |P_i| \right)

where rr is the number of re-optimization points. Target: OAQE/Ctotal<0.05O_{\text{AQE}} / C_{\text{total}} < 0.05.

Key Insight

AQE is most beneficial for multi-stage queries with intermediate shuffles where cardinality estimates are unreliable. For simple queries with accurate statistics, AQE overhead may exceed benefit. Enable selectively via spark.sql.adaptive.enabled.

Summary

AQE improves query performance by 20-50% through dynamic plan re-optimization. Key mechanisms include skew split, dynamic join strategy selection, and optimized sort-merge join. The cost of re-optimization must be justified by the improvement in plan quality.

Key Concepts Table

ConceptDescriptionConfiguration
AQE EnabledMaster switch for adaptive executionspark.sql.adaptive.enabled=true
Join Strategy SwitchingDynamically change join type based on runtime statsspark.sql.adaptive.autoBroadcastJoinThreshold
Partition CoalescingMerge small partitions after shufflespark.sql.adaptive.coalescePartitions.enabled=true
Skew DetectionIdentify partitions with disproportionate sizespark.sql.adaptive.skewJoin.skewedPartitionFactor=5
Skew SplittingSplit skewed partitions into sub-partitionsspark.sql.adaptive.skewJoin.skewedPartitionThreshold=256MB
Local Shuffle ReaderRead coalesced partitions without global shufflespark.sql.adaptive.localShuffleReader.enabled=true
Stage BoundaryPoint where AQE collects stats and re-optimizesAutomatic at shuffle boundaries
Runtime StatisticsActual row counts, file sizes, partition sizesCollected at each stage boundary
Min Partition SizeMinimum size for coalesced partitionsspark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
Target Post-Shuffle PartitionsOptimal partition count after coalescingspark.sql.adaptive.advisoryPartitionSizeInMB=64MB

Code Examples

Enabling and Configuring AQE

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("AdaptiveQueryExecution") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInMB", "64") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
    .config("spark.sql.adaptive.skewJoin.skewedPartitionThreshold", "256MB") \
    .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Create skewed dataset
from pyspark.sql.functions import rand, when, concat, lit

# Orders with skewed customer distribution (some customers have 100x more orders)
orders_df = spark.range(0, 10_000_000) \
    .withColumn("order_id", concat(lit("ORD-"), col("id"))) \
    .withColumn("customer_id",
        when(rand() < 0.001, concat(lit("VIP-"), (col("id") % 10)))  # 0.1% customers = 50% orders
        .when(rand() < 0.01, concat(lit("PREMIUM-"), (col("id") % 100)))  # 1% customers
        .otherwise(concat(lit("REGULAR-"), (col("id") % 100000)))  # 99% customers
    ) \
    .withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)"))

# Small customers table
customers_df = spark.range(0, 100_000) \
    .withColumn("customer_id", concat(lit("REGULAR-"), col("id"))) \
    .withColumn("name", concat(lit("Customer_"), col("id"))) \
    .withColumn("segment", 
        when(rand() < 0.1, "VIP")
        .when(rand() < 0.3, "Premium")
        .otherwise("Regular"))

# Write tables
orders_df.write.mode("overwrite").saveAsTable("skewed_orders")
customers_df.write.mode("overwrite").saveAsTable("skewed_customers")

AQE Join Strategy Switching

# Query that benefits from AQE join switching
# Without AQE: SortMergeJoin (based on stale statistics)
# With AQE: BroadcastHashJoin (based on actual small table size)

# Enable AQE logging to see decisions
spark.sparkContext.setLogLevel("INFO")

# Execute join query
result = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        c.name,
        c.segment
    FROM skewed_orders o
    JOIN skewed_customers c ON o.customer_id = c.customer_id
    WHERE c.segment = 'VIP'
""")

# Force execution and collect plan
result.write.mode("overwrite").saveAsTable("aqe_result")

# Check the physical plan after AQE optimization
spark.sql("""
    EXPLAIN FORMATTED
    SELECT o.order_id, o.amount, c.name
    FROM skewed_orders o
    JOIN skewed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)

# Verify AQE is active
spark.sql("SET -v").filter("spark.sql.adaptive.enabled").show(truncate=False)

Skew Detection and Resolution

# Create heavily skewed data for demonstration
skewed_data = []

# Normal keys (100 rows each)
for i in range(100):
    for j in range(100):
        skewed_data.append((f"key_{i}", f"value_{j}", j * 1.0))

# Skewed key (1M rows)
for j in range(1_000_000):
    skewed_data.append(("skewed_key", f"value_{j}", j * 1.0))

skewed_df = spark.createDataFrame(skewed_data, ["key", "value", "amount"])

# Without AQE: skewed_key creates one massive partition
# With AQE: skewed_key is split across multiple partitions

# Write skewed data
skewed_df.write.mode("overwrite").saveAsTable("heavily_skewed")

# Perform aggregation that exposes skew
spark.sql("""
    SELECT key, COUNT(*) as cnt, SUM(amount) as total
    FROM heavily_skewed
    GROUP BY key
    ORDER BY cnt DESC
""").show(10, truncate=False)

# Join with skew
skewed_df2 = spark.createDataFrame(
    [(f"key_{i}", f"lookup_{i}") for i in range(100)] + [("skewed_key", "skewed_lookup")],
    ["key", "lookup_value"]
)

# This join will be optimized by AQE to handle the skewed key
result = skewed_df.join(skewed_df2, "key")
result.write.mode("overwrite").saveAsTable("skew_resolved_result")

# Verify skew was handled
spark.sql("""
    EXPLAIN FORMATTED
    SELECT * FROM heavily_skewed h
    JOIN (SELECT key, lookup_value FROM 
          (VALUES ('key_0', 'v0'), ('skewed_key', 'sv')) AS t(key, lookup_value)
    ) l ON h.key = l.key
""").show(truncate=False)

Partition Coalescing

# Demonstrate partition coalescing
# Start with high shuffle partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Small dataset that doesn't need 500 partitions
small_df = spark.range(0, 10_000) \
    .withColumn("key", col("id") % 100) \
    .withColumn("value", rand())

# Without AQE: 500 partitions (most empty or tiny)
# With AQE: Coalesced to optimal count based on actual data size

result = small_df.groupBy("key").agg(sum("value").alias("total"))

# Check number of partitions in the plan
result.write.mode("overwrite").saveAsTable("coalesced_result")

# Verify partition count was reduced
spark.sql("""
    SELECT 
        input_file_name(),
        count(*) as row_count
    FROM coalesced_result
    GROUP BY input_file_name()
""").show(truncate=False)

# Compare with explicit coalesce
spark.conf.set("spark.sql.shuffle.partitions", "200")
result_explicit = small_df.repartition(20, "key") \
    .groupBy("key").agg(sum("value").alias("total"))

# AQE automatically determines optimal partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")  # Over-partition
spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQE will coalesce to appropriate count
adaptive_result = small_df.groupBy("key").agg(sum("value").alias("total"))
adaptive_result.write.mode("overwrite").saveAsTable("adaptive_coalesced")

Performance Metrics

MetricWithout AQEWith AQE (Join Switch)With AQE (Skew Resolve)With AQE (Coalesce)
Join Execution Time120-180 sec5-15 sec30-60 secN/A
Skew Impact10-50x slowerN/A2-5x fasterN/A
Shuffle Partition CountFixed (200)Fixed (200)Fixed (200)Dynamic (10-50)
Small File CountHigh (200)High (200)High (200)Low (10-50)
Memory UsageHigh (skew)Low (broadcast)Moderate (split)Low (coalesced)
Query Planning Time1-2 sec2-4 sec3-6 sec2-3 sec
Stage Re-optimizationNone1-2 per query1-3 per query1-2 per query
Statistics CollectionPre-execution onlyRuntime per stageRuntime per stageRuntime per stage
Plan AdaptationStaticDynamic join typeDynamic partition splitDynamic partition count
Total Query ImprovementBaseline10-30x faster2-10x faster20-50% faster

Best Practices

  1. Always enable AQE (spark.sql.adaptive.enabled=true) for Spark 3.x+ workloads—it provides automatic optimization with minimal overhead
  2. Set spark.sql.adaptive.autoBroadcastJoinThreshold to 10-100MB based on driver memory to enable automatic broadcast join switching
  3. Configure skew detection with spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 for most workloads; increase to 10 for naturally skewed data
  4. Use spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB to prevent creating partitions smaller than 1MB
  5. Monitor AQE decisions in Spark UI under the "Adaptive Execution" section to understand what optimizations were applied
  6. Keep spark.sql.shuffle.partitions high initially (200-1000) and let AQE coalesce to the optimal count—this avoids under-partitioning
  7. Test with spark.sql.adaptive.enabled=false periodically to measure AQE's impact on your specific workloads
  8. Use EXPLAIN FORMATTED after query execution to see the post-AQE optimized plan
  9. Combine AQE with dynamic partition pruning for maximum benefit on partitioned tables
  10. Increase driver memory for AQE workloads since it collects and processes runtime statistics at each stage boundary
  11. Avoid disabling AQE for production workloads unless you have specific compatibility issues—overriding defaults should be rare
  12. Use spark.sql.adaptive.skewJoin.enabled=true explicitly even though it's the default, to ensure it's not overridden by other configurations

Mathematical Foundation Summary

AQE optimizes queries dynamically using runtime statistics collected at stage boundaries. Optimal partition count is computed as optimal_partitions = total_shuffle_size / target_shuffle_size. Skew detection identifies partitions where partition_size > median_size × skew_threshold. Split skew partitions into split_count = ceil(partition_size / target_size) sub-partitions. AQE reduces query cost by ΔCost = Σ(optimal_shuffle_bytes - actual_shuffle_bytes) which typically achieves 20-50% improvement on skewed datasets.

See also: Bucketing Strategies (29), Cost Optimization (39), Production Hardening (40)

See Also

Premium Content

Adaptive Query Execution in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement