🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Bucketing Strategies in PySpark

🟢 Free Lesson

Advertisement

🪣 Bucketing Strategies in PySpark

Bucketing Architecture

Bucketing: hash(key) → bucket fileTable ATable BBUCKET 4BUCKET 4Bucket 0part-00000Bucket 1part-00001Bucket 2part-00002Bucket 3part-00003Same bucket number = same file layoutBucket PruningWHERE bucket_id = hash(key) % 4Read 1 fileSkip 3 filesZero shuffle for same-bucket joinsdf.write.bucketBy(4, "key").saveAsTable("t")

Architecture Diagram

Detailed Explanation

What is Bucketing?

Bucketing distributes data into a fixed number of files (buckets) based on hash value of columns.

Key Takeaway: Unlike partitioning (directory structures), bucketing organizes data within a directory into hash-based file groups.


Core Benefits

BenefitDescription
Eliminate ShuffleWhen both tables bucketed on join key
Bucket PruningSkip entire buckets for point queries
Sorted OrderEfficient range scans within buckets

Hash Function and Bucket Assignment

  • Uses murmur3 hash (Spark's default) producing 32-bit integer
  • Bucket index: hash(column_value) % num_buckets
  • All rows with same bucketing column value → same bucket file

Bucketed Join Logic:

  1. Both tables must have same number of buckets
  2. Each bucket in Table A joins with corresponding bucket in Table B
  3. No shuffle required—bucket-aware sort-merge join

Bucket Pruning Optimization

When query includes equality predicate on bucketing column:

Query TypePruning Triggered?
WHERE customer_id = 'CUST-001'Yes
WHERE amount > 500No
WHERE region = 'North'No

Performance Impact: Can skip 99%+ of data for point queries.


Number of Buckets

Too FewToo ManyOptimal
Large files, slow scansMany small files, metadata overhead128-256 MB per file

Calculation: num_buckets = total_data_size / target_file_size

Important: Bucket count must match between tables for bucketed joins.


Sorted Bucketing

Additional optimization within each bucket:

  • Data sorted by bucketing columns
  • Enables efficient range scans
  • Sort order maintained at file level
  • Concatenation of files preserves sort order

Bucketing + Partitioning Interaction

LayerCreates
PartitioningDirectory structure (e.g., date=2024-01-15/)
BucketingFiles within each partition directory

Pruning: Bucket pruning only works within a partition—queries filtering on both benefit from both strategies.

Warning: Over-partitioning + bucketing can create too many small files.

Mathematical Foundations

Definition: Hash Bucketing

A hash bucketing strategy partitions dataset DD into BB buckets via a hash function h:key{0,1,,B1}h: \text{key} \rightarrow \{0, 1, \ldots, B-1\}. Bucket bb contains:

Db={rD:h(r.key)=b}D_b = \{r \in D : h(r.\text{key}) = b\}

Bucket Pruning

For equi-join on bucketed columns with BB buckets, only matching buckets need scanning:

Scanned(D)=DB(per table, per matching bucket)\text{Scanned}(D) = \frac{|D|}{B} \quad \text{(per table, per matching bucket)}

Join scans reduce from D1×D2|D_1| \times |D_2| to B×D1B×D2B=D1×D2BB \times \frac{|D_1|}{B} \times \frac{|D_2|}{B} = \frac{|D_1| \times |D_2|}{B}.

Load Balance Theorem

For hash function hh mapping nn keys to BB buckets, the expected maximum bucket size is:

E[maxbDb]nB+2nlnBB\mathbb{E}[\max_b |D_b|] \approx \frac{n}{B} + \sqrt{\frac{2n \ln B}{B}}

Load imbalance approaches 0 as n/Bn/B \rightarrow \infty for good hash functions.

Sort-Merge Join Cost

Without bucketing, sort-merge join cost is:

C=O(D1logD1+D2logD2+D1+D2)C = O\left(|D_1| \log |D_1| + |D_2| \log |D_2| + |D_1| + |D_2|\right)

With bucketing, sorting is eliminated: Cbucketed=O(D1+D2)C_{\text{bucketed}} = O(|D_1| + |D_2|).

Bucket Count Selection

Optimal bucket count BB minimizes total I/O:

B=argminB(B×D1/BSblock×costread+overhead(B))B^* = \arg\min_B \left( B \times \left\lceil \frac{|D_1|/B}{S_{\text{block}}} \right\rceil \times \text{cost}_{\text{read}} + \text{overhead}(B) \right)

where SblockS_{\text{block}} is the HDFS block size.

Key Insight

Bucketing trades write-time computation for read-time savings. The benefit is maximal when the same bucketed column is used for joins across multiple queries. Over-bucketing (B>D/SblockB > |D|/S_{\text{block}}) creates too many small files, hurting I/O.

Summary

Hash bucketing partitions data by key, enabling bucket pruning that reduces join scan by factor BB. Load balance follows a balls-into-bins model. Optimal bucket count balances block-aligned I/O against overhead. Bucketing is most beneficial for repeated equi-joins on the same key.

Key Concepts Table

ConceptDescriptionPerformance Impact
Bucket ColumnColumn used for hash-based distributionDetermines join and pruning efficiency
Number of BucketsFixed count of hash-based file groupsMust match for bucketed joins
Hash Functionmurmur3 hash for bucket assignmentUniform distribution across buckets
Bucket PruningSkip non-matching buckets on queryUp to 99%+ reduction in data scanned
Bucketed JoinJoin without shuffle on matching bucketsEliminates shuffle stage
Sorted BucketingData sorted within each bucketEnables efficient range scans
Bucket MetadataStored in table propertiesRequired for bucket-aware optimizations
File Size per BucketControlled by num_buckets and data sizeTarget 128-256 MB per file
Partition + BucketHierarchical layoutBoth pruning strategies apply
Bucket EvolutionChanging bucket countRequires table rewrite

Code Examples

Creating Bucketed Tables

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("BucketingStrategies") \
    .config("spark.sql.sources.bucketing.enabled", "true") \
    .config("spark.sql.sources.bucketing.maxBuckets", "100000") \
    .getOrCreate()

# Create large dataset for bucketing demonstration
from pyspark.sql.functions import rand, floor, concat, lit

# Generate customer orders (10M rows)
orders_df = spark.range(0, 10_000_000) \
    .withColumn("order_id", concat(lit("ORD-"), col("id"))) \
    .withColumn("customer_id", concat(lit("CUST-"), (col("id") % 1_000_000))) \
    .withColumn("product_id", concat(lit("PROD-"), (col("id") % 50_000))) \
    .withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)")) \
    .withColumn("order_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int")))

# Create customers dataset (1M rows)
customers_df = spark.range(0, 1_000_000) \
    .withColumn("customer_id", concat(lit("CUST-"), col("id"))) \
    .withColumn("name", concat(lit("Customer_"), col("id"))) \
    .withColumn("segment", 
        when(rand() < 0.3, "Premium")
        .when(rand() < 0.6, "Standard")
        .otherwise("Basic"))

# Write bucketed tables
# Bucket by customer_id with 256 buckets
orders_df.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \
    .mode("overwrite") \
    .saveAsTable("bucketed_orders")

customers_df.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \
    .mode("overwrite") \
    .saveAsTable("bucketed_customers")

# Verify bucketing
spark.sql("DESCRIBE EXTENDED bucketed_orders").show(truncate=False)
spark.sql("DESCRIBE EXTENDED bucketed_customers").show(truncate=False)

Bucket Pruning Demonstration

# Enable bucket pruning for queries
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")

# Query with bucket pruning (equality predicate on bucket column)
# This should only scan 1/256 of the data
import time

# Point query - bucket pruning enabled
start = time.time()
result1 = spark.sql("""
    SELECT o.order_id, o.amount, c.name
    FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
    WHERE o.customer_id = 'CUST-000042'
""")
result1.show(truncate=False)
pruned_time = time.time() - start
print(f"Bucket pruning query time: {pruned_time:.2f} seconds")

# Query without bucket pruning (range predicate)
start = time.time()
result2 = spark.sql("""
    SELECT o.order_id, o.amount, c.name
    FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
    WHERE o.amount > 500
""")
result2.show(5, truncate=False)
full_scan_time = time.time() - start
print(f"Full scan query time: {full_scan_time:.2f} seconds")

# Verify bucket pruning in query plan
spark.sql("""
    EXPLAIN SELECT * FROM bucketed_orders 
    WHERE customer_id = 'CUST-000042'
""").show(truncate=False)

Bucket-Aware Joins

# Bucketed join - should NOT shuffle
start = time.time()
bucketed_join_df = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        o.order_date,
        c.name,
        c.segment
    FROM bucketed_orders o
    JOIN bucketed_customers c 
    ON o.customer_id = c.customer_id
""")
bucketed_join_df.count()  # Force execution
bucketed_join_time = time.time() - start
print(f"Bucketed join time: {bucketed_join_time:.2f} seconds")

# Non-bucketed join for comparison
orders_df.write.mode("overwrite").saveAsTable("regular_orders")
customers_df.write.mode("overwrite").saveAsTable("regular_customers")

start = time.time()
regular_join_df = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        o.order_date,
        c.name,
        c.segment
    FROM regular_orders o
    JOIN regular_customers c 
    ON o.customer_id = c.customer_id
""")
regular_join_df.count()  # Force execution
regular_join_time = time.time() - start
print(f"Regular join time: {regular_join_time:.2f} seconds")

print(f"Speedup: {regular_join_time / bucketed_join_time:.1f}x")

# Verify no shuffle in bucketed join
spark.sql("""
    EXPLAIN SELECT * FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)

Multi-Column Bucketing

# Bucket by multiple columns for complex query patterns
complex_orders_df = orders_df \
    .withColumn("region", 
        when(rand() < 0.25, "North")
        .when(rand() < 0.5, "South")
        .when(rand() < 0.75, "East")
        .otherwise("West"))

# Write with multi-column bucketing
complex_orders_df.write \
    .bucketBy(16, "region", "customer_id") \
    .sortBy("region", "customer_id") \
    .mode("overwrite") \
    .saveAsTable("multi_bucket_orders")

# Query benefits from both columns
spark.sql("""
    SELECT region, customer_id, SUM(amount) as total
    FROM multi_bucket_orders
    WHERE region = 'North' AND customer_id = 'CUST-000042'
    GROUP BY region, customer_id
""").show(truncate=False)

# Verify multi-column bucketing
spark.sql("DESCRIBE EXTENDED multi_bucket_orders").show(truncate=False)

Performance Metrics

MetricNon-Bucketed64 Buckets256 Buckets1024 Buckets
File Count (100GB table)~800 files64 files256 files1024 files
Avg File Size128 MB (varies)1.5 GB400 MB100 MB
Write Time (10M rows)45 seconds60 seconds75 seconds90 seconds
Point Query (bucket pruning)8-12 seconds2-4 seconds1-2 seconds0.5-1 second
Range Query (no pruning)8-12 seconds10-15 seconds8-12 seconds12-18 seconds
Bucketed Join (2 tables)120-180 seconds8-15 seconds6-10 seconds10-20 seconds
Non-Bucketed Join120-180 seconds120-180 seconds120-180 seconds120-180 seconds
Shuffle Volume (join)100% of data~0% (bucketed)~0% (bucketed)~0% (bucketed)
Memory per PartitionHigh varianceUniformUniformLow per partition
Concurrent Query PerformanceDegradesStableOptimalDegrades (too many files)

Best Practices

  1. Match bucket counts between join tables to enable bucket-aware joins—mismatched counts force a shuffle even with bucketing
  2. Target 128-256 MB file size per bucket by calculating num_buckets = total_data_size / target_file_size
  3. Use sortBy within buckets for columns commonly used in range queries to enable efficient intra-bucket scans
  4. Avoid bucketing on high-cardinality columns (e.g., UUIDs) unless you need bucket pruning—hash distribution will be uniform regardless
  5. Combine partitioning and bucketing when queries commonly filter on both temporal and entity columns
  6. Monitor small file counts—if buckets contain files < 10 MB, reduce the number of buckets
  7. Use INSERT INTO instead of write.saveAsTable for subsequent loads to maintain bucket structure
  8. Enable spark.sql.sources.bucketing.enabled=true explicitly to ensure bucket pruning is active
  9. Avoid bucket evolution (changing bucket count) as it requires a full table rewrite
  10. Use ANALYZE TABLE after bucketing to update column statistics for the optimizer
  11. Test bucket pruning with EXPLAIN to verify the query plan shows bucket pruning (scan only matching buckets)
  12. Consider bucketing for CDC workloads where upserts on the bucket column benefit from targeted file updates

Mathematical Foundation Summary

Bucketing reduces join cost from O(n × m) shuffle to O(n/b + m/b) per-bucket joins where b is bucket count. The hash function bucket = hash(key) mod b ensures deterministic partitioning across writes. File count per bucket follows files_per_bucket = rows_per_bucket / target_file_size. Optimal bucket count satisfies b ≥ max(n/parallelism, m/parallelism) to maximize partition utilization while maintaining bucket pruning benefits. Skew within buckets occurs when max_bucket_rows / avg_bucket_rows > 2, requiring salting techniques.

See also: Adaptive Query Execution (30), JSON/XML Parsing (28), Cost Optimization (39)

See Also

Premium Content

Bucketing Strategies in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement