🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Batch Pipeline Architecture: GCS → Dataflow → BigQuery

GCP Data EngineeringBatch Pipelines⭐ Premium

Advertisement

Batch Pipeline Architecture: GCS → Dataflow → BigQuery

Build production-ready batch data pipelines using GCS as the data lake, Dataflow for processing, and BigQuery for analytics.

20 min readAdvanced

Batch Pipeline Architecture

🏗️ GCP Data Engineering Reference Architecture
DATA SOURCES🗃️On-Prem DB☁️SaaS APIs📡IoT Sensors📱Mobile Apps🔌REST APIsINGESTION LAYERDataflow (CDC)Pub/SubCloud TasksStorage TransferTransfer ApplianceRAW DATA ZONE (Cloud Storage)landing/Ingested databronze/Unvalidatedarchive/Historicalraw/Original formatstaging/Temp processingPROCESSING LAYERDataflowStream + BatchDataprocSpark/HadoopCloud FunctionsEvent-drivenData PrepVisual ETLCloud ComposerOrchestrateCURATED DATA ZONEsilver/Cleaned, validatedgold/Business-readyaggregates/Pre-computedfeatures/ML featuresBigQuery (Warehouse)Looker (BI)Vertex AI (ML)Data StudioDataplex
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.

Implementation

GCS Bucket Structure

# Create data lake structure
gsutil -m mkdir -p gs://my-data-lake/bronze/raw
gsutil -m mkdir -p gs://my-data-lake/bronze/archive
gsutil -m mkdir -p gs://my-data-lake/silver/validated
gsutil -m mkdir -p gs://my-data-lake/silver/cleaned
gsutil -m mkdir -p gs://my-data-lake/gold/curated
gsutil -m mkdir -p gs://my-data-lake/temp
gsutil -m mkdir -p gs://my-data-lake/staging

# Set lifecycle rules
cat > lifecycle.json << 'EOF'
{
  "rule": [
    {
      "action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
      "condition": {"age": 30}
    },
    {
      "action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
      "condition": {"age": 90}
    },
    {
      "action": {"type": "Delete"},
      "condition": {"age": 730, "isLive": false}
    }
  ]
}
EOF

gsutil lifecycle set lifecycle.json gs://my-data-lake/bronze/raw

Dataflow Batch Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import BigQueryDisposition
import json
import hashlib
from datetime import datetime

class ParseAndValidate(beam.DoFn):
    """Parse JSON and validate schema."""

    def process(self, element):
        try:
            record = json.loads(element)

            # Validate required fields
            required_fields = ['event_id', 'event_type', 'timestamp', 'user_id']
            if not all(field in record for field in required_fields):
                return

            # Validate data types
            if not isinstance(record.get('amount', 0), (int, float)):
                return

            # Enrich with metadata
            record['processed_at'] = datetime.utcnow().isoformat()
            record['source_file'] = element.metadata.path if hasattr(element, 'metadata') else 'unknown'
            record['record_hash'] = hashlib.md5(json.dumps(record, sort_keys=True).encode()).hexdigest()

            yield record

        except json.JSONDecodeError:
            return

class DeduplicateRecords(beam.DoFn):
    """Deduplicate records using record hash."""

    def __init__(self):
        self.seen_hashes = set()

    def process(self, element):
        record_hash = element.get('record_hash')
        if record_hash and record_hash not in self.seen_hashes:
            self.seen_hashes.add(record_hash)
            yield element

def run_batch_pipeline():
    """Run batch ETL pipeline: GCS → Dataflow → BigQuery."""
    pipeline_options = PipelineOptions([
        '--project', 'my-project',
        '--runner', 'DataflowRunner',
        '--region', 'us-central1',
        '--temp_location', 'gs://my-data-lake/temp/',
        '--staging_location', 'gs://my-data-lake/staging/',
        '--machine_type', 'n1-standard-4',
        '--max_num_workers', '20',
        '--autoscaling_algorithm', 'THROUGHPUT_BASED',
        '--number_of_worker_harness_threads', '4',
        '--experiments', 'use_runner_v2'
    ])

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Read from GCS (Bronze)
        raw_data = (
            pipeline
            | 'Read Raw Data' >> beam.io.ReadFromText(
                'gs://my-data-lake/bronze/raw/**/*.json.gz',
                compression_type=beam.io.fileio.CompressionTypes.GZIP
            )
        )

        # Parse and validate
        validated_data = (
            raw_data
            | 'Parse and Validate' >> beam.ParDo(ParseAndValidate())
        )

        # Write to GCS as Parquet (Silver)
        (
            validated_data
            | 'Write Silver Parquet' >> beam.io.WriteToParquet(
                'gs://my-data-lake/silver/validated/',
                schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64,processed_at:TIMESTAMP',
                file_name_suffix='.parquet',
                coders=beam.codersregistry.get_coder('TableRow')
            )
        )

        # Write to BigQuery (Gold)
        (
            validated_data
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                'my-project:analytics.events',
                schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64,processed_at:TIMESTAMP',
                write_disposition=BigQueryDisposition.WRITE_APPEND,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
            )
        )

if __name__ == '__main__':
    run_batch_pipeline()

Best Practice: Always implement the Bronze/Silver/Gold pattern. Bronze stores raw data, Silver contains validated/cleaned data, and Gold is business-ready. This provides data lineage, enables reprocessing, and separates concerns.

Cloud Composer DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

default_args = {
    'owner': 'data-engineering',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'sla': timedelta(hours=4),
}

with DAG(
    dag_id='batch_etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['batch', 'etl', 'daily'],
) as dag:

    # Task 1: Trigger Dataflow job
    run_dataflow = DataflowStartFlexTemplateOperator(
        task_id='run_dataflow_batch',
        body={
            'jobName': 'batch-etl-{{ ds }}',
            'parameters': {
                'input': 'gs://my-data-lake/bronze/raw/{{ ds_nodash }}/*.json.gz',
                'output_table': 'my-project:analytics.events',
                'temp_location': 'gs://my-data-lake/temp/'
            },
            'environment': {
                'numWorkers': 10,
                'maxWorkers': 50,
                'machineType': 'n1-standard-4',
                'zone': 'us-central1-a'
            }
        },
        location='us-central1',
    )

    # Task 2: Data quality check
    quality_check = BigQueryInsertJobOperator(
        task_id='data_quality_check',
        configuration={
            'query': {
                'query': '''
                    DECLARE row_count INT64;
                    DECLARE null_count INT64;

                    SELECT COUNT(*) INTO row_count
                    FROM `project.analytics.events`
                    WHERE DATE(processed_at) = '{{ ds }}';

                    SELECT COUNT(*) INTO null_count
                    FROM `project.analytics.events`
                    WHERE DATE(processed_at) = '{{ ds }}'
                      AND event_id IS NULL;

                    SELECT
                        row_count,
                        null_count,
                        SAFE_DIVIDE(null_count, row_count) as null_rate
                ''',
                'useLegacySql': False,
            }
        },
        location='us-central1',
    )

    # Task 3: Alert on failure
    alert_failure = EmailOperator(
        task_id='alert_failure',
        to=['data-team@company.com'],
        subject='Batch Pipeline Failed - {{ ds }}',
        html='<p>Batch pipeline failed. Check Cloud Composer logs.</p>',
        trigger_rule='one_failed',
    )

    run_dataflow >> quality_check >> alert_failure

Monitoring and Alerting

from google.cloud import monitoring_v3
from google.protobuf import timestamp_pb2

def create_batch_pipeline_alert(project_id):
    """Create monitoring alert for batch pipeline failures."""
    client = monitoring_v3.AlertPolicyServiceClient()

    alert_policy = monitoring_v3.AlertPolicy(
        display_name="Batch Pipeline Duration Alert",
        conditions=[
            monitoring_v3.AlertPolicy.Condition(
                display_name="Pipeline Duration > 4 hours",
                condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                    filter='resource.type="cloud_dataflow_job" AND '
                           'metric.type="dataflow.googleapis.com/job/elapsed_time"',
                    comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
                    threshold_value=14400,  # 4 hours in seconds
                    duration="60s",
                ),
            )
        ],
    )

    request = monitoring_v3.CreateAlertPolicyRequest(
        name=f"projects/{project_id}",
        alert_policy=alert_policy,
    )

    response = client.create_alert_policy(request=request)
    print(f"Created alert: {response.name}")

ℹ️

Cost Tip: For batch pipelines, use FlexRS for non-urgent jobs (up to 50% savings). Enable autoscaling to scale down when idle. Use pre-emptible workers for fault-tolerant processing. Schedule jobs during off-peak hours for lower costs.

💬

Common Interview Questions

Q1: Why use GCS as the source for batch pipelines?

Answer: GCS provides durable, scalable storage for raw data at low cost. It supports multiple formats, lifecycle management, and direct access from Dataflow. Using GCS as the Bronze layer enables reprocessing, data lineage, and separation of storage and compute.

Q2: How do you handle schema evolution in batch pipelines?

Answer: Use Avro or Parquet with schema evolution support. Implement schema registry for version control. Use Dataflow's auto-detection for flexible schemas. For BigQuery, use schema auto-detection with explicit column additions. Always validate data after schema changes.

Q3: What are the key metrics to monitor in batch pipelines?

Answer: 1) Pipeline duration vs. SLA, 2) Records processed per second, 3) Error rates and types, 4) Data quality metrics (null rates, completeness), 5) Resource utilization (CPU, memory), 6) Cost per run.

Q4: How do you implement idempotency in batch pipelines?

Answer: Use date-based partitioning to prevent duplicate processing. Implement record hashing for deduplication. Use BigQuery partitioned tables with WRITE_TRUNCATE for daily loads. Check pipeline state before reprocessing. Use Cloud Composer's execution dates for deterministic runs.

Q5: What is the difference between FlexRS and standard Dataflow for batch?

Answer: FlexRS provides up to 50% cost savings by using a mix of preemptible and on-demand VMs with longer execution time limits (up to 6 hours). Standard Dataflow provides faster execution with guaranteed resources. Use FlexRS for non-urgent batch jobs, standard for time-sensitive processing.

Advertisement