πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Model Monitoring

AIOps CoreMonitoring Systems🟒 Free Lesson

Advertisement

Model Monitoring

Model monitoring is the continuous process of tracking ML model performance, data quality, and system health to ensure models remain accurate and reliable in production.

Key Monitoring Areas

  • Data Drift: Changes in input data distribution
  • Concept Drift: Changes in relationship between inputs and outputs
  • Performance Monitoring: Model accuracy and latency metrics
  • System Health: Infrastructure and resource utilization
  • Business Metrics: Impact on business outcomes

Monitoring Architecture

Data Drift Detection

Implementation

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple

class DataDriftDetector:
    def __init__(self, reference_data, significance_level=0.05):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.reference_distributions = self._compute_distributions(reference_data)
    
    def _compute_distributions(self, data):
        """Compute reference distributions"""
        distributions = {}
        for column in data.select_dtypes(include=[np.number]).columns:
            distributions[column] = {
                "mean": data[column].mean(),
                "std": data[column].std(),
                "histogram": np.histogram(data[column], bins=50)
            }
        return distributions
    
    def detect_drift(self, current_data) -> Dict[str, Dict]:
        """Detect drift in current data"""
        drift_results = {}
        
        for column in current_data.select_dtypes(include=[np.number]).columns:
            if column in self.reference_distributions:
                # KS test
                ks_statistic, ks_pvalue = stats.ks_2samp(
                    self.reference_data[column].dropna(),
                    current_data[column].dropna()
                )
                
                # Chi-squared test for categorical
                chi2_stat, chi2_pvalue = self._chi_squared_test(
                    self.reference_data[column],
                    current_data[column]
                )
                
                drift_results[column] = {
                    "ks_statistic": ks_statistic,
                    "ks_pvalue": ks_pvalue,
                    "chi2_statistic": chi2_stat,
                    "chi2_pvalue": chi2_pvalue,
                    "drift_detected": ks_pvalue < self.significance_level,
                    "mean_shift": current_data[column].mean() - self.reference_distributions[column]["mean"],
                    "std_ratio": current_data[column].std() / self.reference_distributions[column]["std"]
                }
        
        return drift_results
    
    def _chi_squared_test(self, reference, current):
        """Chi-squared test for distribution comparison"""
        # Bin the data
        bins = np.linspace(
            min(reference.min(), current.min()),
            max(reference.max(), current.max()),
            20
        )
        
        ref_hist, _ = np.histogram(reference.dropna(), bins=bins)
        cur_hist, _ = np.histogram(current.dropna(), bins=bins)
        
        # Avoid division by zero
        ref_hist = ref_hist + 1e-10
        cur_hist = cur_hist + 1e-10
        
        chi2_stat, p_value = stats.chisquare(cur_hist, ref_hist)
        return chi2_stat, p_value

Concept Drift Detection

from collections import deque
import numpy as np

class ConceptDriftDetector:
    def __init__(self, window_size=100, threshold=0.05):
        self.window_size = window_size
        self.threshold = threshold
        self.predictions = deque(maxlen=window_size)
        self.actuals = deque(maxlen=window_size)
    
    def add_observation(self, prediction, actual):
        """Add prediction-actual pair"""
        self.predictions.append(prediction)
        self.actuals.append(actual)
    
    def detect_drift(self):
        """Detect concept drift using Page-Hinkley test"""
        if len(self.predictions) < self.window_size:
            return False, 0.0
        
        # Calculate cumulative error
        errors = [1 if p != a else 0 for p, a in zip(self.predictions, self.actuals)]
        cumulative_error = np.cumsum(errors)
        
        # Page-Hinkley test
        mean_error = np.mean(errors)
        ph_statistic = np.max(cumulative_error - np.arange(1, len(errors) + 1) * mean_error)
        
        drift_detected = ph_statistic > self.threshold * self.window_size
        
        return drift_detected, ph_statistic
    
    def get_drift_score(self):
        """Get current drift score"""
        if len(self.predictions) == 0:
            return 0.0
        
        errors = [1 if p != a else 0 for p, a in zip(self.predictions, self.actuals)]
        return np.mean(errors)

Performance Monitoring

Comprehensive Monitoring

import time
from dataclasses import dataclass
from typing import Dict, List
import numpy as np

@dataclass
class ModelMetrics:
    accuracy: float
    precision: float
    recall: float
    f1_score: float
    latency_p50: float
    latency_p95: float
    latency_p99: float
    throughput: float
    error_rate: float

class ModelPerformanceMonitor:
    def __init__(self):
        self.metrics_history = []
        self.latency_buffer = []
        self.prediction_buffer = []
    
    def record_prediction(self, prediction, actual=None, latency_ms=None):
        """Record a prediction"""
        self.prediction_buffer.append({
            "prediction": prediction,
            "actual": actual,
            "latency": latency_ms,
            "timestamp": time.time()
        })
        
        if latency_ms:
            self.latency_buffer.append(latency_ms)
    
    def calculate_metrics(self) -> ModelMetrics:
        """Calculate current performance metrics"""
        # Filter predictions with actuals
        labeled = [p for p in self.prediction_buffer if p["actual"] is not None]
        
        if not labeled:
            return None
        
        predictions = [p["prediction"] for p in labeled]
        actuals = [p["actual"] for p in labeled]
        
        # Calculate classification metrics
        tp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 1)
        fp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 0)
        fn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 1)
        tn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 0)
        
        accuracy = (tp + tn) / (tp + fp + fn + tn) if (tp + fp + fn + tn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        # Calculate latency metrics
        if self.latency_buffer:
            latency_p50 = np.percentile(self.latency_buffer, 50)
            latency_p95 = np.percentile(self.latency_buffer, 95)
            latency_p99 = np.percentile(self.latency_buffer, 99)
        else:
            latency_p50 = latency_p95 = latency_p99 = 0
        
        # Calculate throughput
        if len(self.prediction_buffer) > 1:
            time_window = self.prediction_buffer[-1]["timestamp"] - self.prediction_buffer[0]["timestamp"]
            throughput = len(self.prediction_buffer) / time_window if time_window > 0 else 0
        else:
            throughput = 0
        
        # Calculate error rate
        errors = sum(1 for p in self.prediction_buffer if p["actual"] is not None and p["prediction"] != p["actual"])
        error_rate = errors / len(self.prediction_buffer) if self.prediction_buffer else 0
        
        metrics = ModelMetrics(
            accuracy=accuracy,
            precision=precision,
            recall=recall,
            f1_score=f1_score,
            latency_p50=latency_p50,
            latency_p95=latency_p95,
            latency_p99=latency_p99,
            throughput=throughput,
            error_rate=error_rate
        )
        
        self.metrics_history.append(metrics)
        return metrics
    
    def detect_anomalies(self, window_size=100):
        """Detect performance anomalies"""
        if len(self.metrics_history) < window_size:
            return []
        
        recent_metrics = self.metrics_history[-window_size:]
        anomalies = []
        
        # Check accuracy drop
        accuracies = [m.accuracy for m in recent_metrics]
        if len(accuracies) > 1:
            mean_acc = np.mean(accuracies[:-1])
            std_acc = np.std(accuracies[:-1])
            current_acc = accuracies[-1]
            
            if current_acc < mean_acc - 2 * std_acc:
                anomalies.append({
                    "type": "accuracy_drop",
                    "severity": "high",
                    "current": current_acc,
                    "expected": mean_acc,
                    "deviation": (mean_acc - current_acc) / std_acc
                })
        
        # Check latency increase
        latencies = [m.latency_p95 for m in recent_metrics]
        if len(latencies) > 1:
            mean_lat = np.mean(latencies[:-1])
            std_lat = np.std(latencies[:-1])
            current_lat = latencies[-1]
            
            if current_lat > mean_lat + 3 * std_lat:
                anomalies.append({
                    "type": "latency_increase",
                    "severity": "medium",
                    "current": current_lat,
                    "expected": mean_lat,
                    "deviation": (current_lat - mean_lat) / std_lat
                })
        
        return anomalies

Mathematical Foundation

Population Stability Index (PSI)

Population Stability Index

PSI=βˆ‘i=1N(Piβˆ’Qi)β‹…ln⁑(PiQi)PSI = \sum_{i=1}^{N}(P_i - Q_i) \cdot \ln\left(\frac{P_i}{Q_i}\right)

Where:

  • ( P_i ) is the proportion of observations in bin ( i ) for the reference distribution
  • ( Q_i ) is the proportion of observations in bin ( i ) for the current distribution
  • ( N ) is the number of bins

Kolmogorov-Smirnov Statistic

KS Statistic

DKS=sup⁑x∣Fref(x)βˆ’Fcurr(x)∣D_{KS} = \sup_x |F_{ref}(x) - F_{curr}(x)|

Where:

  • ( F_{ref}(x) ) is the reference distribution function
  • ( F_{curr}(x) ) is the current distribution function

Alert Threshold Calculation

Dynamic thresholds based on historical variance:

Dynamic Threshold

T=ΞΌ+kβ‹…ΟƒT = \mu + k \cdot \sigma

Where:

  • ( \mu ) is the historical mean
  • ( \sigma ) is the historical standard deviation
  • ( k ) is the sensitivity parameter (typically 2-3)

Monitoring Dashboard

class MonitoringDashboard:
    def __init__(self, monitor):
        self.monitor = monitor
    
    def generate_report(self):
        """Generate comprehensive monitoring report"""
        metrics = self.monitor.calculate_metrics()
        drift_results = self.monitor.detect_drift()
        anomalies = self.monitor.detect_anomalies()
        
        report = {
            "timestamp": time.time(),
            "model_performance": {
                "accuracy": metrics.accuracy,
                "precision": metrics.precision,
                "recall": metrics.recall,
                "f1_score": metrics.f1_score
            },
            "latency_metrics": {
                "p50": metrics.latency_p50,
                "p95": metrics.latency_p95,
                "p99": metrics.latency_p99,
                "throughput": metrics.throughput
            },
            "data_quality": {
                "drift_detected": drift_results.get("drift_detected", False),
                "drift_score": drift_results.get("drift_score", 0.0)
            },
            "anomalies": anomalies,
            "health_status": self._calculate_health_status(metrics, drift_results, anomalies)
        }
        
        return report
    
    def _calculate_health_status(self, metrics, drift_results, anomalies):
        """Calculate overall health status"""
        if anomalies:
            return "CRITICAL"
        elif drift_results.get("drift_detected", False):
            return "WARNING"
        elif metrics.accuracy < 0.8:
            return "DEGRADED"
        else:
            return "HEALTHY"

Best Practices

1. Comprehensive Logging

import logging
import json

class ModelLogger:
    def __init__(self, log_file):
        self.logger = logging.getLogger("model_monitor")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_prediction(self, request_id, prediction, features, latency):
        """Log prediction details"""
        log_entry = {
            "request_id": request_id,
            "prediction": prediction,
            "features": features,
            "latency_ms": latency,
            "timestamp": time.time()
        }
        self.logger.info(json.dumps(log_entry))

2. Automated Alerting

class AlertManager:
    def __init__(self, thresholds):
        self.thresholds = thresholds
    
    def check_alerts(self, metrics):
        """Check if metrics exceed thresholds"""
        alerts = []
        
        if metrics.accuracy < self.thresholds["accuracy_min"]:
            alerts.append({
                "type": "accuracy",
                "severity": "high",
                "message": f"Accuracy {metrics.accuracy} below threshold {self.thresholds['accuracy_min']}"
            })
        
        if metrics.latency_p99 > self.thresholds["latency_max"]:
            alerts.append({
                "type": "latency",
                "severity": "medium",
                "message": f"Latency P99 {metrics.latency_p99}ms above threshold {self.thresholds['latency_max']}ms"
            })
        
        return alerts

3. Model Retraining Triggers

class RetrainingTrigger:
    def __init__(self, monitor):
        self.monitor = monitor
    
    def should_retrain(self):
        """Determine if model should be retrained"""
        metrics = self.monitor.calculate_metrics()
        drift_results = self.monitor.detect_drift()
        anomalies = self.monitor.detect_anomalies()
        
        # Retrain if significant drift or performance degradation
        if drift_results.get("drift_detected", False):
            return True, "data_drift"
        
        if metrics and metrics.accuracy < 0.75:
            return True, "performance_degradation"
        
        if anomalies:
            return True, "anomaly_detected"
        
        return False, None

Summary

Model monitoring is essential for maintaining reliable ML systems in production. By implementing comprehensive monitoring for data drift, concept drift, and performance metrics, organizations can detect issues early and ensure model accuracy and reliability over time.

⭐

Premium Content

Model Monitoring

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement