PySpark RDD Fundamentals
- RDDs are immutable, partitioned, distributed collections with lineage-based fault tolerance
- Narrow transformations can be pipelined; wide transformations require shuffle barriers
- Optimal partition size is 128MBβ200MB; partition count = max(dataSize/partitionSize, totalCores)
- Recovery cost is proportional to lineage depth Γ data size per partition
- Use
persist()for reuse; usecoalesce()to reduce partitions without full shuffle - Data skew causes stragglers: stage time = max(partition times)
DfResilient Distributed Dataset (RDD)
An RDD is an immutable, partitioned collection of elements that can be operated on in parallel. Each RDD is defined by five properties: a list of partitions, a function to compute each split, a list of dependencies on parent RDDs, an optional partitioner (for key-value RDDs), and an optional list of preferred locations for each split.
DfPartition
A partition is a logical chunk of data stored on a single node. The number of partitions determines parallelism β each partition is processed by one task on one executor core.
DfLineage
Lineage is the complete record of transformations used to build an RDD. It is stored as a DAG and enables fault tolerance by allowing Spark to recompute only the lost partitions without data replication.
DfShuffle
A shuffle is the process of redistributing data across partitions, typically across the network. It occurs during wide transformations and is the most expensive operation in Spark, involving disk I/O, network I/O, and serialization.
Narrow vs Wide Transformation Partition Mapping
RDD Lineage Fault Tolerance
RDD Architecture Overview
RDD Transformation DAG
Narrow vs Wide Transformations
Here,
- =Number of partitions
- =Total data size in bytes
- =Target partition size (default 128MB for HDFS)
- =Total available executor cores
Recovery Cost (Lineage Recomputation)
Here,
- =Total cost to recompute lost partitions
- =Cost of transformation T_i in the lineage
- =Data size processed at step i
- =Number of transformations in the lineage path
Narrow transformations (map, filter, flatMap) have 1:1 parent-child partition mapping and can be pipelined without shuffle. Wide transformations (groupByKey, reduceByKey, join) have M:N mapping and require a shuffle barrier β they cannot be pipelined.
The optimal partition size is 128MBβ200MB. Too few partitions cause underutilization of cores; too many cause excessive task scheduling overhead. Use repartition() to increase partitions or coalesce() to decrease without full shuffle.
Avoid collect() on large datasets β it brings all data to the driver node which can cause OutOfMemoryError. Use take(n), show(n), or foreach() instead for large datasets.
ThFault Tolerance via Lineage
Theorem: Any lost partition of an RDD can be recomputed from its lineage in at most O(L Γ D) time, where L is the lineage depth (number of transformations) and D is the data size at that partition. This guarantees correctness without data replication, unlike systems like HDFS which use 3Γ replication.
Code Examples
Basic RDD Operations
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("RDD_Basics").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create RDD from collection
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 4) # 4 partitions
print(f"Partitions: {rdd.getNumPartitions()}")
# Narrow transformations (no shuffle)
mapped = rdd.map(lambda x: x * 2)
filtered = rdd.filter(lambda x: x > 5)
flattened = rdd.flatMap(lambda x: [x, x * 10])
# Wide transformation (shuffle)
pairs = rdd.map(lambda x: (x % 3, x))
grouped = pairs.groupByKey()
reduced = pairs.reduceByKey(lambda a, b: a + b)
# Actions (trigger execution)
print(f"Count: {rdd.count()}")
print(f"First: {rdd.first()}")
print(f"Take: {rdd.take(3)}")
print(f"Sum: {rdd.reduce(lambda a, b: a + b)}")
# Check lineage
print(rdd.toDebugString().decode())
sc.stop()
RDD Persistence Levels
from pyspark import StorageLevel
# Cache in memory (deserialized)
rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)
# Persist with specific storage level
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.persist(StorageLevel.DISK_ONLY)
rdd.persist(StorageLevel.OFF_HEAP)
# Unpersist when done
rdd.unpersist()
Key Concepts Table
| Concept | Description | Example |
|---|---|---|
| Partition | Logical chunk of data for parallel processing | rdd.getNumPartitions() |
| Lineage | DAG of transformations for fault tolerance | rdd.toDebugString() |
| Narrow Transform | 1:1 partition mapping, no shuffle | map(), filter(), flatMap() |
| Wide Transform | M:N partition mapping, requires shuffle | groupByKey(), reduceByKey(), join() |
| Lazy Evaluation | Transforms built but not executed until action | Build DAG β Action triggers execution |
| Action | Triggers computation, returns result | collect(), count(), first() |
| Cache/Persist | Store RDD in memory/disk for reuse | rdd.cache() or rdd.persist() |
| Checkpoint | Write RDD to reliable storage, truncate lineage | rdd.checkpoint() |
| Broadcast | Read-only variable cached on each executor | sc.broadcast(variable) |
| Accumulator | Write-only variable for aggregations | sc.accumulator(0) |
Best Practices
- Prefer DataFrames over RDDs β DataFrames use Catalyst optimizer and Tungsten engine for automatic optimization
- Use
reduceByKeyovergroupByKeyβreduceByKeycombines locally before shuffle, reducing network I/O - Cache wisely β Only cache RDDs that are reused across multiple actions
- Partition appropriately β Aim for 128MBβ200MB per partition
- Avoid
collect()on large datasets β usetake(n)orforeach()instead - Use
coalesce()to reduce partitions β avoids full shuffle unlikerepartition() - Enable Kryo serialization β 10x faster than Java serialization
- Monitor shuffle spill β indicates memory pressure
Key Takeaways
- RDDs are the foundation of Spark's distributed computing model
- Narrow transformations pipeline without shuffle; wide transformations require shuffle
- Lineage enables fault tolerance without data replication
- Optimal partition size: 128MBβ200MB
- Use
persist()for reuse; usecoalesce()to reduce partitions - Data skew causes stragglers: stage time = max(partition times)
See Also
- SparkSession Architecture β The entry point that creates and manages RDDs
- DataFrame Operations β Higher-level API with Catalyst optimizations
- SparkSQL Engine β SQL queries over RDDs and DataFrames
- Transformation Types β Comprehensive guide to all transformations
- Kafka Architecture β Stream processing with RDD-based DStreams