πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

20. Monitoring & Metrics in PySpark

🟒 Free Lesson

Advertisement

20. Monitoring & Metrics in PySpark

DfMonitoring

Monitoring is the continuous process of collecting, analyzing, and acting on application metrics to ensure system health, performance, and reliability. In PySpark, monitoring provides visibility into job execution, resource usage, and application behavior.

DfObservability

Observability is the ability to understand the internal state of a system by examining its external outputs. It encompasses metrics, logs, and traces to provide comprehensive insights into application behavior.

Monitoring Coverage Formula
Cmonitor=Nmonitored_componentsNtotal_componentsΓ—100%C_{monitor} = \frac{N_{monitored\_components}}{N_{total\_components}} \times 100\%

Here,

  • CmonitorC_{monitor}=Monitoring coverage percentage (target > 90%)
  • Nmonitored_componentsN_{monitored\_components}=Number of components with monitoring
  • Ntotal_componentsN_{total\_components}=Total number of application components

Alert Response Time

Tresponse=Tdetection+Tnotification+TactionT_{response} = T_{detection} + T_{notification} + T_{action}

Here,

  • TresponseT_{response}=Total response time to issues
  • TdetectionT_{detection}=Time to detect the issue
  • TnotificationT_{notification}=Time to notify stakeholders
  • TactionT_{action}=Time to take corrective action

Spark provides multiple monitoring interfaces: Spark UI (web interface), Spark History Server (for completed applications), metrics system (JMX, Prometheus, Graphite), and event logging. Each provides different levels of detail and use cases.

Enable event logging in production to capture detailed execution data. Use the Spark History Server to analyze completed applications and identify performance bottlenecks.

ThMonitoring Triangle

Theorem: Effective monitoring requires three pillars: (1) Metrics β€” quantitative measurements of system state; (2) Logs β€” detailed event records for debugging; (3) Traces β€” end-to-end execution paths for distributed debugging. Together, these provide comprehensive observability into Spark applications.

  • Monitoring provides visibility into application health and performance
  • Observability encompasses metrics, logs, and traces
  • Spark UI, History Server, and metrics system are primary monitoring tools
  • Effective monitoring requires coverage of all critical components
  • Alert response time depends on detection, notification, and action phases

Observability Triangle

MetricsLogsTracesObservabilityMetrics:CPU, MemoryJMX, PrometheusTraces:End-to-endDistributed debugSpark UI + History ServerLogs: Event logging, driver/stdoutTraces: Spark listener, OpenTelemetry

πŸ—οΈ Monitoring Architecture

πŸ“š Detailed Explanation

Why Monitoring & Observability?

  • Maintains reliable, performant PySpark applications in production
  • Provides visibility into application behavior
  • Enables proactive issue detection
  • Supports data-driven optimization decisions

Spark Monitoring Interfaces

InterfacePurposeBest For
Spark UIReal-time application monitoringDevelopment, debugging
History ServerCompleted application analysisPost-mortem, performance tuning
Metrics SystemFlexible metrics collectionProduction monitoring
Event LoggingDetailed execution data captureAnalysis, auditing

Metrics Categories

CategoryWhat It Tracks
ApplicationJob, stage, task completion rates
ResourceCPU, memory, disk I/O, network I/O
JVMGC pause time, heap memory, threads
ShuffleRead, write, spill size
StorageCache hit rate, cached data, memory fraction
CustomUser-defined counters and gauges

Metrics Export Formats

FormatUse Case
JMXJava management
PrometheusModern monitoring stacks
GraphiteTime-series data
CustomSpecialized requirements

Event Logging

  • Records job submissions, stage transitions, task metrics, configuration details
  • Essential for:
    • Performance analysis
    • Capacity planning
    • Debugging complex issues

Alerting

  • Notifies stakeholders of anomalies or threshold violations
  • Requires:
    • Appropriate thresholds
    • Minimize false positives
    • Timely notification
  • Common alerts:
    • Job failures
    • High resource utilization
    • Long GC pauses
    • Data quality issues

Dashboards

  • Visual representations of monitoring data
  • Enable quick identification of trends, anomalies, performance issues
  • Tools: Grafana with Prometheus
  • Organize by audience:
    • Operations
    • Development
    • Management

Distributed Tracing

  • End-to-end visibility across services and components
  • Helps:
    • Understand data flow through stages
    • Identify bottlenecks in shuffle operations
    • Debug performance in complex pipelines

Log Aggregation

ToolCapability
ELK StackElasticsearch, Logstash, Kibana
Grafana LokiLog search and analysis
  • Structured logging (JSON) improves searchability
  • Centralizes logs from all cluster nodes

Capacity Planning

  • Uses monitoring data to forecast resource needs
  • Analyze historical trends:
    • Resource usage
    • Data volumes
    • Processing times
  • Proactively scale infrastructure

Security Monitoring

Track:

  • Authentication attempts
  • Authorization failures
  • Data access patterns
  • Configuration changes

Essential for compliance, incident response, security auditing.


Advanced Techniques

  • Anomaly detection β€” statistical methods for unusual patterns
  • Predictive monitoring β€” forecast issues before they occur
  • Correlation analysis β€” identify relationships between metrics

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Spark UIReal-time application monitoringDevelopment, debugging
History ServerCompleted application analysisPost-mortem, performance tuning
Metrics SystemFlexible metrics collectionProduction monitoring
Event LoggingDetailed execution data captureAnalysis, auditing
AlertingAutomated issue notificationOperations, incident response
DashboardsVisual data representationMonitoring, reporting
Distributed TracingEnd-to-end execution visibilityDebugging, optimization
Log AggregationCentralized log managementDebugging, compliance

πŸ’» Code Examples

Basic Monitoring Configuration

from pyspark.sql import SparkSession

# Configure Spark with monitoring
# Parameter: spark.eventLog.enabled β€” enables event logging
# Parameter: spark.eventLog.dir β€” directory for event logs
spark = SparkSession.builder \
    .appName("MonitoringExample") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "hdfs:///spark-history") \
    .config("spark.yarn.historyServer.address", "history-server-host:18080") \
    .config("spark.ui.port", "4040") \
    .config("spark.ui.enabled", "true") \
    .config("spark.ui.killEnabled", "true") \
    .getOrCreate()

# Read data
df = spark.read \
    .format("parquet") \
    .load("hdfs:///data/input")

# Process data with monitoring
result = df \
    .repartition(100) \
    .groupBy("category") \
    .agg({"value": "sum", "count": "count"})

# Write results
result.write \
    .mode("overwrite") \
    .parquet("hdfs:///data/output")

# Access Spark UI
print(f"Spark UI available at: http://localhost:4040")

spark.stop()

Prometheus Metrics Export

from pyspark.sql import SparkSession

# Configure Spark with Prometheus metrics
# Parameter: spark.metrics.conf β€” metrics configuration file
# Parameter: spark.metrics.staticConf.enabled β€” enable static metrics
spark = SparkSession.builder \
    .appName("PrometheusMonitoring") \
    .config("spark.metrics.conf", "metrics.properties") \
    .config("spark.metrics.staticConf.enabled", "true") \
    .config("spark.metrics.namespace", "spark_application") \
    .getOrCreate()

# Create metrics configuration file
metrics_config = """
# Prometheus metrics configuration
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
*.sink.prometheus.port=9090

# Application metrics
appapplicationmaster.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
appapplicationmaster.sink.prometheus.path=/metrics/prometheus

# Executor metrics
executor.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
executor.sink.prometheus.path=/metrics/prometheus

# JVM metrics
jvm.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServletSink
jvm.sink.prometheus.path=/metrics/prometheus
"""

# Write metrics configuration
with open("metrics.properties", "w") as f:
    f.write(metrics_config)

# Read data
df = spark.read \
    .format("parquet") \
    .load("hdfs:///data/input")

# Process data
result = df.groupBy("category").count()

# Write results
result.write \
    .mode("overwrite") \
    .parquet("hdfs:///data/output")

# Prometheus metrics will be available at:
# http://executor-host:9090/metrics/prometheus

spark.stop()

Custom Metrics with JMX

from pyspark.sql import SparkSession
import time
import threading

# Configure Spark with JMX metrics
spark = SparkSession.builder \
    .appName("JMXMonitoring") \
    .config("spark.metrics.conf", "metrics.properties") \
    .getOrCreate()

# Create JMX metrics configuration
jmx_config = """
# JMX metrics configuration
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
*.sink.jmx.port=9999

# Application metrics
appapplicationmaster.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

# Executor metrics
executor.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

# Shuffle metrics
shuffle.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
"""

# Write JMX configuration
with open("metrics.properties", "w") as f:
    f.write(jmx_config)

# Function to collect custom metrics
def collect_custom_metrics():
    """Collect custom application metrics"""
    sc = spark.sparkContext
    
    # Get executor information
    executor_info = sc._jsc.sc().statusTracker().getExecutorInfos()
    
    # Log executor count
    print(f"Number of executors: {len(executor_info)}")
    
    # Monitor processing
    df = spark.read \
        .format("parquet") \
        .load("hdfs:///data/input")
    
    # Cache and count
    df.cache()
    count = df.count()
    
    print(f"Row count: {count}")
    
    # Process data
    result = df \
        .repartition(100) \
        .groupBy("category") \
        .agg({"value": "sum"})
    
    result.write \
        .mode("overwrite") \
        .parquet("hdfs:///data/output")

# Run metrics collection
collect_custom_metrics()

spark.stop()

Comprehensive Monitoring Setup

from pyspark.sql import SparkSession
import json
import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SparkMonitor:
    def __init__(self, app_name, config):
        """Initialize Spark monitoring"""
        self.app_name = app_name
        self.config = config
        self.spark = self.create_spark_session()
        self.metrics = {}
    
    def create_spark_session(self):
        """Create Spark session with monitoring configuration"""
        return SparkSession.builder \
            .appName(self.app_name) \
            .config("spark.eventLog.enabled", "true") \
            .config("spark.eventLog.dir", self.config["event_log_dir"]) \
            .config("spark.yarn.historyServer.address", self.config["history_server"]) \
            .config("spark.metrics.conf", "metrics.properties") \
            .config("spark.ui.port", "4040") \
            .getOrCreate()
    
    def collect_metrics(self):
        """Collect application metrics"""
        sc = self.spark.sparkContext
        
        # Get executor information
        executor_info = sc._jsc.sc().statusTracker().getExecutorInfos()
        
        self.metrics = {
            "timestamp": datetime.now().isoformat(),
            "app_name": self.app_name,
            "num_executors": len(executor_info),
            "application_id": sc.applicationId,
            "spark_version": sc.version
        }
        
        return self.metrics
    
    def monitor_processing(self, input_path, output_path):
        """Monitor data processing"""
        start_time = time.time()
        
        # Read data
        df = self.spark.read \
            .format("parquet") \
            .load(input_path)
        
        # Cache and count
        df.cache()
        count = df.count()
        
        # Process data
        result = df \
            .repartition(100) \
            .groupBy("category") \
            .agg({"value": "sum", "count": "count"})
        
        # Write results
        result.write \
            .mode("overwrite") \
            .parquet(output_path)
        
        end_time = time.time()
        
        # Update metrics
        self.metrics.update({
            "input_rows": count,
            "processing_time": end_time - start_time,
            "output_path": output_path,
            "status": "completed"
        })
        
        return self.metrics
    
    def generate_report(self):
        """Generate monitoring report"""
        report = {
            "summary": self.metrics,
            "recommendations": self.generate_recommendations()
        }
        
        # Save report
        with open(f"monitoring_report_{self.app_name}.json", "w") as f:
            json.dump(report, f, indent=2)
        
        return report
    
    def generate_recommendations(self):
        """Generate optimization recommendations"""
        recommendations = []
        
        # Analyze metrics and generate recommendations
        if self.metrics.get("processing_time", 0) > 3600:
            recommendations.append("Consider increasing parallelism or partitioning data differently")
        
        if self.metrics.get("num_executors", 0) < 5:
            recommendations.append("Consider adding more executors for better parallelism")
        
        return recommendations

# Example usage
if __name__ == "__main__":
    # Configuration
    config = {
        "event_log_dir": "hdfs:///spark-history",
        "history_server": "history-server-host:18080"
    }
    
    # Initialize monitor
    monitor = SparkMonitor("MonitoredApplication", config)
    
    # Collect metrics
    metrics = monitor.collect_metrics()
    print(f"Initial metrics: {metrics}")
    
    # Monitor processing
    processing_metrics = monitor.monitor_processing(
        "hdfs:///data/input",
        "hdfs:///data/output"
    )
    print(f"Processing metrics: {processing_metrics}")
    
    # Generate report
    report = monitor.generate_report()
    print(f"Monitoring report saved")
    
    monitor.spark.stop()

πŸ“ˆ Performance Metrics

MetricOptimalTypicalPoorOptimization
Monitoring Coverage> 95%80-95%< 80%Comprehensive instrumentation
Alert Response Time< 5min5-30min> 30minAutomated alerting
Dashboard Refresh< 1min1-5min> 5minReal-time streaming
Log Retention30-90 days7-30 days< 7 daysLog management
Metrics Granularity1min5-15min> 15minHigh-frequency collection

πŸ† Best Practices

  1. Enable event logging β€” Capture detailed execution data for analysis
  2. Use appropriate metrics β€” Focus on actionable metrics that drive decisions
  3. Implement alerting β€” Automated notifications for critical issues
  4. Create dashboards β€” Visual representations for different audiences
  5. Aggregate logs β€” Centralize logs for search and analysis
  6. Monitor resource usage β€” Track CPU, memory, disk, and network
  7. Set meaningful thresholds β€” Balance between sensitivity and noise
  8. Regularly review monitoring β€” Update configurations based on feedback
  9. Integrate with incident management β€” Connect alerts to response processes
  10. Document monitoring setup β€” Maintain clear documentation of configurations

πŸ”— Related Topics

  • 17-cluster-management.mdx: Cluster-level monitoring
  • 18-gc-tuning.mdx: JVM monitoring and GC metrics
  • 19-spark-submit.mdx: Application deployment monitoring

See Also

  • 06-joins-optimization.mdx: Join performance monitoring
  • 11-structured-streaming.mdx: Streaming application monitoring
  • Kafka Streams (kafka/03): Stream processing monitoring
  • Data Engineering Streaming (data-engineering/022): Production monitoring patterns
⭐

Premium Content

20. Monitoring & Metrics in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement