πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Alerting & Incident Response

AIOps Root Cause AnalysisIncident Management🟒 Free Lesson

Advertisement

Alerting & Incident Response

Effective alerting and incident response systems are crucial for maintaining reliable ML systems. They ensure issues are detected quickly and resolved systematically.

Key Components

  • Alert Rules: Define conditions that trigger alerts
  • Escalation Policies: Determine who gets notified and when
  • Runbooks: Step-by-step guides for incident resolution
  • Incident Workflows: Structured processes for handling incidents

Alerting Architecture

Alert Rule Implementation

Alert Rule Engine

from dataclasses import dataclass
from typing import Dict, List, Callable, Any
from datetime import datetime, timedelta
import json

@dataclass
class AlertRule:
    rule_id: str
    name: str
    condition: Callable[[Dict], bool]
    severity: str
    description: str
    runbook_url: str
    labels: Dict[str, str]
    annotations: Dict[str, str]
    for_duration: timedelta = timedelta(minutes=5)
    repeat_interval: timedelta = timedelta(hours=1)

class AlertRuleEngine:
    def __init__(self):
        self.rules = {}
        self.alert_history = []
        self.active_alerts = {}
    
    def add_rule(self, rule: AlertRule):
        """Add alert rule"""
        self.rules[rule.rule_id] = rule
    
    def evaluate_rules(self, metrics: Dict[str, Any]) -> List[Dict]:
        """Evaluate all rules against current metrics"""
        triggered_alerts = []
        
        for rule_id, rule in self.rules.items():
            try:
                if rule.condition(metrics):
                    alert = self._create_alert(rule, metrics)
                    
                    # Check if alert should fire
                    if self._should_fire_alert(alert):
                        triggered_alerts.append(alert)
                        self._record_alert(alert)
            except Exception as e:
                print(f"Error evaluating rule {rule_id}: {e}")
        
        return triggered_alerts
    
    def _create_alert(self, rule: AlertRule, metrics: Dict) -> Dict:
        """Create alert from rule"""
        return {
            "alert_id": f"{rule.rule_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "rule_id": rule.rule_id,
            "name": rule.name,
            "severity": rule.severity,
            "description": rule.description,
            "runbook_url": rule.runbook_url,
            "labels": rule.labels,
            "annotations": rule.annotations,
            "metrics": metrics,
            "timestamp": datetime.now().isoformat(),
            "status": "firing"
        }
    
    def _should_fire_alert(self, alert: Dict) -> bool:
        """Determine if alert should fire"""
        rule = self.rules[alert["rule_id"]]
        
        # Check if similar alert already active
        for active_alert in self.active_alerts.values():
            if (active_alert["rule_id"] == alert["rule_id"] and
                active_alert["status"] == "firing"):
                # Check repeat interval
                last_fired = datetime.fromisoformat(active_alert["timestamp"])
                if datetime.now() - last_fired < rule.repeat_interval:
                    return False
        
        return True
    
    def _record_alert(self, alert: Dict):
        """Record alert in history"""
        self.alert_history.append(alert)
        self.active_alerts[alert["alert_id"]] = alert

Example Alert Rules

def create_model_alert_rules():
    """Create example alert rules for model monitoring"""
    
    rules = []
    
    # Accuracy drop rule
    def accuracy_condition(metrics):
        return metrics.get("accuracy", 1.0) < 0.85
    
    rules.append(AlertRule(
        rule_id="model_accuracy_low",
        name="Model Accuracy Below Threshold",
        condition=accuracy_condition,
        severity="high",
        description="Model accuracy has dropped below 85%",
        runbook_url="https://wiki.company.com/runbooks/model-accuracy",
        labels={"team": "ml-ops", "service": "prediction-api"},
        annotations={
            "summary": "Model accuracy degraded",
            "description": "Current accuracy: {{ $value }}"
        }
    ))
    
    # Latency increase rule
    def latency_condition(metrics):
        return metrics.get("latency_p99", 0) > 1000  # 1 second
    
    rules.append(AlertRule(
        rule_id="model_latency_high",
        name="High Model Latency",
        condition=latency_condition,
        severity="medium",
        description="Model latency P99 exceeds 1 second",
        runbook_url="https://wiki.company.com/runbooks/model-latency",
        labels={"team": "ml-ops", "service": "prediction-api"},
        annotations={
            "summary": "High latency detected",
            "description": "P99 latency: {{ $value }}ms"
        }
    ))
    
    # Data drift rule
    def drift_condition(metrics):
        return metrics.get("psi_score", 0) > 0.2
    
    rules.append(AlertRule(
        rule_id="data_drift_detected",
        name="Significant Data Drift",
        condition=drift_condition,
        severity="high",
        description="Data drift detected with PSI > 0.2",
        runbook_url="https://wiki.company.com/runbooks/data-drift",
        labels={"team": "data-science", "service": "feature-pipeline"},
        annotations={
            "summary": "Data drift detected",
            "description": "PSI score: {{ $value }}"
        }
    ))
    
    return rules

Escalation Policies

Escalation Manager

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict
import asyncio

class EscalationLevel(Enum):
    L1 = 1  # On-call engineer
    L2 = 2  # Senior engineer
    L3 = 3  # Team lead
    L4 = 4  # Management

@dataclass
class EscalationPolicy:
    policy_id: str
    name: str
    levels: List[Dict]
    timeout_minutes: int

class EscalationManager:
    def __init__(self):
        self.policies = {}
        self.active_escalations = {}
    
    def add_policy(self, policy: EscalationPolicy):
        """Add escalation policy"""
        self.policies[policy.policy_id] = policy
    
    def start_escalation(self, alert: Dict, policy_id: str):
        """Start escalation process"""
        policy = self.policies[policy_id]
        
        escalation = {
            "escalation_id": f"esc_{alert['alert_id']}",
            "alert": alert,
            "policy": policy,
            "current_level": 0,
            "started_at": datetime.now(),
            "notifications_sent": [],
            "acknowledged": False
        }
        
        self.active_escalations[escalation["escalation_id"]] = escalation
        
        # Send initial notification
        self._send_notification(escalation, policy.levels[0])
        
        # Start escalation timer
        asyncio.create_task(self._escalation_timer(escalation))
    
    async def _escalation_timer(self, escalation: Dict):
        """Timer for escalation"""
        policy = escalation["policy"]
        
        while not escalation["acknowledged"]:
            await asyncio.sleep(policy.timeout_minutes * 60)
            
            if not escalation["acknowledged"]:
                # Escalate to next level
                escalation["current_level"] += 1
                
                if escalation["current_level"] < len(policy.levels):
                    self._send_notification(
                        escalation, 
                        policy.levels[escalation["current_level"]]
                    )
                else:
                    # All levels exhausted
                    break
    
    def _send_notification(self, escalation: Dict, level: Dict):
        """Send notification to escalation level"""
        notification = {
            "escalation_id": escalation["escalation_id"],
            "level": level["level"],
            "recipients": level["recipients"],
            "channel": level["channel"],
            "timestamp": datetime.now().isoformat()
        }
        
        escalation["notifications_sent"].append(notification)
        
        # Send notification (implementation depends on channel)
        self._dispatch_notification(notification)
    
    def _dispatch_notification(self, notification: Dict):
        """Dispatch notification through appropriate channel"""
        channel = notification["channel"]
        
        if channel == "email":
            self._send_email(notification)
        elif channel == "slack":
            self._send_slack(notification)
        elif channel == "pagerduty":
            self._send_pagerduty(notification)
    
    def acknowledge_alert(self, escalation_id: str, acknowledger: str):
        """Acknowledge alert"""
        if escalation_id in self.active_escalations:
            escalation = self.active_escalations[escalation_id]
            escalation["acknowledged"] = True
            escalation["acknowledged_by"] = acknowledger
            escalation["acknowledged_at"] = datetime.now().isoformat()

Runbooks

Runbook Management

from dataclasses import dataclass
from typing import List, Dict, Optional
import yaml

@dataclass
class RunbookStep:
    step_id: str
    description: str
    action: str
    expected_result: str
    troubleshooting: Optional[str]
    automation_script: Optional[str]

@dataclass
class Runbook:
    runbook_id: str
    title: str
    description: str
    alert_rule_id: str
    steps: List[RunbookStep]
    escalation_contacts: List[str]
    estimated_time: str
    prerequisites: List[str]

class RunbookManager:
    def __init__(self):
        self.runbooks = {}
    
    def load_runbook(self, filepath: str):
        """Load runbook from YAML file"""
        with open(filepath, 'r') as f:
            runbook_data = yaml.safe_load(f)
        
        runbook = Runbook(
            runbook_id=runbook_data["id"],
            title=runbook_data["title"],
            description=runbook_data["description"],
            alert_rule_id=runbook_data["alert_rule_id"],
            steps=[RunbookStep(**step) for step in runbook_data["steps"]],
            escalation_contacts=runbook_data.get("escalation_contacts", []),
            estimated_time=runbook_data.get("estimated_time", "Unknown"),
            prerequisites=runbook_data.get("prerequisites", [])
        )
        
        self.runbooks[runbook.runbook_id] = runbook
        return runbook
    
    def get_runbook_for_alert(self, alert_rule_id: str) -> Optional[Runbook]:
        """Get runbook for specific alert"""
        for runbook in self.runbooks.values():
            if runbook.alert_rule_id == alert_rule_id:
                return runbook
        return None
    
    def execute_runbook(self, runbook_id: str, context: Dict) -> Dict:
        """Execute runbook steps"""
        runbook = self.runbooks.get(runbook_id)
        if not runbook:
            return {"success": False, "error": "Runbook not found"}
        
        results = []
        for step in runbook.steps:
            step_result = self._execute_step(step, context)
            results.append(step_result)
            
            if not step_result["success"]:
                return {
                    "success": False,
                    "failed_step": step.step_id,
                    "results": results
                }
        
        return {"success": True, "results": results}
    
    def _execute_step(self, step: RunbookStep, context: Dict) -> Dict:
        """Execute a single runbook step"""
        try:
            if step.automation_script:
                # Execute automated script
                result = self._run_automation_script(step.automation_script, context)
            else:
                # Manual step
                result = {
                    "success": True,
                    "message": f"Manual step: {step.description}",
                    "action_required": step.action
                }
            
            return result
        except Exception as e:
            return {"success": False, "error": str(e)}

Example Runbook YAML

id: runbook_model_accuracy
title: "Model Accuracy Degradation Runbook"
description: "Steps to diagnose and resolve model accuracy issues"
alert_rule_id: model_accuracy_low
estimated_time: "30-60 minutes"
prerequisites:
  - Access to model monitoring dashboard
  - Access to prediction logs
  - Access to training data

steps:
  - step_id: step1
    description: "Verify the accuracy drop is real"
    action: "Check monitoring dashboard for accuracy trend"
    expected_result: "Confirm accuracy is below threshold"
    troubleshooting: "If dashboard shows different values, check data pipeline"
    automation_script: |
      #!/bin/bash
      curl -s "http://monitoring.internal/api/metrics/accuracy" | jq '.current_value'

  - step_id: step2
    description: "Check for data drift"
    action: "Review data drift metrics in dashboard"
    expected_result: "Identify if data drift is causing the issue"
    troubleshooting: "If no drift, check for concept drift"
    automation_script: |
      python -c "
      from drift_detector import check_drift
      result = check_drift()
      print(f'Drift detected: {result[\"drift_detected\"]}')
      "

  - step_id: step3
    description: "Review recent predictions"
    action: "Analyze prediction distribution"
    expected_result: "Identify patterns in incorrect predictions"
    troubleshooting: "If patterns found, investigate specific feature changes"

  - step_id: step4
    description: "Check model version"
    action: "Verify correct model version is deployed"
    expected_result: "Confirm expected version is running"
    troubleshooting: "If wrong version, rollback to correct version"

  - step_id: step5
    description: "Retrain model if needed"
    action: "Trigger model retraining pipeline"
    expected_result: "New model trained with improved performance"
    troubleshooting: "If retraining fails, escalate to L2"

Incident Workflow

Incident Manager

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime

class IncidentStatus(Enum):
    OPEN = "open"
    ACKNOWLEDGED = "acknowledged"
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    MONITORING = "monitoring"
    RESOLVED = "resolved"

@dataclass
class Incident:
    incident_id: str
    title: str
    description: str
    severity: str
    status: IncidentStatus
    alert: Dict
    runbook: Runbook
    assignee: str
    created_at: str
    updated_at: str
    resolution_notes: str

class IncidentManager:
    def __init__(self):
        self.incidents = {}
        self.incident_history = []
    
    def create_incident(self, alert: Dict, runbook: Runbook) -> Incident:
        """Create incident from alert"""
        incident = Incident(
            incident_id=f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            title=alert["name"],
            description=alert["description"],
            severity=alert["severity"],
            status=IncidentStatus.OPEN,
            alert=alert,
            runbook=runbook,
            assignee="",
            created_at=datetime.now().isoformat(),
            updated_at=datetime.now().isoformat(),
            resolution_notes=""
        )
        
        self.incidents[incident.incident_id] = incident
        return incident
    
    def update_incident(self, incident_id: str, updates: Dict):
        """Update incident status"""
        if incident_id in self.incidents:
            incident = self.incidents[incident_id]
            
            for key, value in updates.items():
                if hasattr(incident, key):
                    setattr(incident, key, value)
            
            incident.updated_at = datetime.now().isoformat()
    
    def resolve_incident(self, incident_id: str, resolution_notes: str):
        """Resolve incident"""
        self.update_incident(incident_id, {
            "status": IncidentStatus.RESOLVED,
            "resolution_notes": resolution_notes
        })
        
        # Move to history
        incident = self.incidents.pop(incident_id)
        self.incident_history.append(incident)
    
    def get_incident_metrics(self) -> Dict:
        """Get incident metrics"""
        open_incidents = [i for i in self.incidents.values() 
                         if i.status != IncidentStatus.RESOLVED]
        
        return {
            "open_incidents": len(open_incidents),
            "by_severity": self._count_by_severity(open_incidents),
            "mean_time_to_resolve": self._calculate_mttr(),
            "incidents_last_24h": self._count_recent_incidents(24)
        }
    
    def _count_by_severity(self, incidents: List[Incident]) -> Dict:
        """Count incidents by severity"""
        counts = {}
        for incident in incidents:
            counts[incident.severity] = counts.get(incident.severity, 0) + 1
        return counts
    
    def _calculate_mttr(self) -> float:
        """Calculate Mean Time to Resolve"""
        if not self.incident_history:
            return 0.0
        
        total_time = 0
        for incident in self.incident_history:
            created = datetime.fromisoformat(incident.created_at)
            # Find resolution time from status changes
            # This is simplified - real implementation would track status changes
            total_time += 3600  # Placeholder: 1 hour average
        
        return total_time / len(self.incident_history)
    
    def _count_recent_incidents(self, hours: int) -> int:
        """Count incidents in recent hours"""
        cutoff = datetime.now().timestamp() - (hours * 3600)
        count = 0
        
        for incident in self.incident_history:
            created = datetime.fromisoformat(incident.created_at)
            if created.timestamp() > cutoff:
                count += 1
        
        return count

Mathematical Foundation

Mean Time to Detect (MTTD)

MTTD

MTTD=1Nβˆ‘i=1N(tdetectiβˆ’toccuri)MTTD = \frac{1}{N}\sum_{i=1}^{N}(t_{detect_i} - t_{occur_i})

Where:

  • ( t_{detect_i} ) is the detection time of incident ( i )
  • ( t_{occur_i} ) is the occurrence time of incident ( i )
  • ( N ) is the number of incidents

Mean Time to Resolve (MTTR)

MTTR

MTTR=1Nβˆ‘i=1N(tresolveiβˆ’tdetecti)MTTR = \frac{1}{N}\sum_{i=1}^{N}(t_{resolve_i} - t_{detect_i})

Alert Fatigue Score

Alert Fatigue Score

AFS=FalseΒ PositivesTotalΒ AlertsΓ—100AFS = \frac{\text{False Positives}}{\text{Total Alerts}} \times 100

Best Practices

1. Alert Grouping

class AlertGrouper:
    def __init__(self):
        self.grouping_rules = {}
    
    def add_grouping_rule(self, rule_name, group_by_keys):
        """Add grouping rule"""
        self.grouping_rules[rule_name] = group_by_keys
    
    def group_alerts(self, alerts):
        """Group related alerts"""
        groups = {}
        
        for alert in alerts:
            group_key = self._generate_group_key(alert)
            
            if group_key not in groups:
                groups[group_key] = {
                    "alerts": [],
                    "first_seen": alert["timestamp"],
                    "last_seen": alert["timestamp"],
                    "count": 0
                }
            
            groups[group_key]["alerts"].append(alert)
            groups[group_key]["last_seen"] = alert["timestamp"]
            groups[group_key]["count"] += 1
        
        return groups
    
    def _generate_group_key(self, alert):
        """Generate group key for alert"""
        key_parts = [
            alert.get("name", ""),
            alert.get("labels", {}).get("service", ""),
            alert.get("labels", {}).get("team", "")
        ]
        return "|".join(key_parts)

2. Incident Postmortem

class PostmortemGenerator:
    def __init__(self):
        self.postmortem_template = {
            "incident_summary": "",
            "timeline": [],
            "root_cause": "",
            "impact": "",
            "resolution": "",
            "action_items": [],
            "lessons_learned": []
        }
    
    def generate_postmortem(self, incident: Incident) -> Dict:
        """Generate postmortem document"""
        postmortem = self.postmortem_template.copy()
        
        # Fill in details from incident
        postmortem["incident_summary"] = f"Incident {incident.incident_id}: {incident.title}"
        postmortem["root_cause"] = incident.resolution_notes
        postmortem["impact"] = f"Severity: {incident.severity}"
        
        # Generate timeline
        postmortem["timeline"] = [
            {"time": incident.created_at, "event": "Incident created"},
            {"time": incident.updated_at, "event": "Incident updated"}
        ]
        
        return postmortem

Summary

Effective alerting and incident response systems are critical for maintaining reliable ML operations. By implementing comprehensive alert rules, escalation policies, runbooks, and incident workflows, organizations can quickly detect and resolve issues, minimizing impact on business outcomes.

⭐

Premium Content

Alerting & Incident Response

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement