πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Cluster Management and Resource Allocation

PySpark AdvancedCluster Management⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 15: Cluster Management β€” Optimizing Resource Utilization

AmazonMetaDifficulty: Hard

Interview Question

"At Amazon, we run thousands of Spark jobs daily on EMR. Walk us through Spark's cluster architecture, how executor memory is managed, and how you would optimize resource allocation for a job that alternates between CPU-intensive and memory-intensive phases." β€” Amazon Senior Data Engineer Interview

"At Meta, we manage massive Spark clusters with dynamic workloads. Explain dynamic allocation, how it interacts with YARN/Kubernetes, and the performance implications of over-provisioning vs under-provisioning executors." β€” Meta Data Engineer Interview


Spark Cluster Architecture

Driver(SparkContext)Cluster ManagerYARN / K8s / Mesos / StandaloneExecutor 1(JVM Process)Task 1Task 2Task 3Task 4Executor 2(JVM Process)Task 1Task 2Task 3Task 4Executor N(JVM Process)Task 1Task 2Task 3Task 4

Memory Management

Executor Memory Layout

Total Container Memoryspark.executor.memory (Heap)memoryOverhead (Off-heap)Reserved Memory (300MB)User Memory (40%)Unified Memory (60%)Storage MemoryExecution Memorymax(384MB, 10%)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClusterInterview") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# Memory calculation for 16GB executor:
# Total container: 16GB + 2GB overhead = 18GB
# Reserved: 300MB
# Usable: 16GB - 300MB = 15.7GB
# User memory: 15.7GB * 0.4 = 6.28GB
# Unified memory: 15.7GB * 0.6 = 9.42GB
#   Storage: 9.42GB * 0.5 = 4.71GB
#   Execution: 9.42GB * 0.5 = 4.71GB

Configuration Parameters

Core Parameters

# Executor configuration
spark.conf.set("spark.executor.instances", "100")  # Number of executors
spark.conf.set("spark.executor.cores", "4")  # Cores per executor
spark.conf.set("spark.executor.memory", "16g")  # Heap memory
spark.conf.set("spark.executor.memoryOverhead", "2g")  # Off-heap memory

# Driver configuration
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.cores", "4")
spark.conf.set("spark.driver.maxResultSize", "4g")

# Parallelism
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.default.parallelism", "200")

Dynamic Allocation

# Enable dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
spark.conf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1s")

# Requires shuffle service for external shuffle
spark.conf.set("spark.shuffle.service.enabled", "true")

Resource Sizing Formulas

Executor Count

Architecture Diagram
Total Executors = Total Cluster Cores / Cores Per Executor
# Example: 1000 node cluster, 64 cores per node, 4 cores per executor
total_executors = (1000 * 64) / 4 = 16000 executors
# Reserve 1 for driver: 15999 available

Memory Per Executor

Architecture Diagram
Container Memory = spark.executor.memory + spark.executor.memoryOverhead
# For 16GB executor:
# Container: 16GB + 2GB = 18GB
# YARN requests 18GB container

Partition Count

Architecture Diagram
Recommended Partitions = Total Data Size / Target Partition Size (128-256MB)
# For 1TB dataset:
partitions = 1024 * 1024 / 256  # ~4096 partitions
spark.conf.set("spark.sql.shuffle.partitions", "4096")

πŸ’‘Amazon Pro Tip

At Amazon EMR, the optimal executor configuration is typically 4-5 cores with 16-32GB memory. More cores per executor increases contention; fewer cores wastes resources. The sweet spot is 4-5 cores for most Spark workloads.


Real-World Scenario: Amazon EMR Optimization

Problem Statement

Optimize a 100-node EMR cluster for a Spark job that processes 5TB of data with alternating CPU and memory phases.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("AmazonEMROptimization") \
    .config("spark.executor.instances", "196") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "20g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "2000") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024)) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.shuffle.spill.compress", "true") \
    .config("spark.rdd.compress", "true") \
    .getOrCreate()

# === PHASE 1: CPU-INTENSIVE (Filtering, Parsing) ===
# Increase parallelism for CPU work
raw_data = spark.read.parquet("s3a://amazon-data/raw-5tb/")

# Use many small partitions for CPU parallelism
filtered = raw_data \
    .repartition(4000) \
    .filter(col("status") == "active") \
    .select("id", "value", "timestamp")

# Cache for reuse across phases
filtered.cache()
filtered.count()  # Force materialization

# === PHASE 2: MEMORY-INTENSIVE (Joins, Aggregations) ===
# Reduce parallelism for memory-heavy operations
aggregated = filtered \
    .repartition(2000, "category") \
    .groupBy("category") \
    .agg(
        sum("value").alias("total"),
        count("*").alias("count")
    )

# === PHASE 3: Write Output ===
aggregated.write \
    .mode("overwrite") \
    .partitionBy("category") \
    .parquet("s3a://amazon-data/output/")

# Clean up
filtered.unpersist()

spark.stop()

Dynamic Allocation Deep Dive

How It Works

# Dynamic allocation scales executors based on workload:
# 1. If pending tasks > 0, add executors
# 2. If executor idle > timeout, remove executor
# 3. Maintains min/max bounds

# For bursty workloads
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")

# For steady-state workloads (disable dynamic allocation)
spark.conf.set("spark.dynamicAllocation.enabled", "false")
spark.conf.set("spark.executor.instances", "50")

When to Disable Dynamic Allocation

# Disable for:
# 1. Latency-sensitive workloads (startup overhead)
# 2. Large joins (need consistent executor count)
# 3. Stateful streaming (state stored in executors)

spark.conf.set("spark.dynamicAllocation.enabled", "false")
spark.conf.set("spark.executor.instances", "100")

Memory Tuning

When Executors OOM

# Symptoms:
# - java.lang.OutOfMemoryError: Java heap space
# - java.lang.OutOfMemoryError: GC overhead limit exceeded
# - Container killed by YARN for exceeding memory limits

# Solutions:
# 1. Increase executor memory
spark.conf.set("spark.executor.memory", "32g")

# 2. Increase memory overhead
spark.conf.set("spark.executor.memoryOverhead", "8g")

# 3. Reduce partition size (more partitions, less data each)
spark.conf.set("spark.sql.shuffle.partitions", "4000")

# 4. Use disk spill for execution
spark.conf.set("spark.shuffle.spill.compress", "true")

# 5. Cache less data
spark.conf.set("spark.memory.storageFraction", "0.2")

GC Tuning

# Use G1GC for large heaps
spark.conf.set("spark.executor.extraJavaOptions", 
               "-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:MaxGCPauseMillis=200")

# Monitor GC through Spark UI
# Look for GC time in Executor tab

Cluster Managers

YARN

# YARN configuration
spark.conf.set("spark.yarn.historyServer.address", "history-server:18080")
spark.conf.set("spark.yarn.maxAppAttempts", "2")
spark.conf.set("spark.yarn.am.waitTime", "100s")

# Queue configuration
spark.conf.set("spark.yarn.queue", "production")

Kubernetes

# Kubernetes configuration
spark.conf.set("spark.kubernetes.namespace", "spark-jobs")
spark.conf.set("spark.kubernetes.executor.request.cores", "2")
spark.conf.set("spark.kubernetes.driver.request.cores", "2")
spark.conf.set("spark.kubernetes.executor.limit.cores", "4")

Edge Cases

1. Data Skew Causing Executor OOM

# One executor gets all skewed data β†’ OOM
# Solution: Salt the skewed key
salted_df = df.withColumn(
    "salt",
    (rand() * 100).cast("int")
).withColumn(
    "salted_key",
    concat(col("skewed_key"), lit("_"), col("salt"))
)

# Repartition by salted key
salted_df.repartition(4000, "salted_key")

2. Too Many Tasks

# 10M partitions = 10M tasks β†’ scheduling overhead
# Solution: Coalesce before write
df.coalesce(1000).write.parquet("s3a://bucket/output/")

3. Executor Lost

# Executors can be preempted (YARN) or killed (OOM)
# Spark handles this through:
# 1. Task re-execution (spark.task.maxFailures)
# 2. Stage re-execution (spark.stage.maxConsecutiveAttempts)

spark.conf.set("spark.task.maxFailures", "4")
spark.conf.set("spark.stage.maxConsecutiveAttempts", "3")

Best Practices

πŸ’‘Cluster Tuning Checklist

  • Use 4-5 cores per executor for most workloads
  • Set executor memory based on data size per partition
  • Enable AQE for automatic optimization
  • Use dynamic allocation for bursty workloads
  • Monitor Spark UI for stragglers and OOM
  • Use Kryo serialization for large datasets
  • Tune shuffle partitions based on data size
  • Enable compression for shuffle and cached data
  • Reserve 20-30% memory overhead for YARN/K8s
  • Test with production data volumes before deployment

Summary

Cluster management is critical for Spark performance at Amazon and Meta scale. Understanding memory layout, executor sizing, dynamic allocation, and resource tuning ensures optimal utilization and prevents failures. The key is balancing parallelism (more executors) with memory per executor (larger executors).

Advertisement