Model Monitoring
Model monitoring is the continuous process of tracking ML model performance, data quality, and system health to ensure models remain accurate and reliable in production.
Key Monitoring Areas
- Data Drift: Changes in input data distribution
- Concept Drift: Changes in relationship between inputs and outputs
- Performance Monitoring: Model accuracy and latency metrics
- System Health: Infrastructure and resource utilization
- Business Metrics: Impact on business outcomes
Monitoring Architecture
Data Drift Detection
Implementation
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
class DataDriftDetector:
def __init__(self, reference_data, significance_level=0.05):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_distributions = self._compute_distributions(reference_data)
def _compute_distributions(self, data):
"""Compute reference distributions"""
distributions = {}
for column in data.select_dtypes(include=[np.number]).columns:
distributions[column] = {
"mean": data[column].mean(),
"std": data[column].std(),
"histogram": np.histogram(data[column], bins=50)
}
return distributions
def detect_drift(self, current_data) -> Dict[str, Dict]:
"""Detect drift in current data"""
drift_results = {}
for column in current_data.select_dtypes(include=[np.number]).columns:
if column in self.reference_distributions:
# KS test
ks_statistic, ks_pvalue = stats.ks_2samp(
self.reference_data[column].dropna(),
current_data[column].dropna()
)
# Chi-squared test for categorical
chi2_stat, chi2_pvalue = self._chi_squared_test(
self.reference_data[column],
current_data[column]
)
drift_results[column] = {
"ks_statistic": ks_statistic,
"ks_pvalue": ks_pvalue,
"chi2_statistic": chi2_stat,
"chi2_pvalue": chi2_pvalue,
"drift_detected": ks_pvalue < self.significance_level,
"mean_shift": current_data[column].mean() - self.reference_distributions[column]["mean"],
"std_ratio": current_data[column].std() / self.reference_distributions[column]["std"]
}
return drift_results
def _chi_squared_test(self, reference, current):
"""Chi-squared test for distribution comparison"""
# Bin the data
bins = np.linspace(
min(reference.min(), current.min()),
max(reference.max(), current.max()),
20
)
ref_hist, _ = np.histogram(reference.dropna(), bins=bins)
cur_hist, _ = np.histogram(current.dropna(), bins=bins)
# Avoid division by zero
ref_hist = ref_hist + 1e-10
cur_hist = cur_hist + 1e-10
chi2_stat, p_value = stats.chisquare(cur_hist, ref_hist)
return chi2_stat, p_value
Concept Drift Detection
from collections import deque
import numpy as np
class ConceptDriftDetector:
def __init__(self, window_size=100, threshold=0.05):
self.window_size = window_size
self.threshold = threshold
self.predictions = deque(maxlen=window_size)
self.actuals = deque(maxlen=window_size)
def add_observation(self, prediction, actual):
"""Add prediction-actual pair"""
self.predictions.append(prediction)
self.actuals.append(actual)
def detect_drift(self):
"""Detect concept drift using Page-Hinkley test"""
if len(self.predictions) < self.window_size:
return False, 0.0
# Calculate cumulative error
errors = [1 if p != a else 0 for p, a in zip(self.predictions, self.actuals)]
cumulative_error = np.cumsum(errors)
# Page-Hinkley test
mean_error = np.mean(errors)
ph_statistic = np.max(cumulative_error - np.arange(1, len(errors) + 1) * mean_error)
drift_detected = ph_statistic > self.threshold * self.window_size
return drift_detected, ph_statistic
def get_drift_score(self):
"""Get current drift score"""
if len(self.predictions) == 0:
return 0.0
errors = [1 if p != a else 0 for p, a in zip(self.predictions, self.actuals)]
return np.mean(errors)
Performance Monitoring
Comprehensive Monitoring
import time
from dataclasses import dataclass
from typing import Dict, List
import numpy as np
@dataclass
class ModelMetrics:
accuracy: float
precision: float
recall: float
f1_score: float
latency_p50: float
latency_p95: float
latency_p99: float
throughput: float
error_rate: float
class ModelPerformanceMonitor:
def __init__(self):
self.metrics_history = []
self.latency_buffer = []
self.prediction_buffer = []
def record_prediction(self, prediction, actual=None, latency_ms=None):
"""Record a prediction"""
self.prediction_buffer.append({
"prediction": prediction,
"actual": actual,
"latency": latency_ms,
"timestamp": time.time()
})
if latency_ms:
self.latency_buffer.append(latency_ms)
def calculate_metrics(self) -> ModelMetrics:
"""Calculate current performance metrics"""
# Filter predictions with actuals
labeled = [p for p in self.prediction_buffer if p["actual"] is not None]
if not labeled:
return None
predictions = [p["prediction"] for p in labeled]
actuals = [p["actual"] for p in labeled]
# Calculate classification metrics
tp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 1)
fp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 0)
fn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 1)
tn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 0)
accuracy = (tp + tn) / (tp + fp + fn + tn) if (tp + fp + fn + tn) > 0 else 0
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
# Calculate latency metrics
if self.latency_buffer:
latency_p50 = np.percentile(self.latency_buffer, 50)
latency_p95 = np.percentile(self.latency_buffer, 95)
latency_p99 = np.percentile(self.latency_buffer, 99)
else:
latency_p50 = latency_p95 = latency_p99 = 0
# Calculate throughput
if len(self.prediction_buffer) > 1:
time_window = self.prediction_buffer[-1]["timestamp"] - self.prediction_buffer[0]["timestamp"]
throughput = len(self.prediction_buffer) / time_window if time_window > 0 else 0
else:
throughput = 0
# Calculate error rate
errors = sum(1 for p in self.prediction_buffer if p["actual"] is not None and p["prediction"] != p["actual"])
error_rate = errors / len(self.prediction_buffer) if self.prediction_buffer else 0
metrics = ModelMetrics(
accuracy=accuracy,
precision=precision,
recall=recall,
f1_score=f1_score,
latency_p50=latency_p50,
latency_p95=latency_p95,
latency_p99=latency_p99,
throughput=throughput,
error_rate=error_rate
)
self.metrics_history.append(metrics)
return metrics
def detect_anomalies(self, window_size=100):
"""Detect performance anomalies"""
if len(self.metrics_history) < window_size:
return []
recent_metrics = self.metrics_history[-window_size:]
anomalies = []
# Check accuracy drop
accuracies = [m.accuracy for m in recent_metrics]
if len(accuracies) > 1:
mean_acc = np.mean(accuracies[:-1])
std_acc = np.std(accuracies[:-1])
current_acc = accuracies[-1]
if current_acc < mean_acc - 2 * std_acc:
anomalies.append({
"type": "accuracy_drop",
"severity": "high",
"current": current_acc,
"expected": mean_acc,
"deviation": (mean_acc - current_acc) / std_acc
})
# Check latency increase
latencies = [m.latency_p95 for m in recent_metrics]
if len(latencies) > 1:
mean_lat = np.mean(latencies[:-1])
std_lat = np.std(latencies[:-1])
current_lat = latencies[-1]
if current_lat > mean_lat + 3 * std_lat:
anomalies.append({
"type": "latency_increase",
"severity": "medium",
"current": current_lat,
"expected": mean_lat,
"deviation": (current_lat - mean_lat) / std_lat
})
return anomalies
Mathematical Foundation
Population Stability Index (PSI)
Population Stability Index
Where:
- ( P_i ) is the proportion of observations in bin ( i ) for the reference distribution
- ( Q_i ) is the proportion of observations in bin ( i ) for the current distribution
- ( N ) is the number of bins
Kolmogorov-Smirnov Statistic
KS Statistic
Where:
- ( F_{ref}(x) ) is the reference distribution function
- ( F_{curr}(x) ) is the current distribution function
Alert Threshold Calculation
Dynamic thresholds based on historical variance:
Dynamic Threshold
Where:
- ( \mu ) is the historical mean
- ( \sigma ) is the historical standard deviation
- ( k ) is the sensitivity parameter (typically 2-3)
Monitoring Dashboard
class MonitoringDashboard:
def __init__(self, monitor):
self.monitor = monitor
def generate_report(self):
"""Generate comprehensive monitoring report"""
metrics = self.monitor.calculate_metrics()
drift_results = self.monitor.detect_drift()
anomalies = self.monitor.detect_anomalies()
report = {
"timestamp": time.time(),
"model_performance": {
"accuracy": metrics.accuracy,
"precision": metrics.precision,
"recall": metrics.recall,
"f1_score": metrics.f1_score
},
"latency_metrics": {
"p50": metrics.latency_p50,
"p95": metrics.latency_p95,
"p99": metrics.latency_p99,
"throughput": metrics.throughput
},
"data_quality": {
"drift_detected": drift_results.get("drift_detected", False),
"drift_score": drift_results.get("drift_score", 0.0)
},
"anomalies": anomalies,
"health_status": self._calculate_health_status(metrics, drift_results, anomalies)
}
return report
def _calculate_health_status(self, metrics, drift_results, anomalies):
"""Calculate overall health status"""
if anomalies:
return "CRITICAL"
elif drift_results.get("drift_detected", False):
return "WARNING"
elif metrics.accuracy < 0.8:
return "DEGRADED"
else:
return "HEALTHY"
Best Practices
1. Comprehensive Logging
import logging
import json
class ModelLogger:
def __init__(self, log_file):
self.logger = logging.getLogger("model_monitor")
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_prediction(self, request_id, prediction, features, latency):
"""Log prediction details"""
log_entry = {
"request_id": request_id,
"prediction": prediction,
"features": features,
"latency_ms": latency,
"timestamp": time.time()
}
self.logger.info(json.dumps(log_entry))
2. Automated Alerting
class AlertManager:
def __init__(self, thresholds):
self.thresholds = thresholds
def check_alerts(self, metrics):
"""Check if metrics exceed thresholds"""
alerts = []
if metrics.accuracy < self.thresholds["accuracy_min"]:
alerts.append({
"type": "accuracy",
"severity": "high",
"message": f"Accuracy {metrics.accuracy} below threshold {self.thresholds['accuracy_min']}"
})
if metrics.latency_p99 > self.thresholds["latency_max"]:
alerts.append({
"type": "latency",
"severity": "medium",
"message": f"Latency P99 {metrics.latency_p99}ms above threshold {self.thresholds['latency_max']}ms"
})
return alerts
3. Model Retraining Triggers
class RetrainingTrigger:
def __init__(self, monitor):
self.monitor = monitor
def should_retrain(self):
"""Determine if model should be retrained"""
metrics = self.monitor.calculate_metrics()
drift_results = self.monitor.detect_drift()
anomalies = self.monitor.detect_anomalies()
# Retrain if significant drift or performance degradation
if drift_results.get("drift_detected", False):
return True, "data_drift"
if metrics and metrics.accuracy < 0.75:
return True, "performance_degradation"
if anomalies:
return True, "anomaly_detected"
return False, None
Summary
Model monitoring is essential for maintaining reliable ML systems in production. By implementing comprehensive monitoring for data drift, concept drift, and performance metrics, organizations can detect issues early and ensure model accuracy and reliability over time.