🪣 Bucketing Strategies in PySpark
Bucketing Architecture
Architecture Diagram
Detailed Explanation
What is Bucketing?
Bucketing distributes data into a fixed number of files (buckets) based on hash value of columns.
Key Takeaway: Unlike partitioning (directory structures), bucketing organizes data within a directory into hash-based file groups.
Core Benefits
| Benefit | Description |
|---|---|
| Eliminate Shuffle | When both tables bucketed on join key |
| Bucket Pruning | Skip entire buckets for point queries |
| Sorted Order | Efficient range scans within buckets |
Hash Function and Bucket Assignment
- Uses
murmur3hash (Spark's default) producing 32-bit integer - Bucket index:
hash(column_value) % num_buckets - All rows with same bucketing column value → same bucket file
Bucketed Join Logic:
- Both tables must have same number of buckets
- Each bucket in Table A joins with corresponding bucket in Table B
- No shuffle required—bucket-aware sort-merge join
Bucket Pruning Optimization
When query includes equality predicate on bucketing column:
| Query Type | Pruning Triggered? |
|---|---|
WHERE customer_id = 'CUST-001' | Yes |
WHERE amount > 500 | No |
WHERE region = 'North' | No |
Performance Impact: Can skip 99%+ of data for point queries.
Number of Buckets
| Too Few | Too Many | Optimal |
|---|---|---|
| Large files, slow scans | Many small files, metadata overhead | 128-256 MB per file |
Calculation: num_buckets = total_data_size / target_file_size
Important: Bucket count must match between tables for bucketed joins.
Sorted Bucketing
Additional optimization within each bucket:
- Data sorted by bucketing columns
- Enables efficient range scans
- Sort order maintained at file level
- Concatenation of files preserves sort order
Bucketing + Partitioning Interaction
| Layer | Creates |
|---|---|
| Partitioning | Directory structure (e.g., date=2024-01-15/) |
| Bucketing | Files within each partition directory |
Pruning: Bucket pruning only works within a partition—queries filtering on both benefit from both strategies.
Warning: Over-partitioning + bucketing can create too many small files.
Mathematical Foundations
Definition: Hash Bucketing
A hash bucketing strategy partitions dataset into buckets via a hash function . Bucket contains:
Bucket Pruning
For equi-join on bucketed columns with buckets, only matching buckets need scanning:
Join scans reduce from to .
Load Balance Theorem
For hash function mapping keys to buckets, the expected maximum bucket size is:
Load imbalance approaches 0 as for good hash functions.
Sort-Merge Join Cost
Without bucketing, sort-merge join cost is:
With bucketing, sorting is eliminated: .
Bucket Count Selection
Optimal bucket count minimizes total I/O:
where is the HDFS block size.
Key Insight
Bucketing trades write-time computation for read-time savings. The benefit is maximal when the same bucketed column is used for joins across multiple queries. Over-bucketing () creates too many small files, hurting I/O.
Summary
Hash bucketing partitions data by key, enabling bucket pruning that reduces join scan by factor . Load balance follows a balls-into-bins model. Optimal bucket count balances block-aligned I/O against overhead. Bucketing is most beneficial for repeated equi-joins on the same key.
Key Concepts Table
| Concept | Description | Performance Impact |
|---|---|---|
| Bucket Column | Column used for hash-based distribution | Determines join and pruning efficiency |
| Number of Buckets | Fixed count of hash-based file groups | Must match for bucketed joins |
| Hash Function | murmur3 hash for bucket assignment | Uniform distribution across buckets |
| Bucket Pruning | Skip non-matching buckets on query | Up to 99%+ reduction in data scanned |
| Bucketed Join | Join without shuffle on matching buckets | Eliminates shuffle stage |
| Sorted Bucketing | Data sorted within each bucket | Enables efficient range scans |
| Bucket Metadata | Stored in table properties | Required for bucket-aware optimizations |
| File Size per Bucket | Controlled by num_buckets and data size | Target 128-256 MB per file |
| Partition + Bucket | Hierarchical layout | Both pruning strategies apply |
| Bucket Evolution | Changing bucket count | Requires table rewrite |
Code Examples
Creating Bucketed Tables
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("BucketingStrategies") \
.config("spark.sql.sources.bucketing.enabled", "true") \
.config("spark.sql.sources.bucketing.maxBuckets", "100000") \
.getOrCreate()
# Create large dataset for bucketing demonstration
from pyspark.sql.functions import rand, floor, concat, lit
# Generate customer orders (10M rows)
orders_df = spark.range(0, 10_000_000) \
.withColumn("order_id", concat(lit("ORD-"), col("id"))) \
.withColumn("customer_id", concat(lit("CUST-"), (col("id") % 1_000_000))) \
.withColumn("product_id", concat(lit("PROD-"), (col("id") % 50_000))) \
.withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)")) \
.withColumn("order_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int")))
# Create customers dataset (1M rows)
customers_df = spark.range(0, 1_000_000) \
.withColumn("customer_id", concat(lit("CUST-"), col("id"))) \
.withColumn("name", concat(lit("Customer_"), col("id"))) \
.withColumn("segment",
when(rand() < 0.3, "Premium")
.when(rand() < 0.6, "Standard")
.otherwise("Basic"))
# Write bucketed tables
# Bucket by customer_id with 256 buckets
orders_df.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \
.mode("overwrite") \
.saveAsTable("bucketed_orders")
customers_df.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \
.mode("overwrite") \
.saveAsTable("bucketed_customers")
# Verify bucketing
spark.sql("DESCRIBE EXTENDED bucketed_orders").show(truncate=False)
spark.sql("DESCRIBE EXTENDED bucketed_customers").show(truncate=False)
Bucket Pruning Demonstration
# Enable bucket pruning for queries
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
# Query with bucket pruning (equality predicate on bucket column)
# This should only scan 1/256 of the data
import time
# Point query - bucket pruning enabled
start = time.time()
result1 = spark.sql("""
SELECT o.order_id, o.amount, c.name
FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
WHERE o.customer_id = 'CUST-000042'
""")
result1.show(truncate=False)
pruned_time = time.time() - start
print(f"Bucket pruning query time: {pruned_time:.2f} seconds")
# Query without bucket pruning (range predicate)
start = time.time()
result2 = spark.sql("""
SELECT o.order_id, o.amount, c.name
FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
WHERE o.amount > 500
""")
result2.show(5, truncate=False)
full_scan_time = time.time() - start
print(f"Full scan query time: {full_scan_time:.2f} seconds")
# Verify bucket pruning in query plan
spark.sql("""
EXPLAIN SELECT * FROM bucketed_orders
WHERE customer_id = 'CUST-000042'
""").show(truncate=False)
Bucket-Aware Joins
# Bucketed join - should NOT shuffle
start = time.time()
bucketed_join_df = spark.sql("""
SELECT
o.order_id,
o.amount,
o.order_date,
c.name,
c.segment
FROM bucketed_orders o
JOIN bucketed_customers c
ON o.customer_id = c.customer_id
""")
bucketed_join_df.count() # Force execution
bucketed_join_time = time.time() - start
print(f"Bucketed join time: {bucketed_join_time:.2f} seconds")
# Non-bucketed join for comparison
orders_df.write.mode("overwrite").saveAsTable("regular_orders")
customers_df.write.mode("overwrite").saveAsTable("regular_customers")
start = time.time()
regular_join_df = spark.sql("""
SELECT
o.order_id,
o.amount,
o.order_date,
c.name,
c.segment
FROM regular_orders o
JOIN regular_customers c
ON o.customer_id = c.customer_id
""")
regular_join_df.count() # Force execution
regular_join_time = time.time() - start
print(f"Regular join time: {regular_join_time:.2f} seconds")
print(f"Speedup: {regular_join_time / bucketed_join_time:.1f}x")
# Verify no shuffle in bucketed join
spark.sql("""
EXPLAIN SELECT * FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)
Multi-Column Bucketing
# Bucket by multiple columns for complex query patterns
complex_orders_df = orders_df \
.withColumn("region",
when(rand() < 0.25, "North")
.when(rand() < 0.5, "South")
.when(rand() < 0.75, "East")
.otherwise("West"))
# Write with multi-column bucketing
complex_orders_df.write \
.bucketBy(16, "region", "customer_id") \
.sortBy("region", "customer_id") \
.mode("overwrite") \
.saveAsTable("multi_bucket_orders")
# Query benefits from both columns
spark.sql("""
SELECT region, customer_id, SUM(amount) as total
FROM multi_bucket_orders
WHERE region = 'North' AND customer_id = 'CUST-000042'
GROUP BY region, customer_id
""").show(truncate=False)
# Verify multi-column bucketing
spark.sql("DESCRIBE EXTENDED multi_bucket_orders").show(truncate=False)
Performance Metrics
| Metric | Non-Bucketed | 64 Buckets | 256 Buckets | 1024 Buckets |
|---|---|---|---|---|
| File Count (100GB table) | ~800 files | 64 files | 256 files | 1024 files |
| Avg File Size | 128 MB (varies) | 1.5 GB | 400 MB | 100 MB |
| Write Time (10M rows) | 45 seconds | 60 seconds | 75 seconds | 90 seconds |
| Point Query (bucket pruning) | 8-12 seconds | 2-4 seconds | 1-2 seconds | 0.5-1 second |
| Range Query (no pruning) | 8-12 seconds | 10-15 seconds | 8-12 seconds | 12-18 seconds |
| Bucketed Join (2 tables) | 120-180 seconds | 8-15 seconds | 6-10 seconds | 10-20 seconds |
| Non-Bucketed Join | 120-180 seconds | 120-180 seconds | 120-180 seconds | 120-180 seconds |
| Shuffle Volume (join) | 100% of data | ~0% (bucketed) | ~0% (bucketed) | ~0% (bucketed) |
| Memory per Partition | High variance | Uniform | Uniform | Low per partition |
| Concurrent Query Performance | Degrades | Stable | Optimal | Degrades (too many files) |
Best Practices
- Match bucket counts between join tables to enable bucket-aware joins—mismatched counts force a shuffle even with bucketing
- Target 128-256 MB file size per bucket by calculating
num_buckets = total_data_size / target_file_size - Use
sortBywithin buckets for columns commonly used in range queries to enable efficient intra-bucket scans - Avoid bucketing on high-cardinality columns (e.g., UUIDs) unless you need bucket pruning—hash distribution will be uniform regardless
- Combine partitioning and bucketing when queries commonly filter on both temporal and entity columns
- Monitor small file counts—if buckets contain files < 10 MB, reduce the number of buckets
- Use
INSERT INTOinstead ofwrite.saveAsTablefor subsequent loads to maintain bucket structure - Enable
spark.sql.sources.bucketing.enabled=trueexplicitly to ensure bucket pruning is active - Avoid bucket evolution (changing bucket count) as it requires a full table rewrite
- Use
ANALYZE TABLEafter bucketing to update column statistics for the optimizer - Test bucket pruning with
EXPLAINto verify the query plan shows bucket pruning (scan only matching buckets) - Consider bucketing for CDC workloads where upserts on the bucket column benefit from targeted file updates
Mathematical Foundation Summary
Bucketing reduces join cost from O(n × m) shuffle to O(n/b + m/b) per-bucket joins where b is bucket count. The hash function bucket = hash(key) mod b ensures deterministic partitioning across writes. File count per bucket follows files_per_bucket = rows_per_bucket / target_file_size. Optimal bucket count satisfies b ≥ max(n/parallelism, m/parallelism) to maximize partition utilization while maintaining bucket pruning benefits. Skew within buckets occurs when max_bucket_rows / avg_bucket_rows > 2, requiring salting techniques.
See also: Adaptive Query Execution (30), JSON/XML Parsing (28), Cost Optimization (39)
See Also
- Partitioning Strategies — Data partitioning approaches
- Join Optimization — Join performance optimization
- Adaptive Query Execution — AQE and dynamic optimization
- Caching Persistence — Caching strategies