Interview Question (Hard) β Asked at: Google, Netflix, Uber, Amazon, Stripe
"Design a comprehensive ML monitoring system that tracks latency, throughput, accuracy decay, and data drift. How do you implement alerting, dashboards, and automated remediation?"
ML Monitoring Architecture
ML monitoring is critical for maintaining model performance in production. It encompasses infrastructure monitoring, model performance monitoring, and business metrics tracking.
Monitoring Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML Monitoring Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data βββββΆβ Model βββββΆβBusiness βββββΆβ Alert β β
β βCollectionβ β Metrics β β Metrics β β Manager β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β βPrometheusβ β Grafana β βPagerDuty β βAuto β β
β βThanos β βDashboardsβ βSlack β βRemediationββ
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Time Series Database β β
β β (Prometheus / InfluxDB / CloudWatch) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Infrastructure Monitoring
Prometheus Metrics Collection
from prometheus_client import (
Counter, Histogram, Gauge, Summary,
start_http_server, REGISTRY
)
from prometheus_client.core import GaugeMetricFamily
import time
import numpy as np
from typing import Dict, Optional
from datetime import datetime
class MLPrometheusMetrics:
"""Prometheus metrics for ML systems."""
def __init__(self, prefix: str = "ml"):
self.prefix = prefix
# Inference metrics
self.inference_latency = Histogram(
f'{prefix}_inference_latency_seconds',
'Inference latency in seconds',
['model_name', 'model_version', 'endpoint'],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
)
self.inference_requests = Counter(
f'{prefix}_inference_requests_total',
'Total inference requests',
['model_name', 'model_version', 'status']
)
self.inference_batch_size = Histogram(
f'{prefix}_inference_batch_size',
'Batch size distribution',
['model_name', 'model_version'],
buckets=[1, 2, 4, 8, 16, 32, 64, 128, 256]
)
# Model performance metrics
self.model_accuracy = Gauge(
f'{prefix}_model_accuracy',
'Current model accuracy',
['model_name', 'model_version', 'metric_type']
)
self.prediction_distribution = Histogram(
f'{prefix}_prediction_distribution',
'Distribution of predictions',
['model_name', 'model_version'],
buckets=np.arange(0, 1.1, 0.1).tolist()
)
# Data drift metrics
self.feature_drift_score = Gauge(
f'{prefix}_feature_drift_score',
'Feature drift score (KS statistic)',
['model_name', 'feature_name']
)
self.data_quality_score = Gauge(
f'{prefix}_data_quality_score',
'Data quality score',
['model_name', 'quality_check']
)
# Resource metrics
self.gpu_utilization = Gauge(
f'{prefix}_gpu_utilization_percent',
'GPU utilization percentage',
['model_name', 'gpu_id']
)
self.memory_usage = Gauge(
f'{prefix}_memory_usage_bytes',
'Memory usage in bytes',
['model_name', 'container']
)
self.queue_depth = Gauge(
f'{prefix}_queue_depth',
'Request queue depth',
['model_name', 'queue_name']
)
def record_inference(self, model_name: str, model_version: str,
latency: float, batch_size: int,
predictions: np.ndarray, status: str = "success"):
"""Record inference metrics."""
self.inference_latency.labels(
model_name=model_name,
model_version=model_version,
endpoint="predict"
).observe(latency)
self.inference_requests.labels(
model_name=model_name,
model_version=model_version,
status=status
).inc()
self.inference_batch_size.labels(
model_name=model_name,
model_version=model_version
).observe(batch_size)
# Record prediction distribution
for pred in predictions:
self.prediction_distribution.labels(
model_name=model_name,
model_version=model_version
).observe(float(pred))
def update_model_performance(self, model_name: str,
model_version: str,
metrics: Dict[str, float]):
"""Update model performance metrics."""
for metric_name, value in metrics.items():
self.model_accuracy.labels(
model_name=model_name,
model_version=model_version,
metric_type=metric_name
).set(value)
def update_drift_scores(self, model_name: str,
drift_scores: Dict[str, float]):
"""Update feature drift scores."""
for feature_name, score in drift_scores.items():
self.feature_drift_score.labels(
model_name=model_name,
feature_name=feature_name
).set(score)
def update_resource_metrics(self, model_name: str,
gpu_util: float,
memory_bytes: int,
queue_depth: int):
"""Update resource utilization metrics."""
self.gpu_utilization.labels(
model_name=model_name,
gpu_id="0"
).set(gpu_util)
self.memory_usage.labels(
model_name=model_name,
container="serving"
).set(memory_bytes)
self.queue_depth.labels(
model_name=model_name,
queue_name="inference"
).set(queue_depth)
class CustomCollector:
"""Custom Prometheus collector for ML-specific metrics."""
def __init__(self, model_registry):
self.model_registry = model_registry
def collect(self):
"""Collect custom metrics."""
# Model registry metrics
models = self.model_registry.get_all_models()
model_info = GaugeMetricFamily(
'ml_model_info',
'Model metadata',
labels=['model_name', 'version', 'framework', 'created_at']
)
for model in models:
model_info.add_metric(
[model['name'], model['version'],
model['framework'], model['created_at']],
1
)
yield model_info
# Model staleness
staleness = GaugeMetricFamily(
'ml_model_staleness_hours',
'Hours since model was last retrained',
labels=['model_name', 'version']
)
for model in models:
hours_since_retrain = (
datetime.now() - datetime.fromisoformat(model['last_retrained'])
).total_seconds() / 3600
staleness.add_metric(
[model['name'], model['version']],
hours_since_retrain
)
yield staleness
# Start metrics server
def start_metrics_server(port: int = 8000):
"""Start Prometheus metrics server."""
start_http_server(port)
print(f"Metrics server started on port {port}")
βΉοΈ
Use Prometheus for time-series metrics collection and Grafana for visualization. Implement custom collectors for ML-specific metrics like model staleness and feature drift.
Latency Monitoring
Latency Tracking Implementation
import time
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import threading
from collections import deque
@dataclass
class LatencyMeasurement:
timestamp: datetime
latency_ms: float
model_name: str
model_version: str
batch_size: int
class LatencyMonitor:
"""Monitor and analyze inference latency."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.measurements = deque(maxlen=window_size)
self.lock = threading.Lock()
# Latency percentiles
self.percentiles = [50, 90, 95, 99, 99.9]
def record_latency(self, latency_ms: float, model_name: str,
model_version: str, batch_size: int = 1):
"""Record a latency measurement."""
measurement = LatencyMeasurement(
timestamp=datetime.now(),
latency_ms=latency_ms,
model_name=model_name,
model_version=model_version,
batch_size=batch_size
)
with self.lock:
self.measurements.append(measurement)
def get_latency_statistics(self, model_name: str = None,
window_minutes: int = 5) -> Dict:
"""Get latency statistics for a time window."""
with self.lock:
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent_measurements = [
m for m in self.measurements
if m.timestamp > cutoff_time and
(model_name is None or m.model_name == model_name)
]
if not recent_measurements:
return {'status': 'no_data'}
latencies = [m.latency_ms for m in recent_measurements]
stats = {
'count': len(latencies),
'mean': np.mean(latencies),
'std': np.std(latencies),
'min': np.min(latencies),
'max': np.max(latencies),
}
# Calculate percentiles
for p in self.percentiles:
stats[f'p{p}'] = np.percentile(latencies, p)
# Calculate throughput
time_span = (
recent_measurements[-1].timestamp -
recent_measurements[0].timestamp
).total_seconds()
if time_span > 0:
stats['throughput_rps'] = len(latencies) / time_span
# Calculate error rate (if latency > threshold)
threshold_ms = 1000 # 1 second
stats['slow_requests'] = sum(
1 for l in latencies if l > threshold_ms
)
stats['slow_request_rate'] = (
stats['slow_requests'] / len(latencies)
)
return stats
def detect_latency_anomalies(self, model_name: str,
window_minutes: int = 5,
threshold_std: float = 3.0) -> List[Dict]:
"""Detect latency anomalies using statistical methods."""
stats = self.get_latency_statistics(model_name, window_minutes)
if 'status' in stats:
return []
anomalies = []
# Check if current latency is above threshold
with self.lock:
if self.measurements:
latest = self.measurements[-1]
mean = stats['mean']
std = stats['std']
z_score = (latest.latency_ms - mean) / std if std > 0 else 0
if abs(z_score) > threshold_std:
anomalies.append({
'type': 'latency_spike',
'timestamp': latest.timestamp,
'latency_ms': latest.latency_ms,
'z_score': z_score,
'mean': mean,
'std': std,
'severity': 'high' if z_score > 4 else 'medium'
})
return anomalies
def predict_sla_breach(self, model_name: str,
sla_latency_ms: float,
prediction_window_minutes: int = 30) -> Dict:
"""Predict potential SLA breaches."""
stats = self.get_latency_statistics(model_name, window_minutes=5)
if 'status' in stats:
return {'prediction': 'insufficient_data'}
# Simple linear extrapolation
recent_trend = self._calculate_trend(model_name, window_minutes=5)
predicted_latency = stats['p99'] + (recent_trend * prediction_window_minutes)
breach_probability = min(1.0, max(0.0,
(predicted_latency - sla_latency_ms) / sla_latency_ms
))
return {
'current_p99': stats['p99'],
'predicted_latency': predicted_latency,
'sla_latency': sla_latency_ms,
'breach_probability': breach_probability,
'prediction_window_minutes': prediction_window_minutes,
'recommendation': self._get_recommendation(breach_probability)
}
def _calculate_trend(self, model_name: str,
window_minutes: int) -> float:
"""Calculate latency trend (ms per minute)."""
with self.lock:
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent = [
m for m in self.measurements
if m.timestamp > cutoff_time and m.model_name == model_name
]
if len(recent) < 10:
return 0.0
# Simple linear regression
x = np.arange(len(recent))
y = [m.latency_ms for m in recent]
slope = np.polyfit(x, y, 1)[0]
return slope
def _get_recommendation(self, breach_probability: float) -> str:
"""Get recommendation based on breach probability."""
if breach_probability > 0.8:
return "URGENT: Immediate action required. Scale up or rollback."
elif breach_probability > 0.5:
return "WARNING: High risk of SLA breach. Consider scaling up."
elif breach_probability > 0.2:
return "MONITOR: Moderate risk. Continue monitoring closely."
else:
return "OK: Low risk of SLA breach."
class LatencyAlertManager:
"""Manage latency-based alerts."""
def __init__(self, config: Dict):
self.config = config
self.alert_history = []
def check_latency_alerts(self, latency_stats: Dict) -> List[Dict]:
"""Check for latency-based alerts."""
alerts = []
# P99 latency alert
if 'p99' in latency_stats:
threshold = self.config.get('p99_threshold_ms', 200)
if latency_stats['p99'] > threshold:
alerts.append({
'type': 'high_p99_latency',
'severity': 'high',
'metric': 'p99_latency',
'value': latency_stats['p99'],
'threshold': threshold,
'message': f"P99 latency {latency_stats['p99']:.2f}ms exceeds threshold {threshold}ms"
})
# Throughput drop alert
if 'throughput_rps' in latency_stats:
threshold = self.config.get('min_throughput_rps', 100)
if latency_stats['throughput_rps'] < threshold:
alerts.append({
'type': 'low_throughput',
'severity': 'medium',
'metric': 'throughput_rps',
'value': latency_stats['throughput_rps'],
'threshold': threshold,
'message': f"Throughput {latency_stats['throughput_rps']:.2f} rps below threshold {threshold} rps"
})
# Slow request rate alert
if 'slow_request_rate' in latency_stats:
threshold = self.config.get('max_slow_request_rate', 0.01)
if latency_stats['slow_request_rate'] > threshold:
alerts.append({
'type': 'high_slow_request_rate',
'severity': 'medium',
'metric': 'slow_request_rate',
'value': latency_stats['slow_request_rate'],
'threshold': threshold,
'message': f"Slow request rate {latency_stats['slow_request_rate']:.4f} exceeds threshold {threshold}"
})
return alerts
β οΈ
Monitor latency at multiple percentiles (p50, p90, p95, p99). Mean latency can be misleading - focus on tail latencies for SLA compliance.
Accuracy Decay Detection
Model Performance Monitoring
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from scipy import stats
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score,
f1_score, confusion_matrix
)
class AccuracyDecayDetector:
"""Detect model accuracy decay in production."""
def __init__(self, baseline_metrics: Dict[str, float],
decay_thresholds: Dict[str, float] = None):
"""
Args:
baseline_metrics: Baseline performance metrics
decay_thresholds: Thresholds for each metric to trigger alert
"""
self.baseline_metrics = baseline_metrics
self.decay_thresholds = decay_thresholds or {
'auc_roc': 0.05, # 5% absolute drop
'precision': 0.10,
'recall': 0.10,
'f1_score': 0.10
}
self.metric_history = []
self.alerts = []
def add_observation(self, y_true: np.ndarray, y_pred: np.ndarray,
y_proba: np.ndarray = None,
timestamp: datetime = None):
"""Add a batch of observations for tracking."""
if timestamp is None:
timestamp = datetime.now()
# Calculate metrics
metrics = {
'timestamp': timestamp,
'n_samples': len(y_true),
'positive_rate': float(np.mean(y_true)),
'prediction_rate': float(np.mean(y_pred))
}
if y_proba is not None:
metrics['auc_roc'] = float(roc_auc_score(y_true, y_proba))
metrics['precision'] = float(precision_score(y_true, y_pred, zero_division=0))
metrics['recall'] = float(recall_score(y_true, y_pred, zero_division=0))
metrics['f1_score'] = float(f1_score(y_true, y_pred, zero_division=0))
# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
if cm.shape == (2, 2):
tn, fp, fn, tp = cm.ravel()
metrics['true_positives'] = int(tp)
metrics['false_positives'] = int(fp)
metrics['true_negatives'] = int(tn)
metrics['false_negatives'] = int(fn)
self.metric_history.append(metrics)
# Check for decay
decay_detected = self.check_decay()
return metrics
def check_decay(self) -> bool:
"""Check for accuracy decay."""
if len(self.metric_history) < 10:
return False
decay_detected = False
for metric_name, threshold in self.decay_thresholds.items():
if metric_name not in self.baseline_metrics:
continue
# Get recent values
recent_values = [
m.get(metric_name) for m in self.metric_history[-100:]
if m.get(metric_name) is not None
]
if len(recent_values) < 10:
continue
baseline = self.baseline_metrics[metric_name]
current_mean = np.mean(recent_values)
# Calculate decay
decay = baseline - current_mean
# Statistical significance test
t_stat, p_value = stats.ttest_1samp(
recent_values, baseline
)
if decay > threshold and p_value < 0.05:
decay_detected = True
# Create alert
alert = {
'type': 'accuracy_decay',
'timestamp': datetime.now(),
'metric': metric_name,
'baseline_value': baseline,
'current_value': current_mean,
'decay': decay,
'threshold': threshold,
'p_value': p_value,
'severity': 'high' if decay > threshold * 2 else 'medium'
}
self.alerts.append(alert)
return decay_detected
def get_performance_trend(self, metric_name: str,
window_hours: int = 24) -> Dict:
"""Get performance trend for a metric."""
cutoff_time = datetime.now() - timedelta(hours=window_hours)
recent_metrics = [
m for m in self.metric_history
if m['timestamp'] > cutoff_time and metric_name in m
]
if len(recent_metrics) < 2:
return {'status': 'insufficient_data'}
values = [m[metric_name] for m in recent_metrics]
timestamps = [m['timestamp'] for m in recent_metrics]
# Calculate trend
x = np.arange(len(values))
slope, intercept = np.polyfit(x, values, 1)
# Calculate statistics
stats_dict = {
'mean': np.mean(values),
'std': np.std(values),
'min': np.min(values),
'max': np.max(values),
'slope': slope,
'intercept': intercept,
'n_observations': len(values),
'window_hours': window_hours
}
# Predict future value (1 hour ahead)
stats_dict['predicted_1h'] = slope * (len(values) + 60) + intercept
return stats_dict
def generate_decay_report(self) -> Dict:
"""Generate comprehensive decay report."""
report = {
'timestamp': datetime.now().isoformat(),
'total_observations': len(self.metric_history),
'baseline_metrics': self.baseline_metrics,
'current_metrics': self._get_current_metrics(),
'decay_analysis': {},
'alerts': self.alerts[-10:], # Last 10 alerts
'recommendations': []
}
for metric_name in self.baseline_metrics:
trend = self.get_performance_trend(metric_name, window_hours=24)
if 'status' not in trend:
report['decay_analysis'][metric_name] = {
'baseline': self.baseline_metrics[metric_name],
'current_mean': trend['mean'],
'decay': self.baseline_metrics[metric_name] - trend['mean'],
'trend_slope': trend['slope'],
'predicted_1h': trend['predicted_1h']
}
# Generate recommendations
if self.alerts:
recent_alerts = self.alerts[-5:]
high_severity = [a for a in recent_alerts if a['severity'] == 'high']
if high_severity:
report['recommendations'].append(
"URGENT: Multiple high-severity accuracy decay alerts. "
"Consider immediate model rollback or retraining."
)
else:
report['recommendations'].append(
"Monitor accuracy trends closely. "
"Schedule retraining if decay continues."
)
return report
def _get_current_metrics(self) -> Dict:
"""Get current (most recent) metrics."""
if self.metric_history:
return self.metric_history[-1]
return {}
class ProductionAccuracyTracker:
"""Track accuracy using ground truth labels as they become available."""
def __init__(self, lag_hours: int = 24):
self.lag_hours = lag_hours
self.predictions = []
self.ground_truth = []
def log_prediction(self, prediction_id: str,
prediction: float,
features: Dict,
timestamp: datetime):
"""Log a prediction for later comparison."""
self.predictions.append({
'prediction_id': prediction_id,
'prediction': prediction,
'features': features,
'timestamp': timestamp,
'ground_truth_received': False
})
def log_ground_truth(self, prediction_id: str,
actual_value: float,
timestamp: datetime):
"""Log ground truth for a prediction."""
for pred in self.predictions:
if pred['prediction_id'] == prediction_id:
pred['actual_value'] = actual_value
pred['ground_truth_timestamp'] = timestamp
pred['ground_truth_received'] = True
break
def calculate_accuracy(self, window_hours: int = 24) -> Dict:
"""Calculate accuracy for predictions with ground truth."""
cutoff_time = datetime.now() - timedelta(hours=window_hours)
# Get predictions with ground truth
labeled_predictions = [
p for p in self.predictions
if p['ground_truth_received'] and
p['timestamp'] > cutoff_time
]
if not labeled_predictions:
return {'status': 'no_labeled_data'}
y_true = np.array([p['actual_value'] for p in labeled_predictions])
y_pred = np.array([p['prediction'] for p in labeled_predictions])
# Calculate metrics
metrics = {
'n_samples': len(y_true),
'accuracy': float(np.mean(y_true == (y_pred > 0.5).astype(int))),
'positive_rate': float(np.mean(y_true)),
'prediction_rate': float(np.mean(y_pred > 0.5)),
}
if len(np.unique(y_true)) == 2:
metrics['auc_roc'] = float(roc_auc_score(y_true, y_pred))
metrics['precision'] = float(precision_score(y_true, (y_pred > 0.5).astype(int)))
metrics['recall'] = float(recall_score(y_true, (y_pred > 0.5).astype(int)))
metrics['f1_score'] = float(f1_score(y_true, (y_pred > 0.5).astype(int)))
return metrics
βΉοΈ
Accuracy decay detection requires ground truth labels, which may have a lag. Implement both proxy metrics (prediction distribution) and actual accuracy tracking for comprehensive monitoring.
Alerting System
Multi-Channel Alerting
import smtplib
import json
from email.mime.text import MIMEText
from typing import Dict, List, Optional
from datetime import datetime
import requests
class MLAlertManager:
"""Manage ML alerts across multiple channels."""
def __init__(self, config: Dict):
self.config = config
self.alert_history = []
# Initialize channels
self.slack_webhook = config.get('slack_webhook')
self.pagerduty_key = config.get('pagerduty_key')
self.email_config = config.get('email_config')
def create_alert(self, alert_type: str, severity: str,
message: str, metrics: Dict = None,
recommendations: List[str] = None) -> Dict:
"""Create and send an alert."""
alert = {
'id': f"alert_{datetime.now():%Y%m%d%H%M%S}",
'type': alert_type,
'severity': severity,
'message': message,
'metrics': metrics or {},
'recommendations': recommendations or [],
'timestamp': datetime.now().isoformat(),
'status': 'active'
}
# Store alert
self.alert_history.append(alert)
# Send to appropriate channels
if severity in ['critical', 'high']:
self._send_pagerduty(alert)
self._send_slack(alert)
self._send_email(alert)
elif severity == 'medium':
self._send_slack(alert)
else:
self._send_slack(alert, channel='info')
return alert
def _send_slack(self, alert: Dict, channel: str = None):
"""Send alert to Slack."""
if not self.slack_webhook:
return
channel = channel or self.config.get('slack_channel', '#ml-alerts')
color = {
'critical': '#FF0000',
'high': '#FF6600',
'medium': '#FFCC00',
'low': '#00CC00'
}.get(alert['severity'], '#808080')
payload = {
'channel': channel,
'attachments': [{
'color': color,
'title': f"ML Alert: {alert['type']}",
'text': alert['message'],
'fields': [
{'title': 'Severity', 'value': alert['severity'], 'short': True},
{'title': 'Time', 'value': alert['timestamp'], 'short': True},
],
'footer': 'ML Monitoring System'
}]
}
if alert['recommendations']:
payload['attachments'][0]['fields'].append({
'title': 'Recommendations',
'value': '\n'.join(alert['recommendations'][:3]),
'short': False
})
requests.post(self.slack_webhook, json=payload)
def _send_pagerduty(self, alert: Dict):
"""Send alert to PagerDuty."""
if not self.pagerduty_key:
return
severity_map = {
'critical': 'critical',
'high': 'error',
'medium': 'warning',
'low': 'info'
}
payload = {
'routing_key': self.pagerduty_key,
'event_action': 'trigger',
'payload': {
'summary': f"{alert['type']}: {alert['message'][:100]}",
'severity': severity_map.get(alert['severity'], 'info'),
'source': 'ml-monitoring',
'component': alert['type'],
'group': 'ml-production',
'class': 'accuracy_decay',
'custom_details': {
'alert_id': alert['id'],
'metrics': alert['metrics'],
'recommendations': alert['recommendations']
}
}
}
requests.post(
'https://events.pagerduty.com/v2/enqueue',
json=payload
)
def _send_email(self, alert: Dict):
"""Send alert via email."""
if not self.email_config:
return
msg = MIMEText(f"""
ML Alert: {alert['type']}
Severity: {alert['severity']}
Time: {alert['timestamp']}
Message:
{alert['message']}
Metrics:
{json.dumps(alert['metrics'], indent=2)}
Recommendations:
{chr(10).join(f"- {r}" for r in alert['recommendations'])}
Alert ID: {alert['id']}
""")
msg['Subject'] = f"[ML Alert - {alert['severity'].upper()}] {alert['type']}"
msg['From'] = self.email_config['from']
msg['To'] = ', '.join(self.email_config['to'])
with smtplib.SMTP(self.email_config['smtp_host']) as server:
server.send_message(msg)
def resolve_alert(self, alert_id: str, resolution: str = ""):
"""Resolve an active alert."""
for alert in self.alert_history:
if alert['id'] == alert_id:
alert['status'] = 'resolved'
alert['resolved_at'] = datetime.now().isoformat()
alert['resolution'] = resolution
break
β οΈ
Implement alert escalation policies. Start with low-severity alerts and escalate if issues persist. Use deduplication and grouping to avoid alert fatigue.
Dashboards
Grafana Dashboard Configuration
{
"dashboard": {
"title": "ML Model Monitoring",
"tags": ["ml", "production", "monitoring"],
"timezone": "browser",
"panels": [
{
"title": "Inference Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(ml_inference_latency_seconds_bucket[5m]))",
"legendFormat": "P99 Latency",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, rate(ml_inference_latency_seconds_bucket[5m]))",
"legendFormat": "P95 Latency",
"refId": "B"
},
{
"expr": "histogram_quantile(0.50, rate(ml_inference_latency_seconds_bucket[5m]))",
"legendFormat": "P50 Latency",
"refId": "C"
}
],
"yaxes": [
{"label": "Latency (seconds)", "format": "s"},
{"show": false}
],
"thresholds": [
{"value": 0.5, "colorMode": "warning", "op": "gt"},
{"value": 1.0, "colorMode": "critical", "op": "gt"}
]
},
{
"title": "Throughput",
"type": "graph",
"targets": [
{
"expr": "rate(ml_inference_requests_total[5m])",
"legendFormat": "Requests/second",
"refId": "A"
}
],
"yaxes": [
{"label": "Requests/second", "format": "reqps"},
{"show": false}
]
},
{
"title": "Model Accuracy",
"type": "gauge",
"targets": [
{
"expr": "ml_model_accuracy{metric_type='auc_roc'}",
"legendFormat": "AUC-ROC",
"refId": "A"
}
],
"options": {
"thresholds": [
{"value": 0, "color": "red"},
{"value": 0.85, "color": "yellow"},
{"value": 0.90, "color": "green"}
],
"min": 0,
"max": 1
}
},
{
"title": "Feature Drift Scores",
"type": "heatmap",
"targets": [
{
"expr": "ml_feature_drift_score",
"legendFormat": "{{feature_name}}",
"refId": "A"
}
],
"options": {
"calculate": false,
"dataFormat": "tsbuckets",
"yAxis": {
"unit": "short"
}
}
},
{
"title": "GPU Utilization",
"type": "graph",
"targets": [
{
"expr": "ml_gpu_utilization_percent",
"legendFormat": "GPU {{gpu_id}}",
"refId": "A"
}
],
"yaxes": [
{"label": "Utilization %", "format": "percent"},
{"show": false}
],
"thresholds": [
{"value": 80, "colorMode": "warning", "op": "gt"},
{"value": 95, "colorMode": "critical", "op": "gt"}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "30s"
}
}
Summary
ML production monitoring requires:
- Infrastructure Monitoring: GPU, CPU, memory, network
- Latency Monitoring: P50, P90, P95, P99 percentiles
- Accuracy Decay Detection: Statistical tests for performance degradation
- Alerting: Multi-channel alerts with escalation policies
- Dashboards: Real-time visualization of key metrics
Implement comprehensive monitoring to maintain model reliability in production.