20. Monitoring & Metrics in PySpark
DfMonitoring
Monitoring is the continuous process of collecting, analyzing, and acting on application metrics to ensure system health, performance, and reliability. In PySpark, monitoring provides visibility into job execution, resource usage, and application behavior.
DfObservability
Observability is the ability to understand the internal state of a system by examining its external outputs. It encompasses metrics, logs, and traces to provide comprehensive insights into application behavior.
Here,
- =Monitoring coverage percentage (target > 90%)
- =Number of components with monitoring
- =Total number of application components
Alert Response Time
Here,
- =Total response time to issues
- =Time to detect the issue
- =Time to notify stakeholders
- =Time to take corrective action
Spark provides multiple monitoring interfaces: Spark UI (web interface), Spark History Server (for completed applications), metrics system (JMX, Prometheus, Graphite), and event logging. Each provides different levels of detail and use cases.
Enable event logging in production to capture detailed execution data. Use the Spark History Server to analyze completed applications and identify performance bottlenecks.
ThMonitoring Triangle
Theorem: Effective monitoring requires three pillars: (1) Metrics β quantitative measurements of system state; (2) Logs β detailed event records for debugging; (3) Traces β end-to-end execution paths for distributed debugging. Together, these provide comprehensive observability into Spark applications.
- Monitoring provides visibility into application health and performance
- Observability encompasses metrics, logs, and traces
- Spark UI, History Server, and metrics system are primary monitoring tools
- Effective monitoring requires coverage of all critical components
- Alert response time depends on detection, notification, and action phases
Observability Triangle
ποΈ Monitoring Architecture
π Detailed Explanation
Why Monitoring & Observability?
- Maintains reliable, performant PySpark applications in production
- Provides visibility into application behavior
- Enables proactive issue detection
- Supports data-driven optimization decisions
Spark Monitoring Interfaces
| Interface | Purpose | Best For |
|---|---|---|
| Spark UI | Real-time application monitoring | Development, debugging |
| History Server | Completed application analysis | Post-mortem, performance tuning |
| Metrics System | Flexible metrics collection | Production monitoring |
| Event Logging | Detailed execution data capture | Analysis, auditing |
Metrics Categories
| Category | What It Tracks |
|---|---|
| Application | Job, stage, task completion rates |
| Resource | CPU, memory, disk I/O, network I/O |
| JVM | GC pause time, heap memory, threads |
| Shuffle | Read, write, spill size |
| Storage | Cache hit rate, cached data, memory fraction |
| Custom | User-defined counters and gauges |
Metrics Export Formats
| Format | Use Case |
|---|---|
| JMX | Java management |
| Prometheus | Modern monitoring stacks |
| Graphite | Time-series data |
| Custom | Specialized requirements |
Event Logging
- Records job submissions, stage transitions, task metrics, configuration details
- Essential for:
- Performance analysis
- Capacity planning
- Debugging complex issues
Alerting
- Notifies stakeholders of anomalies or threshold violations
- Requires:
- Appropriate thresholds
- Minimize false positives
- Timely notification
- Common alerts:
- Job failures
- High resource utilization
- Long GC pauses
- Data quality issues
Dashboards
- Visual representations of monitoring data
- Enable quick identification of trends, anomalies, performance issues
- Tools: Grafana with Prometheus
- Organize by audience:
- Operations
- Development
- Management
Distributed Tracing
- End-to-end visibility across services and components
- Helps:
- Understand data flow through stages
- Identify bottlenecks in shuffle operations
- Debug performance in complex pipelines
Log Aggregation
| Tool | Capability |
|---|---|
| ELK Stack | Elasticsearch, Logstash, Kibana |
| Grafana Loki | Log search and analysis |
- Structured logging (JSON) improves searchability
- Centralizes logs from all cluster nodes
Capacity Planning
- Uses monitoring data to forecast resource needs
- Analyze historical trends:
- Resource usage
- Data volumes
- Processing times
- Proactively scale infrastructure
Security Monitoring
Track:
- Authentication attempts
- Authorization failures
- Data access patterns
- Configuration changes
Essential for compliance, incident response, security auditing.
Advanced Techniques
- Anomaly detection β statistical methods for unusual patterns
- Predictive monitoring β forecast issues before they occur
- Correlation analysis β identify relationships between metrics
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Spark UI | Real-time application monitoring | Development, debugging |
| History Server | Completed application analysis | Post-mortem, performance tuning |
| Metrics System | Flexible metrics collection | Production monitoring |
| Event Logging | Detailed execution data capture | Analysis, auditing |
| Alerting | Automated issue notification | Operations, incident response |
| Dashboards | Visual data representation | Monitoring, reporting |
| Distributed Tracing | End-to-end execution visibility | Debugging, optimization |
| Log Aggregation | Centralized log management | Debugging, compliance |
π» Code Examples
Basic Monitoring Configuration
from pyspark.sql import SparkSession
# Configure Spark with monitoring
# Parameter: spark.eventLog.enabled β enables event logging
# Parameter: spark.eventLog.dir β directory for event logs
spark = SparkSession.builder \
.appName("MonitoringExample") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "hdfs:///spark-history") \
.config("spark.yarn.historyServer.address", "history-server-host:18080") \
.config("spark.ui.port", "4040") \
.config("spark.ui.enabled", "true") \
.config("spark.ui.killEnabled", "true") \
.getOrCreate()
# Read data
df = spark.read \
.format("parquet") \
.load("hdfs:///data/input")
# Process data with monitoring
result = df \
.repartition(100) \
.groupBy("category") \
.agg({"value": "sum", "count": "count"})
# Write results
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
# Access Spark UI
print(f"Spark UI available at: http://localhost:4040")
spark.stop()
Prometheus Metrics Export
from pyspark.sql import SparkSession
# Configure Spark with Prometheus metrics
# Parameter: spark.metrics.conf β metrics configuration file
# Parameter: spark.metrics.staticConf.enabled β enable static metrics
spark = SparkSession.builder \
.appName("PrometheusMonitoring") \
.config("spark.metrics.conf", "metrics.properties") \
.config("spark.metrics.staticConf.enabled", "true") \
.config("spark.metrics.namespace", "spark_application") \
.getOrCreate()
# Create metrics configuration file
metrics_config = """
# Prometheus metrics configuration
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
*.sink.prometheus.port=9090
# Application metrics
appapplicationmaster.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
appapplicationmaster.sink.prometheus.path=/metrics/prometheus
# Executor metrics
executor.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
executor.sink.prometheus.path=/metrics/prometheus
# JVM metrics
jvm.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
jvm.sink.prometheus.path=/metrics/prometheus
"""
# Write metrics configuration
with open("metrics.properties", "w") as f:
f.write(metrics_config)
# Read data
df = spark.read \
.format("parquet") \
.load("hdfs:///data/input")
# Process data
result = df.groupBy("category").count()
# Write results
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
# Prometheus metrics will be available at:
# http://executor-host:9090/metrics/prometheus
spark.stop()
Custom Metrics with JMX
from pyspark.sql import SparkSession
import time
import threading
# Configure Spark with JMX metrics
spark = SparkSession.builder \
.appName("JMXMonitoring") \
.config("spark.metrics.conf", "metrics.properties") \
.getOrCreate()
# Create JMX metrics configuration
jmx_config = """
# JMX metrics configuration
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
*.sink.jmx.port=9999
# Application metrics
appapplicationmaster.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Executor metrics
executor.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Shuffle metrics
shuffle.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
"""
# Write JMX configuration
with open("metrics.properties", "w") as f:
f.write(jmx_config)
# Function to collect custom metrics
def collect_custom_metrics():
"""Collect custom application metrics"""
sc = spark.sparkContext
# Get executor information
executor_info = sc._jsc.sc().statusTracker().getExecutorInfos()
# Log executor count
print(f"Number of executors: {len(executor_info)}")
# Monitor processing
df = spark.read \
.format("parquet") \
.load("hdfs:///data/input")
# Cache and count
df.cache()
count = df.count()
print(f"Row count: {count}")
# Process data
result = df \
.repartition(100) \
.groupBy("category") \
.agg({"value": "sum"})
result.write \
.mode("overwrite") \
.parquet("hdfs:///data/output")
# Run metrics collection
collect_custom_metrics()
spark.stop()
Comprehensive Monitoring Setup
from pyspark.sql import SparkSession
import json
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SparkMonitor:
def __init__(self, app_name, config):
"""Initialize Spark monitoring"""
self.app_name = app_name
self.config = config
self.spark = self.create_spark_session()
self.metrics = {}
def create_spark_session(self):
"""Create Spark session with monitoring configuration"""
return SparkSession.builder \
.appName(self.app_name) \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", self.config["event_log_dir"]) \
.config("spark.yarn.historyServer.address", self.config["history_server"]) \
.config("spark.metrics.conf", "metrics.properties") \
.config("spark.ui.port", "4040") \
.getOrCreate()
def collect_metrics(self):
"""Collect application metrics"""
sc = self.spark.sparkContext
# Get executor information
executor_info = sc._jsc.sc().statusTracker().getExecutorInfos()
self.metrics = {
"timestamp": datetime.now().isoformat(),
"app_name": self.app_name,
"num_executors": len(executor_info),
"application_id": sc.applicationId,
"spark_version": sc.version
}
return self.metrics
def monitor_processing(self, input_path, output_path):
"""Monitor data processing"""
start_time = time.time()
# Read data
df = self.spark.read \
.format("parquet") \
.load(input_path)
# Cache and count
df.cache()
count = df.count()
# Process data
result = df \
.repartition(100) \
.groupBy("category") \
.agg({"value": "sum", "count": "count"})
# Write results
result.write \
.mode("overwrite") \
.parquet(output_path)
end_time = time.time()
# Update metrics
self.metrics.update({
"input_rows": count,
"processing_time": end_time - start_time,
"output_path": output_path,
"status": "completed"
})
return self.metrics
def generate_report(self):
"""Generate monitoring report"""
report = {
"summary": self.metrics,
"recommendations": self.generate_recommendations()
}
# Save report
with open(f"monitoring_report_{self.app_name}.json", "w") as f:
json.dump(report, f, indent=2)
return report
def generate_recommendations(self):
"""Generate optimization recommendations"""
recommendations = []
# Analyze metrics and generate recommendations
if self.metrics.get("processing_time", 0) > 3600:
recommendations.append("Consider increasing parallelism or partitioning data differently")
if self.metrics.get("num_executors", 0) < 5:
recommendations.append("Consider adding more executors for better parallelism")
return recommendations
# Example usage
if __name__ == "__main__":
# Configuration
config = {
"event_log_dir": "hdfs:///spark-history",
"history_server": "history-server-host:18080"
}
# Initialize monitor
monitor = SparkMonitor("MonitoredApplication", config)
# Collect metrics
metrics = monitor.collect_metrics()
print(f"Initial metrics: {metrics}")
# Monitor processing
processing_metrics = monitor.monitor_processing(
"hdfs:///data/input",
"hdfs:///data/output"
)
print(f"Processing metrics: {processing_metrics}")
# Generate report
report = monitor.generate_report()
print(f"Monitoring report saved")
monitor.spark.stop()
π Performance Metrics
| Metric | Optimal | Typical | Poor | Optimization |
|---|---|---|---|---|
| Monitoring Coverage | > 95% | 80-95% | < 80% | Comprehensive instrumentation |
| Alert Response Time | < 5min | 5-30min | > 30min | Automated alerting |
| Dashboard Refresh | < 1min | 1-5min | > 5min | Real-time streaming |
| Log Retention | 30-90 days | 7-30 days | < 7 days | Log management |
| Metrics Granularity | 1min | 5-15min | > 15min | High-frequency collection |
π Best Practices
- Enable event logging β Capture detailed execution data for analysis
- Use appropriate metrics β Focus on actionable metrics that drive decisions
- Implement alerting β Automated notifications for critical issues
- Create dashboards β Visual representations for different audiences
- Aggregate logs β Centralize logs for search and analysis
- Monitor resource usage β Track CPU, memory, disk, and network
- Set meaningful thresholds β Balance between sensitivity and noise
- Regularly review monitoring β Update configurations based on feedback
- Integrate with incident management β Connect alerts to response processes
- Document monitoring setup β Maintain clear documentation of configurations
π Related Topics
- 17-cluster-management.mdx: Cluster-level monitoring
- 18-gc-tuning.mdx: JVM monitoring and GC metrics
- 19-spark-submit.mdx: Application deployment monitoring
See Also
- 06-joins-optimization.mdx: Join performance monitoring
- 11-structured-streaming.mdx: Streaming application monitoring
- Kafka Streams (kafka/03): Stream processing monitoring
- Data Engineering Streaming (data-engineering/022): Production monitoring patterns