π― PySpark Partitioning Strategies
DfPartitioning
Partitioning is the process of distributing data across multiple partitions for parallel processing. The goal is to minimize shuffle during joins and aggregations while maintaining balanced partition sizes.
DfHash Partitioning
Hash partitioning assigns each record to a partition based on hash(key) % num_partitions. It provides uniform distribution for well-distributed keys but causes skew when key distribution is non-uniform.
DfRange Partitioning
Range partitioning assigns records to partitions based on key ranges. Each partition covers a contiguous range of keys. It is used for ORDER BY operations and range-based queries but is vulnerable to skew if data is clustered.
DfCoalescing
Coalescing reduces the number of partitions without a full shuffle by combining existing partitions. It is only useful for decreasing partition count and does not help with load balancing.
Here,
- =Partition index assigned to the record
- =Hash function applied to the partition key
- =Total number of partitions
Partition Balance Metric
Here,
- =Balance factor (1.0 = perfectly balanced, higher = more skewed)
- =Size (in rows or bytes) of partition i
- =Total number of partitions
Optimal Partition Count
Here,
- =Optimal number of partitions
- =Total data size in bytes
- =Target partition size (128MBβ200MB)
- =Total number of executor cores
Round-robin partitioning (repartition(n)) distributes records evenly across partitions without regard to key values. It is the best choice for eliminating skew and ensuring uniform partition sizes.
When joining two datasets, repartition both by the join key to co-locate matching records on the same executor. This eliminates shuffle during the join: df1.repartition("key").join(df2.repartition("key"), "key").
ThCo-Partitioning Theorem
Theorem: If two datasets are partitioned by the same partitioner with the same number of partitions, joining them requires zero shuffle β each executor joins its local partitions independently. The total join cost is P Γ (C_{local} + I_{local}) where P is partition count, C_{local} is local compute cost, and I_{local} is local I/O cost.
- Hash partitioning: uniform distribution for well-distributed keys
- Range partitioning: contiguous key ranges for ORDER BY and range queries
- Round-robin: best for eliminating skew (use
repartition(n)) - Co-partition by join key to eliminate shuffle
- Balance factor B = max_partition_size / avg_partition_size; target B β 1.0
- Target 128MBβ200MB per partition for optimal performance
- Use coalesce() to reduce partitions without shuffle; repartition() for full shuffle
Partitioning Comparison
Architecture Diagram
π Detailed Explanation
1. Why Partitioning Matters
Partitioning determines how data is distributed across the cluster.
Key Benefits:
- Performance: Reduces data shuffling and network I/O
- Parallelism: Enables concurrent processing across executors
- Data locality: Keeps related data together for efficient joins
- Query optimization: Enables partition pruning
2. Round-Robin Partitioning
Round-robin partitioning distributes data evenly across partitions regardless of key values.
Characteristics:
- Equal partition sizes
- No data locality
- Simple distribution
- Used by
repartition(n)
When to Use:
- When data is evenly distributed and you need more partitions
- When you don't have a natural partition key
- For load balancing across executors
3. Hash Partitioning
Hash partitioning assigns records to partitions based on the hash of a key value.
Formula: partition = hash(key) % num_partitions
Characteristics:
- Same key β same partition
- Enables partition pruning
- May cause data skew
- Used by
partitionBy(),repartition(n, col)
When to Use:
- When you frequently filter or join on a specific key
- When you want to co-locate related data
- For partition-level operations
4. Range Partitioning
Range partitioning divides data into ordered ranges.
Characteristics:
- Data ordered across partitions
- Enables range pruning
- Requires boundary calculation
- Used for ordered data
When to Use:
- When you frequently query ranges
- For time-series data
- For ordered analytics
5. Bucket Partitioning
Bucket partitioning is a file-based partitioning scheme that divides data into a fixed number of files based on hash of a key.
Characteristics:
- Fixed number of files per table
- Files are pre-partitioned by key
- Enables bucket joins (no shuffle)
- Persistent across writes
When to Use:
- When you frequently join two large tables
- When you want to avoid shuffle in joins
- For repeated join operations
6. Partition Count Optimization
The number of partitions should be based on:
- Data size: Target 128MB-200MB per partition
- Executor cores: 2-4 partitions per core
- Cluster size: Total partitions = executors Γ cores Γ 2-4
- Operation type: More partitions for parallel operations
Formula:
optimal_partitions = max(total_data_size / target_partition_size,
num_executors * cores_per_executor * 2)
7. Data Skew and Mitigation
Data skew occurs when some partitions have significantly more data than others.
Symptoms:
- Longer task durations for skewed partitions
- Underutilization of cluster resources
- Memory pressure on executors processing skewed data
Detection:
- Monitor task duration in Spark UI
- Check partition sizes
- Use
df.groupBy("partition").count()
Mitigation:
- Salting: Add random prefix to skewed keys
- Broadcast join: Avoid shuffle for small tables
- AQE: Adaptive Query Execution for automatic skew handling
8. Partition Pruning
Partition pruning eliminates unnecessary partition reads based on filter predicates.
How it Works:
- Filter predicate is analyzed
- Matching partitions are identified
- Only matching partitions are read
- Other partitions are skipped entirely
Example:
-- Table partitioned by year, month
SELECT * FROM events
WHERE year = 2024 AND month = 1
-- Only reads partitions for year=2024, month=1
-- Skips all other partitions
Performance Tip: Partition pruning can provide 10xβ50x speedup by reducing the amount of data read.
π Key Concepts Table
| Strategy | Distribution | Pruning | Skew Risk | Use Case |
|---|---|---|---|---|
| Round-Robin | Equal sizes | None | Low | Load balancing |
| Hash | Key-based | Yes (point) | Medium | Key-based queries |
| Range | Ordered | Yes (range) | Low-Medium | Range queries |
| Bucket | File-based | Yes (join) | Medium | Repeated joins |
| Dynamic | Runtime | Yes (column) | Variable | Adaptive queries |
π» Code Examples
Example 1: Basic Partitioning
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("PartitioningStrategies").getOrCreate()
# Create sample data
df = spark.range(1000000).withColumn(
"key", col("id") % 100
).withColumn(
"value", concat(lit("value_"), col("id"))
)
print(f"Initial partitions: {df.rdd.getNumPartitions()}")
# Round-robin repartition (equal sizes)
# Parameter: numPartitions β target number of partitions
# Full shuffle: all data is redistributed
df_rr = df.repartition(20)
print(f"After repartition(20): {df_rr.rdd.getNumPartitions()}")
# Hash repartition by key
# Parameters: numPartitions, *cols β hash partition by specified columns
# Same key always goes to same partition
df_hash = df.repartition(20, "key")
print(f"After repartition(20, key): {df_hash.rdd.getNumPartitions()}")
# Coalesce to reduce partitions
# Parameter: numPartitions β must be less than current count
# No shuffle: combines existing partitions
df_coalesced = df_rr.coalesce(10)
print(f"After coalesce(10): {df_coalesced.rdd.getNumPartitions()}")
# Check partition sizes
def get_partition_sizes(rdd):
return rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
print(f"Partition sizes (rr): {get_partition_sizes(df_rr.rdd)[:5]}...")
Example 2: Hash Partitioning for Joins
# Create two large DataFrames
users = spark.range(1000000).withColumn(
"user_id", col("id")
).withColumn(
"name", concat(lit("user_"), col("id"))
)
orders = spark.range(5000000).withColumn(
"order_id", col("id")
).withColumn(
"user_id", col("id") % 1000000
).withColumn(
"amount", (col("id") * 10.0) % 1000
)
# Method 1: Repartition both by join key
# Parameters: numPartitions, col β hash partition by join key
users_repartitioned = users.repartition(100, "user_id")
orders_repartitioned = orders.repartition(100, "user_id")
# Join (no shuffle if both are partitioned by same key)
result = users_repartitioned.join(orders_repartitioned, "user_id")
result.explain()
# Method 2: Use bucketing for persistent partitioning
# Parameters:
# bucketBy(50, "user_id") β 50 hash buckets by user_id
# sortBy("user_id") β sort within each bucket
# saveAsTable("users_bucketed") β persist as Hive table
users.write \
.bucketBy(50, "user_id") \
.sortBy("user_id") \
.saveAsTable("users_bucketed")
orders.write \
.bucketBy(50, "user_id") \
.sortBy("user_id") \
.saveAsTable("orders_bucketed")
# Join bucketed tables (no shuffle!)
users_bucketed = spark.table("users_bucketed")
orders_bucketed = spark.table("orders_bucketed")
result_bucketed = users_bucketed.join(orders_bucketed, "user_id")
result_bucketed.explain()
Example 3: Range Partitioning
# Create time-series data
from pyspark.sql.types import TimestampType
from datetime import datetime, timedelta
# Generate time-series data
base_date = datetime(2024, 1, 1)
time_series = spark.range(1000000).withColumn(
"timestamp",
(lit(base_date.cast("long")) + col("id") * 3600).cast("timestamp")
).withColumn(
"value", col("id") * 1.0
)
# Range partition by timestamp
# Parameter: numPartitions β target partition count
# repartitionByRange sorts data and creates ordered partitions
df_range = time_series.repartitionByRange(4, "timestamp")
# Check distribution
df_range.groupBy(
floor(unix_timestamp("timestamp") / (365 * 24 * 3600)).alias("year")
).count().show()
Example 4: Partition Pruning
# Create partitioned table
events = spark.range(1000000).withColumn(
"event_id", col("id")
).withColumn(
"event_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int"))
).withColumn(
"user_id", col("id") % 10000
)
# Write with partitioning
# Parameter: partitionBy("event_date") β creates directory structure
# Output: event_date=2024-01-01/part-00000.parquet
events.write \
.partitionBy("event_date") \
.mode("overwrite") \
.parquet("events_partitioned/")
# Read and query with partition pruning
events_df = spark.read.parquet("events_partitioned/")
# This query only reads partitions for January 2024
result = events_df.filter(
col("event_date").between("2024-01-01", "2024-01-31")
)
# Check physical plan for partition pruning
result.explain()
# Verify partition pruning in Spark UI
# Look for "PartitionFilters" in the scan
π Performance Metrics
| Strategy | 1GB Data | 10GB Data | Shuffle (MB) | Pruning Speedup |
|---|---|---|---|---|
| Round-Robin | 5.0s | 45s | 1000 | None |
| Hash (100 parts) | 4.5s | 40s | 1000 | 10-50x |
| Range (100 parts) | 4.8s | 42s | 1000 | 5-20x |
| Bucket (50 buckets) | 3.0s | 25s | 0 (join) | 10-50x |
| Coalesce (β) | 3.5s | 30s | 0 | None |
| Repartition (β) | 6.0s | 55s | 1000 | None |
β Best Practices
1. Choose Right Partition Count
# Target 128MB-200MB per partition
data_size_gb = 10 # 10GB
target_partition_mb = 128
optimal_partitions = int(data_size_gb * 1024 / target_partition_mb)
print(f"Optimal partitions: {optimal_partitions}") # ~80
# Consider executor cores
num_executors = 10
cores_per_executor = 4
max_partitions = num_executors * cores_per_executor * 4
print(f"Max partitions: {max_partitions}") # 160
2. Use Hash Partitioning for Joins
# Partition both DataFrames by join key
# Parameter: numPartitions, col β hash partition
df1_partitioned = df1.repartition(100, "join_key")
df2_partitioned = df2.repartition(100, "join_key")
# Join (no shuffle)
result = df1_partitioned.join(df2_partitioned, "join_key")
3. Use Bucketing for Repeated Joins
# Write bucketed tables
# bucketBy(numBuckets, *cols) β hash into fixed buckets
# sortBy(*cols) β sort within buckets
df1.write.bucketBy(100, "key").sortBy("key").saveAsTable("t1_bucketed")
df2.write.bucketBy(100, "key").sortBy("key").saveAsTable("t2_bucketed")
# Join bucketed tables (no shuffle)
result = spark.table("t1_bucketed").join(spark.table("t2_bucketed"), "key")
4. Partition Large Tables for Query Optimization
# Partition by frequently filtered columns
# Parameter: partitionBy(*cols) β creates directory hierarchy
df.write.partitionBy("year", "month").parquet("output/")
# Enables partition pruning
df_filtered = spark.read.parquet("output/").filter(
(col("year") == 2024) & (col("month") == 1)
)
5. Monitor Partition Distribution
# Check partition sizes
partition_sizes = df.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]
).collect()
print(f"Partition sizes: {partition_sizes}")
print(f"Min: {min(partition_sizes)}, Max: {max(partition_sizes)}")
print(f"Skew ratio: {max(partition_sizes) / min(partition_sizes):.2f}")
6. Avoid Over-Partitioning
# BAD: Too many small partitions
df.repartition(10000) # Creates 10K tiny partitions
# GOOD: Right-sized partitions
df.repartition(100) # Creates 100 manageable partitions