Batch Pipeline Architecture
Implementation
GCS Bucket Structure
# Create data lake structure
gsutil -m mkdir -p gs://my-data-lake/bronze/raw
gsutil -m mkdir -p gs://my-data-lake/bronze/archive
gsutil -m mkdir -p gs://my-data-lake/silver/validated
gsutil -m mkdir -p gs://my-data-lake/silver/cleaned
gsutil -m mkdir -p gs://my-data-lake/gold/curated
gsutil -m mkdir -p gs://my-data-lake/temp
gsutil -m mkdir -p gs://my-data-lake/staging
# Set lifecycle rules
cat > lifecycle.json << 'EOF'
{
"rule": [
{
"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
"condition": {"age": 30}
},
{
"action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
"condition": {"age": 90}
},
{
"action": {"type": "Delete"},
"condition": {"age": 730, "isLive": false}
}
]
}
EOF
gsutil lifecycle set lifecycle.json gs://my-data-lake/bronze/raw
Dataflow Batch Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import BigQueryDisposition
import json
import hashlib
from datetime import datetime
class ParseAndValidate(beam.DoFn):
"""Parse JSON and validate schema."""
def process(self, element):
try:
record = json.loads(element)
# Validate required fields
required_fields = ['event_id', 'event_type', 'timestamp', 'user_id']
if not all(field in record for field in required_fields):
return
# Validate data types
if not isinstance(record.get('amount', 0), (int, float)):
return
# Enrich with metadata
record['processed_at'] = datetime.utcnow().isoformat()
record['source_file'] = element.metadata.path if hasattr(element, 'metadata') else 'unknown'
record['record_hash'] = hashlib.md5(json.dumps(record, sort_keys=True).encode()).hexdigest()
yield record
except json.JSONDecodeError:
return
class DeduplicateRecords(beam.DoFn):
"""Deduplicate records using record hash."""
def __init__(self):
self.seen_hashes = set()
def process(self, element):
record_hash = element.get('record_hash')
if record_hash and record_hash not in self.seen_hashes:
self.seen_hashes.add(record_hash)
yield element
def run_batch_pipeline():
"""Run batch ETL pipeline: GCS → Dataflow → BigQuery."""
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-data-lake/temp/',
'--staging_location', 'gs://my-data-lake/staging/',
'--machine_type', 'n1-standard-4',
'--max_num_workers', '20',
'--autoscaling_algorithm', 'THROUGHPUT_BASED',
'--number_of_worker_harness_threads', '4',
'--experiments', 'use_runner_v2'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read from GCS (Bronze)
raw_data = (
pipeline
| 'Read Raw Data' >> beam.io.ReadFromText(
'gs://my-data-lake/bronze/raw/**/*.json.gz',
compression_type=beam.io.fileio.CompressionTypes.GZIP
)
)
# Parse and validate
validated_data = (
raw_data
| 'Parse and Validate' >> beam.ParDo(ParseAndValidate())
)
# Write to GCS as Parquet (Silver)
(
validated_data
| 'Write Silver Parquet' >> beam.io.WriteToParquet(
'gs://my-data-lake/silver/validated/',
schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64,processed_at:TIMESTAMP',
file_name_suffix='.parquet',
coders=beam.codersregistry.get_coder('TableRow')
)
)
# Write to BigQuery (Gold)
(
validated_data
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'my-project:analytics.events',
schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64,processed_at:TIMESTAMP',
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
)
)
if __name__ == '__main__':
run_batch_pipeline()
✨
Best Practice: Always implement the Bronze/Silver/Gold pattern. Bronze stores raw data, Silver contains validated/cleaned data, and Gold is business-ready. This provides data lineage, enables reprocessing, and separates concerns.
Cloud Composer DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
default_args = {
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['data-team@company.com'],
'sla': timedelta(hours=4),
}
with DAG(
dag_id='batch_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['batch', 'etl', 'daily'],
) as dag:
# Task 1: Trigger Dataflow job
run_dataflow = DataflowStartFlexTemplateOperator(
task_id='run_dataflow_batch',
body={
'jobName': 'batch-etl-{{ ds }}',
'parameters': {
'input': 'gs://my-data-lake/bronze/raw/{{ ds_nodash }}/*.json.gz',
'output_table': 'my-project:analytics.events',
'temp_location': 'gs://my-data-lake/temp/'
},
'environment': {
'numWorkers': 10,
'maxWorkers': 50,
'machineType': 'n1-standard-4',
'zone': 'us-central1-a'
}
},
location='us-central1',
)
# Task 2: Data quality check
quality_check = BigQueryInsertJobOperator(
task_id='data_quality_check',
configuration={
'query': {
'query': '''
DECLARE row_count INT64;
DECLARE null_count INT64;
SELECT COUNT(*) INTO row_count
FROM `project.analytics.events`
WHERE DATE(processed_at) = '{{ ds }}';
SELECT COUNT(*) INTO null_count
FROM `project.analytics.events`
WHERE DATE(processed_at) = '{{ ds }}'
AND event_id IS NULL;
SELECT
row_count,
null_count,
SAFE_DIVIDE(null_count, row_count) as null_rate
''',
'useLegacySql': False,
}
},
location='us-central1',
)
# Task 3: Alert on failure
alert_failure = EmailOperator(
task_id='alert_failure',
to=['data-team@company.com'],
subject='Batch Pipeline Failed - {{ ds }}',
html='<p>Batch pipeline failed. Check Cloud Composer logs.</p>',
trigger_rule='one_failed',
)
run_dataflow >> quality_check >> alert_failure
Monitoring and Alerting
from google.cloud import monitoring_v3
from google.protobuf import timestamp_pb2
def create_batch_pipeline_alert(project_id):
"""Create monitoring alert for batch pipeline failures."""
client = monitoring_v3.AlertPolicyServiceClient()
alert_policy = monitoring_v3.AlertPolicy(
display_name="Batch Pipeline Duration Alert",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="Pipeline Duration > 4 hours",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter='resource.type="cloud_dataflow_job" AND '
'metric.type="dataflow.googleapis.com/job/elapsed_time"',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=14400, # 4 hours in seconds
duration="60s",
),
)
],
)
request = monitoring_v3.CreateAlertPolicyRequest(
name=f"projects/{project_id}",
alert_policy=alert_policy,
)
response = client.create_alert_policy(request=request)
print(f"Created alert: {response.name}")
ℹ️
Cost Tip: For batch pipelines, use FlexRS for non-urgent jobs (up to 50% savings). Enable autoscaling to scale down when idle. Use pre-emptible workers for fault-tolerant processing. Schedule jobs during off-peak hours for lower costs.
Common Interview Questions
Q1: Why use GCS as the source for batch pipelines?
Answer: GCS provides durable, scalable storage for raw data at low cost. It supports multiple formats, lifecycle management, and direct access from Dataflow. Using GCS as the Bronze layer enables reprocessing, data lineage, and separation of storage and compute.
Q2: How do you handle schema evolution in batch pipelines?
Answer: Use Avro or Parquet with schema evolution support. Implement schema registry for version control. Use Dataflow's auto-detection for flexible schemas. For BigQuery, use schema auto-detection with explicit column additions. Always validate data after schema changes.
Q3: What are the key metrics to monitor in batch pipelines?
Answer: 1) Pipeline duration vs. SLA, 2) Records processed per second, 3) Error rates and types, 4) Data quality metrics (null rates, completeness), 5) Resource utilization (CPU, memory), 6) Cost per run.
Q4: How do you implement idempotency in batch pipelines?
Answer: Use date-based partitioning to prevent duplicate processing. Implement record hashing for deduplication. Use BigQuery partitioned tables with WRITE_TRUNCATE for daily loads. Check pipeline state before reprocessing. Use Cloud Composer's execution dates for deterministic runs.
Q5: What is the difference between FlexRS and standard Dataflow for batch?
Answer: FlexRS provides up to 50% cost savings by using a mix of preemptible and on-demand VMs with longer execution time limits (up to 6 hours). Standard Dataflow provides faster execution with guaranteed resources. Use FlexRS for non-urgent batch jobs, standard for time-sensitive processing.