Anomaly Detection in Streaming Data
Data streams arrive continuously β IoT sensors, network traffic, financial transactions. Batch processing can't keep up. Streaming algorithms process data in real-time with bounded memory, detecting anomalies as they occur.
Why Streaming Anomaly Detection Matters
A DDoS attack generates millions of requests per second. A fraudulent transaction pattern emerges in milliseconds. Batch processing by hour is too slow. Streaming detection catches threats in real-time.
import numpy as np
import pandas as pd
from collections import defaultdict, deque
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
Streaming Data Fundamentals
class DataStream:
"""Simulate a data stream with concept drift."""
def __init__(self, n_points=10000, drift_point=7000):
self.n_points = n_points
self.drift_point = drift_point
self.current = 0
def __iter__(self):
return self
def __next__(self):
if self.current >= self.n_points:
raise StopIteration
t = self.current
self.current += 1
if t < self.drift_point:
# Normal distribution
return {
'timestamp': t,
'value': np.random.normal(50, 5),
'label': 'normal'
}
else:
# Concept drift β distribution changes
if np.random.random() < 0.1:
return {
'timestamp': t,
'value': np.random.normal(80, 10), # Anomaly
'label': 'anomaly'
}
else:
return {
'timestamp': t,
'value': np.random.normal(60, 8), # Shifted mean
'label': 'normal'
}
# Test stream
stream = DataStream()
values = [next(stream)['value'] for _ in range(1000)]
print(f"Stream values: mean={np.mean(values):.2f}, std={np.std(values):.2f}")
Exponential Moving Average (EMA) Detector
class EMADetector:
"""EMA-based anomaly detection for streaming data."""
def __init__(self, alpha=0.1, threshold=3.0, warmup=100):
self.alpha = alpha
self.threshold = threshold
self.warmup = warmup
self.ema = None
self.ema_var = None
self.count = 0
def update(self, value):
self.count += 1
if self.ema is None:
self.ema = value
self.ema_var = 0
return False
# Update EMA
delta = value - self.ema
self.ema += self.alpha * delta
self.ema_var = (1 - self.alpha) * (self.ema_var + self.alpha * delta ** 2)
# Check anomaly after warmup
if self.count < self.warmup:
return False
std = np.sqrt(self.ema_var)
z_score = abs(value - self.ema) / (std + 1e-10)
return z_score > self.threshold
# Test EMA detector
ema_detector = EMADetector(alpha=0.1, threshold=3.0)
stream = DataStream()
anomalies = []
for point in stream:
is_anomaly = ema_detector.update(point['value'])
if is_anomaly:
anomalies.append(point)
print(f"EMA detected {len(anomalies)} anomalies")
print(f"Anomaly rate: {len(anomalies) / 10000:.2%}")
Sliding Window Statistics
class SlidingWindowDetector:
"""Detect anomalies using sliding window statistics."""
def __init__(self, window_size=100, threshold=3.0):
self.window_size = window_size
self.threshold = threshold
self.window = deque(maxlen=window_size)
def update(self, value):
self.window.append(value)
if len(self.window) < self.window_size // 2:
return False
arr = np.array(self.window)
mean = np.mean(arr)
std = np.std(arr) + 1e-10
z_score = abs(value - mean) / std
return z_score > self.threshold
def get_statistics(self):
arr = np.array(self.window)
return {
'mean': np.mean(arr),
'std': np.std(arr),
'min': np.min(arr),
'max': np.max(arr)
}
# Test
window_detector = SlidingWindowDetector(window_size=100, threshold=3.0)
stream = DataStream()
anomalies = sum(1 for _ in range(10000) if window_detector.update(next(stream)['value']))
print(f"Window detector: {anomalies} anomalies")
Count-Min Sketch
Probabilistic data structure for frequency estimation in streams.
class CountMinSketch:
"""Count-Min Sketch for frequency estimation."""
def __init__(self, width=1000, depth=5):
self.width = width
self.depth = depth
self.table = np.zeros((depth, width), dtype=int)
self.hashes = [hash(f"seed_{i}") for i in range(depth)]
def add(self, item, count=1):
for i in range(self.depth):
idx = (self.hashes[i] + hash(str(item))) % self.width
self.table[i, idx] += count
def estimate(self, item):
estimates = []
for i in range(self.depth):
idx = (self.hashes[i] + hash(str(item))) % self.width
estimates.append(self.table[i, idx])
return min(estimates) # Conservative estimate
def get_heavy_hitters(self, threshold=100):
"""Find items appearing more than threshold times."""
heavy = []
for col in range(self.width):
min_count = min(self.table[:, col])
if min_count >= threshold:
heavy.append((col, min_count))
return sorted(heavy, key=lambda x: -x[1])
# Test
cms = CountMinSketch(width=10000, depth=5)
# Simulate stream
stream_items = np.random.choice(['item_A', 'item_B', 'item_C', 'item_D'], 10000,
p=[0.5, 0.3, 0.15, 0.05])
for item in stream_items:
cms.add(item)
print("Frequency estimates:")
for item in ['item_A', 'item_B', 'item_C', 'item_D', 'item_E']:
est = cms.estimate(item)
print(f" {item}: ~{est}")
Count-Min Sketch for Anomaly Detection
class CMSAnomalyDetector:
"""Detect frequency anomalies using Count-Min Sketch."""
def __init__(self, window_size=1000, threshold=3.0):
self.window_size = window_size
self.threshold = threshold
self.cms = CountMinSketch(width=5000, depth=5)
self.total = 0
self.window_items = deque(maxlen=window_size)
def update(self, item):
# Estimate current frequency
freq = self.cms.estimate(item)
# Add to sketch
self.cms.add(item)
self.total += 1
self.window_items.append(item)
if self.total < self.window_size // 2:
return False, freq
# Expected frequency (uniform distribution assumption)
unique_items = len(set(self.window_items))
expected_freq = self.total / max(unique_items, 1)
# Z-score
if expected_freq > 0:
z = (freq - expected_freq) / (np.sqrt(expected_freq) + 1e-10)
return z > self.threshold, freq
return False, freq
# Test
cms_detector = CMSAnomalyDetector()
items = ['normal'] * 900 + ['burst_item'] * 100
np.random.shuffle(items)
anomalies = 0
for item in items:
is_anomaly, freq = cms_detector.update(item)
if is_anomaly:
anomalies += 1
print(f"CMS anomaly detector: {anomalies} anomalies detected")
Sketching for Distinct Counting
class HyperLogLog:
"""HyperLogLog for approximate distinct counting."""
def __init__(self, precision=10):
self.precision = precision
self.m = 1 << precision
self.registers = np.zeros(self.m, dtype=int)
def add(self, item):
h = hash(str(item))
idx = h & (self.m - 1)
remaining = h >> self.precision
# Count leading zeros
if remaining == 0:
self.registers[idx] = max(self.registers[idx], 1)
else:
leading_zeros = 1
while (remaining & 1) == 0 and leading_zeros < 64:
leading_zeros += 1
remaining >>= 1
self.registers[idx] = max(self.registers[idx], leading_zeros)
def count(self):
"""Estimate number of distinct elements."""
alpha = 0.7213 / (1 + 1.079 / self.m)
raw = alpha * self.m ** 2 / sum(2 ** (-r) for r in self.registers)
# Small range correction
if raw <= 5 * self.m / 2:
zeros = np.sum(self.registers == 0)
if zeros > 0:
return self.m * np.log(self.m / zeros)
return raw
# Test
hll = HyperLogLog(precision=10)
for i in range(10000):
hll.add(f"item_{i % 5000}")
print(f"HyperLogLog estimated distinct: {hll.count():.0f}")
print(f"Actual distinct: 5000")
Reservoir Sampling
class ReservoirSampling:
"""Reservoir sampling for streaming data."""
def __init__(self, k=100):
self.k = k
self.reservoir = []
self.count = 0
def update(self, item):
self.count += 1
if len(self.reservoir) < self.k:
self.reservoir.append(item)
else:
j = np.random.randint(0, self.count)
if j < self.k:
self.reservoir[j] = item
def get_sample(self):
return self.reservoir
# Test
reservoir = ReservoirSampling(k=100)
for i in range(10000):
reservoir.update(i)
sample = reservoir.get_sample()
print(f"Reservoir sample: {len(sample)} items")
print(f"Sample range: {min(sample)} to {max(sample)}")
Z-Score Streaming Detector
class StreamingZScore:
"""Welford's online algorithm for streaming z-score detection."""
def __init__(self, threshold=3.0, window=1000):
self.threshold = threshold
self.window = window
self.n = 0
self.mean = 0
self.M2 = 0
self.values = deque(maxlen=window)
def update(self, value):
self.n += 1
self.values.append(value)
# Welford's online algorithm
delta = value - self.mean
self.mean += delta / self.n
delta2 = value - self.mean
self.M2 += delta * delta2
if self.n < 2:
return False
variance = self.M2 / (self.n - 1)
std = np.sqrt(variance)
if std == 0:
return False
z_score = abs(value - self.mean) / std
return z_score > self.threshold
# Test
z_detector = StreamingZScore(threshold=3.0)
stream = DataStream()
anomalies = 0
for _ in range(10000):
point = next(stream)
if z_detector.update(point['value']):
anomalies += 1
print(f"Z-score detector: {anomalies} anomalies")
Best Practices
- Choose window size carefully β too small misses patterns, too slow to adapt
- Use EMA for concept drift β adapts to changing distributions
- Count-Min Sketch for frequencies β O(1) updates, bounded memory
- HyperLogLog for distinct counts β 1% error with tiny memory footprint
- Combine detectors β ensemble for robustness
- Set thresholds on validation data β don't guess thresholds
Summary
Streaming anomaly detection requires algorithms with bounded memory and O(1) updates. EMA and sliding windows detect point anomalies, Count-Min Sketch tracks frequencies, and HyperLogLog counts distinct elements. Master these techniques for real-time monitoring and threat detection.