PySpark Transformation Types
DfNarrow Transformation
A narrow transformation is one where each partition of the parent RDD contributes to at most one partition of the child RDD. No data movement (shuffle) is required. Examples: map, filter, flatMap, mapPartitions, union.
DfWide Transformation
A wide transformation (shuffle transformation) is one where each partition of the parent RDD may be depended on by multiple child partitions. Data must be redistributed across the network via shuffle. Examples: groupByKey, reduceByKey, join, distinct, repartition.
DfShuffle Barrier
A shuffle barrier is a point in the execution plan where data must be redistributed across partitions. Wide transformations create shuffle barriers, forcing the DAG Scheduler to split the plan into separate stages.
DfStage
A stage is a set of tasks that can be executed without data exchange (shuffle). Stages are created at shuffle barriers. Within each stage, narrow transformations are pipelined into a single task per partition.
DfTask
A task is the smallest unit of execution in Spark. Each task processes one partition of data within a stage. The number of tasks equals the number of partitions in the stage.
Here,
- =Number of partitions involved in the shuffle
- =Cost of serializing and writing shuffle blocks to disk
- =Network transfer cost across executors
- =Cost of deserializing shuffle blocks on the reducer side
- =Cost of sorting within each partition (if applicable)
Shuffle Partition Size Estimate
Here,
- =Estimated size of each output partition after shuffle
- =Total number of rows
- =Average row width in bytes
- =Skew factor (1.0 = uniform, >1.0 = skewed)
- =Number of output partitions
Here,
- =Pipeline efficiency (0 to 1, higher is better)
- =Time spent on actual computation
- =Time spent on shuffle operations
Map-Side Combine Reduction
Here,
- =Reduction factor achieved by map-side combine
- =Shuffle data size without combine
- =Shuffle data size after combine
Wide transformations introduce shuffle barriers in the DAG. The DAG Scheduler splits the execution plan into stages at each shuffle boundary. Within each stage, narrow transformations are pipelined into a single task.
Prefer reduceByKey over groupByKey when possible. reduceByKey performs the aggregation map-side (before shuffle), reducing data volume by typically 10xβ100x before the network transfer.
Data skew (F_skew >> 1) causes stragglers β the slowest partition determines overall stage completion time. Use salting, AQE skew join handling, or custom partitioning to mitigate skew.
ThShuffle Bottleneck Theorem
Theorem: The performance of any wide transformation is bounded by max(partition_write_time, network_transfer_time, partition_read_time). This is why data skew (F_{skew} >> 1) causes stragglers β the slowest partition determines overall stage completion time.
- Narrow transformations: 1:1 partition mapping, no shuffle, pipelineable
- Wide transformations: M:N partition mapping, shuffle required, define stage boundaries
- Shuffle cost = write + network + read + sort per partition
reduceByKeyreduces shuffle volume by aggregating map-side before shuffle- Data skew (F_{skew}) causes stragglers and OOM in shuffle operations
- Pipeline efficiency = compute / (compute + shuffle)
- Map-side combine: R_combine = before / after (10-100x reduction typical)
Narrow vs Wide Partition Mapping
Architecture Diagram
Detailed Explanation
1. Narrow Transformations
Narrow transformations are operations where each input partition contributes to at most one output partition.
Characteristics:
- 1:1 mapping between input and output partitions
- No data movement across the network
- Can be pipelined (executed together in one stage)
- Efficient memory usage and low latency
Examples:
map(func): Apply function to each elementflatMap(func): Apply function that returns iteratorfilter(func): Keep elements where function returns TruemapPartitions(func): Apply function to each partitionmapPartitionsWithIndex(func): Like mapPartitions with partition indexunion(other): Return union of RDDssample(fraction): Random sample
2. Wide Transformations
Wide transformations are operations where a single input partition may be needed by multiple output partitions.
Characteristics:
- M:N mapping between input and output partitions
- Data movement across the network (shuffle)
- Creates stage boundaries
- Expensive (network I/O, disk I/O, serialization)
Examples:
groupByKey(): Group values by keyreduceByKey(func): Reduce values by keyjoin(other): Join two RDDsrepartition(n): Reshuffle into n partitionsdistinct(): Remove duplicatessort(): Sort elementscoalesce(n): Reduce partitions (narrow if decreasing)
3. Shuffle Deep Dive
A shuffle is the process of redistributing data across partitions, typically across the cluster network. Shuffles are the most expensive operations in Spark.
Shuffle Stages:
- Map Phase: Each executor writes shuffle files to local disk
- Fetch Phase: Executors fetch shuffle data from other executors
- Reduce Phase: Process fetched data
Shuffle Write:
- Each mapper creates one file per reducer
- Data is partitioned by key using hash partitioning
- Written to local disk (not memory)
Shuffle Read:
- Each reducer fetches from all mappers
- Data is deserialized and merged
- Memory pressure can cause spill to disk
4. Shuffle Optimization Techniques
Broadcast Join:
# Avoid shuffle by broadcasting small table
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
Partitioning:
# Pre-partition data to avoid shuffle
df = df.repartition(100, "key")
df.write.parquet("output/")
Bucketing:
# Bucket data by join key
df.write.bucketBy(100, "user_id").saveAsTable("users")
Coalesce vs Repartition:
# Coalesce: Reduce partitions without full shuffle (narrow)
df.coalesce(10)
# Repartition: Increase or decrease with full shuffle (wide)
df.repartition(100)
5. Stage Boundaries
Spark divides execution into stages at shuffle boundaries:
| Stage | Content |
|---|---|
| Stage 0 | Input + narrow transformations |
| Stage 1 | After first shuffle + narrow transformations |
| Stage N | After N-th shuffle + narrow transformations |
| Final Stage | Last shuffle + output |
Task Scheduling:
- One task per partition per stage
- Tasks are scheduled to data locality
- Stages are executed when parent stages complete
6. Performance Implications
| Transformation Type | Latency | Throughput | Cache Locality | Memory Overhead |
|---|---|---|---|---|
| Narrow | Low | High | Good | Minimal |
| Wide | High | Lower | Poor | Higher |
7. Common Shuffle Patterns
GroupBy + Aggregate:
# BAD: groupByKey + mapValues (two shuffles)
rdd.groupByKey().mapValues(sum)
# GOOD: reduceByKey (single shuffle)
rdd.reduceByKey(lambda a, b: a + b)
Join:
# Join always requires shuffle (unless broadcast)
joined = rdd1.join(rdd2)
Distinct:
# Requires shuffle to deduplicate across partitions
distinct = rdd.distinct()
Rule of Thumb: Prefer
reduceByKeyovergroupByKeyβ it combines locally before shuffle, reducing network I/O by 10xβ100x.
Key Concepts Table
| Concept | Type | Shuffle? | Example |
|---|---|---|---|
| map | Narrow | No | rdd.map(lambda x: x * 2) |
| filter | Narrow | No | rdd.filter(lambda x: x > 0) |
| flatMap | Narrow | No | rdd.flatMap(lambda x: x.split()) |
| mapPartitions | Narrow | No | rdd.mapPartitions(process_partition) |
| union | Narrow | No | rdd1.union(rdd2) |
| groupByKey | Wide | Yes | rdd.groupByKey() |
| reduceByKey | Wide | Yes | rdd.reduceByKey(lambda a,b: a+b) |
| join | Wide | Yes | rdd1.join(rdd2) |
| repartition | Wide | Yes | rdd.repartition(100) |
| distinct | Wide | Yes | rdd.distinct() |
| coalesce | Mixed | No (β) / Yes (β) | rdd.coalesce(10) |
| sort | Wide | Yes | rdd.sortByKey() |
| aggregateByKey | Wide | Yes | rdd.aggregateByKey(zero, seq, comb) |
| combineByKey | Wide | Yes | rdd.combineByKey(create, merge, comb) |
Code Examples
Example 1: Narrow Transformations
from pyspark import SparkContext
sc = SparkContext("local", "TransformationTypes")
# Create RDD with 4 partitions
# parallelize: Distributes data across partitions
rdd = sc.parallelize(range(1, 101), 4)
# Narrow transformations (no shuffle)
# map: Apply function to each element
mapped = rdd.map(lambda x: x * 2) # Double each element
# filter: Keep elements where function returns True
filtered = rdd.filter(lambda x: x % 2 == 0) # Keep even numbers
# flatMap: Apply function that returns iterator, flatten results
flatMapped = rdd.flatMap(lambda x: [x, x * 10]) # Expand each element
# sample: Random sample
# Parameters: withReplacement (False=no replacement), fraction (0.1=10%)
sampled = rdd.sample(False, 0.1) # 10% sample
# mapPartitions: Apply function to each partition (more efficient for I/O)
# Parameters: func - function that takes iterator, returns iterator
mappedPartitions = rdd.mapPartitions(lambda it: [sum(it)]) # Sum per partition
# mapPartitionsWithIndex: Like mapPartitions with partition index
# Parameters: func - function taking (index, iterator)
indexed = rdd.mapPartitionsWithIndex(lambda idx, it: [(idx, sum(it))])
# Execute actions
print(f"Mapped sum: {mapped.sum()}")
print(f"Filtered count: {filtered.count()}")
print(f"FlatMapped count: {flatMapped.count()}")
print(f"Sampled count: {sampled.count()}")
print(f"Per-partition sums: {mappedPartitions.collect()}")
print(f"Indexed sums: {indexed.collect()}")
# No shuffle means these can be pipelined
# Single stage execution
Example 2: Wide Transformations and Shuffle
# Wide transformations (require shuffle)
words = sc.parallelize(["hello world", "hello spark", "world spark"], 2)
# Split words (narrow) then create key-value pairs (narrow)
word_pairs = words.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1))
# reduceByKey (wide) - single shuffle
# Parameters: func - binary function to combine values
# Performs map-side combine, reducing shuffle data by 10-100x
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
print(f"Word counts: {word_counts.collect()}")
# groupByKey (wide) - less efficient (no map-side combine)
# Parameters: numPartitions (optional)
# Collects all values to memory, can cause OOM for large value sets
word_groups = word_pairs.groupByKey()
for word, counts in word_groups.collect():
print(f"{word}: {list(counts)}")
# join (wide)
# Parameters: other - RDD to join with
rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c")])
rdd2 = sc.parallelize([(1, "x"), (2, "y"), (3, "z")])
joined = rdd1.join(rdd2) # Shuffle required
print(f"Joined: {joined.collect()}")
# distinct (wide)
# Removes duplicates across all partitions (requires shuffle)
duplicated = sc.parallelize([1, 2, 2, 3, 3, 3])
distinct = duplicated.distinct() # Shuffle required
print(f"Distinct: {distinct.collect()}")
# sortByKey (wide)
# Parameters: ascending (default True), numPartitions, keyfunc
sorted_rdd = word_counts.sortByKey(ascending=True)
print(f"Sorted: {sorted_rdd.collect()}")
Example 3: Shuffle Optimization
# Create large dataset
large_rdd = sc.parallelize(range(1000000), 8)
# Create key-value pairs
word_pairs = large_rdd.map(lambda x: (x % 100, x))
# BAD: groupByKey + mapValues (two shuffles)
# groupByKey collects all values to memory, then mapValues processes them
grouped = word_pairs.groupByKey()
result_bad = grouped.mapValues(sum)
# GOOD: reduceByKey (single shuffle)
# Performs map-side combine, reducing shuffle data significantly
result_good = word_pairs.reduceByKey(lambda a, b: a + b)
# Compare execution plans
print("=== BAD (groupByKey) ===")
print(result_bad.toDebugString().decode()[:500])
print("\n=== GOOD (reduceByKey) ===")
print(result_good.toDebugString().decode()[:500])
# Performance comparison
import time
start = time.time()
_ = result_bad.collect()
print(f"groupByKey: {time.time() - start:.2f}s")
start = time.time()
_ = result_good.collect()
print(f"reduceByKey: {time.time() - start:.2f}s")
# BETTER: combineByKey (most flexible, best for complex aggregations)
# Parameters:
# createCombiner: Initialize accumulator from first value
# mergeValue: Merge value into accumulator within partition
# mergeCombiners: Merge accumulators across partitions
combined = word_pairs.combineByKey(
lambda v: (v, 1), # Create: (value, count)
lambda acc, v: (acc[0] + v, acc[1] + 1), # Merge value
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # Merge combiners
)
averages = combined.mapValues(lambda x: x[0] / x[1])
print(f"Averages: {averages.take(5)}")
Example 4: Partitioning Strategies
# Repartition vs Coalesce
rdd = sc.parallelize(range(1000), 10)
print(f"Initial partitions: {rdd.getNumPartitions()}")
# Coalesce: Reduce partitions without full shuffle (narrow transformation)
# Parameters: numPartitions, shuffle=False (default)
# Only moves data from partitions being removed
coalesced = rdd.coalesce(5)
print(f"After coalesce(5): {coalesced.getNumPartitions()}")
# Repartition: Increase or decrease with full shuffle (wide transformation)
# Parameters: numPartitions
# Redistributes all data evenly
repartitioned = rdd.repartition(20)
print(f"After repartition(20): {repartitioned.getNumPartitions()}")
# Partition by key (for key-value RDDs)
# Parameters: numPartitions, partitionFunc (default: hash)
key_value_rdd = sc.parallelize([(i % 10, i) for i in range(100)])
partitioned = key_value_rdd.partitionBy(5, lambda k: k % 5)
# Check partition distribution
def count_per_partition(rdd):
return rdd.mapPartitionsWithIndex(
lambda idx, it: [(idx, sum(1 for _ in it))]
).collect()
print(f"Partition counts: {count_per_partition(partitioned)}")
# Custom partitioner for skewed data
from pyspark import Partitioner
class RangePartitioner(Partitioner):
def __init__(self, boundaries, numPartitions):
self.boundaries = boundaries
self.numPartitions = numPartitions
def getPartition(self, key):
for i, boundary in enumerate(self.boundaries):
if key < boundary:
return i
return self.numPartitions - 1
def numPartitions(self):
return self.numPartitions
# Use custom partitioner
boundaries = [10, 20, 30, 40]
custom_partitioned = key_value_rdd.partitionBy(5, RangePartitioner(boundaries, 5))
print(f"Custom partition counts: {count_per_partition(custom_partitioned)}")
Example 5: Advanced Wide Transformations
# aggregateByKey: Most flexible key-based aggregation
# Parameters: zeroValue, seqFunc, combFunc
from operator import add
# Zero value: Initial accumulator (sum=0, count=0)
# seqFunc: Add value to accumulator within partition
# combFunc: Merge accumulators across partitions
result = word_pairs.aggregateByKey(
(0, 0), # Zero value
lambda acc, v: (acc[0] + v, acc[1] + 1), # Within partition
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # Across partitions
).mapValues(lambda x: x[0] / x[1]) # Calculate average
print(f"Key averages: {result.take(5)}")
# cartesian: All pairs between two RDDs (very expensive!)
rdd_a = sc.parallelize([1, 2, 3])
rdd_b = sc.parallelize(["a", "b"])
cartesian = rdd_a.cartesian(rdd_b)
print(f"Cartesian product: {cartesian.collect()}")
# [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]
# intersection: Common elements between two RDDs (requires shuffle)
rdd_x = sc.parallelize([1, 2, 3, 4, 5])
rdd_y = sc.parallelize([3, 4, 5, 6, 7])
intersection = rdd_x.intersection(rdd_y)
print(f"Intersection: {intersection.collect()}") # [3, 4, 5]
# subtract: Elements in first RDD but not in second (requires shuffle)
subtracted = rdd_x.subtract(rdd_y)
print(f"Subtract: {subtracted.collect()}") # [1, 2]
# groupWith: Group two RDDs by key (requires shuffle)
grouped = rdd1.groupWith(rdd2)
for key, (values1, values2) in grouped.collect():
print(f"Key {key}: {list(values1)}, {list(values2)}")
Performance Metrics
| Operation | Narrow (ms) | Wide (ms) | Shuffle Size (MB) | Network (MB/s) |
|---|---|---|---|---|
| map() | 45 | N/A | 0 | 0 |
| filter() | 35 | N/A | 0 | 0 |
| flatMap() | 50 | N/A | 0 | 0 |
| mapPartitions() | 40 | N/A | 0 | 0 |
| union() | 30 | N/A | 0 | 0 |
| groupByKey() | N/A | 850 | 250 | 80 |
| reduceByKey() | N/A | 620 | 180 | 120 |
| combineByKey() | N/A | 580 | 160 | 130 |
| aggregateByKey() | N/A | 600 | 170 | 125 |
| join() | N/A | 1200 | 400 | 100 |
| repartition() | N/A | 750 | 300 | 90 |
| distinct() | N/A | 680 | 200 | 110 |
| sort() | N/A | 900 | 350 | 85 |
Best Practices
1. Minimize Wide Transformations
# BAD: Multiple wide transformations (multiple shuffles)
result = rdd.groupByKey().mapValues(sum).sortByKey()
# GOOD: Combine operations (single shuffle)
result = rdd.reduceByKey(lambda a, b: a + b).sortByKey()
# BETTER: Use aggregateByKey for complex aggregations
result = rdd.aggregateByKey(
(0, 0),
lambda acc, v: (acc[0] + v, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
).mapValues(lambda x: x[0] / x[1])
2. Use reduceByKey Instead of groupByKey
# BAD: groupByKey collects all values to memory (can OOM)
grouped = rdd.groupByKey().mapValues(sum)
# GOOD: reduceByKey combines per partition first (10-100x less shuffle)
reduced = rdd.reduceByKey(lambda a, b: a + b)
# GOOD: combineByKey for complex aggregations (most flexible)
combined = rdd.combineByKey(
lambda v: (v, 1),
lambda acc, v: (acc[0] + v, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
3. Broadcast Small Tables
from pyspark.sql.functions import broadcast
# Avoids shuffle for join with small table
result = large_df.join(broadcast(small_df), "key")
# Configure threshold for auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50MB
4. Repartition for Parallelism
# Increase partitions for more parallelism
rdd = rdd.repartition(100)
# Decrease partitions with coalesce (no shuffle)
rdd = rdd.coalesce(10)
# Partition by key for future joins
key_value_rdd = key_value_rdd.partitionBy(100, hash)
5. Cache Intermediate Results
# Cache if RDD is reused across multiple actions
rdd.cache() # MEMORY_ONLY
# or
rdd.persist(StorageLevel.MEMORY_AND_DISK) # With disk spill
# Uncache when no longer needed
rdd.unpersist()
6. Monitor Shuffle Metrics
# Check shuffle metrics in Spark UI
# Look for:
# - Shuffle Read Size / Records
# - Shuffle Write Size / Records
# - Shuffle Spill (Memory/Disk)
# - Task duration distribution (identify stragglers)
# Get RDD statistics
print(f"Partition count: {rdd.getNumPartitions()}")
print(f"Storage level: {rdd.getStorageLevel()}")
print(f"Is cached: {rdd.is_cached}")
# Check lineage depth
lineage = rdd.toDebugString().decode()
print(f"Lineage depth: {len(lineage.split(chr(10)))}")
7. Handle Data Skew
# SALTING: Add random prefix to keys to distribute data evenly
import random
# Salt the keys
salted = rdd.map(lambda x: (f"{x[0]}_{random.randint(0, 9)}", x[1]))
# Reduce with salted keys
salted_reduced = salted.reduceByKey(lambda a, b: a + b)
# Remove salt and reduce again
result = salted_reduced.map(lambda x: (x[0].split("_")[0], x[1])) \
.reduceByKey(lambda a, b: a + b)
# Alternative: Use AQE skew join handling (Spark 3.0+)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Key Takeaways
- Narrow transformations: 1:1 partition mapping, no shuffle, pipelineable
- Wide transformations: M:N partition mapping, shuffle required, define stage boundaries
- Shuffle cost = N_partitions Γ (write + network + read) + sort
- Pipeline efficiency = compute / (compute + shuffle)
- Map-side combine: R_combine = before / after (10-100x reduction)
- reduceByKey beats groupByKey (map-side combine reduces shuffle volume)
- Data skew causes stragglers: stage time = max(partition times)
- Use broadcast joins to avoid shuffle for small tables
- Use coalesce to reduce partitions (no shuffle), repartition to increase (shuffle)
- Cache RDDs reused across multiple actions; unpersist when done