πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Anomaly Detection

AIOps CoreAnomaly Detection🟒 Free Lesson

Advertisement

Anomaly Detection

Anomaly detection identifies data points, events, or observations that deviate significantly from the expected pattern, potentially indicating issues or important events.

Types of Anomalies

  • Point Anomalies: Individual data points that are far from normal
  • Contextual Anomalies: Anomalies that are context-dependent
  • Collective Anomalies: A collection of data points that are anomalous together

Anomaly Detection Architecture

Statistical Methods

Z-Score Method

import numpy as np
from typing import Dict, List, Tuple

class ZScoreAnomalyDetector:
    def __init__(self, threshold=3.0, window_size=None):
        self.threshold = threshold
        self.window_size = window_size
        self.mean = None
        self.std = None
    
    def fit(self, data: np.ndarray):
        """Fit the detector to normal data"""
        self.mean = np.mean(data)
        self.std = np.std(data)
        
        if self.std == 0:
            self.std = 1e-10  # Avoid division by zero
    
    def detect(self, data: np.ndarray) -> Dict:
        """Detect anomalies in data"""
        if self.mean is None or self.std is None:
            raise ValueError("Detector not fitted. Call fit() first.")
        
        z_scores = np.abs((data - self.mean) / self.std)
        anomalies = z_scores > self.threshold
        
        return {
            "anomalies": anomalies,
            "z_scores": z_scores,
            "anomaly_indices": np.where(anomalies)[0],
            "anomaly_count": np.sum(anomalies),
            "anomaly_rate": np.mean(anomalies)
        }
    
    def detect_streaming(self, value: float) -> bool:
        """Detect anomaly in streaming data"""
        if self.mean is None or self.std is None:
            return False
        
        z_score = abs((value - self.mean) / self.std)
        return z_score > self.threshold

class RollingZScoreDetector:
    def __init__(self, window_size: int, threshold: float = 3.0):
        self.window_size = window_size
        self.threshold = threshold
        self.data_buffer = []
    
    def detect(self, value: float) -> Dict:
        """Detect anomaly using rolling statistics"""
        self.data_buffer.append(value)
        
        if len(self.data_buffer) > self.window_size:
            self.data_buffer.pop(0)
        
        if len(self.data_buffer) < 10:  # Need minimum data
            return {"is_anomaly": False, "z_score": 0.0}
        
        mean = np.mean(self.data_buffer)
        std = np.std(self.data_buffer)
        
        if std == 0:
            return {"is_anomaly": False, "z_score": 0.0}
        
        z_score = abs((value - mean) / std)
        is_anomaly = z_score > self.threshold
        
        return {
            "is_anomaly": is_anomaly,
            "z_score": z_score,
            "mean": mean,
            "std": std
        }

IQR Method

import numpy as np
from typing import Dict

class IQRAnomalyDetector:
    def __init__(self, multiplier=1.5):
        self.multiplier = multiplier
        self.q1 = None
        self.q3 = None
        self.iqr = None
    
    def fit(self, data: np.ndarray):
        """Fit the detector to normal data"""
        self.q1 = np.percentile(data, 25)
        self.q3 = np.percentile(data, 75)
        self.iqr = self.q3 - self.q1
        
        if self.iqr == 0:
            self.iqr = 1e-10  # Avoid division by zero
    
    def detect(self, data: np.ndarray) -> Dict:
        """Detect anomalies using IQR"""
        if self.q1 is None or self.q3 is None:
            raise ValueError("Detector not fitted. Call fit() first.")
        
        lower_bound = self.q1 - self.multiplier * self.iqr
        upper_bound = self.q3 + self.multiplier * self.iqr
        
        anomalies = (data < lower_bound) | (data > upper_bound)
        
        return {
            "anomalies": anomalies,
            "lower_bound": lower_bound,
            "upper_bound": upper_bound,
            "anomaly_indices": np.where(anomalies)[0],
            "anomaly_count": np.sum(anomalies),
            "anomaly_rate": np.mean(anomalies)
        }
    
    def detect_single(self, value: float) -> bool:
        """Detect if a single value is anomalous"""
        if self.q1 is None or self.q3 is None:
            return False
        
        lower_bound = self.q1 - self.multiplier * self.iqr
        upper_bound = self.q3 + self.multiplier * self.iqr
        
        return value < lower_bound or value > upper_bound

ML-Based Methods

Isolation Forest

import numpy as np
from typing import Dict, List
import random

class IsolationTree:
    def __init__(self, max_depth=10):
        self.max_depth = max_depth
        self.tree = None
    
    def fit(self, data: np.ndarray, depth=0):
        """Build isolation tree"""
        if depth >= self.max_depth or len(data) <= 1:
            return {"type": "leaf", "size": len(data)}
        
        # Random feature and split value
        n_features = data.shape[1]
        feature_idx = random.randint(0, n_features - 1)
        
        feature_min = np.min(data[:, feature_idx])
        feature_max = np.max(data[:, feature_idx])
        
        if feature_min == feature_max:
            return {"type": "leaf", "size": len(data)}
        
        split_value = random.uniform(feature_min, feature_max)
        
        left_mask = data[:, feature_idx] < split_value
        right_mask = ~left_mask
        
        left_data = data[left_mask]
        right_data = data[right_mask]
        
        return {
            "type": "internal",
            "feature_idx": feature_idx,
            "split_value": split_value,
            "left": self.fit(left_data, depth + 1),
            "right": self.fit(right_data, depth + 1)
        }
    
    def path_length(self, point: np.ndarray, node=None, depth=0) -> float:
        """Calculate path length for a point"""
        if node is None:
            node = self.tree
        
        if node["type"] == "leaf":
            return depth
        
        if point[node["feature_idx"]] < node["split_value"]:
            return self.path_length(point, node["left"], depth + 1)
        else:
            return self.path_length(point, node["right"], depth + 1)

class IsolationForest:
    def __init__(self, n_trees=100, max_depth=10, contamination=0.1):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.contamination = contamination
        self.trees = []
        self.threshold = None
    
    def fit(self, data: np.ndarray):
        """Fit isolation forest"""
        self.trees = []
        
        for _ in range(self.n_trees):
            # Sample subset
            sample_size = min(256, len(data))
            sample_indices = random.sample(range(len(data)), sample_size)
            sample_data = data[sample_indices]
            
            # Build tree
            tree = IsolationTree(self.max_depth)
            tree.tree = tree.fit(sample_data)
            self.trees.append(tree)
        
        # Calculate threshold
        scores = self._calculate_scores(data)
        self.threshold = np.percentile(scores, 100 * self.contamination)
    
    def _calculate_scores(self, data: np.ndarray) -> np.ndarray:
        """Calculate anomaly scores"""
        scores = []
        
        for point in data:
            avg_path_length = np.mean([tree.path_length(point) for tree in self.trees])
            scores.append(self._anomaly_score(avg_path_length))
        
        return np.array(scores)
    
    def _anomaly_score(self, path_length: float) -> float:
        """Calculate anomaly score from path length"""
        c = self._average_path_length(256)  # Average path length for 256 samples
        return 2 ** (-path_length / c)
    
    def _average_path_length(self, n: int) -> float:
        """Calculate average path length for unsuccessful search in BST"""
        if n <= 1:
            return 0
        return 2.0 * (np.log(n - 1) + 0.5772156649) - (2.0 * (n - 1) / n)
    
    def detect(self, data: np.ndarray) -> Dict:
        """Detect anomalies"""
        scores = self._calculate_scores(data)
        anomalies = scores > self.threshold
        
        return {
            "anomalies": anomalies,
            "scores": scores,
            "threshold": self.threshold,
            "anomaly_indices": np.where(anomalies)[0],
            "anomaly_count": np.sum(anomalies),
            "anomaly_rate": np.mean(anomalies)
        }

Autoencoder-based Detection

import numpy as np
from typing import Dict, Tuple

class AutoencoderDetector:
    def __init__(self, input_dim: int, encoding_dim: int = 32):
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        self.encoder_weights = None
        self.decoder_weights = None
        self.threshold = None
    
    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
    
    def _sigmoid_derivative(self, x):
        return x * (1 - x)
    
    def fit(self, data: np.ndarray, epochs=100, learning_rate=0.01):
        """Train autoencoder"""
        n_samples = data.shape[0]
        
        # Initialize weights
        self.encoder_weights = np.random.randn(self.input_dim, self.encoding_dim) * 0.1
        self.decoder_weights = np.random.randn(self.encoding_dim, self.input_dim) * 0.1
        
        for epoch in range(epochs):
            # Forward pass
            encoded = self._sigmoid(np.dot(data, self.encoder_weights))
            decoded = self._sigmoid(np.dot(encoded, self.decoder_weights))
            
            # Calculate loss
            loss = np.mean((data - decoded) ** 2)
            
            # Backward pass
            output_error = data - decoded
            output_delta = output_error * self._sigmoid_derivative(decoded)
            
            hidden_error = np.dot(output_delta, self.decoder_weights.T)
            hidden_delta = hidden_error * self._sigmoid_derivative(encoded)
            
            # Update weights
            self.decoder_weights += learning_rate * np.dot(encoded.T, output_delta) / n_samples
            self.encoder_weights += learning_rate * np.dot(data.T, hidden_delta) / n_samples
            
            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {loss:.6f}")
        
        # Calculate threshold
        reconstructions = self._reconstruct(data)
        mse = np.mean((data - reconstructions) ** 2, axis=1)
        self.threshold = np.percentile(mse, 95)  # 95th percentile
    
    def _reconstruct(self, data: np.ndarray) -> np.ndarray:
        """Reconstruct data"""
        encoded = self._sigmoid(np.dot(data, self.encoder_weights))
        decoded = self._sigmoid(np.dot(encoded, self.decoder_weights))
        return decoded
    
    def detect(self, data: np.ndarray) -> Dict:
        """Detect anomalies using reconstruction error"""
        reconstructions = self._reconstruct(data)
        mse = np.mean((data - reconstructions) ** 2, axis=1)
        anomalies = mse > self.threshold
        
        return {
            "anomalies": anomalies,
            "reconstruction_errors": mse,
            "threshold": self.threshold,
            "anomaly_indices": np.where(anomalies)[0],
            "anomaly_count": np.sum(anomalies),
            "anomaly_rate": np.mean(anomalies)
        }

Mathematical Foundation

Z-Score Formula

Z-Score

Z=Xβˆ’ΞΌΟƒZ = \frac{X - \mu}{\sigma}

Where:

  • ( X ) is the data point
  • ( \mu ) is the mean
  • ( \sigma ) is the standard deviation

IQR Formula

Interquartile Range

IQR=Q3βˆ’Q1IQR = Q_3 - Q_1

Anomaly boundaries:

IQR Boundaries

Lower=Q1βˆ’kβ‹…IQR,Upper=Q3+kβ‹…IQRLower = Q_1 - k \cdot IQR, \quad Upper = Q_3 + k \cdot IQR

Isolation Forest Anomaly Score

Anomaly Score

s(x,n)=2βˆ’E(h(x))c(n)s(x, n) = 2^{-\frac{E(h(x))}{c(n)}}

Where:

  • ( E(h(x)) ) is the average path length
  • ( c(n) = 2H(n-1) - \frac{2(n-1)}{n} ) is the average path length of unsuccessful search in BST
  • ( H(i) ) is the harmonic number

Ensemble Methods

import numpy as np
from typing import Dict, List

class AnomalyEnsemble:
    def __init__(self):
        self.detectors = []
        self.weights = []
    
    def add_detector(self, detector, weight=1.0):
        """Add detector to ensemble"""
        self.detectors.append(detector)
        self.weights.append(weight)
    
    def detect(self, data: np.ndarray) -> Dict:
        """Detect anomalies using ensemble"""
        all_scores = []
        
        for detector, weight in zip(self.detectors, self.weights):
            result = detector.detect(data)
            scores = result.get("scores", result.get("reconstruction_errors", None))
            
            if scores is not None:
                # Normalize scores
                normalized_scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores) + 1e-10)
                all_scores.append(normalized_scores * weight)
        
        if not all_scores:
            return {"anomalies": np.array([]), "scores": np.array([])}
        
        # Combine scores
        combined_scores = np.mean(all_scores, axis=0)
        
        # Determine threshold
        threshold = np.percentile(combined_scores, 95)
        anomalies = combined_scores > threshold
        
        return {
            "anomalies": anomalies,
            "scores": combined_scores,
            "threshold": threshold,
            "anomaly_indices": np.where(anomalies)[0],
            "anomaly_count": np.sum(anomalies),
            "anomaly_rate": np.mean(anomalies)
        }

Best Practices

1. Feature Engineering

class AnomalyFeatureEngine:
    def __init__(self):
        self.feature_extractors = []
    
    def add_extractor(self, extractor):
        """Add feature extractor"""
        self.feature_extractors.append(extractor)
    
    def extract_features(self, data: np.ndarray) -> np.ndarray:
        """Extract features for anomaly detection"""
        features = []
        
        for extractor in self.feature_extractors:
            feature = extractor(data)
            features.append(feature)
        
        return np.column_stack(features)

# Example feature extractors
def statistical_features(data):
    """Extract statistical features"""
    return [
        np.mean(data),
        np.std(data),
        np.min(data),
        np.max(data),
        np.median(data)
    ]

def trend_features(data):
    """Extract trend features"""
    x = np.arange(len(data))
    slope = np.polyfit(x, data, 1)[0]
    return [slope]

2. Threshold Optimization

class ThresholdOptimizer:
    def __init__(self):
        self.thresholds = {}
    
    def optimize_threshold(self, data: np.ndarray, labels: np.ndarray = None):
        """Optimize anomaly threshold"""
        if labels is not None:
            # Supervised optimization
            return self._supervised_optimization(data, labels)
        else:
            # Unsupervised optimization
            return self._unsupervised_optimization(data)
    
    def _supervised_optimization(self, data: np.ndarray, labels: np.ndarray):
        """Optimize threshold with labeled data"""
        best_threshold = 0
        best_f1 = 0
        
        for threshold in np.percentile(data, np.arange(80, 99, 1)):
            predictions = data > threshold
            
            # Calculate F1 score
            tp = np.sum(predictions & labels)
            fp = np.sum(predictions & ~labels)
            fn = np.sum(~predictions & labels)
            
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
            
            if f1 > best_f1:
                best_f1 = f1
                best_threshold = threshold
        
        return best_threshold
    
    def _unsupervised_optimization(self, data: np.ndarray):
        """Optimize threshold without labels"""
        # Use elbow method or other heuristics
        thresholds = np.percentile(data, np.arange(80, 99, 1))
        scores = []
        
        for threshold in thresholds:
            anomaly_rate = np.mean(data > threshold)
            scores.append(anomaly_rate)
        
        # Find elbow point
        diffs = np.diff(scores)
        elbow_idx = np.argmax(np.abs(diffs))
        
        return thresholds[elbow_idx]

Summary

Anomaly detection is essential for identifying unusual patterns in ML systems. By implementing statistical and ML-based methods, organizations can detect issues early and maintain system reliability through proactive monitoring and response.

⭐

Premium Content

Anomaly Detection

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement