Anomaly Detection
Anomaly detection identifies data points, events, or observations that deviate significantly from the expected pattern, potentially indicating issues or important events.
Types of Anomalies
- Point Anomalies: Individual data points that are far from normal
- Contextual Anomalies: Anomalies that are context-dependent
- Collective Anomalies: A collection of data points that are anomalous together
Anomaly Detection Architecture
Statistical Methods
Z-Score Method
import numpy as np
from typing import Dict, List, Tuple
class ZScoreAnomalyDetector:
def __init__(self, threshold=3.0, window_size=None):
self.threshold = threshold
self.window_size = window_size
self.mean = None
self.std = None
def fit(self, data: np.ndarray):
"""Fit the detector to normal data"""
self.mean = np.mean(data)
self.std = np.std(data)
if self.std == 0:
self.std = 1e-10 # Avoid division by zero
def detect(self, data: np.ndarray) -> Dict:
"""Detect anomalies in data"""
if self.mean is None or self.std is None:
raise ValueError("Detector not fitted. Call fit() first.")
z_scores = np.abs((data - self.mean) / self.std)
anomalies = z_scores > self.threshold
return {
"anomalies": anomalies,
"z_scores": z_scores,
"anomaly_indices": np.where(anomalies)[0],
"anomaly_count": np.sum(anomalies),
"anomaly_rate": np.mean(anomalies)
}
def detect_streaming(self, value: float) -> bool:
"""Detect anomaly in streaming data"""
if self.mean is None or self.std is None:
return False
z_score = abs((value - self.mean) / self.std)
return z_score > self.threshold
class RollingZScoreDetector:
def __init__(self, window_size: int, threshold: float = 3.0):
self.window_size = window_size
self.threshold = threshold
self.data_buffer = []
def detect(self, value: float) -> Dict:
"""Detect anomaly using rolling statistics"""
self.data_buffer.append(value)
if len(self.data_buffer) > self.window_size:
self.data_buffer.pop(0)
if len(self.data_buffer) < 10: # Need minimum data
return {"is_anomaly": False, "z_score": 0.0}
mean = np.mean(self.data_buffer)
std = np.std(self.data_buffer)
if std == 0:
return {"is_anomaly": False, "z_score": 0.0}
z_score = abs((value - mean) / std)
is_anomaly = z_score > self.threshold
return {
"is_anomaly": is_anomaly,
"z_score": z_score,
"mean": mean,
"std": std
}
IQR Method
import numpy as np
from typing import Dict
class IQRAnomalyDetector:
def __init__(self, multiplier=1.5):
self.multiplier = multiplier
self.q1 = None
self.q3 = None
self.iqr = None
def fit(self, data: np.ndarray):
"""Fit the detector to normal data"""
self.q1 = np.percentile(data, 25)
self.q3 = np.percentile(data, 75)
self.iqr = self.q3 - self.q1
if self.iqr == 0:
self.iqr = 1e-10 # Avoid division by zero
def detect(self, data: np.ndarray) -> Dict:
"""Detect anomalies using IQR"""
if self.q1 is None or self.q3 is None:
raise ValueError("Detector not fitted. Call fit() first.")
lower_bound = self.q1 - self.multiplier * self.iqr
upper_bound = self.q3 + self.multiplier * self.iqr
anomalies = (data < lower_bound) | (data > upper_bound)
return {
"anomalies": anomalies,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
"anomaly_indices": np.where(anomalies)[0],
"anomaly_count": np.sum(anomalies),
"anomaly_rate": np.mean(anomalies)
}
def detect_single(self, value: float) -> bool:
"""Detect if a single value is anomalous"""
if self.q1 is None or self.q3 is None:
return False
lower_bound = self.q1 - self.multiplier * self.iqr
upper_bound = self.q3 + self.multiplier * self.iqr
return value < lower_bound or value > upper_bound
ML-Based Methods
Isolation Forest
import numpy as np
from typing import Dict, List
import random
class IsolationTree:
def __init__(self, max_depth=10):
self.max_depth = max_depth
self.tree = None
def fit(self, data: np.ndarray, depth=0):
"""Build isolation tree"""
if depth >= self.max_depth or len(data) <= 1:
return {"type": "leaf", "size": len(data)}
# Random feature and split value
n_features = data.shape[1]
feature_idx = random.randint(0, n_features - 1)
feature_min = np.min(data[:, feature_idx])
feature_max = np.max(data[:, feature_idx])
if feature_min == feature_max:
return {"type": "leaf", "size": len(data)}
split_value = random.uniform(feature_min, feature_max)
left_mask = data[:, feature_idx] < split_value
right_mask = ~left_mask
left_data = data[left_mask]
right_data = data[right_mask]
return {
"type": "internal",
"feature_idx": feature_idx,
"split_value": split_value,
"left": self.fit(left_data, depth + 1),
"right": self.fit(right_data, depth + 1)
}
def path_length(self, point: np.ndarray, node=None, depth=0) -> float:
"""Calculate path length for a point"""
if node is None:
node = self.tree
if node["type"] == "leaf":
return depth
if point[node["feature_idx"]] < node["split_value"]:
return self.path_length(point, node["left"], depth + 1)
else:
return self.path_length(point, node["right"], depth + 1)
class IsolationForest:
def __init__(self, n_trees=100, max_depth=10, contamination=0.1):
self.n_trees = n_trees
self.max_depth = max_depth
self.contamination = contamination
self.trees = []
self.threshold = None
def fit(self, data: np.ndarray):
"""Fit isolation forest"""
self.trees = []
for _ in range(self.n_trees):
# Sample subset
sample_size = min(256, len(data))
sample_indices = random.sample(range(len(data)), sample_size)
sample_data = data[sample_indices]
# Build tree
tree = IsolationTree(self.max_depth)
tree.tree = tree.fit(sample_data)
self.trees.append(tree)
# Calculate threshold
scores = self._calculate_scores(data)
self.threshold = np.percentile(scores, 100 * self.contamination)
def _calculate_scores(self, data: np.ndarray) -> np.ndarray:
"""Calculate anomaly scores"""
scores = []
for point in data:
avg_path_length = np.mean([tree.path_length(point) for tree in self.trees])
scores.append(self._anomaly_score(avg_path_length))
return np.array(scores)
def _anomaly_score(self, path_length: float) -> float:
"""Calculate anomaly score from path length"""
c = self._average_path_length(256) # Average path length for 256 samples
return 2 ** (-path_length / c)
def _average_path_length(self, n: int) -> float:
"""Calculate average path length for unsuccessful search in BST"""
if n <= 1:
return 0
return 2.0 * (np.log(n - 1) + 0.5772156649) - (2.0 * (n - 1) / n)
def detect(self, data: np.ndarray) -> Dict:
"""Detect anomalies"""
scores = self._calculate_scores(data)
anomalies = scores > self.threshold
return {
"anomalies": anomalies,
"scores": scores,
"threshold": self.threshold,
"anomaly_indices": np.where(anomalies)[0],
"anomaly_count": np.sum(anomalies),
"anomaly_rate": np.mean(anomalies)
}
Autoencoder-based Detection
import numpy as np
from typing import Dict, Tuple
class AutoencoderDetector:
def __init__(self, input_dim: int, encoding_dim: int = 32):
self.input_dim = input_dim
self.encoding_dim = encoding_dim
self.encoder_weights = None
self.decoder_weights = None
self.threshold = None
def _sigmoid(self, x):
return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
def _sigmoid_derivative(self, x):
return x * (1 - x)
def fit(self, data: np.ndarray, epochs=100, learning_rate=0.01):
"""Train autoencoder"""
n_samples = data.shape[0]
# Initialize weights
self.encoder_weights = np.random.randn(self.input_dim, self.encoding_dim) * 0.1
self.decoder_weights = np.random.randn(self.encoding_dim, self.input_dim) * 0.1
for epoch in range(epochs):
# Forward pass
encoded = self._sigmoid(np.dot(data, self.encoder_weights))
decoded = self._sigmoid(np.dot(encoded, self.decoder_weights))
# Calculate loss
loss = np.mean((data - decoded) ** 2)
# Backward pass
output_error = data - decoded
output_delta = output_error * self._sigmoid_derivative(decoded)
hidden_error = np.dot(output_delta, self.decoder_weights.T)
hidden_delta = hidden_error * self._sigmoid_derivative(encoded)
# Update weights
self.decoder_weights += learning_rate * np.dot(encoded.T, output_delta) / n_samples
self.encoder_weights += learning_rate * np.dot(data.T, hidden_delta) / n_samples
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss:.6f}")
# Calculate threshold
reconstructions = self._reconstruct(data)
mse = np.mean((data - reconstructions) ** 2, axis=1)
self.threshold = np.percentile(mse, 95) # 95th percentile
def _reconstruct(self, data: np.ndarray) -> np.ndarray:
"""Reconstruct data"""
encoded = self._sigmoid(np.dot(data, self.encoder_weights))
decoded = self._sigmoid(np.dot(encoded, self.decoder_weights))
return decoded
def detect(self, data: np.ndarray) -> Dict:
"""Detect anomalies using reconstruction error"""
reconstructions = self._reconstruct(data)
mse = np.mean((data - reconstructions) ** 2, axis=1)
anomalies = mse > self.threshold
return {
"anomalies": anomalies,
"reconstruction_errors": mse,
"threshold": self.threshold,
"anomaly_indices": np.where(anomalies)[0],
"anomaly_count": np.sum(anomalies),
"anomaly_rate": np.mean(anomalies)
}
Mathematical Foundation
Z-Score Formula
Z-Score
Where:
- ( X ) is the data point
- ( \mu ) is the mean
- ( \sigma ) is the standard deviation
IQR Formula
Interquartile Range
Anomaly boundaries:
IQR Boundaries
Isolation Forest Anomaly Score
Anomaly Score
Where:
- ( E(h(x)) ) is the average path length
- ( c(n) = 2H(n-1) - \frac{2(n-1)}{n} ) is the average path length of unsuccessful search in BST
- ( H(i) ) is the harmonic number
Ensemble Methods
import numpy as np
from typing import Dict, List
class AnomalyEnsemble:
def __init__(self):
self.detectors = []
self.weights = []
def add_detector(self, detector, weight=1.0):
"""Add detector to ensemble"""
self.detectors.append(detector)
self.weights.append(weight)
def detect(self, data: np.ndarray) -> Dict:
"""Detect anomalies using ensemble"""
all_scores = []
for detector, weight in zip(self.detectors, self.weights):
result = detector.detect(data)
scores = result.get("scores", result.get("reconstruction_errors", None))
if scores is not None:
# Normalize scores
normalized_scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores) + 1e-10)
all_scores.append(normalized_scores * weight)
if not all_scores:
return {"anomalies": np.array([]), "scores": np.array([])}
# Combine scores
combined_scores = np.mean(all_scores, axis=0)
# Determine threshold
threshold = np.percentile(combined_scores, 95)
anomalies = combined_scores > threshold
return {
"anomalies": anomalies,
"scores": combined_scores,
"threshold": threshold,
"anomaly_indices": np.where(anomalies)[0],
"anomaly_count": np.sum(anomalies),
"anomaly_rate": np.mean(anomalies)
}
Best Practices
1. Feature Engineering
class AnomalyFeatureEngine:
def __init__(self):
self.feature_extractors = []
def add_extractor(self, extractor):
"""Add feature extractor"""
self.feature_extractors.append(extractor)
def extract_features(self, data: np.ndarray) -> np.ndarray:
"""Extract features for anomaly detection"""
features = []
for extractor in self.feature_extractors:
feature = extractor(data)
features.append(feature)
return np.column_stack(features)
# Example feature extractors
def statistical_features(data):
"""Extract statistical features"""
return [
np.mean(data),
np.std(data),
np.min(data),
np.max(data),
np.median(data)
]
def trend_features(data):
"""Extract trend features"""
x = np.arange(len(data))
slope = np.polyfit(x, data, 1)[0]
return [slope]
2. Threshold Optimization
class ThresholdOptimizer:
def __init__(self):
self.thresholds = {}
def optimize_threshold(self, data: np.ndarray, labels: np.ndarray = None):
"""Optimize anomaly threshold"""
if labels is not None:
# Supervised optimization
return self._supervised_optimization(data, labels)
else:
# Unsupervised optimization
return self._unsupervised_optimization(data)
def _supervised_optimization(self, data: np.ndarray, labels: np.ndarray):
"""Optimize threshold with labeled data"""
best_threshold = 0
best_f1 = 0
for threshold in np.percentile(data, np.arange(80, 99, 1)):
predictions = data > threshold
# Calculate F1 score
tp = np.sum(predictions & labels)
fp = np.sum(predictions & ~labels)
fn = np.sum(~predictions & labels)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
return best_threshold
def _unsupervised_optimization(self, data: np.ndarray):
"""Optimize threshold without labels"""
# Use elbow method or other heuristics
thresholds = np.percentile(data, np.arange(80, 99, 1))
scores = []
for threshold in thresholds:
anomaly_rate = np.mean(data > threshold)
scores.append(anomaly_rate)
# Find elbow point
diffs = np.diff(scores)
elbow_idx = np.argmax(np.abs(diffs))
return thresholds[elbow_idx]
Summary
Anomaly detection is essential for identifying unusual patterns in ML systems. By implementing statistical and ML-based methods, organizations can detect issues early and maintain system reliability through proactive monitoring and response.