πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Observability in Data Pipelines: Monitoring & Alerting

Data EngineeringPipeline Operations⭐ Premium

Advertisement

Observability in Data Pipelines: Monitoring & Alerting

Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Spotify, Stripe

1. The Three Pillars of Data Observability

Architecture Diagram
Data Observability
β”œβ”€β”€ Metrics (Quantitative)
β”‚   β”œβ”€β”€ Row counts, null rates, freshness
β”‚   β”œβ”€β”€ Pipeline duration, throughput
β”‚   └── Resource utilization (CPU, memory, storage)
β”œβ”€β”€ Logs (Discrete Events)
β”‚   β”œβ”€β”€ Pipeline start/end events
β”‚   β”œβ”€β”€ Schema changes, data quality failures
β”‚   └── Error traces, stack traces
└── Lineage (Provenance)
    β”œβ”€β”€ Upstream dependencies
    β”œβ”€β”€ Downstream impact
    └── Column-level transformations

ℹ️

Key Insight: Data observability is NOT just monitoring. It's the ability to understand, diagnose, and manage data health across the entire pipeline lifecycle.

2. Key Data Metrics (SLIs)

Freshness Metrics

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class FreshnessSLI:
    table_name: str
    last_updated: datetime
    expected_frequency: timedelta
    timestamp_column: str = "updated_at"
    
    @property
    def delay(self) -> timedelta:
        return datetime.now() - self.last_updated
    
    @property
    def is_healthy(self) -> bool:
        return self.delay <= self.expected_frequency
    
    @property
    def sli_value(self) -> float:
        """1.0 = perfectly on time, 0.0 = way overdue"""
        if self.delay <= self.expected_frequency:
            return 1.0
        overdue_ratio = self.delay.total_seconds() / self.expected_frequency.total_seconds()
        return max(0, 1.0 - (overdue_ratio - 1) * 0.5)

# Usage
sli = FreshnessSLI(
    table_name="fact_orders",
    last_updated=datetime.now() - timedelta(minutes=45),
    expected_frequency=timedelta(hours=1)
)
print(f"Freshness: {sli.sli_value:.2f}")  # 0.5

Volume Metrics

@dataclass
class VolumeSLI:
    table_name: str
    current_count: int
    historical_avg: int
    historical_std: int
    window_hours: int = 24
    
    @property
    def z_score(self) -> float:
        if self.historical_std == 0:
            return 0
        return (self.current_count - self.historical_avg) / self.historical_std
    
    @property
    def is_healthy(self) -> bool:
        return abs(self.z_score) <= 3  # Within 3 std devs
    
    @property
    def anomaly_type(self) -> Optional[str]:
        if self.z_score > 3:
            return "VOLUME_SPIKE"
        elif self.z_score < -3:
            return "VOLUME_DROP"
        return None

Quality Metrics

@dataclass
class QualityMetrics:
    total_rows: int
    null_counts: dict  # column -> null_count
    duplicate_count: int
    schema_changes: list  # list of detected changes
    
    @property
    def null_ratio(self) -> float:
        total_cells = sum(self.null_counts.values())
        return total_cells / (self.total_rows * len(self.null_counts)) if self.total_rows > 0 else 0
    
    @property
    def duplicate_ratio(self) -> float:
        return self.duplicate_count / self.total_rows if self.total_rows > 0 else 0
    
    def column_null_rates(self) -> dict:
        return {col: count / self.total_rows for col, count in self.null_counts.items()}

3. Pipeline Health Dashboard

Metrics Collection

import time
from functools import wraps
from typing import Callable
from prometheus_client import Counter, Histogram, Gauge

# Prometheus metrics
PIPELINE_DURATION = Histogram(
    'pipeline_duration_seconds',
    'Pipeline execution duration',
    ['pipeline_name', 'stage']
)

PIPELINE_ROWS = Counter(
    'pipeline_rows_processed',
    'Total rows processed',
    ['pipeline_name', 'stage', 'status']
)

PIPELINE_ERRORS = Counter(
    'pipeline_errors_total',
    'Total pipeline errors',
    ['pipeline_name', 'error_type']
)

DATA_FRESHNESS = Gauge(
    'data_freshness_minutes',
    'Data freshness in minutes',
    ['table_name']
)

def monitor_pipeline(pipeline_name: str):
    def decorator(func: Callable):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start
                PIPELINE_DURATION.labels(pipeline_name=pipeline_name, stage='complete').observe(duration)
                PIPELINE_ROWS.labels(pipeline_name=pipeline_name, stage='complete', status='success').inc()
                return result
            except Exception as e:
                PIPELINE_ERRORS.labels(pipeline_name=pipeline_name, error_type=type(e).__name__).inc()
                raise
        return wrapper
    return decorator

# Usage
@monitor_pipeline("daily_sales_etl")
def run_daily_sales():
    # Your pipeline code
    pass

Alert Rules

# prometheus/alerts.yml
groups:
  - name: data_pipeline_alerts
    rules:
      - alert: PipelineDelayed
        expr: pipeline_duration_seconds{stage="complete"} > 3600
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} running for >1 hour"
          
      - alert: DataStale
        expr: data_freshness_minutes > 120
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Table {{ $labels.table_name }} is stale for >2 hours"
          
      - alert: PipelineFailing
        expr: rate(pipeline_errors_total[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} has errors"
          
      - alert: VolumeAnomaly
        expr: abs(pipeline_rows - avg_over_time(pipeline_rows[24h])) / avg_over_time(pipeline_rows[24h]) > 0.3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Volume anomaly detected for {{ $labels.pipeline_name }}"

4. Distributed Tracing for Data Pipelines

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanExporter
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Setup tracing
provider = TracerProvider()
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
provider.add_span_processor(BatchSpanExporter(jaeger_exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("data-pipeline")

class PipelineTracer:
    def __init__(self, pipeline_name: str):
        self.pipeline_name = pipeline_name
        self.tracer = trace.get_tracer(pipeline_name)
    
    def trace_stage(self, stage_name: str):
        def decorator(func):
            def wrapper(*args, **kwargs):
                with self.tracer.start_as_current_span(
                    f"{self.pipeline_name}.{stage_name}",
                    attributes={
                        "pipeline.name": self.pipeline_name,
                        "stage.name": stage_name,
                    }
                ) as span:
                    try:
                        result = func(*args, **kwargs)
                        span.set_status(trace.Status(trace.StatusCode.OK))
                        return result
                    except Exception as e:
                        span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
                        span.record_exception(e)
                        raise
            return wrapper
        return decorator

# Usage
tracer = PipelineTracer("daily_etl")

@tracer.trace_stage("extract")
def extract_data():
    # Extraction logic
    pass

@tracer.trace_stage("transform")
def transform_data():
    # Transformation logic
    pass

5. Incident Response for Data Issues

Automated Triage

from enum import Enum
from dataclasses import dataclass

class Severity(Enum):
    P1 = "critical"    # Revenue impact, dashboard broken
    P2 = "high"        # Data delayed, partial impact
    P3 = "medium"      # Quality issue, no immediate impact
    P4 = "low"         # Cosmetic, no data impact

@dataclass
class DataIncident:
    incident_id: str
    severity: Severity
    description: str
    affected_tables: list
    root_cause: str
    mitigation: str
    owner: str

class IncidentTriager:
    SEVERITY_RULES = {
        "PIPELINE_FAILURE": Severity.P1,
        "SCHEMA_CHANGE": Severity.P2,
        "VOLUME_DROP": Severity.P2,
        "FRESHNESS_BREACH": Severity.P2,
        "NULL_SPIKE": Severity.P3,
        "DUPLICATE_ROWS": Severity.P3,
    }
    
    def triage(self, alert_type: str, context: dict) -> DataIncident:
        severity = self.SEVERITY_RULES.get(alert_type, Severity.P4)
        
        # Escalate if multiple downstream consumers affected
        if context.get("downstream_count", 0) > 10:
            severity = Severity.P1
        
        return DataIncident(
            incident_id=f"INC-{int(time.time())}",
            severity=severity,
            description=f"{alert_type} on {context.get('table', 'unknown')}",
            affected_tables=[context.get('table', '')],
            root_cause="Under investigation",
            mitigation="Pending",
            owner=context.get('owner', 'data-platform')
        )

6. Observability Stack Architecture

Grafana β€” Dashboards, Alerts, CorrelationPrometheus (Metrics)Jaeger (Traces)OpenLineage (Lineage)↓↓↓Pipeline Instrumentation: Spark, Airflow, dbt, Kafka, FlinkStart with freshness + volume monitoring β†’ catch 80% of data issues

ℹ️

Best Practice: Start with freshness and volume monitoring β€” they catch 80% of data issues. Add quality checks and lineage tracking incrementally.

Follow-Up Questions

  1. How would you implement data observability for real-time streaming pipelines?
  2. Design an incident response runbook for data quality issues.
  3. How do you correlate metrics across pipeline stages?
  4. How would you reduce alert fatigue in a data platform?
  5. Design a data health scorecard for executive reporting.

Advertisement