Alerting & Incident Response
Effective alerting and incident response systems are crucial for maintaining reliable ML systems. They ensure issues are detected quickly and resolved systematically.
Key Components
- Alert Rules: Define conditions that trigger alerts
- Escalation Policies: Determine who gets notified and when
- Runbooks: Step-by-step guides for incident resolution
- Incident Workflows: Structured processes for handling incidents
Alerting Architecture
Alert Rule Implementation
Alert Rule Engine
from dataclasses import dataclass
from typing import Dict, List, Callable, Any
from datetime import datetime, timedelta
import json
@dataclass
class AlertRule:
rule_id: str
name: str
condition: Callable[[Dict], bool]
severity: str
description: str
runbook_url: str
labels: Dict[str, str]
annotations: Dict[str, str]
for_duration: timedelta = timedelta(minutes=5)
repeat_interval: timedelta = timedelta(hours=1)
class AlertRuleEngine:
def __init__(self):
self.rules = {}
self.alert_history = []
self.active_alerts = {}
def add_rule(self, rule: AlertRule):
"""Add alert rule"""
self.rules[rule.rule_id] = rule
def evaluate_rules(self, metrics: Dict[str, Any]) -> List[Dict]:
"""Evaluate all rules against current metrics"""
triggered_alerts = []
for rule_id, rule in self.rules.items():
try:
if rule.condition(metrics):
alert = self._create_alert(rule, metrics)
# Check if alert should fire
if self._should_fire_alert(alert):
triggered_alerts.append(alert)
self._record_alert(alert)
except Exception as e:
print(f"Error evaluating rule {rule_id}: {e}")
return triggered_alerts
def _create_alert(self, rule: AlertRule, metrics: Dict) -> Dict:
"""Create alert from rule"""
return {
"alert_id": f"{rule.rule_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"rule_id": rule.rule_id,
"name": rule.name,
"severity": rule.severity,
"description": rule.description,
"runbook_url": rule.runbook_url,
"labels": rule.labels,
"annotations": rule.annotations,
"metrics": metrics,
"timestamp": datetime.now().isoformat(),
"status": "firing"
}
def _should_fire_alert(self, alert: Dict) -> bool:
"""Determine if alert should fire"""
rule = self.rules[alert["rule_id"]]
# Check if similar alert already active
for active_alert in self.active_alerts.values():
if (active_alert["rule_id"] == alert["rule_id"] and
active_alert["status"] == "firing"):
# Check repeat interval
last_fired = datetime.fromisoformat(active_alert["timestamp"])
if datetime.now() - last_fired < rule.repeat_interval:
return False
return True
def _record_alert(self, alert: Dict):
"""Record alert in history"""
self.alert_history.append(alert)
self.active_alerts[alert["alert_id"]] = alert
Example Alert Rules
def create_model_alert_rules():
"""Create example alert rules for model monitoring"""
rules = []
# Accuracy drop rule
def accuracy_condition(metrics):
return metrics.get("accuracy", 1.0) < 0.85
rules.append(AlertRule(
rule_id="model_accuracy_low",
name="Model Accuracy Below Threshold",
condition=accuracy_condition,
severity="high",
description="Model accuracy has dropped below 85%",
runbook_url="https://wiki.company.com/runbooks/model-accuracy",
labels={"team": "ml-ops", "service": "prediction-api"},
annotations={
"summary": "Model accuracy degraded",
"description": "Current accuracy: {{ $value }}"
}
))
# Latency increase rule
def latency_condition(metrics):
return metrics.get("latency_p99", 0) > 1000 # 1 second
rules.append(AlertRule(
rule_id="model_latency_high",
name="High Model Latency",
condition=latency_condition,
severity="medium",
description="Model latency P99 exceeds 1 second",
runbook_url="https://wiki.company.com/runbooks/model-latency",
labels={"team": "ml-ops", "service": "prediction-api"},
annotations={
"summary": "High latency detected",
"description": "P99 latency: {{ $value }}ms"
}
))
# Data drift rule
def drift_condition(metrics):
return metrics.get("psi_score", 0) > 0.2
rules.append(AlertRule(
rule_id="data_drift_detected",
name="Significant Data Drift",
condition=drift_condition,
severity="high",
description="Data drift detected with PSI > 0.2",
runbook_url="https://wiki.company.com/runbooks/data-drift",
labels={"team": "data-science", "service": "feature-pipeline"},
annotations={
"summary": "Data drift detected",
"description": "PSI score: {{ $value }}"
}
))
return rules
Escalation Policies
Escalation Manager
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict
import asyncio
class EscalationLevel(Enum):
L1 = 1 # On-call engineer
L2 = 2 # Senior engineer
L3 = 3 # Team lead
L4 = 4 # Management
@dataclass
class EscalationPolicy:
policy_id: str
name: str
levels: List[Dict]
timeout_minutes: int
class EscalationManager:
def __init__(self):
self.policies = {}
self.active_escalations = {}
def add_policy(self, policy: EscalationPolicy):
"""Add escalation policy"""
self.policies[policy.policy_id] = policy
def start_escalation(self, alert: Dict, policy_id: str):
"""Start escalation process"""
policy = self.policies[policy_id]
escalation = {
"escalation_id": f"esc_{alert['alert_id']}",
"alert": alert,
"policy": policy,
"current_level": 0,
"started_at": datetime.now(),
"notifications_sent": [],
"acknowledged": False
}
self.active_escalations[escalation["escalation_id"]] = escalation
# Send initial notification
self._send_notification(escalation, policy.levels[0])
# Start escalation timer
asyncio.create_task(self._escalation_timer(escalation))
async def _escalation_timer(self, escalation: Dict):
"""Timer for escalation"""
policy = escalation["policy"]
while not escalation["acknowledged"]:
await asyncio.sleep(policy.timeout_minutes * 60)
if not escalation["acknowledged"]:
# Escalate to next level
escalation["current_level"] += 1
if escalation["current_level"] < len(policy.levels):
self._send_notification(
escalation,
policy.levels[escalation["current_level"]]
)
else:
# All levels exhausted
break
def _send_notification(self, escalation: Dict, level: Dict):
"""Send notification to escalation level"""
notification = {
"escalation_id": escalation["escalation_id"],
"level": level["level"],
"recipients": level["recipients"],
"channel": level["channel"],
"timestamp": datetime.now().isoformat()
}
escalation["notifications_sent"].append(notification)
# Send notification (implementation depends on channel)
self._dispatch_notification(notification)
def _dispatch_notification(self, notification: Dict):
"""Dispatch notification through appropriate channel"""
channel = notification["channel"]
if channel == "email":
self._send_email(notification)
elif channel == "slack":
self._send_slack(notification)
elif channel == "pagerduty":
self._send_pagerduty(notification)
def acknowledge_alert(self, escalation_id: str, acknowledger: str):
"""Acknowledge alert"""
if escalation_id in self.active_escalations:
escalation = self.active_escalations[escalation_id]
escalation["acknowledged"] = True
escalation["acknowledged_by"] = acknowledger
escalation["acknowledged_at"] = datetime.now().isoformat()
Runbooks
Runbook Management
from dataclasses import dataclass
from typing import List, Dict, Optional
import yaml
@dataclass
class RunbookStep:
step_id: str
description: str
action: str
expected_result: str
troubleshooting: Optional[str]
automation_script: Optional[str]
@dataclass
class Runbook:
runbook_id: str
title: str
description: str
alert_rule_id: str
steps: List[RunbookStep]
escalation_contacts: List[str]
estimated_time: str
prerequisites: List[str]
class RunbookManager:
def __init__(self):
self.runbooks = {}
def load_runbook(self, filepath: str):
"""Load runbook from YAML file"""
with open(filepath, 'r') as f:
runbook_data = yaml.safe_load(f)
runbook = Runbook(
runbook_id=runbook_data["id"],
title=runbook_data["title"],
description=runbook_data["description"],
alert_rule_id=runbook_data["alert_rule_id"],
steps=[RunbookStep(**step) for step in runbook_data["steps"]],
escalation_contacts=runbook_data.get("escalation_contacts", []),
estimated_time=runbook_data.get("estimated_time", "Unknown"),
prerequisites=runbook_data.get("prerequisites", [])
)
self.runbooks[runbook.runbook_id] = runbook
return runbook
def get_runbook_for_alert(self, alert_rule_id: str) -> Optional[Runbook]:
"""Get runbook for specific alert"""
for runbook in self.runbooks.values():
if runbook.alert_rule_id == alert_rule_id:
return runbook
return None
def execute_runbook(self, runbook_id: str, context: Dict) -> Dict:
"""Execute runbook steps"""
runbook = self.runbooks.get(runbook_id)
if not runbook:
return {"success": False, "error": "Runbook not found"}
results = []
for step in runbook.steps:
step_result = self._execute_step(step, context)
results.append(step_result)
if not step_result["success"]:
return {
"success": False,
"failed_step": step.step_id,
"results": results
}
return {"success": True, "results": results}
def _execute_step(self, step: RunbookStep, context: Dict) -> Dict:
"""Execute a single runbook step"""
try:
if step.automation_script:
# Execute automated script
result = self._run_automation_script(step.automation_script, context)
else:
# Manual step
result = {
"success": True,
"message": f"Manual step: {step.description}",
"action_required": step.action
}
return result
except Exception as e:
return {"success": False, "error": str(e)}
Example Runbook YAML
id: runbook_model_accuracy
title: "Model Accuracy Degradation Runbook"
description: "Steps to diagnose and resolve model accuracy issues"
alert_rule_id: model_accuracy_low
estimated_time: "30-60 minutes"
prerequisites:
- Access to model monitoring dashboard
- Access to prediction logs
- Access to training data
steps:
- step_id: step1
description: "Verify the accuracy drop is real"
action: "Check monitoring dashboard for accuracy trend"
expected_result: "Confirm accuracy is below threshold"
troubleshooting: "If dashboard shows different values, check data pipeline"
automation_script: |
#!/bin/bash
curl -s "http://monitoring.internal/api/metrics/accuracy" | jq '.current_value'
- step_id: step2
description: "Check for data drift"
action: "Review data drift metrics in dashboard"
expected_result: "Identify if data drift is causing the issue"
troubleshooting: "If no drift, check for concept drift"
automation_script: |
python -c "
from drift_detector import check_drift
result = check_drift()
print(f'Drift detected: {result[\"drift_detected\"]}')
"
- step_id: step3
description: "Review recent predictions"
action: "Analyze prediction distribution"
expected_result: "Identify patterns in incorrect predictions"
troubleshooting: "If patterns found, investigate specific feature changes"
- step_id: step4
description: "Check model version"
action: "Verify correct model version is deployed"
expected_result: "Confirm expected version is running"
troubleshooting: "If wrong version, rollback to correct version"
- step_id: step5
description: "Retrain model if needed"
action: "Trigger model retraining pipeline"
expected_result: "New model trained with improved performance"
troubleshooting: "If retraining fails, escalate to L2"
Incident Workflow
Incident Manager
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
class IncidentStatus(Enum):
OPEN = "open"
ACKNOWLEDGED = "acknowledged"
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
MONITORING = "monitoring"
RESOLVED = "resolved"
@dataclass
class Incident:
incident_id: str
title: str
description: str
severity: str
status: IncidentStatus
alert: Dict
runbook: Runbook
assignee: str
created_at: str
updated_at: str
resolution_notes: str
class IncidentManager:
def __init__(self):
self.incidents = {}
self.incident_history = []
def create_incident(self, alert: Dict, runbook: Runbook) -> Incident:
"""Create incident from alert"""
incident = Incident(
incident_id=f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
title=alert["name"],
description=alert["description"],
severity=alert["severity"],
status=IncidentStatus.OPEN,
alert=alert,
runbook=runbook,
assignee="",
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
resolution_notes=""
)
self.incidents[incident.incident_id] = incident
return incident
def update_incident(self, incident_id: str, updates: Dict):
"""Update incident status"""
if incident_id in self.incidents:
incident = self.incidents[incident_id]
for key, value in updates.items():
if hasattr(incident, key):
setattr(incident, key, value)
incident.updated_at = datetime.now().isoformat()
def resolve_incident(self, incident_id: str, resolution_notes: str):
"""Resolve incident"""
self.update_incident(incident_id, {
"status": IncidentStatus.RESOLVED,
"resolution_notes": resolution_notes
})
# Move to history
incident = self.incidents.pop(incident_id)
self.incident_history.append(incident)
def get_incident_metrics(self) -> Dict:
"""Get incident metrics"""
open_incidents = [i for i in self.incidents.values()
if i.status != IncidentStatus.RESOLVED]
return {
"open_incidents": len(open_incidents),
"by_severity": self._count_by_severity(open_incidents),
"mean_time_to_resolve": self._calculate_mttr(),
"incidents_last_24h": self._count_recent_incidents(24)
}
def _count_by_severity(self, incidents: List[Incident]) -> Dict:
"""Count incidents by severity"""
counts = {}
for incident in incidents:
counts[incident.severity] = counts.get(incident.severity, 0) + 1
return counts
def _calculate_mttr(self) -> float:
"""Calculate Mean Time to Resolve"""
if not self.incident_history:
return 0.0
total_time = 0
for incident in self.incident_history:
created = datetime.fromisoformat(incident.created_at)
# Find resolution time from status changes
# This is simplified - real implementation would track status changes
total_time += 3600 # Placeholder: 1 hour average
return total_time / len(self.incident_history)
def _count_recent_incidents(self, hours: int) -> int:
"""Count incidents in recent hours"""
cutoff = datetime.now().timestamp() - (hours * 3600)
count = 0
for incident in self.incident_history:
created = datetime.fromisoformat(incident.created_at)
if created.timestamp() > cutoff:
count += 1
return count
Mathematical Foundation
Mean Time to Detect (MTTD)
MTTD
Where:
- ( t_{detect_i} ) is the detection time of incident ( i )
- ( t_{occur_i} ) is the occurrence time of incident ( i )
- ( N ) is the number of incidents
Mean Time to Resolve (MTTR)
MTTR
Alert Fatigue Score
Alert Fatigue Score
Best Practices
1. Alert Grouping
class AlertGrouper:
def __init__(self):
self.grouping_rules = {}
def add_grouping_rule(self, rule_name, group_by_keys):
"""Add grouping rule"""
self.grouping_rules[rule_name] = group_by_keys
def group_alerts(self, alerts):
"""Group related alerts"""
groups = {}
for alert in alerts:
group_key = self._generate_group_key(alert)
if group_key not in groups:
groups[group_key] = {
"alerts": [],
"first_seen": alert["timestamp"],
"last_seen": alert["timestamp"],
"count": 0
}
groups[group_key]["alerts"].append(alert)
groups[group_key]["last_seen"] = alert["timestamp"]
groups[group_key]["count"] += 1
return groups
def _generate_group_key(self, alert):
"""Generate group key for alert"""
key_parts = [
alert.get("name", ""),
alert.get("labels", {}).get("service", ""),
alert.get("labels", {}).get("team", "")
]
return "|".join(key_parts)
2. Incident Postmortem
class PostmortemGenerator:
def __init__(self):
self.postmortem_template = {
"incident_summary": "",
"timeline": [],
"root_cause": "",
"impact": "",
"resolution": "",
"action_items": [],
"lessons_learned": []
}
def generate_postmortem(self, incident: Incident) -> Dict:
"""Generate postmortem document"""
postmortem = self.postmortem_template.copy()
# Fill in details from incident
postmortem["incident_summary"] = f"Incident {incident.incident_id}: {incident.title}"
postmortem["root_cause"] = incident.resolution_notes
postmortem["impact"] = f"Severity: {incident.severity}"
# Generate timeline
postmortem["timeline"] = [
{"time": incident.created_at, "event": "Incident created"},
{"time": incident.updated_at, "event": "Incident updated"}
]
return postmortem
Summary
Effective alerting and incident response systems are critical for maintaining reliable ML operations. By implementing comprehensive alert rules, escalation policies, runbooks, and incident workflows, organizations can quickly detect and resolve issues, minimizing impact on business outcomes.