🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Real-Time Pipeline: Event Hubs → Stream Analytics → Cosmos DB

Azure Data EngineeringStreaming Pipeline⭐ Premium

Advertisement

Real-Time Pipeline: Event Hubs → Stream Analytics → Cosmos DB

End-to-end real-time streaming pipeline with Event Hubs, Stream Analytics, and Cosmos DB for sub-second analytics

Streaming Pipeline Architecture

Architecture Diagram
┌─────────────────────────────────────────────────────────────────────┐
│                    REAL-TIME STREAMING PIPELINE                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  DATA SOURCES              INGESTION           PROCESSING           │
│  ┌──────────┐            ┌──────────┐        ┌──────────────┐     │
│  │ IoT      │───────────>│ Event    │───────>│ Stream       │     │
│  │ Devices  │            │ Hubs     │        │ Analytics    │     │
│  └──────────┘            └──────────┘        │              │     │
│                                               │ • Windowing  │     │
│  ┌──────────┐            ┌──────────┐        │ • Joins      │     │
│  │ App      │───────────>│ Event    │───────>│ • Aggregation│     │
│  │ Events   │            │ Hubs     │        │ • Filtering  │     │
│  └──────────┘            └──────────┘        └──────┬───────┘     │
│                                                       │              │
│  ┌──────────┐            ┌──────────┐                │              │
│  │ Log      │───────────>│ Event    │───────────────┘              │
│  │ Streams  │            │ Hubs     │                               │
│  └──────────┘            └──────────┘                               │
│                                                       │              │
│  STORAGE               VISUALIZATION        ALERTING               │
│                                                       ▼              │
│  ┌──────────┐         ┌──────────────┐    ┌──────────────┐        │
│  │ Cosmos   │<────────│ Power BI     │    │ Azure        │        │
│  │ DB       │         │ Dashboard    │    │ Functions    │        │
│  └──────────┘         └──────────────┘    │ (Alerts)     │        │
│       │                                     └──────────────┘        │
│       │                                                             │
│       ▼                                                             │
│  ┌──────────┐         ┌──────────────┐                             │
│  │ ADLS     │<────────│ Event Hubs   │                             │
│  │ Gen2     │         │ Capture      │                             │
│  └──────────┘         └──────────────┘                             │
└─────────────────────────────────────────────────────────────────────┘

Event Hubs Configuration

{
  "properties": {
    "eventHubName": "iot-telemetry",
    "partitionCount": 16,
    "messageRetentionInDays": 7,
    "captureDescription": {
      "enabled": true,
      "encoding": "Parquet",
      "destination": {
        "properties": {
          "storageAccountResourceId": "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/stdatalake001",
          "blobContainer": "event-capture",
          "archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}",
          "timeWindow": "00:05:00",
          "sizeLimitInBytes": 104857600
        }
      }
    }
  }
}

Stream Analytics Query

-- Real-time aggregation with windowing
SELECT
    System.Timestamp() AS window_end,
    device_id,
    COUNT(*) AS event_count,
    AVG(temperature) AS avg_temperature,
    MAX(temperature) AS max_temperature,
    MIN(humidity) AS min_humidity
INTO [cosmos-output]
FROM [eventhub-input] PARTITION BY device_id
GROUP BY
    TumblingWindow(minute, 1),
    device_id;

-- Anomaly detection
SELECT
    device_id,
    System.Timestamp() AS event_time,
    temperature,
    AnomalyDetection_SpikeAndDip(
        temperature, 95, 100,
        DATEDIFF(second, System.Timestamp(), event_time)
    ) AS spike_result
INTO [alert-output]
FROM [eventhub-input] PARTITION BY device_id
GROUP BY
    TumblingWindow(second, 30),
    device_id;

-- Temporal join with reference data
SELECT
    e.device_id,
    e.temperature,
    e.event_time,
    r.location,
    r.threshold_min,
    r.threshold_max
INTO [enriched-output]
FROM [eventhub-input] e
JOIN [device-reference] r
    ON e.device_id = r.device_id
    AND DATEDIFF(minute, e, r) BETWEEN -5 AND 5;

Python Producer

from azure.eventhub import EventHubProducerClient, EventData
import json
import time
import random

producer = EventHubProducerClient.from_connection_string(
    conn_str="Endpoint=sb://ns-iot.servicebus.windows.net/;SharedAccessKeyName=...;",
    eventhub_name="iot-telemetry"
)

while True:
    events = []
    for i in range(100):
        event = {
            "device_id": f"sensor-{random.randint(1, 1000):04d}",
            "temperature": round(random.uniform(60, 100), 2),
            "humidity": round(random.uniform(30, 80), 2),
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ")
        }
        events.append(EventData(json.dumps(event)))
    
    producer.send_batch(events)
    time.sleep(1)

ℹ️

Pro Tip: Use partition keys wisely—partition by device_id ensures ordering per device. For high-cardinality keys, monitor partition distribution to avoid hot partitions.

Interview Questions

Q1: How do you ensure exactly-once processing in this streaming pipeline? A: Event Hubs provides checkpointing via consumer groups. Stream Analytics uses checkpointing to resume from last position. For Cosmos DB, use upsert operations with unique keys to handle duplicates.

Q2: What are the scaling limits for this architecture? A: Event Hubs: 40 TU (Standard) = 40 MB/s ingress. Stream Analytics: 120 SU = 120 MB/s. Cosmos DB: Unlimited RU/s with autoscale. Scale each component independently based on throughput requirements.

Q3: How do you handle backpressure in the streaming pipeline? A: Event Hubs buffers events (7-day retention). Stream Analytics processes at its pace (configure SU). If processing falls behind, events accumulate in Event Hubs. Monitor lag metrics and scale SU as needed.

Advertisement