πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

PySpark Caching and Persistence: Storage Levels, Memory Management

🟒 Free Lesson

Advertisement

πŸ’Ύ PySpark Caching and Persistence

DfCaching

Caching stores the results of an RDD or DataFrame in memory (or disk) to avoid recomputing them from lineage when reused. cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK).

DfStorage Level

A storage level defines where and how data is persisted: MEMORY_ONLY (deserialized in heap), MEMORY_AND_DISK (spill to disk), MEMORY_ONLY_SER (serialized), DISK_ONLY, or OFF_HEAP (outside JVM).

DfCheckpointing

Checkpointing truncates the RDD lineage by saving data to reliable storage (HDFS). It is used for very deep DAGs to prevent stack overflow and reduce memory usage, but loses the ability to recompute from source.

Cache Hit Ratio
Rcache=Ncache_hitsNtotal_readsR_{cache} = \frac{N_{cache\_hits}}{N_{total\_reads}}

Here,

  • RcacheR_{cache}=Fraction of reads served from cache (target > 0.8)
  • Ncache_hitsN_{cache\_hits}=Number of reads that found data in cache
  • Ntotal_readsN_{total\_reads}=Total number of data reads

Storage Memory Requirement

Mcache=NpartitionsΓ—SpartitionΓ—FcompressionM_{cache} = N_{partitions} \times S_{partition} \times F_{compression}

Here,

  • McacheM_{cache}=Total memory needed to cache the dataset
  • NpartitionsN_{partitions}=Number of partitions
  • SpartitionS_{partition}=Size per partition (serialized or deserialized)
  • FcompressionF_{compression}=Compression ratio (1.0 if uncompressed, 0.3–0.7 typical)

Cache vs Recompute Break-Even

Nreuses>CcacheCrecomputeN_{reuses} > \frac{C_{cache}}{C_{recompute}}

Here,

  • NreusesN_{reuses}=Number of times cached data must be accessed to justify caching
  • CcacheC_{cache}=Cost of caching (memory + serialization time)
  • CrecomputeC_{recompute}=Cost of recomputing from lineage per access

Use MEMORY_AND_DISK for datasets that may not fully fit in memory β€” Spark will spill to disk rather than recompute. Use MEMORY_ONLY only when you are certain the data fits and recomputation cost is low.

Always call unpersist() on DataFrames that are no longer reused to free cache memory for other datasets. Cached data is pinned in memory and cannot be evicted by the unified memory manager until explicitly unpersisted.

ThCache vs Recompute Trade-off

Theorem: Caching is beneficial when N_{reuses} Γ— Cost_{recompute} > Cost_{cache} where N_{reuses} is the number of times the data is accessed after caching, Cost_{recompute} is the time to recompute from lineage, and Cost_{cache} is the memory footprint cost.

  • cache() = persist(MEMORY_AND_DISK); use persist() for custom storage levels
  • Cache only datasets reused 2+ times; always unpersist() when done
  • Target cache hit ratio > 80%; monitor via Spark UI Storage tab
  • Checkpointing truncates lineage for very deep DAGs
  • MEMORY_ONLY_SER saves 50% memory but adds serialization cost per access
  • MEMORY_AND_DISK ensures all data is persisted (spills to disk if needed)

Storage Levels Comparison

MEMORY_ONLYDeserialized, HeapMEMORY_AND_DISKSpill to disk if neededMEMORY_ONLY_SERSerialized, 50% less RAMDISK_ONLYNo memory usageOFF_HEAPOutside JVM, no GCCache vs Recompute DecisionReuses {"<"} 2Recompute from lineageReuses 2-5MEMORY_AND_DISKReuses {">"} 5MEMORY_ONLYDeep DAGCheckpoint (HDFS)N_reuses {"Γ—"} Cost_recompute {" >"} Cost_cache β†’ Cache is beneficialTarget cache hit ratio {" >"} 80% | Always unpersist() when done

πŸ—οΈ Architecture Diagram

πŸ“š Detailed Explanation

1. Why Cache?

Caching is essential when a DataFrame or RDD is reused multiple times. Without caching, Spark recomputes the entire lineage from the source for each action.

Benefits:

  • Avoids recomputation of expensive transformations
  • Reduces I/O (read from cache instead of source)
  • Improves iterative algorithm performance
  • Enables interactive data exploration

Rule of Thumb: Cache only datasets reused 2+ times; always unpersist() when done.


2. Cache vs Persist

cache() is shorthand for persist(StorageLevel.MEMORY_ONLY):

  • Stores data in JVM heap as deserialized objects
  • Fastest access but highest memory overhead
  • Good for small-to-medium datasets that fit in memory

persist() allows choosing a storage level:

  • MEMORY_ONLY: Same as cache()
  • MEMORY_ONLY_SER: Serialized, saves space
  • MEMORY_AND_DISK: Spill to disk if memory full
  • DISK_ONLY: Only on disk
  • OFF_HEAP: Outside JVM

3. Storage Levels Deep Dive

Storage LevelLocationFormatSpeedSpaceBest For
MEMORY_ONLYHeapDeserializedFastestHighest (3-5x)Small datasets, repeated access
MEMORY_ONLY_SERHeapSerializedFastMedium (1.5-2x)Memory-constrained environments
MEMORY_AND_DISKHeap + DiskMixedFastMediumDatasets larger than memory
DISK_ONLYDiskSerializedSlowerLowestVery large datasets
OFF_HEAPAlluxioSerializedFastLowestShared caching across applications

4. Unified Memory Management

Spark 1.6+ introduced unified memory management where storage and execution memory share a common pool.

Key Behaviors:

  • Storage can borrow from execution (when execution is idle)
  • Execution can borrow from storage (forces eviction)
  • Execution has higher priority for memory
  • Soft boundary allows flexible memory sharing

5. Cache Eviction Policies

When memory pressure occurs, Spark evicts cached data using LRU (Least Recently Used) policy.

Eviction Process:

  1. Monitor memory usage
  2. When storage exceeds 50% of unified memory
  3. Identify least recently used partitions
  4. Evict or spill to disk based on storage level
  5. Update block manager metadata

Factors Affecting Eviction:

  • Access frequency
  • Partition size
  • Storage level (memory vs disk)
  • Memory pressure from execution

6. Cache Metrics and Monitoring

Key Metrics to Monitor:

  • storageLevel: Current storage level
  • cachedPartitions: Number of cached partitions
  • cacheSize: Total cached data size
  • cacheMemory: Memory used for caching
  • cacheDisk: Disk used for spilling
  • cacheHitRatio: Percentage of cache hits

Spark UI Locations:

  • Storage tab: Cached RDDs and DataFrames
  • Executors tab: Memory usage per executor
  • SQL tab: Cache usage in query plans

7. Common Cache Pitfalls

PitfallImpactSolution
Caching too muchFills up memory, causes evictionCache only what's reused
Not caching when neededRecomputes expensive transformationsCache datasets used 2+ times
Wrong storage levelMEMORY_ONLY for too-large datasets β†’ constant recomputationUse MEMORY_AND_DISK for large data
Forgetting to unpersistMemory leaks over timeAlways call unpersist() when done

8. Cache Optimization Strategies

Strategy 1: Cache selectively

# Cache only what's reused
df.cache()  # Cache this one
result = df.filter(...).groupBy(...).agg(...)  # Don't cache intermediate

Strategy 2: Choose right storage level

# Small dataset, repeated access
df.cache()  # MEMORY_ONLY

# Large dataset, fits in memory
df.persist(StorageLevel.MEMORY_ONLY_SER)

# Dataset larger than memory
df.persist(StorageLevel.MEMORY_AND_DISK)

Strategy 3: Monitor and adjust

# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")

# Unpersist when done
df.unpersist()

Best Practice: Target cache hit ratio > 80%; monitor via Spark UI Storage tab.

πŸ”‘ Key Concepts Table

Storage LevelLocationFormatSpeedSpaceGCUse Case
MEMORY_ONLYHeapDeserializedβ˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜†β˜†β˜†β˜…β˜…β˜†β˜†β˜†Small, repeated access
MEMORY_ONLY_SERHeapSerializedβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜…β˜†Memory-constrained
MEMORY_AND_DISKHeap + DiskMixedβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜†β˜†β˜…β˜…β˜…β˜†β˜†Large datasets
DISK_ONLYDiskSerializedβ˜…β˜…β˜†β˜†β˜†β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…Very large datasets
OFF_HEAPAlluxioSerializedβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…Shared caching
MEMORY_ONLY_2HeapDeserializedβ˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜†β˜†β˜†β˜…β˜…β˜†β˜†β˜†2 replicas
MEMORY_AND_DISK_2Heap + DiskMixedβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜†β˜†β˜…β˜…β˜…β˜†β˜†Fault tolerance

πŸ’» Code Examples

Example 1: Basic Caching

from pyspark.sql import SparkSession
from pyspark import StorageLevel

spark = SparkSession.builder.appName("CachingPersistence").getOrCreate()

# Create expensive DataFrame
df = spark.range(1000000).withColumn(
    "key", col("id") % 1000
).withColumn(
    "value", col("id") * 1.0
)

# Cache the DataFrame (MEMORY_ONLY)
# cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK)
df.cache()

# First action triggers computation and caching
count = df.count()
print(f"Count: {count}")

# Subsequent actions use cached version
sum_val = df.groupBy("key").sum("value").count()
print(f"GroupBy count: {sum_val}")

# Check cache status
print(f"Is cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")

# Unpersist when done
df.unpersist()

Example 2: Different Storage Levels

from pyspark import StorageLevel

# MEMORY_ONLY_SER β€” Serialized, saves space
# Parameter: StorageLevel.MEMORY_ONLY_SER β€” serialized in memory
df_ser = df.persist(StorageLevel.MEMORY_ONLY_SER)
print(f"MEMORY_ONLY_SER: {df_ser.storageLevel}")

# MEMORY_AND_DISK β€” Spill to disk if needed
# Parameter: StorageLevel.MEMORY_AND_DISK β€” spill to disk
df_disk = df.persist(StorageLevel.MEMORY_AND_DISK)
print(f"MEMORY_AND_DISK: {df_disk.storageLevel}")

# DISK_ONLY β€” Only on disk
# Parameter: StorageLevel.DISK_ONLY β€” all data on disk
df_only_disk = df.persist(StorageLevel.DISK_ONLY)
print(f"DISK_ONLY: {df_only_disk.storageLevel}")

# Test performance
import time

start = time.time()
for _ in range(5):
    df.count()
print(f"MEMORY_ONLY time: {time.time() - start:.2f}s")

start = time.time()
for _ in range(5):
    df_ser.count()
print(f"MEMORY_ONLY_SER time: {time.time() - start:.2f}s")

Example 3: Cache Monitoring

# Enable cache metrics
spark.sparkContext.setLogLevel("INFO")

# Create and cache DataFrame
df = spark.range(1000000).cache()

# Trigger caching
df.count()

# Check storage info
storage_info = spark._jsc.sc().getRDDStorageInfo()
for rdd_info in storage_info:
    print(f"RDD ID: {rdd_info.name()}")
    print(f"Storage Level: {rdd_info.storageLevel()}")
    print(f"Number of Partitions: {rdd_info.numCachedPartitions()}")
    print(f"Memory Size: {rdd_info.memSize() / 1024 / 1024:.2f} MB")
    print(f"Disk Size: {rdd_info.diskSize() / 1024 / 1024:.2f} MB")

Example 4: Cache in Iterative Algorithms

# Example: PageRank-like iterative computation
def iterative_computation(df, num_iterations=10):
    # Cache input data
    # cache() avoids recomputing df from source each iteration
    df.cache()
    df.count()  # Trigger caching
    
    for i in range(num_iterations):
        # Use cached data in each iteration
        df = df.withColumn(
            "value", col("value") * 0.85 + 0.15
        )
        
        # Trigger computation
        df.count()
        
        print(f"Iteration {i + 1} complete")
    
    # Unpersist when done
    df.unpersist()
    return df

# Run iterative computation
result = iterative_computation(df)

πŸ“Š Performance Metrics

ScenarioNo CacheMEMORY_ONLYMEMORY_ONLY_SERMEMORY_AND_DISK
1GB, 5 actions25.0s5.0s7.5s6.0s
10GB, 5 actions250.0s50.0s75.0s60.0s
1GB, 10 actions50.0s5.5s8.0s6.5s
10GB, 10 actions500.0s55.0s80.0s65.0s
Memory Usage03.5GB2.0GB3.5GB + Disk
GC Time0500ms200ms500ms
Disk I/O0002GB

βœ… Best Practices

1. Cache Selectively

# Cache DataFrame reused multiple times
df = expensive_computation()
df.cache()

# Use cached DataFrame
result1 = df.filter(...).collect()
result2 = df.groupBy(...).agg(...).collect()

# Unpersist when done
df.unpersist()

2. Choose Appropriate Storage Level

# Small dataset, fits in memory
df.cache()  # MEMORY_ONLY

# Large dataset, serialized saves space
df.persist(StorageLevel.MEMORY_ONLY_SER)

# Very large dataset, spill to disk
df.persist(StorageLevel.MEMORY_AND_DISK)

3. Monitor Cache Usage

# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")

# Check storage info in Spark UI
# Storage tab shows cached RDDs/DataFrames

4. Unpersist When Done

# Always unpersist when DataFrame no longer needed
df.unpersist()

# Or use context manager pattern
with df:
    result = df.filter(...).collect()
# df is automatically unpersisted

5. Avoid Cache Bloat

# Don't cache everything
df1.cache()  # Cache this
df2 = expensive_computation(df1)  # Don't cache intermediate
df3 = another_computation(df2)  # Don't cache this either

# Cache only what's reused
result = df3.filter(...).collect()

6. Use Checkpointing for Long Lineages

# For very long lineages, checkpoint to truncate
sc.setCheckpointDir("hdfs:///checkpoint")
df.checkpoint()

# Checkpoint truncates lineage, reduces memory

See Also

⭐

Premium Content

PySpark Caching and Persistence: Storage Levels, Memory Management

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement