Real-Time Streaming Dashboards
Kafka + Flink + Prometheus + Grafana for Live Monitoring
ℹ️
Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS/GCP Build real-time operational dashboards that update every second, enabling instant visibility into system health, business metrics, and user behavior.
Project Overview
Problem Statement
Traditional dashboards rely on batch data, creating visibility gaps that delay incident response and business decisions. Operations teams need real-time visibility into system performance, while business teams need instant access to live metrics.
Objectives
- Achieve sub-second dashboard refresh rates
- Process 1M+ metrics per second
- Support complex windowed aggregations
- Enable drill-down from summary to detail
- Maintain 99.99% dashboard availability
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Event Streaming | Apache Kafka | Metric ingestion |
| Stream Processing | Apache Flink | Real-time aggregation |
| Time-Series DB | Prometheus + Thanos | Metric storage |
| Visualization | Grafana | Dashboards and alerts |
| Alerting | AlertManager | Incident management |
Architecture Diagram
Data Source Setup and Schema
Metric Schema Definitions
# schemas/metrics.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List
from enum import Enum
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
SUMMARY = "summary"
@dataclass
class MetricDefinition:
name: str
metric_type: MetricType
description: str
labels: List[str] = field(default_factory=list)
retention_days: int = 30
@dataclass
class MetricPoint:
metric_name: str
value: float
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
def to_prometheus_text(self) -> str:
label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
if label_str:
label_str = "{" + label_str + "}"
return f"{self.metric_name}{label_str} {self.value} {int(self.timestamp.timestamp() * 1000)}"
METRIC_DEFINITIONS = {
"http_requests_total": MetricDefinition(
name="http_requests_total",
metric_type=MetricType.COUNTER,
description="Total HTTP requests",
labels=["method", "path", "status", "instance"]
),
"http_request_duration_seconds": MetricDefinition(
name="http_request_duration_seconds",
metric_type=MetricType.HISTOGRAM,
description="HTTP request duration",
labels=["method", "path", "instance"]
),
"cpu_usage_percent": MetricDefinition(
name="cpu_usage_percent",
metric_type=MetricType.GAUGE,
description="CPU usage percentage",
labels=["instance", "job"]
),
"memory_usage_bytes": MetricDefinition(
name="memory_usage_bytes",
metric_type=MetricType.GAUGE,
description="Memory usage in bytes",
labels=["instance", "job"]
),
"orders_per_minute": MetricDefinition(
name="orders_per_minute",
metric_type=MetricType.COUNTER,
description="Orders processed per minute",
labels=["region", "product_line"]
),
"revenue_per_hour": MetricDefinition(
name="revenue_per_hour",
metric_type=MetricType.COUNTER,
description="Revenue generated per hour",
labels=["region", "payment_method"]
),
"user_sessions_active": MetricDefinition(
name="user_sessions_active",
metric_type=MetricType.GAUGE,
description="Active user sessions",
labels=["platform", "country"]
)
}
Step-by-Step Implementation Guide
Step 1: Kafka Metric Producer
# producers/metric_producer.py
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from typing import Dict
import time, random, logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class MetricProducer:
def __init__(self, config: Dict):
self.producer = Producer({
'bootstrap.servers': config['kafka_brokers'],
'client.id': 'metric-producer',
'acks': 'all',
'retries': 5,
'compression.type': 'snappy'
})
self.schema_registry = SchemaRegistryClient({'url': config['schema_registry_url']})
self.metric_schema = """
{
"type": "record", "name": "Metric",
"namespace": "com.company.metrics",
"fields": [
{"name": "metric_name", "type": "string"},
{"name": "value", "type": "double"},
{"name": "timestamp", "type": "long"},
{"name": "labels", "type": {"type": "map", "values": "string"}},
{"name": "source", "type": "string"}
]
}
"""
self.serializer = AvroSerializer(self.schema_registry, self.metric_schema,
conf={'auto.register.schemas': True})
def produce_metric(self, metric_name: str, value: float,
labels: Dict[str, str], source: str = "app"):
metric = {
"metric_name": metric_name, "value": value,
"timestamp": int(time.time() * 1000),
"labels": labels, "source": source
}
key = f"{metric_name}:{labels.get('instance', 'default')}"
try:
self.producer.produce(topic="metrics-raw", key=key.encode('utf-8'),
value=self.serializer(metric, {"subject": "metrics-raw-value"}),
callback=lambda err, msg: logger.error(f"Delivery failed: {err}") if err else None)
self.producer.poll(0)
except Exception as e:
logger.error(f"Failed to produce metric: {e}")
def produce_http_metrics(self, endpoint: str, method: str,
status_code: int, duration_ms: float, instance: str):
self.produce_metric("http_requests_total", 1,
{"method": method, "path": endpoint, "status": str(status_code), "instance": instance})
self.produce_metric("http_request_duration_seconds", duration_ms / 1000,
{"method": method, "path": endpoint, "instance": instance})
def produce_system_metrics(self, instance: str, cpu_percent: float,
memory_bytes: int, disk_usage_percent: float):
self.produce_metric("cpu_usage_percent", cpu_percent, {"instance": instance, "job": "system"})
self.produce_metric("memory_usage_bytes", memory_bytes, {"instance": instance, "job": "system"})
self.produce_metric("disk_usage_percent", disk_usage_percent, {"instance": instance, "job": "system"})
def produce_business_metrics(self, region: str, orders: int,
revenue: float, active_sessions: int):
self.produce_metric("orders_per_minute", orders, {"region": region, "product_line": "all"})
self.produce_metric("revenue_per_hour", revenue, {"region": region, "payment_method": "all"})
self.produce_metric("user_sessions_active", active_sessions, {"platform": "web", "country": "US"})
def generate_synthetic_metrics(self, duration_seconds: int = 60,
metrics_per_second: int = 1000):
instances = [f"instance-{i}" for i in range(10)]
regions = ["us-east-1", "us-west-2", "eu-west-1"]
end_time = time.time() + duration_seconds
while time.time() < end_time:
for _ in range(metrics_per_second):
instance = random.choice(instances)
self.produce_http_metrics(
endpoint=random.choice(["/api/users", "/api/orders", "/api/products"]),
method=random.choice(["GET", "POST", "PUT"]),
status_code=random.choice([200, 200, 200, 200, 400, 404, 500]),
duration_ms=random.expovariate(1/100), instance=instance)
self.produce_system_metrics(instance=instance,
cpu_percent=random.gauss(50, 20),
memory_bytes=random.randint(1_000_000_000, 4_000_000_000),
disk_usage_percent=random.gauss(60, 15))
if random.random() < 0.1:
region = random.choice(regions)
self.produce_business_metrics(region=region, orders=random.randint(1, 10),
revenue=random.uniform(100, 10000), active_sessions=random.randint(100, 1000))
time.sleep(1)
def close(self):
self.producer.flush()
Step 2: Flink Stream Processing
# flink/stream_processor.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.window import Tumble
class StreamingMetricsProcessor:
def __init__(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(4)
self.t_env = StreamTableEnvironment.create(self.env)
def setup_kafka_source(self):
self.t_env.execute_sql("""
CREATE TABLE metrics_raw (
metric_name STRING, value DOUBLE, `timestamp` BIGINT,
labels MAP<STRING, STRING>, source STRING,
event_time AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka', 'topic' = 'metrics-raw',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-metrics-processor',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
def setup_prometheus_sink(self):
self.t_env.execute_sql("""
CREATE TABLE prometheus_sink (
metric_name STRING, labels STRING, value DOUBLE, `timestamp` BIGINT
) WITH (
'connector' = 'prometheus',
'pushgateway.url' = 'prometheus-pushgateway:9091',
'delete.after.write' = 'true'
)
""")
def create_real_time_aggregation(self):
self.t_env.execute_sql("""
CREATE TABLE real_time_metrics AS
SELECT
metric_name,
labels,
TUMBLE_START(event_time, INTERVAL '1' SECOND) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' SECOND) AS window_end,
SUM(value) AS total_value,
COUNT(*) AS event_count,
AVG(value) AS avg_value,
MAX(value) AS max_value,
MIN(value) AS min_value
FROM metrics_raw
GROUP BY
TUMBLE(event_time, INTERVAL '1' SECOND),
metric_name,
labels
""")
def create_windowed_aggregation(self):
self.t_env.execute_sql("""
CREATE TABLE windowed_metrics AS
SELECT
metric_name,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
SUM(value) AS total_value,
COUNT(*) AS sample_count,
AVG(value) AS avg_value,
STDDEV_POP(value) AS stddev_value
FROM metrics_raw
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
metric_name
""")
def create_anomaly_detection(self):
self.t_env.execute_sql("""
CREATE TABLE anomaly_alerts AS
SELECT
metric_name,
labels,
event_time,
value,
AVG(value) OVER (
PARTITION BY metric_name
ORDER BY event_time
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
) AS moving_avg,
STDDEV_POP(value) OVER (
PARTITION BY metric_name
ORDER BY event_time
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
) AS moving_stddev,
CASE
WHEN ABS(value - AVG(value) OVER (
PARTITION BY metric_name
ORDER BY event_time
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
)) > 3 * STDDEV_POP(value) OVER (
PARTITION BY metric_name
ORDER BY event_time
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
) THEN true
ELSE false
END AS is_anomaly
FROM metrics_raw
""")
def run(self):
self.setup_kafka_source()
self.setup_prometheus_sink()
self.create_real_time_aggregation()
self.create_windowed_aggregation()
self.create_anomaly_detection()
self.t_env.execute("Streaming Metrics Processor")
Step 3: Grafana Dashboard Configuration
{
"dashboard": {
"title": "Real-Time Operations Dashboard",
"tags": ["streaming", "real-time", "operations"],
"timezone": "browser",
"refresh": "1s",
"time": {"from": "now-1h", "to": "now"},
"panels": [
{
"title": "Request Rate (per second)",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
"targets": [
{
"expr": "sum(rate(http_requests_total[1m])) by (instance)",
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "reqps",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 1000},
{"color": "red", "value": 5000}
]
}
}
}
},
{
"title": "Response Latency (P99)",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket[5m])) by (le, instance))",
"legendFormat": "P99 {{instance}}",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 0.5},
{"color": "red", "value": 1.0}
]
}
}
}
},
{
"title": "CPU Usage by Instance",
"type": "gauge",
"gridPos": {"h": 8, "w": 8, "x": 0, "y": 8},
"targets": [
{
"expr": "avg(cpu_usage_percent) by (instance)",
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"min": 0, "max": 100,
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
}
}
}
},
{
"title": "Active Sessions",
"type": "stat",
"gridPos": {"h": 8, "w": 8, "x": 8, "y": 8},
"targets": [
{
"expr": "sum(user_sessions_active)",
"legendFormat": "Total Active",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"color": "blue", "value": null},
{"color": "green", "value": 100},
{"color": "yellow", "value": 500}
]
}
}
}
},
{
"title": "Revenue per Hour",
"type": "bargauge",
"gridPos": {"h": 8, "w": 8, "x": 16, "y": 8},
"targets": [
{
"expr": "sum(increase(revenue_per_hour[1h])) by (region)",
"legendFormat": "{{region}}",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD"
}
}
},
{
"title": "Anomaly Alerts",
"type": "table",
"gridPos": {"h": 10, "w": 24, "x": 0, "y": 16},
"targets": [
{
"expr": "anomaly_alerts{is_anomaly=\"true\"}",
"format": "table",
"instant": true,
"refId": "A"
}
]
}
],
"templating": {
"list": [
{
"name": "instance",
"type": "query",
"query": "label_values(cpu_usage_percent, instance)",
"refresh": 2,
"multi": true,
"includeAll": true
}
]
},
"alerting": {
"rules": [
{
"alert": "HighErrorRate",
"expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m])) > 0.05",
"for": "2m",
"labels": {"severity": "critical"},
"annotations": {
"summary": "High error rate detected (>5%)"
}
},
{
"alert": "HighLatency",
"expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1",
"for": "5m",
"labels": {"severity": "warning"},
"annotations": {
"summary": "P99 latency above 1 second"
}
}
]
}
}
}
Infrastructure Setup (Terraform)
# infrastructure/streaming_dashboards.tf
variable "environment" {
default = "production"
}
resource "aws_msk_cluster" "metrics_kafka" {
cluster_name = "metrics-streaming-${var.environment}"
kafka_version = "3.5.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
client_subnets = var.private_subnets
security_groups = [aws_security_group.kafka.id]
storage_info {
ebs_storage_info { volume_size = 100 }
}
}
encryption_info {
encryption_in_transit { client_broker = "TLS"; in_cluster = true }
encryption_at_rest_kms_key_arn = aws_kms_key.kafka.arn
}
}
resource "aws_ecs_cluster" "flink_cluster" {
name = "flink-streaming-${var.environment}"
setting { name = "containerInsights"; value = "enabled" }
}
resource "aws_ecs_task_definition" "flink_jobmanager" {
family = "flink-jobmanager"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = "2048"
memory = "4096"
container_definitions = jsonencode([{
name = "jobmanager"
image = "${var.ecr_url}/flink-jobmanager:latest"
portMappings = [{ containerPort = 8081, hostPort = 8081 }]
command = ["jobmanager"]
environment = [
{ name = "FLINK_PROPERTIES", value = "jobmanager.rpc.address: jobmanager\nstate.backend: rocksdb\nstate.checkpoints.dir: s3://${var.checkpoint_bucket}/checkpoints" }
]
logConfiguration = {
logDriver = "awslogs"
options = { "awslogs-group" = aws_cloudwatch_log_group.flink.name, "awslogs-region" = var.region }
}
}])
execution_role_arn = aws_iam_role.ecs_execution.arn
task_role_arn = aws_iam_role.flink_task.arn
}
resource "aws_ecs_task_definition" "flink_taskmanager" {
family = "flink-taskmanager"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = "4096"
memory = "8192"
container_definitions = jsonencode([{
name = "taskmanager"
image = "${var.ecr_url}/flink-taskmanager:latest"
command = ["taskmanager"]
environment = [
{ name = "FLINK_PROPERTIES", value = "jobmanager.rpc.address: jobmanager\ntaskmanager.numberOfTaskSlots: 4\nstate.backend: rocksdb" }
]
logConfiguration = {
logDriver = "awslogs"
options = { "awslogs-group" = aws_cloudwatch_log_group.flink.name, "awslogs-region" = var.region }
}
}])
execution_role_arn = aws_iam_role.ecs_execution.arn
task_role_arn = aws_iam_role.flink_task.arn
}
resource "aws_ecs_service" "jobmanager" {
name = "flink-jobmanager"
cluster = aws_ecs_cluster.flink_cluster.id
task_definition = aws_ecs_task_definition.flink_jobmanager.arn
desired_count = 1
network_configuration {
subnets = var.private_subnets
security_groups = [aws_security_group.flink.id]
assign_public_ip = false
}
}
resource "aws_ecs_service" "taskmanager" {
name = "flink-taskmanager"
cluster = aws_ecs_cluster.flink_cluster.id
task_definition = aws_ecs_task_definition.flink_taskmanager.arn
desired_count = 3
network_configuration {
subnets = var.private_subnets
security_groups = [aws_security_group.flink.id]
assign_public_ip = false
}
}
resource "helm_release" "prometheus" {
name = "prometheus"
repository = "https://prometheus-community.github.io/helm-charts"
chart = "kube-prometheus-stack"
version = "55.0.0"
namespace = "monitoring"
create_namespace = true
values = [yamlencode({
prometheus = {
prometheusSpec = {
retention = "30d"
storageSpec = {
volumeClaimTemplate = { spec = { storageClassName = "gp3", resources = { requests = { storage = "100Gi" } } } }
}
}
}
grafana = {
adminPassword = var.grafana_password
dashboards = { default = { "real-time-ops" = { json = file("dashboards/real-time-ops.json") } } }
}
})]
}
resource "aws_cloudwatch_log_group" "flink" {
name = "/ecs/flink-streaming"
retention_in_days = 30
}
resource "aws_s3_bucket" "flink_checkpoints" {
bucket = "flink-checkpoints-${var.environment}"
}
output "grafana_endpoint" {
value = "http://grafana.monitoring.svc.cluster.local:3000"
}
output "prometheus_endpoint" {
value = "http://prometheus.monitoring.svc.cluster.local:9090"
}
Testing and Validation
# tests/test_streaming_dashboards.py
import pytest
import time
import json
from datetime import datetime
from unittest.mock import Mock, patch
class TestStreamingDashboards:
@pytest.fixture
def metric_producer(self):
from producers.metric_producer import MetricProducer
return MetricProducer({'kafka_brokers': 'localhost:9092', 'schema_registry_url': 'http://localhost:8081'})
@pytest.fixture
def sample_metrics(self):
return [
{"metric_name": "http_requests_total", "value": 1, "labels": {"method": "GET", "status": "200"}, "timestamp": int(time.time() * 1000)},
{"metric_name": "cpu_usage_percent", "value": 75.5, "labels": {"instance": "web-1"}, "timestamp": int(time.time() * 1000)},
{"metric_name": "orders_per_minute", "value": 42, "labels": {"region": "us-east-1"}, "timestamp": int(time.time() * 1000)}
]
def test_metric_schema_compliance(self, sample_metrics):
for metric in sample_metrics:
assert "metric_name" in metric
assert "value" in metric
assert "labels" in metric
assert "timestamp" in metric
assert isinstance(metric["value"], (int, float))
assert isinstance(metric["labels"], dict)
def test_prometheus_format_conversion(self):
from schemas.metrics import MetricPoint
point = MetricPoint(
metric_name="http_requests_total",
value=100.0,
timestamp=datetime.now(),
labels={"method": "GET", "status": "200"}
)
text = point.to_prometheus_text()
assert "http_requests_total" in text
assert 'method="GET"' in text
assert "100.0" in text
def test_grafana_panel_generation(self):
from schemas.metrics import DashboardMetric
panel = DashboardMetric(
name="Request Rate",
query="sum(rate(http_requests_total[1m]))",
unit="reqps",
thresholds={"warning": 1000, "critical": 5000}
)
grafana_panel = panel.to_grafana_panel()
assert grafana_panel["title"] == "Request Rate"
assert grafana_panel["type"] == "timeseries"
assert len(grafana_panel["targets"]) == 1
def test_alerting_rule_configuration(self):
rules = [
{"alert": "HighErrorRate", "expr": "error_rate > 0.05", "for": "2m", "severity": "critical"},
{"alert": "HighLatency", "expr": "p99_latency > 1", "for": "5m", "severity": "warning"}
]
for rule in rules:
assert "alert" in rule
assert "expr" in rule
assert "for" in rule
assert "severity" in rule
def test_dashboard_refresh_rate(self):
dashboard_config = {"refresh": "1s", "time": {"from": "now-1h", "to": "now"}}
assert dashboard_config["refresh"] == "1s"
@patch('confluent_kafka.Producer')
def test_kafka_metric_production(self, mock_producer_class):
mock_producer = Mock()
mock_producer_class.return_value = mock_producer
mock_producer.produce.return_value = None
from producers.metric_producer import MetricProducer
producer = MetricProducer({'kafka_brokers': 'localhost:9092', 'schema_registry_url': 'http://localhost:8081'})
producer.produce_metric("test_metric", 42.0, {"instance": "test"})
mock_producer.produce.assert_called()
def test_anomaly_detection_threshold(self):
values = [10, 12, 11, 13, 10, 100, 11, 12]
mean = sum(values[:-1]) / len(values[:-1])
import math
stddev = math.sqrt(sum((x - mean) ** 2 for x in values[:-1]) / len(values[:-1]))
threshold = 3 * stddev
anomaly = abs(values[-3] - mean) > threshold
assert isinstance(anomaly, bool)
def test_windowed_aggregation_logic(self):
events = [
{"timestamp": 1000, "value": 10},
{"timestamp": 2000, "value": 20},
{"timestamp": 3000, "value": 15},
{"timestamp": 4000, "value": 25}
]
window_size_ms = 2500
windows = {}
for event in events:
window_key = event["timestamp"] // window_size_ms
if window_key not in windows:
windows[window_key] = []
windows[window_key].append(event["value"])
for window_key, values in windows.items():
avg = sum(values) / len(values)
assert avg > 0
assert len(values) > 0
def test_metric_retention_policy(self):
retention_config = {
"real_time": {"retention": "1h", "resolution": "1s"},
"short_term": {"retention": "24h", "resolution": "1m"},
"long_term": {"retention": "30d", "resolution": "5m"}
}
assert len(retention_config) == 3
for key, config in retention_config.items():
assert "retention" in config
assert "resolution" in config
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| MSK (Kafka) | 3x kafka.m5.large | $450 |
| ECS Flink | JobManager + 3 TaskManagers | $800 |
| Prometheus | Managed, 100GB storage | $200 |
| Grafana Cloud | Pro tier | $50 |
| S3 (Checkpoints) | 500GB | $12 |
| Data Transfer | Cross-AZ | $75 |
| Total | $1,587 |
Cost Optimization Strategies
💡
Tip: Reduce streaming dashboard costs by 30-40%:
- Flink Checkpointing: Optimize checkpoint intervals to reduce S3 costs
- Prometheus Downsampling: Downsample old metrics automatically
- Kafka Retention: Set appropriate retention periods per topic
- Grafana Caching: Enable query caching to reduce Prometheus load
- Auto-scaling: Scale Flink TaskManagers based on lag
Performance Metrics
| Metric | Batch Dashboards | Streaming Dashboards | Improvement |
|---|---|---|---|
| Data Freshness | 5-15 minutes | < 1 second | 300-900x faster |
| Alert Latency | 10-30 minutes | < 5 seconds | 120-360x faster |
| Dashboard Load | 3-5 seconds | < 1 second | 3-5x faster |
| Query Performance | 2-10 seconds | < 100ms | 20-100x faster |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Focus on these streaming concepts in interviews:
-
Why Flink over Spark Streaming?
- True streaming (not micro-batch)
- Lower latency (< 100ms vs seconds)
- Exactly-once semantics
- Better state management
-
Why Prometheus over InfluxDB?
- Pull-based model reduces complexity
- Rich query language (PromQL)
- Excellent Kubernetes integration
- Large ecosystem of exporters
-
Why Thanos for long-term storage?
- Unlimited retention on object storage
- Multi-cluster support
- Global view across Prometheus instances
- Cost-effective long-term storage
Common Interview Questions
Q: "How do you handle late-arriving metrics?"
late_data_strategies = {
"Watermarks": "Allow 5-second lateness with Flink watermarks",
"Windows": "Use sliding windows instead of tumbling",
"Grace Period": "Configurable grace period for late events",
"Side Outputs": "Route late data to separate stream"
}
Q: "How do you ensure dashboard availability during failures?"
availability_strategy = {
"Multi-AZ": "Deploy across 3 availability zones",
"Replication": "Prometheus with 2 replicas + Thanos",
"Fallback": "Grafana shows stale data instead of errors",
"Circuit Breaker": "Rate limiting prevents cascade failures"
}
Q: "How do you optimize Prometheus query performance?"
optimization_techniques = {
"Recording Rules": "Pre-compute expensive queries",
"Label Optimization": "Minimize high-cardinality labels",
"Sharding": "Shard by metric type or label",
"Caching": "Grafana query caching + Thanos query frontend"
}
Deployment Checklist
- Deploy Kafka cluster for metric ingestion
- Set up Flink cluster with job and task managers
- Configure Prometheus with retention policies
- Deploy Grafana with pre-built dashboards
- Set up AlertManager with notification channels
- Configure recording rules for common queries
- Test end-to-end metric flow
- Validate alerting thresholds
- Set up monitoring for the monitoring stack
- Document runbooks for common issues
⚠️
Warning: Always test alerting rules in staging first. False positives erode trust in the monitoring system. Start with warning-level alerts and escalate to critical after tuning.
This project demonstrates real-time monitoring and observability skills and is highly relevant for SRE and data engineering interviews at companies with large-scale distributed systems.