πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

ETL Pipeline Patterns: Dataproc, Dataflow & Cloud Functions

GCP Data EngineeringETL Pipelines⭐ Premium

Advertisement

ETL Pipeline Patterns on GCP

Master ETL patterns using Dataproc Spark, Dataflow Beam, Cloud Functions, and orchestrated workflows on GCP.

20 min readAdvanced

ETL Architecture Patterns on GCP

πŸ“Š BigQuery Architecture for Data Engineering
COLUMNAR STORAGE (Capacitor)Column 1Int64Column 2StringColumn 3Float64Column 4TimestampColumn 5JSONColumn N...QUERY ENGINE (Dremel)Tree ArchitectureDistributed executionSlot-basedAuto-scaling computeColumn pruningRead only needed columnsPredicate pushdownFilter earlyKEY FEATURESBI EngineIn-memory analyticsStreaming BufferReal-time insertsPartitioningTime-unit / IntegerClusteringAuto-sort columnsSLOT USAGEStandardShared slotsEnterpriseReserved slotsFlex SlotsPay per useAutoscaleDynamic allocation
Interview Tip: BigQuery separates storage and compute. Queries are charged by slots (compute) + bytes scanned. Always partition and cluster tables to reduce costs.

Dataproc Spark ETL

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("ETL_Sales_Data") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# EXTRACT: Read raw data from GCS
raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("gs://my-data-lake/bronze/sales/*.csv")

# TRANSFORM: Clean and enrich
transformed_df = raw_df \
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
    .withColumn("amount", col("quantity") * col("unit_price")) \
    .withColumn("tax", col("amount") * 0.08) \
    .withColumn("total", col("amount") + col("tax")) \
    .withColumn("customer_domain", regexp_extract(col("email"), "@(.+)$", 1)) \
    .filter(col("amount") > 0) \
    .filter(col("email").isNotNull())

# Aggregate by product category
summary_df = transformed_df \
    .groupBy("order_date", "product_category") \
    .agg(
        count("*").alias("order_count"),
        sum("total").alias("total_revenue"),
        avg("total").alias("avg_order_value"),
        countDistinct("customer_email").alias("unique_customers")
    )

# LOAD: Write to BigQuery
summary_df.write \
    .format("bigquery") \
    .option("table", "project.analytics.sales_summary") \
    .option("temporaryGcsBucket", "my-temp-bucket") \
    .mode("overwrite") \
    .save()

# Also write detailed data to GCS as Parquet
transformed_df.write \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .parquet("gs://my-data-lake/silver/sales/")

spark.stop()

✨

Best Practice: For Dataproc ETL, use dynamic allocation and adaptive query execution. Write to both BigQuery (for analytics) and GCS (for reprocessing). Use partitionBy for efficient data pruning. Always validate data quality before loading.

Dataflow Beam ETL

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
import json

class TransformSalesData(beam.DoFn):
    """Transform raw sales data."""
    def process(self, element):
        try:
            record = json.loads(element)
            yield {
                'order_id': record['order_id'],
                'customer_id': record['customer_id'],
                'product_category': record['product_category'],
                'amount': float(record['quantity']) * float(record['unit_price']),
                'order_date': record['order_date'],
                'processed_at': datetime.utcnow().isoformat()
            }
        except (KeyError, ValueError):
            return

def run_etl_pipeline():
    pipeline_options = PipelineOptions([
        '--project', 'my-project',
        '--runner', 'DataflowRunner',
        '--region', 'us-central1',
        '--temp_location', 'gs://my-data-lake/temp/',
        '--machine_type', 'n1-standard-4',
        '--max_num_workers', '10'
    ])

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read' >> beam.io.ReadFromText('gs://my-data-lake/bronze/sales/*.json')
            | 'Transform' >> beam.ParDo(TransformSalesData())
            | 'Write BQ' >> WriteToBigQuery(
                'my-project:analytics.sales',
                schema='order_id:STRING,customer_id:STRING,product_category:STRING,amount:FLOAT64,order_date:TIMESTAMP',
                write_disposition=BigQueryDisposition.WRITE_APPEND
            )
        )

if __name__ == '__main__':
    run_etl_pipeline()
πŸ’¬

Common Interview Questions

Q1: When would you choose Dataproc over Dataflow for ETL?

Answer: Dataproc is better for existing Spark/Hadoop workloads, complex ML pipelines, or when you need cluster control. Dataflow is better for new development, unified batch/streaming, or when you want serverless operation. Dataproc provides more flexibility; Dataflow eliminates management overhead.

Q2: How do you implement error handling in ETL pipelines?

Answer: 1) Use try/catch blocks in transforms, 2) Implement dead-letter queues for failed records, 3) Use quality checks at each stage, 4) Log errors to Cloud Logging, 5) Implement retry logic for transient failures, 6) Set up alerts for error rates.

Q3: What is the difference between ETL and ELT?

Answer: ETL transforms data before loading into the target system. ELT loads raw data first, then transforms within the target. ELT is common with BigQuery because it separates storage and compute. Use ETL for complex transformations, ELT for simple transformations leveraging BigQuery's compute.

Q4: How do you optimize ETL pipeline costs?

Answer: 1) Use preemptible VMs for batch processing, 2) Enable autoscaling, 3) Use FlexRS for non-urgent jobs, 4) Right-size machine types, 5) Implement incremental processing, 6) Use columnar formats (Parquet) for efficiency.

Q5: How do you test ETL pipelines?

Answer: 1) Unit test transforms with sample data, 2) Integration test with small datasets, 3) Use data quality frameworks (Great Expectations), 4) Implement schema validation, 5) Test error handling paths, 6) Monitor production metrics.

Advertisement