Stream Analytics: Window Functions & Anomaly Detection
Advanced Stream Analytics with windowing, anomaly detection, and reference data joins
Advanced Windowing
-- Session Window with timeout
SELECT
device_id,
MIN(event_time) AS session_start,
MAX(event_time) AS session_end,
COUNT(*) AS events_in_session,
AVG(temperature) AS avg_temp
INTO [session-output]
FROM [eventhub-input]
GROUP BY
SessionWindow(minute, 10), -- 10-min inactivity timeout
device_id;
-- Hopping Window with watermark
SELECT
System.Timestamp() AS window_end,
device_id,
AVG(temperature) AS avg_temp,
COUNT(*) AS event_count
INTO [hopping-output]
FROM [eventhub-input]
GROUP BY
HoppingWindow(minute, 10, 5), -- 10-min window, 5-min hop
device_id;
-- Stacked Window (Tumbling + Sliding)
SELECT
System.Timestamp() AS window_end,
device_id,
AVG(temperature) AS rolling_5min_avg,
MAX(temperature) AS rolling_5min_max
INTO [stacked-output]
FROM [eventhub-input]
GROUP BY
SlidingWindow(minute, 5),
device_id;
Anomaly Detection
-- Spike and Dip Detection
SELECT
device_id,
System.Timestamp() AS event_time,
temperature,
AnomalyDetection_SpikeAndDip(
temperature,
95, -- Confidence level
200, -- Sensitivity (lower = more sensitive)
DATEDIFF(second, System.Timestamp(), event_time)
) AS spike_result
INTO [anomaly-output]
FROM [eventhub-input]
GROUP BY
TumblingWindow(second, 30),
device_id;
-- Change Point Detection
SELECT
device_id,
System.Timestamp() AS event_time,
AnomalyDetection_ChangePoint(
temperature,
95, -- Confidence level
400, -- Maximum history length
DATEDIFF(second, System.Timestamp(), event_time)
) AS change_point_result
INTO [changepoint-output]
FROM [eventhub-input]
GROUP BY
TumblingWindow(minute, 10),
device_id;
Reference Data Join
-- Temporal join with reference data
SELECT
e.device_id,
e.temperature,
e.event_time,
r.device_location,
r.device_type,
r.threshold_min,
r.threshold_max,
CASE
WHEN e.temperature > r.threshold_max THEN 'OVER_THRESHOLD'
WHEN e.temperature < r.threshold_min THEN 'UNDER_THRESHOLD'
ELSE 'NORMAL'
END AS status
INTO [enriched-output]
FROM [eventhub-input] e
JOIN [device-reference] r
ON e.device_id = r.device_id
AND DATEDIFF(minute, e, r) BETWEEN -5 AND 5;
SU Monitoring
# Monitor Stream Analytics SU utilization
from azure.identity import DefaultAzureCredential
from azure.mgmt.streamanalytics import StreamAnalyticsManagementClient
credential = DefaultAzureCredential()
sa_client = StreamAnalyticsManagementClient(credential, subscription_id)
# Get job metrics
metrics = sa_client.operations.list_metrics(
resource_group_name="rg-dataengineering-prod",
job_name="sa-iot-processor",
metricnames="SU% Utilization",
interval="PT1M"
)
for metric in metrics.value:
print(f"SU Utilization: {metric.timeseries[0].data[-1].average}%")
βΉοΈ
Pro Tip: Monitor SU utilization. If consistently above 70%, scale up SU. If latency increases, check for data skew across partitions and consider repartitioning.
Interview Questions
Q1: When would you use Session Window over Tumbling Window? A: Session Window for user/device activity analysis where session boundaries are defined by inactivity gaps. Tumbling Window for fixed-interval periodic aggregations.
Q2: How do you optimize Stream Analytics performance? A: 1) Increase SU if utilization is high, 2) Use PARTITION BY for parallelism, 3) Optimize query (reduce unnecessary joins), 4) Use reference data instead of self-joins, 5) Monitor SU metrics.
Q3: What are the limitations of Stream Analytics anomaly detection? A: 1) Requires sufficient historical data, 2) May produce false positives, 3) Limited to time-series patterns, 4) Cannot detect complex anomalies, 5) Requires tuning of confidence/sensitivity parameters.