18. Garbage Collection Tuning in PySpark
DfGarbage Collection (GC)
Garbage collection is the automatic memory management process in the JVM that reclaims memory occupied by objects that are no longer referenced by the application. In PySpark, GC directly impacts application performance and stability.
DfStop-the-World (STW) Pause
A stop-the-world pause occurs when the JVM suspends all application threads to perform garbage collection. Long STW pauses can cause application timeouts, data skew, and performance degradation.
Here,
- =GC overhead percentage (target < 5%)
- =Time spent in garbage collection
- =Total application execution time
Heap Memory Distribution
Here,
- =Total heap memory allocated
- =Young generation space (eden + survivor)
- =Old generation space (tenured)
- =Off-heap memory (for Tachyon, Netty)
Spark uses both on-heap and off-heap memory. On-heap memory is managed by the JVM garbage collector, while off-heap memory is managed manually. Understanding this distinction is crucial for effective GC tuning.
Use the G1GC garbage collector for most Spark workloads. It provides a good balance between throughput and pause times. For low-latency requirements, consider ZGC or Shenandoah.
ThGC Impact on Performance
Theorem: GC overhead directly impacts Spark application performance. When GC overhead exceeds 5% of total execution time, applications experience significant performance degradation. Optimal GC configuration reduces overhead to below 2% while maintaining sufficient memory for application operations.
- Garbage collection reclaims unused JVM memory automatically
- Stop-the-world pauses cause application thread suspension
- GC overhead should be kept below 5% for optimal performance
- G1GC is recommended for most Spark workloads
- Off-heap memory reduces GC pressure but requires manual management
JVM Memory Model & GC Comparison
ποΈ Garbage Collection Architecture
π Detailed Explanation
Why GC Tuning Matters
- JVM's automatic memory management can become a bottleneck
- Untuned GC β long pause times, out-of-memory errors, degraded performance
- Proper tuning keeps GC overhead below 5% of total execution time
JVM Memory Structure
| Generation | Purpose | Contents |
|---|---|---|
| Young Generation | New object allocation | Eden + Survivor spaces |
| Old Generation | Long-lived objects | Promoted from young generation |
| Off-Heap Memory | Manual management | Outside JVM heap, reduces GC pressure |
Spark Executor Memory Layout
| Memory Type | Purpose |
|---|---|
| Execution Memory | Shuffles, joins, sorts |
| Storage Memory | Cached data, broadcast variables |
| User Memory | User data structures |
| Reserved Memory | Spark internals |
Understanding these divisions helps configure appropriate heap sizes.
Garbage Collector Comparison
| Collector | Best For | Trade-off |
|---|---|---|
| Serial GC | Single-core machines | Simple, low overhead |
| Parallel GC | Multi-core, throughput-focused | Better throughput |
| G1GC | Most Spark workloads | Balance throughput/latency |
| ZGC/Shenandoah | Latency-sensitive apps | Ultra-low latency |
Key Takeaway: Use G1GC for most Spark workloads. Consider ZGC for low-latency requirements.
Key GC Tuning Parameters
| Parameter | Description |
|---|---|
-Xmx / -Xms | Heap size (max/start) |
-XX:NewRatio | Young to old generation ratio |
-XX:+UseG1GC | Select G1 garbage collector |
-XX:MaxGCPauseMillis | Target max pause time |
-XX:G1HeapRegionSize | G1 region size |
GC Logging & Monitoring
- Enable via JVM flags:
-Xlog:gc*or-XX:+PrintGCDetails - Metrics available through Spark UI
- Reveals:
- GC frequency
- Pause times
- Memory pressure
Common GC Issues & Solutions
| Issue | Symptom | Solution |
|---|---|---|
| Frequent minor GC | High GC overhead | Increase young generation size |
| Long major GC pauses | Application stalls | Tune old generation, use G1GC |
| OutOfMemoryError | Crash | Increase heap, fix memory leaks |
Code-Level Optimizations
- Avoid unnecessary object creation
- Use primitive types instead of wrapper classes
- Reuse objects where possible
- Minimize broadcast variables for large datasets
Key Takeaway: Code-level changes can significantly reduce GC pressure.
Production Monitoring
Set alerts for:
- GC overhead > 5% of total execution time
- Pause times > 1 second
- Memory utilization trends
Advanced Techniques
- Off-heap memory for large datasets
- Memory-mapped files for external data
- Custom memory allocators for specific use cases
- Provide more control but require careful implementation
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Garbage Collection | Automatic memory reclamation | JVM memory management |
| Stop-the-World Pause | Thread suspension during GC | Performance impact |
| Young Generation | New object allocation space | Short-lived objects |
| Old Generation | Long-lived object storage | Persistent data |
| Off-Heap Memory | Non-JVM managed memory | Large datasets |
| GC Algorithm | Collection strategy selection | Performance optimization |
| Heap Size | Total memory allocation | Resource configuration |
| GC Overhead | Time spent in GC | Performance metric |
π» Code Examples
Basic GC Configuration
from pyspark.sql import SparkSession
# Configure Spark with GC settings
# Parameter: spark.executor.extraJavaOptions β JVM options for executors
spark = SparkSession.builder \
.appName("GCConfiguration") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:MaxGCPauseMillis=200 "
"-XX:InitiatingHeapOccupancyPercent=35 "
"-XX:G1HeapRegionSize=16m "
"-XX:G1ReservePercent=15 "
"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
.config("spark.driver.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:MaxGCPauseMillis=200 "
"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
.getOrCreate()
# Read data
df = spark.read \
.format("parquet") \
.load("hdfs:///data/input")
# Process data
result = df.groupBy("category").count()
# Write results
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
spark.stop()
Memory Configuration for Large Datasets
from pyspark.sql import SparkSession
# Configure memory for large dataset processing
# Parameter: spark.memory.fraction β fraction of heap for execution + storage
spark = SparkSession.builder \
.appName("LargeDatasetGC") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.3") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:MaxGCPauseMillis=300 "
"-XX:InitiatingHeapOccupancyPercent=40 "
"-XX:G1HeapRegionSize=32m "
"-XX:G1ReservePercent=20 "
"-XX:ConcGCThreads=4 "
"-XX:ParallelGCThreads=8 "
"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
.getOrCreate()
# Read large dataset
df = spark.read \
.format("parquet") \
.load("hdfs:///data/large_dataset")
# Process with caching
df.cache()
df.count() # Materialize cache
# Perform complex transformations
result = df \
.repartition(500) \
.groupBy("category", "subcategory") \
.agg({"value": "sum", "count": "count"}) \
.orderBy("category")
# Write results
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
spark.stop()
GC Monitoring and Analysis
from pyspark.sql import SparkSession
import time
import psutil
spark = SparkSession.builder \
.appName("GCMonitoring") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:+PrintGCDetails "
"-XX:+PrintGCDateStamps "
"-Xloggc:/tmp/spark-gc.log") \
.getOrCreate()
# Function to monitor memory usage
def monitor_memory():
"""Monitor JVM memory usage"""
# Get Spark context
sc = spark.sparkContext
# Get executor memory info
# Note: This is a simplified monitoring approach
# In production, use JMX or Spark metrics system
print("Memory monitoring started")
# Monitor during processing
df = spark.read \
.format("parquet") \
.load("hdfs:///data/input")
# Cache and count to trigger GC
df.cache()
count = df.count()
print(f"Row count: {count}")
# Perform operations that may trigger GC
result = df \
.repartition(100) \
.groupBy("category") \
.agg({"value": "sum"})
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
# Function to analyze GC logs
def analyze_gc_logs():
"""Analyze GC log files"""
# This would typically parse the GC log file
# For demonstration, we'll show the expected log format
print("GC Log Analysis:")
print("Look for patterns like:")
print("- GC pause times > 500ms")
print("- Full GC frequency > 1 per minute")
print("- Memory allocation failures")
print("- Promotion failures")
# Run monitoring
monitor_memory()
analyze_gc_logs()
spark.stop()
GC Optimization for Specific Workloads
from pyspark.sql import SparkSession
# Configuration for batch processing (throughput-optimized)
spark_batch = SparkSession.builder \
.appName("BatchProcessingGC") \
.config("spark.executor.memory", "32g") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseParallelGC "
"-XX:ParallelGCThreads=8 "
"-XX:+PrintGCDetails "
"-Xloggc:/tmp/spark-batch-gc.log") \
.getOrCreate()
# Configuration for streaming (latency-optimized)
spark_streaming = SparkSession.builder \
.appName("StreamingGC") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:MaxGCPauseMillis=100 "
"-XX:InitiatingHeapOccupancyPercent=30 "
"-XX:+PrintGCDetails "
"-Xloggc:/tmp/spark-streaming-gc.log") \
.getOrCreate()
# Configuration for iterative algorithms (memory-optimized)
spark_iterative = SparkSession.builder \
.appName("IterativeAlgorithmGC") \
.config("spark.executor.memory", "16g") \
.config("spark.memory.fraction", "0.85") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:G1HeapRegionSize=16m "
"-XX:InitiatingHeapOccupancyPercent=35 "
"-XX:+PrintGCDetails "
"-Xloggc:/tmp/spark-iterative-gc.log") \
.getOrCreate()
# Example: Batch processing
df_batch = spark_batch.read \
.format("parquet") \
.load("hdfs:///data/batch_input")
result_batch = df_batch \
.repartition(200) \
.groupBy("category") \
.agg({"value": "sum"})
result_batch.write \
.mode("overwrite") \
.parquet("hdfs:///data/batch_output")
spark_batch.stop()
# Example: Streaming
df_stream = spark_streaming.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic1") \
.load()
result_stream = df_stream \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
result_stream.awaitTermination()
spark_streaming.stop()
# Example: Iterative algorithm (e.g., PageRank)
df_iterative = spark_iterative.read \
.format("parquet") \
.load("hdfs:///data/graph_data")
# Simple iterative algorithm
for i in range(10):
df_iterative = df_iterative \
.repartition(100) \
.groupBy("node_id") \
.agg({"rank": "sum"}) \
.withColumn("rank", col("sum(rank)") * 0.85 + 0.15)
df_iterative.write \
.mode("overwrite") \
.parquet("hdfs:///data/pagerank_output")
spark_iterative.stop()
π Performance Metrics
| Metric | Optimal | Typical | Poor | Optimization |
|---|---|---|---|---|
| GC Overhead | < 2% | 2-5% | > 5% | Heap tuning, code optimization |
| Minor GC Pause | < 50ms | 50-200ms | > 200ms | Young gen sizing |
| Major GC Pause | < 200ms | 200ms-1s | > 1s | Algorithm selection |
| GC Frequency | < 1/min | 1-10/min | > 10/min | Memory allocation patterns |
| Memory Utilization | 60-80% | 40-60% | < 40% | Caching, memory config |
π Best Practices
- Monitor GC metrics β Track overhead, pause times, and frequency
- Use appropriate collector β G1GC for most workloads, ZGC for low latency
- Right-size heap β Avoid over-provisioning or under-provisioning
- Tune generation ratios β Adjust young/old generation balance
- Enable GC logging β Use logs for analysis and tuning
- Optimize code β Reduce object creation and unnecessary allocations
- Use off-heap memory β For large datasets to reduce GC pressure
- Test with production data β Validate GC settings with realistic workloads
- Set up alerts β Monitor for high GC overhead and long pauses
- Document configurations β Maintain records of GC tuning changes
π Related Topics
- 08-caching-persistence.mdx: Memory management and caching
- 10-serialization-kryo.mdx: Serialization impact on memory
- 17-cluster-management.mdx: Cluster-level resource management
See Also
- 09-udf-optimization.mdx: UDF memory usage patterns
- 11-structured-streaming.mdx: Streaming memory requirements
- Kafka Streams (kafka/03): JVM tuning in streaming systems
- Data Engineering Streaming (data-engineering/022): Production memory management