Spark UI & Monitoring
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Navigating the Spark UI
The Spark UI provides detailed execution information across Jobs, Stages, Tasks, Storage, and Environment tabs.
Key UI Sections
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder \
.appName("MonitoringExample") \
.config("spark.ui.enabled", "true") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "hdfs://logs/spark-events") \
.config("spark.ui.port", "4040") \
.getOrCreate()
# Access Spark UI at http://driver-host:4040
# Job tab: Shows all jobs and their status
# - Job ID, Description, Stages, Tasks, Duration
# - Click to see stage details
# Stage tab: Shows stage-level metrics
# - Input/Output sizes
# - Shuffle Read/Write
# - Task duration distribution
# - GC time
# Task tab: Shows per-task metrics
# - Task duration (identify stragglers)
# - Shuffle Read/Write per task
# - GC time per task
# - Executor uptime
# Storage tab: Shows cached DataFrames
# - Cached RDDs and DataFrames
# - Memory usage per partition
# - Storage level
# Environment tab: Shows configuration
# - All Spark configs
# - System properties
# - Classpath entries
βΉοΈ
Interview Insight: The Spark UI is your first debugging tool. Learn to quickly identify slow tasks, high shuffle, and memory issues from the UI.
Programmatic Monitoring
# Access Spark metrics programmatically
def get_spark_metrics(spark_context):
"""Extract key metrics from Spark"""
# Get executor information
executor_info = spark_context._jsc.sc().getExecutorMemoryStatus()
# Get accumulator values
accumulators = spark_context._jsc.sc().getAccumulators()
return {
"executors": executor_info,
"accumulators": accumulators
}
# Monitor job execution
df = spark.read.parquet("hdfs://data/large")
# Track execution metrics
start_time = spark.sparkContext._jsc.sc().startTime()
result = df.groupBy("key").agg(F.sum("value"))
result.count()
# Access job metrics via SparkListener
class MetricsListener:
def __init__(self):
self.job_durations = []
self.stage_durations = []
def onJobEnd(self, jobEnd):
self.job_durations.append({
"job_id": jobEnd.jobResult().jobId(),
"duration": jobEnd.time()
})
def onStageCompleted(self, stageCompleted):
self.stage_durations.append({
"stage_id": stageCompleted.stageInfo().stageId(),
"duration": stageCompleted.stageInfo().submissionTime()
})
# Register listener
listener = MetricsListener()
spark.sparkContext._jsc.sc().addSparkListener(listener)
Event Logging and History Server
# Enable event logging
spark = SparkSession.builder \
.appName("EventLogging") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "hdfs://logs/spark-events") \
.config("spark.eventLog.logStageExecutorMetrics", "true") \
.getOrCreate()
# Events are logged to HDFS/S3
# Access via Spark History Server (port 18080)
# Query event logs programmatically
def analyze_event_log(log_path):
"""Parse Spark event log for analysis"""
# Read JSON event log
events_df = spark.read.json(log_path)
# Analyze job durations
job_events = events_df.filter(F.col("Event") == "SparkListenerJobEnd")
job_events.select(
"Job ID",
"Job Result",
"Timestamp"
).show()
# Analyze task metrics
task_events = events_df.filter(F.col("Event") == "SparkListenerTaskEnd")
task_metrics = task_events.select(
"Task Info.Task ID",
"Task Metrics.Executor Run Time",
"Task Metrics.Shuffle Read Metrics.Total Bytes Read",
"Task Metrics.Shuffle Write Metrics.Total Bytes Written"
)
return task_metrics
β οΈ
Warning: Event logs can grow large. Configure spark.eventLog.maxLogFileSize and retention policies to manage storage.
Custom Metrics with Dropwizard
# Spark integrates with Dropwizard metrics
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DropwizardMetrics") \
.config("spark.metrics.conf", "metrics.properties") \
.config("spark.metrics.conf.*.sink.class", "org.apache.spark.metrics.sink.JmxSink") \
.getOrCreate()
# Custom metrics in your code
from com.codahale.metrics import Counter, Timer, Gauge
# Access metrics registry
metrics = spark.sparkContext._jsc.sc().getMetricsSystem()
# Create custom counters
def process_batch(batch_df, batch_id):
"""Process streaming batch with metrics"""
record_count = Counter("records.processed")
processing_time = Timer("processing.time")
record_count.inc(batch_df.count())
with processing_time.time():
result = batch_df.filter(F.col("status") == "active")
result.write.mode("append").parquet("hdfs://output/processed")
Performance Profiling
# Profile Spark applications
import cProfile
import pstats
from io import StringIO
def profile_spark_job():
"""Profile a Spark job"""
profiler = cProfile.Profile()
profiler.enable()
# Your Spark job
df = spark.read.parquet("hdfs://data/events")
result = df \
.filter(F.col("amount") > 100) \
.groupBy("category") \
.agg(F.sum("amount"))
result.count()
profiler.disable()
# Print profiling results
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20)
print(stream.getvalue())
# Memory profiling
def profile_memory():
"""Monitor memory usage during execution"""
import psutil
import os
process = psutil.Process(os.getpid())
def get_memory():
return process.memory_info().rss / 1024 / 1024
print(f"Before: {get_memory():.1f} MB")
df = spark.read.parquet("hdfs://data/large")
df.cache()
df.count()
print(f"After cache: {get_memory():.1f} MB")
result = df.groupBy("key").agg(F.sum("value"))
result.count()
print(f"After aggregation: {get_memory():.1f} MB")
Common Performance Issues and Solutions
# Issue 1: Skewed tasks
# Symptom: Some tasks take much longer than others
# Solution: Check task duration distribution in Stage tab
# Issue 2: High shuffle
# Symptom: Large shuffle read/write
# Solution: Optimize partitioning, use broadcast joins
# Issue 3: OOM errors
# Symptom: Executor OOM, GC overhead
# Solution: Increase memory, reduce data per partition
# Issue 4: Data skew
# Symptom: Uneven partition sizes
# Solution: Repartition, use salting techniques
# Issue 5: Small files
# Symptom: Many small partitions
# Solution: Coalesce, optimize file sizes
# Example: Diagnosing a slow job
def diagnose_slow_job(spark, df):
"""Common diagnostic checks"""
# Check partition count
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Check data distribution
df.withColumn("partition_id", F.spark_partition_id()) \
.groupBy("partition_id") \
.agg(F.count("*").alias("count")) \
.describe() \
.show()
# Check for skew
stats = df.withColumn("partition_id", F.spark_partition_id()) \
.groupBy("partition_id") \
.agg(F.count("*").alias("count"))
max_count = stats.agg(F.max("count")).collect()[0][0]
avg_count = stats.agg(F.avg("count")).collect()[0][0]
print(f"Max/Avg ratio: {max_count/avg_count:.2f}")
βΉοΈ
Key Takeaway: Master the Spark UI for quick diagnostics. Use programmatic monitoring for automated alerts. Event logs provide historical analysis. Custom metrics give application-specific insights.
Follow-Up Questions
- How would you set up monitoring for a production Spark cluster?
- Explain how to use the Spark History Server for post-mortem analysis.
- How do you monitor Spark on Kubernetes vs YARN?
- Describe strategies for alerting on Spark job failures.
- How would you track data lineage across Spark applications?