Model Monitoring & Drift Detection
A model that worked perfectly in staging can silently fail in production. Learn to detect when your model is degrading and implement safe deployment strategies.
Types of Model Degradation
Statistical Drift Detection
import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class DriftResult:
feature: str
statistic: float
p_value: float
drifted: bool
method: str
class DriftDetector:
def __init__(self, reference_data, significance_level=0.05):
self.reference = reference_data
self.alpha = significance_level
def ks_test(self, current_data: np.ndarray, feature_idx: int) -> DriftResult:
stat, p_value = stats.ks_2samp(
self.reference[:, feature_idx],
current_data[:, feature_idx]
)
return DriftResult(
feature=f"feature_{feature_idx}",
statistic=stat,
p_value=p_value,
drifted=p_value < self.alpha,
method="ks_test"
)
def psi(self, expected, actual, buckets=10) -> float:
"""Population Stability Index"""
expected_pct = np.histogram(expected, bins=buckets)[0] / len(expected) + 1e-6
actual_pct = np.histogram(actual, bins=buckets)[0] / len(actual) + 1e-6
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return psi
def wasserstein_distance(self, current_data: np.ndarray, feature_idx: int) -> float:
return stats.wasserstein_distance(
self.reference[:, feature_idx],
current_data[:, feature_idx]
)
def detect_all(self, current_data: np.ndarray) -> List[DriftResult]:
results = []
for i in range(current_data.shape[1]):
result = self.ks_test(current_data, i)
results.append(result)
return results
def should_alert(self, results: List[DriftResult]) -> bool:
drifted_count = sum(1 for r in results if r.drifted)
return drifted_count > len(results) * 0.3 # Alert if 30%+ features drift
Concept Drift Detection
Concept drift means the relationship between features and target changes.
from sklearn.linear_model import SGDClassifier
from collections import deque
import numpy as np
class ADWIN:
"""Adaptive Windowing for concept drift detection"""
def __init__(self, delta=0.002):
self.delta = delta
self.window = deque()
def update(self, value):
self.window.append(value)
if len(self.window) > 10:
self._check_drift()
def _check_drift(self):
n = len(self.window)
for i in range(1, n):
left = list(self.window)[:i]
right = list(self.window)[i:]
mean_left = np.mean(left)
mean_right = np.mean(right)
n_left = len(left)
n_right = len(right)
epsilon = np.sqrt(
(1.0 / (2 * n_left) + 1.0 / (2 * n_right)) *
2 * np.log(2 * n / self.delta)
)
if abs(mean_left - mean_right) > epsilon:
# Drift detected, shrink window
self.window = deque(right)
return True
return False
class ConceptDriftDetector:
def __init__(self, window_size=1000):
self.window_size = window_size
self.predictions = deque(maxlen=window_size)
self.actuals = deque(maxlen=window_size)
self.adwin = ADWIN()
def update(self, prediction, actual):
error = 1.0 if prediction != actual else 0.0
self.predictions.append(prediction)
self.actuals.append(actual)
return self.adwin.update(error)
def get_error_rate(self):
if len(self.actuals) == 0:
return 0.0
errors = sum(p != a for p, a in zip(self.predictions, self.actuals))
return errors / len(self.actuals)
class OnlineLearningDetector:
def __init__(self, model=None):
self.model = model or SGDClassifier(loss='log_loss')
self.batch_X = []
self.batch_y = []
self.batch_size = 100
def partial_fit(self, X, y):
self.batch_X.extend(X)
self.batch_y.extend(y)
if len(self.batch_X) >= self.batch_size:
self.model.partial_fit(
np.array(self.batch_X),
np.array(self.batch_y),
classes=np.unique(self.batch_y)
)
self.batch_X = []
self.batch_y = []
Shadow Deployments
Run the new model alongside production without affecting users.
import time
import json
from datetime import datetime
from typing import Dict, Any
class ShadowDeployment:
def __init__(self, production_model, candidate_model):
self.production = production_model
self.candidate = candidate_model
self.metrics = {
"production": {"latencies": [], "predictions": []},
"candidate": {"latencies": [], "predictions": []}
}
def predict(self, features):
# Production prediction (used by users)
start = time.time()
prod_pred = self.production.predict(features)
prod_latency = (time.time() - start) * 1000
# Candidate prediction (shadow, not served)
start = time.time()
cand_pred = self.candidate.predict(features)
cand_latency = (time.time() - start) * 1000
self.metrics["production"]["latencies"].append(prod_latency)
self.metrics["production"]["predictions"].append(prod_pred)
self.metrics["candidate"]["latencies"].append(cand_latency)
self.metrics["candidate"]["predictions"].append(cand_pred)
return prod_pred # Only production goes to user
def get_comparison(self):
return {
"production_latency_p50": np.percentile(self.metrics["production"]["latencies"], 50),
"production_latency_p99": np.percentile(self.metrics["production"]["latencies"], 99),
"candidate_latency_p50": np.percentile(self.metrics["candidate"]["latencies"], 50),
"candidate_latency_p99": np.percentile(self.metrics["candidate"]["latencies"], 99),
"prediction_agreement": np.mean(
np.array(self.metrics["production"]["predictions"]) ==
np.array(self.metrics["candidate"]["predictions"])
)
}
Canary Releases
Gradually shift traffic to the new model.
import random
from dataclasses import dataclass
@dataclass
class CanaryConfig:
initial_traffic_pct: float = 0.05
increment_pct: float = 0.05
max_traffic_pct: float = 1.0
evaluation_window_hours: float = 24
min_samples: int = 1000
rollback_threshold: float = 0.05 # Rollback if performance drops 5%
class CanaryRelease:
def __init__(self, production_model, canary_model, config: CanaryConfig):
self.production = production_model
self.canary = canary_model
self.config = config
self.current_traffic_pct = config.initial_traffic_pct
self.production_metrics = []
self.canary_metrics = []
def route_request(self, request_id):
if random.random() < self.current_traffic_pct:
return "canary", self.canary
return "production", self.production
def record_outcome(self, variant, prediction, actual):
correct = prediction == actual
if variant == "production":
self.production_metrics.append(correct)
else:
self.canary_metrics.append(correct)
def should_promote(self):
if len(self.canary_metrics) < self.config.min_samples:
return None
prod_rate = np.mean(self.production_metrics[-self.config.min_samples:])
canary_rate = np.mean(self.canary_metrics[-self.config.min_samples:])
if canary_rate < prod_rate - self.config.rollback_threshold:
return "rollback"
if canary_rate >= prod_rate:
if self.current_traffic_pct < self.config.max_traffic_pct:
self.current_traffic_pct = min(
self.current_traffic_pct + self.config.increment_pct,
self.config.max_traffic_pct
)
return "promote"
return "maintain"
Key Takeaways
- Monitor both data distributions and model performance metrics
- Use ADWIN or Page-Hinkley tests for automated concept drift detection
- Shadow deployments let you validate models without user impact
- Canary releases provide gradual, reversible rollouts with automatic rollback triggers