PySpark Advanced Interview Series
Module 15: Cluster Management β Optimizing Resource Utilization
Interview Question
"At Amazon, we run thousands of Spark jobs daily on EMR. Walk us through Spark's cluster architecture, how executor memory is managed, and how you would optimize resource allocation for a job that alternates between CPU-intensive and memory-intensive phases." β Amazon Senior Data Engineer Interview
"At Meta, we manage massive Spark clusters with dynamic workloads. Explain dynamic allocation, how it interacts with YARN/Kubernetes, and the performance implications of over-provisioning vs under-provisioning executors." β Meta Data Engineer Interview
Spark Cluster Architecture
Memory Management
Executor Memory Layout
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ClusterInterview") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.5") \
.getOrCreate()
# Memory calculation for 16GB executor:
# Total container: 16GB + 2GB overhead = 18GB
# Reserved: 300MB
# Usable: 16GB - 300MB = 15.7GB
# User memory: 15.7GB * 0.4 = 6.28GB
# Unified memory: 15.7GB * 0.6 = 9.42GB
# Storage: 9.42GB * 0.5 = 4.71GB
# Execution: 9.42GB * 0.5 = 4.71GB
Configuration Parameters
Core Parameters
# Executor configuration
spark.conf.set("spark.executor.instances", "100") # Number of executors
spark.conf.set("spark.executor.cores", "4") # Cores per executor
spark.conf.set("spark.executor.memory", "16g") # Heap memory
spark.conf.set("spark.executor.memoryOverhead", "2g") # Off-heap memory
# Driver configuration
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.cores", "4")
spark.conf.set("spark.driver.maxResultSize", "4g")
# Parallelism
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.default.parallelism", "200")
Dynamic Allocation
# Enable dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
spark.conf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1s")
# Requires shuffle service for external shuffle
spark.conf.set("spark.shuffle.service.enabled", "true")
Resource Sizing Formulas
Executor Count
Total Executors = Total Cluster Cores / Cores Per Executor
# Example: 1000 node cluster, 64 cores per node, 4 cores per executor
total_executors = (1000 * 64) / 4 = 16000 executors
# Reserve 1 for driver: 15999 available
Memory Per Executor
Container Memory = spark.executor.memory + spark.executor.memoryOverhead
# For 16GB executor:
# Container: 16GB + 2GB = 18GB
# YARN requests 18GB container
Partition Count
Recommended Partitions = Total Data Size / Target Partition Size (128-256MB)
# For 1TB dataset:
partitions = 1024 * 1024 / 256 # ~4096 partitions
spark.conf.set("spark.sql.shuffle.partitions", "4096")
π‘Amazon Pro Tip
At Amazon EMR, the optimal executor configuration is typically 4-5 cores with 16-32GB memory. More cores per executor increases contention; fewer cores wastes resources. The sweet spot is 4-5 cores for most Spark workloads.
Real-World Scenario: Amazon EMR Optimization
Problem Statement
Optimize a 100-node EMR cluster for a Spark job that processes 5TB of data with alternating CPU and memory phases.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("AmazonEMROptimization") \
.config("spark.executor.instances", "196") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "20g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.driver.memory", "8g") \
.config("spark.sql.shuffle.partitions", "2000") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024)) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "512m") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.3") \
.config("spark.shuffle.compress", "true") \
.config("spark.shuffle.spill.compress", "true") \
.config("spark.rdd.compress", "true") \
.getOrCreate()
# === PHASE 1: CPU-INTENSIVE (Filtering, Parsing) ===
# Increase parallelism for CPU work
raw_data = spark.read.parquet("s3a://amazon-data/raw-5tb/")
# Use many small partitions for CPU parallelism
filtered = raw_data \
.repartition(4000) \
.filter(col("status") == "active") \
.select("id", "value", "timestamp")
# Cache for reuse across phases
filtered.cache()
filtered.count() # Force materialization
# === PHASE 2: MEMORY-INTENSIVE (Joins, Aggregations) ===
# Reduce parallelism for memory-heavy operations
aggregated = filtered \
.repartition(2000, "category") \
.groupBy("category") \
.agg(
sum("value").alias("total"),
count("*").alias("count")
)
# === PHASE 3: Write Output ===
aggregated.write \
.mode("overwrite") \
.partitionBy("category") \
.parquet("s3a://amazon-data/output/")
# Clean up
filtered.unpersist()
spark.stop()
Dynamic Allocation Deep Dive
How It Works
# Dynamic allocation scales executors based on workload:
# 1. If pending tasks > 0, add executors
# 2. If executor idle > timeout, remove executor
# 3. Maintains min/max bounds
# For bursty workloads
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")
# For steady-state workloads (disable dynamic allocation)
spark.conf.set("spark.dynamicAllocation.enabled", "false")
spark.conf.set("spark.executor.instances", "50")
When to Disable Dynamic Allocation
# Disable for:
# 1. Latency-sensitive workloads (startup overhead)
# 2. Large joins (need consistent executor count)
# 3. Stateful streaming (state stored in executors)
spark.conf.set("spark.dynamicAllocation.enabled", "false")
spark.conf.set("spark.executor.instances", "100")
Memory Tuning
When Executors OOM
# Symptoms:
# - java.lang.OutOfMemoryError: Java heap space
# - java.lang.OutOfMemoryError: GC overhead limit exceeded
# - Container killed by YARN for exceeding memory limits
# Solutions:
# 1. Increase executor memory
spark.conf.set("spark.executor.memory", "32g")
# 2. Increase memory overhead
spark.conf.set("spark.executor.memoryOverhead", "8g")
# 3. Reduce partition size (more partitions, less data each)
spark.conf.set("spark.sql.shuffle.partitions", "4000")
# 4. Use disk spill for execution
spark.conf.set("spark.shuffle.spill.compress", "true")
# 5. Cache less data
spark.conf.set("spark.memory.storageFraction", "0.2")
GC Tuning
# Use G1GC for large heaps
spark.conf.set("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:MaxGCPauseMillis=200")
# Monitor GC through Spark UI
# Look for GC time in Executor tab
Cluster Managers
YARN
# YARN configuration
spark.conf.set("spark.yarn.historyServer.address", "history-server:18080")
spark.conf.set("spark.yarn.maxAppAttempts", "2")
spark.conf.set("spark.yarn.am.waitTime", "100s")
# Queue configuration
spark.conf.set("spark.yarn.queue", "production")
Kubernetes
# Kubernetes configuration
spark.conf.set("spark.kubernetes.namespace", "spark-jobs")
spark.conf.set("spark.kubernetes.executor.request.cores", "2")
spark.conf.set("spark.kubernetes.driver.request.cores", "2")
spark.conf.set("spark.kubernetes.executor.limit.cores", "4")
Edge Cases
1. Data Skew Causing Executor OOM
# One executor gets all skewed data β OOM
# Solution: Salt the skewed key
salted_df = df.withColumn(
"salt",
(rand() * 100).cast("int")
).withColumn(
"salted_key",
concat(col("skewed_key"), lit("_"), col("salt"))
)
# Repartition by salted key
salted_df.repartition(4000, "salted_key")
2. Too Many Tasks
# 10M partitions = 10M tasks β scheduling overhead
# Solution: Coalesce before write
df.coalesce(1000).write.parquet("s3a://bucket/output/")
3. Executor Lost
# Executors can be preempted (YARN) or killed (OOM)
# Spark handles this through:
# 1. Task re-execution (spark.task.maxFailures)
# 2. Stage re-execution (spark.stage.maxConsecutiveAttempts)
spark.conf.set("spark.task.maxFailures", "4")
spark.conf.set("spark.stage.maxConsecutiveAttempts", "3")
Best Practices
π‘Cluster Tuning Checklist
- Use 4-5 cores per executor for most workloads
- Set executor memory based on data size per partition
- Enable AQE for automatic optimization
- Use dynamic allocation for bursty workloads
- Monitor Spark UI for stragglers and OOM
- Use Kryo serialization for large datasets
- Tune shuffle partitions based on data size
- Enable compression for shuffle and cached data
- Reserve 20-30% memory overhead for YARN/K8s
- Test with production data volumes before deployment
Summary
Cluster management is critical for Spark performance at Amazon and Meta scale. Understanding memory layout, executor sizing, dynamic allocation, and resource tuning ensures optimal utilization and prevents failures. The key is balancing parallelism (more executors) with memory per executor (larger executors).