πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Monitoring: Cloud Monitoring, Log Explorer & Alerting

GCP Data EngineeringMonitoring⭐ Premium

Advertisement

Monitoring Data Pipelines on GCP

Master monitoring data pipelines on GCP including Cloud Monitoring, Log Explorer, alerting, dashboards, and operational best practices.

16 min readIntermediate

Monitoring Architecture

πŸ“ˆ GCP Operations Suite: Monitoring, Logging & Trace
GCP Operations Suite (formerly Stackdriver)Cloud MonitoringMetrics ExplorerQuery & visualizeDashboardsCustom & built-inAlerting PoliciesThreshold, SLO, uptimeUptime ChecksHTTP, TCP, SSL, DNSMonitored ResourcesVMs, GKE, Cloud SQLCloud LoggingLog ExplorerSearch & filter logsLog-based MetricsCustom metrics from logsLog SinksExport to GCS, BigQuery, Pub/SubLog ViewsProject, folder, org-levelAudit LogsAdmin, Data Access, SystemCloud TraceDistributed TracingRequest flow across servicesLatency AnalysisP50, P95, P99 percentilesTrace SamplingAutomatic & customCloud Trace APIOpenTelemetry supportProfilerCPU, memory profilingALERTING POLICIESThresholdmetric > value for durationSLO-basedError budget burn rateUptime CheckService availabilityDASHBOARDS & REPORTINGOverview DashboardGCP project healthCustom DashboardsMetrics, charts, textCloud Monitoring APIProgrammatic accessOPERATIONS SUITE INTEGRATIONSError ReportingCrash analysisCloud DebuggerLive inspectionCloud ProfilerPerformanceCloud DebuggerSnapshotsGKE LoggingContainer logs
Interview Tip: GCP Operations Suite (formerly Stackdriver) provides unified monitoring, logging, and tracing. Use Cloud Monitoring for metrics/alerts, Cloud Logging for centralized log management, and Cloud Trace for distributed tracing. Always set up alerting policies for critical data pipeline failures.

Key Metrics to Monitor

# Key metrics for data pipeline monitoring
metrics = {
    "bigquery": {
        "query_count": "Number of queries executed",
        "bytes_scanned": "Total bytes scanned by queries",
        "slots_used": "Number of slots in use",
        "query_duration": "Query execution time",
        "load_job_count": "Number of load jobs",
        "storage_bytes": "Total storage used"
    },
    "dataflow": {
        "element_count": "Elements processed",
        "data_volume": "Data volume processed",
        "system_lag": "Processing lag",
        "worker_count": "Number of workers",
        "cpu_utilization": "Worker CPU usage",
        "pending_elements": "Elements in backlog"
    },
    "dataproc": {
        "cluster_status": "Cluster health",
        "yarn_apps": "Running YARN applications",
        "hdfs_usage": "HDFS storage usage",
        "job_duration": "Spark job execution time",
        "worker_count": "Number of workers"
    },
    "pubsub": {
        "message_count": "Messages published",
        "unacked_messages": "Unacknowledged messages",
        "publish_latency": "Time to publish",
        "subscription_backlog": "Subscription queue depth"
    }
}

Custom Metrics

from google.cloud import monitoring_v3
from google.protobuf import timestamp_pb2
import time

def create_custom_metric(project_id, metric_type, value):
    """Create a custom metric for pipeline monitoring."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"

    series = monitoring_v3.TimeSeries()
    series.metric.type = f"custom.googleapis.com/pipeline/{metric_type}"
    series.resource.type = "global"
    series.resource.labels["project_id"] = project_id

    now = time.time()
    seconds = int(now)
    nanos = int((now - seconds) * 10**9)

    interval = monitoring_v3.TimeInterval(
        end_time={"seconds": seconds, "nanos": nanos}
    )

    point = monitoring_v3.Point(
        interval=interval,
        value=monitoring_v3.TypedValue(double_value=value)
    )

    series.points = [point]

    client.create_time_series(
        request={
            "name": project_name,
            "time_series": [series]
        }
    )

Alerting Policies

from google.cloud import monitoring_v3

def create_pipeline_alert(project_id, metric_name, threshold):
    """Create alert for pipeline metric."""
    client = monitoring_v3.AlertPolicyServiceClient()

    alert_policy = monitoring_v3.AlertPolicy(
        display_name=f"Pipeline Alert: {metric_name}",
        conditions=[
            monitoring_v3.AlertPolicy.Condition(
                display_name=f"{metric_name} threshold",
                condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                    filter=f'resource.type="global" AND metric.type="custom.googleapis.com/pipeline/{metric_name}"',
                    comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
                    threshold_value=threshold,
                    duration="60s",
                ),
            )
        ],
        notification_channels=[],
        alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
            auto_close="1800s"
        ),
    )

    request = monitoring_v3.CreateAlertPolicyRequest(
        name=f"projects/{project_id}",
        alert_policy=alert_policy,
    )

    response = client.create_alert_policy(request=request)
    print(f"Created alert: {response.name}")

✨

Best Practice: Monitor key pipeline metrics: 1) Processing lag, 2) Error rates, 3) Data quality metrics, 4) Cost metrics, 5) Resource utilization. Set up alerts for anomalies. Use dashboards for operational visibility. Export logs to BigQuery for analysis.

πŸ’¬

Common Interview Questions

Q1: What are the key metrics to monitor for data pipelines?

Answer: 1) Processing lag (latency), 2) Error rates, 3) Data volume processed, 4) Resource utilization (CPU, memory), 5) Data quality metrics, 6) Cost metrics, 7) SLA compliance.

Q2: How do you set up effective alerting?

Answer: 1) Define clear thresholds based on historical baselines, 2) Use multiple notification channels, 3) Implement escalation policies, 4) Avoid alert fatigue with proper thresholds, 5) Test alerts regularly, 6) Document runbooks for each alert.

Q3: What is the benefit of exporting logs to BigQuery?

Answer: BigQuery enables complex log analysis, correlation across services, custom dashboards, and long-term retention. You can analyze patterns, troubleshoot issues, and create operational reports that aren't possible in Log Explorer.

Q4: How do you monitor data quality in production?

Answer: 1) Track completeness, accuracy, and consistency metrics, 2) Set up Dataplex quality scans, 3) Monitor for schema changes, 4) Alert on quality threshold violations, 5) Track quality trends over time.

Q5: What dashboards should data engineers maintain?

Answer: 1) Pipeline health (status, duration, errors), 2) Data quality (completeness, accuracy), 3) Cost monitoring (daily/monthly spend), 4) Resource utilization (CPU, memory, slots), 5) SLA compliance (latency, availability).

Advertisement