πΎ PySpark Caching and Persistence
DfCaching
Caching stores the results of an RDD or DataFrame in memory (or disk) to avoid recomputing them from lineage when reused. cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK).
DfStorage Level
A storage level defines where and how data is persisted: MEMORY_ONLY (deserialized in heap), MEMORY_AND_DISK (spill to disk), MEMORY_ONLY_SER (serialized), DISK_ONLY, or OFF_HEAP (outside JVM).
DfCheckpointing
Checkpointing truncates the RDD lineage by saving data to reliable storage (HDFS). It is used for very deep DAGs to prevent stack overflow and reduce memory usage, but loses the ability to recompute from source.
Here,
- =Fraction of reads served from cache (target > 0.8)
- =Number of reads that found data in cache
- =Total number of data reads
Storage Memory Requirement
Here,
- =Total memory needed to cache the dataset
- =Number of partitions
- =Size per partition (serialized or deserialized)
- =Compression ratio (1.0 if uncompressed, 0.3β0.7 typical)
Cache vs Recompute Break-Even
Here,
- =Number of times cached data must be accessed to justify caching
- =Cost of caching (memory + serialization time)
- =Cost of recomputing from lineage per access
Use MEMORY_AND_DISK for datasets that may not fully fit in memory β Spark will spill to disk rather than recompute. Use MEMORY_ONLY only when you are certain the data fits and recomputation cost is low.
Always call unpersist() on DataFrames that are no longer reused to free cache memory for other datasets. Cached data is pinned in memory and cannot be evicted by the unified memory manager until explicitly unpersisted.
ThCache vs Recompute Trade-off
Theorem: Caching is beneficial when N_{reuses} Γ Cost_{recompute} > Cost_{cache} where N_{reuses} is the number of times the data is accessed after caching, Cost_{recompute} is the time to recompute from lineage, and Cost_{cache} is the memory footprint cost.
cache()=persist(MEMORY_AND_DISK); usepersist()for custom storage levels- Cache only datasets reused 2+ times; always
unpersist()when done - Target cache hit ratio > 80%; monitor via Spark UI Storage tab
- Checkpointing truncates lineage for very deep DAGs
- MEMORY_ONLY_SER saves 50% memory but adds serialization cost per access
- MEMORY_AND_DISK ensures all data is persisted (spills to disk if needed)
Storage Levels Comparison
ποΈ Architecture Diagram
π Detailed Explanation
1. Why Cache?
Caching is essential when a DataFrame or RDD is reused multiple times. Without caching, Spark recomputes the entire lineage from the source for each action.
Benefits:
- Avoids recomputation of expensive transformations
- Reduces I/O (read from cache instead of source)
- Improves iterative algorithm performance
- Enables interactive data exploration
Rule of Thumb: Cache only datasets reused 2+ times; always
unpersist()when done.
2. Cache vs Persist
cache() is shorthand for persist(StorageLevel.MEMORY_ONLY):
- Stores data in JVM heap as deserialized objects
- Fastest access but highest memory overhead
- Good for small-to-medium datasets that fit in memory
persist() allows choosing a storage level:
MEMORY_ONLY: Same as cache()MEMORY_ONLY_SER: Serialized, saves spaceMEMORY_AND_DISK: Spill to disk if memory fullDISK_ONLY: Only on diskOFF_HEAP: Outside JVM
3. Storage Levels Deep Dive
| Storage Level | Location | Format | Speed | Space | Best For |
|---|---|---|---|---|---|
| MEMORY_ONLY | Heap | Deserialized | Fastest | Highest (3-5x) | Small datasets, repeated access |
| MEMORY_ONLY_SER | Heap | Serialized | Fast | Medium (1.5-2x) | Memory-constrained environments |
| MEMORY_AND_DISK | Heap + Disk | Mixed | Fast | Medium | Datasets larger than memory |
| DISK_ONLY | Disk | Serialized | Slower | Lowest | Very large datasets |
| OFF_HEAP | Alluxio | Serialized | Fast | Lowest | Shared caching across applications |
4. Unified Memory Management
Spark 1.6+ introduced unified memory management where storage and execution memory share a common pool.
Key Behaviors:
- Storage can borrow from execution (when execution is idle)
- Execution can borrow from storage (forces eviction)
- Execution has higher priority for memory
- Soft boundary allows flexible memory sharing
5. Cache Eviction Policies
When memory pressure occurs, Spark evicts cached data using LRU (Least Recently Used) policy.
Eviction Process:
- Monitor memory usage
- When storage exceeds 50% of unified memory
- Identify least recently used partitions
- Evict or spill to disk based on storage level
- Update block manager metadata
Factors Affecting Eviction:
- Access frequency
- Partition size
- Storage level (memory vs disk)
- Memory pressure from execution
6. Cache Metrics and Monitoring
Key Metrics to Monitor:
storageLevel: Current storage levelcachedPartitions: Number of cached partitionscacheSize: Total cached data sizecacheMemory: Memory used for cachingcacheDisk: Disk used for spillingcacheHitRatio: Percentage of cache hits
Spark UI Locations:
- Storage tab: Cached RDDs and DataFrames
- Executors tab: Memory usage per executor
- SQL tab: Cache usage in query plans
7. Common Cache Pitfalls
| Pitfall | Impact | Solution |
|---|---|---|
| Caching too much | Fills up memory, causes eviction | Cache only what's reused |
| Not caching when needed | Recomputes expensive transformations | Cache datasets used 2+ times |
| Wrong storage level | MEMORY_ONLY for too-large datasets β constant recomputation | Use MEMORY_AND_DISK for large data |
| Forgetting to unpersist | Memory leaks over time | Always call unpersist() when done |
8. Cache Optimization Strategies
Strategy 1: Cache selectively
# Cache only what's reused
df.cache() # Cache this one
result = df.filter(...).groupBy(...).agg(...) # Don't cache intermediate
Strategy 2: Choose right storage level
# Small dataset, repeated access
df.cache() # MEMORY_ONLY
# Large dataset, fits in memory
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Dataset larger than memory
df.persist(StorageLevel.MEMORY_AND_DISK)
Strategy 3: Monitor and adjust
# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Unpersist when done
df.unpersist()
Best Practice: Target cache hit ratio > 80%; monitor via Spark UI Storage tab.
π Key Concepts Table
| Storage Level | Location | Format | Speed | Space | GC | Use Case |
|---|---|---|---|---|---|---|
| MEMORY_ONLY | Heap | Deserialized | β β β β β | β β βββ | β β βββ | Small, repeated access |
| MEMORY_ONLY_SER | Heap | Serialized | β β β β β | β β β β β | β β β β β | Memory-constrained |
| MEMORY_AND_DISK | Heap + Disk | Mixed | β β β β β | β β β ββ | β β β ββ | Large datasets |
| DISK_ONLY | Disk | Serialized | β β βββ | β β β β β | β β β β β | Very large datasets |
| OFF_HEAP | Alluxio | Serialized | β β β β β | β β β β β | β β β β β | Shared caching |
| MEMORY_ONLY_2 | Heap | Deserialized | β β β β β | β β βββ | β β βββ | 2 replicas |
| MEMORY_AND_DISK_2 | Heap + Disk | Mixed | β β β β β | β β β ββ | β β β ββ | Fault tolerance |
π» Code Examples
Example 1: Basic Caching
from pyspark.sql import SparkSession
from pyspark import StorageLevel
spark = SparkSession.builder.appName("CachingPersistence").getOrCreate()
# Create expensive DataFrame
df = spark.range(1000000).withColumn(
"key", col("id") % 1000
).withColumn(
"value", col("id") * 1.0
)
# Cache the DataFrame (MEMORY_ONLY)
# cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK)
df.cache()
# First action triggers computation and caching
count = df.count()
print(f"Count: {count}")
# Subsequent actions use cached version
sum_val = df.groupBy("key").sum("value").count()
print(f"GroupBy count: {sum_val}")
# Check cache status
print(f"Is cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Unpersist when done
df.unpersist()
Example 2: Different Storage Levels
from pyspark import StorageLevel
# MEMORY_ONLY_SER β Serialized, saves space
# Parameter: StorageLevel.MEMORY_ONLY_SER β serialized in memory
df_ser = df.persist(StorageLevel.MEMORY_ONLY_SER)
print(f"MEMORY_ONLY_SER: {df_ser.storageLevel}")
# MEMORY_AND_DISK β Spill to disk if needed
# Parameter: StorageLevel.MEMORY_AND_DISK β spill to disk
df_disk = df.persist(StorageLevel.MEMORY_AND_DISK)
print(f"MEMORY_AND_DISK: {df_disk.storageLevel}")
# DISK_ONLY β Only on disk
# Parameter: StorageLevel.DISK_ONLY β all data on disk
df_only_disk = df.persist(StorageLevel.DISK_ONLY)
print(f"DISK_ONLY: {df_only_disk.storageLevel}")
# Test performance
import time
start = time.time()
for _ in range(5):
df.count()
print(f"MEMORY_ONLY time: {time.time() - start:.2f}s")
start = time.time()
for _ in range(5):
df_ser.count()
print(f"MEMORY_ONLY_SER time: {time.time() - start:.2f}s")
Example 3: Cache Monitoring
# Enable cache metrics
spark.sparkContext.setLogLevel("INFO")
# Create and cache DataFrame
df = spark.range(1000000).cache()
# Trigger caching
df.count()
# Check storage info
storage_info = spark._jsc.sc().getRDDStorageInfo()
for rdd_info in storage_info:
print(f"RDD ID: {rdd_info.name()}")
print(f"Storage Level: {rdd_info.storageLevel()}")
print(f"Number of Partitions: {rdd_info.numCachedPartitions()}")
print(f"Memory Size: {rdd_info.memSize() / 1024 / 1024:.2f} MB")
print(f"Disk Size: {rdd_info.diskSize() / 1024 / 1024:.2f} MB")
Example 4: Cache in Iterative Algorithms
# Example: PageRank-like iterative computation
def iterative_computation(df, num_iterations=10):
# Cache input data
# cache() avoids recomputing df from source each iteration
df.cache()
df.count() # Trigger caching
for i in range(num_iterations):
# Use cached data in each iteration
df = df.withColumn(
"value", col("value") * 0.85 + 0.15
)
# Trigger computation
df.count()
print(f"Iteration {i + 1} complete")
# Unpersist when done
df.unpersist()
return df
# Run iterative computation
result = iterative_computation(df)
π Performance Metrics
| Scenario | No Cache | MEMORY_ONLY | MEMORY_ONLY_SER | MEMORY_AND_DISK |
|---|---|---|---|---|
| 1GB, 5 actions | 25.0s | 5.0s | 7.5s | 6.0s |
| 10GB, 5 actions | 250.0s | 50.0s | 75.0s | 60.0s |
| 1GB, 10 actions | 50.0s | 5.5s | 8.0s | 6.5s |
| 10GB, 10 actions | 500.0s | 55.0s | 80.0s | 65.0s |
| Memory Usage | 0 | 3.5GB | 2.0GB | 3.5GB + Disk |
| GC Time | 0 | 500ms | 200ms | 500ms |
| Disk I/O | 0 | 0 | 0 | 2GB |
β Best Practices
1. Cache Selectively
# Cache DataFrame reused multiple times
df = expensive_computation()
df.cache()
# Use cached DataFrame
result1 = df.filter(...).collect()
result2 = df.groupBy(...).agg(...).collect()
# Unpersist when done
df.unpersist()
2. Choose Appropriate Storage Level
# Small dataset, fits in memory
df.cache() # MEMORY_ONLY
# Large dataset, serialized saves space
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Very large dataset, spill to disk
df.persist(StorageLevel.MEMORY_AND_DISK)
3. Monitor Cache Usage
# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Check storage info in Spark UI
# Storage tab shows cached RDDs/DataFrames
4. Unpersist When Done
# Always unpersist when DataFrame no longer needed
df.unpersist()
# Or use context manager pattern
with df:
result = df.filter(...).collect()
# df is automatically unpersisted
5. Avoid Cache Bloat
# Don't cache everything
df1.cache() # Cache this
df2 = expensive_computation(df1) # Don't cache intermediate
df3 = another_computation(df2) # Don't cache this either
# Cache only what's reused
result = df3.filter(...).collect()
6. Use Checkpointing for Long Lineages
# For very long lineages, checkpoint to truncate
sc.setCheckpointDir("hdfs:///checkpoint")
df.checkpoint()
# Checkpoint truncates lineage, reduces memory
See Also
- 01-sparksession-architecture β SparkSession initialization and memory configuration
- 02-rdd-fundamentals β RDD lineage and persistence primitives
- 03-dataframe-operations β DataFrame transformations and actions
- 05-transformation-types β Lazy evaluation and transformation chains
- 07-partitioning-strategies β Partitioning impact on cache efficiency
- 09-udf-optimization β Caching UDF results for reuse
- 12-state-management β State caching in streaming applications
- Kafka Streams β Caching strategies in stream processing state stores
- Data Engineering Streaming β Persistence patterns in streaming pipelines