πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cloud Composer: Managed Airflow for Data Pipelines

GCP Data EngineeringCloud Composer⭐ Premium

Advertisement

Google Cloud Composer Deep Dive

Master Google Cloud Composer including managed Apache Airflow, DAGs, GKE workers, operators, and data pipeline orchestration patterns.

20 min readAdvanced

Cloud Composer Architecture

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It automates data pipeline scheduling, monitoring, and management.

Architecture Overview

🎡 Cloud Composer (Airflow) Architecture
Cloud Composer: Managed Apache Airflow on GCPGKE CLUSTER (Managed Kubernetes)Schedulerβ€’ Parse DAG filesβ€’ Create task instancesβ€’ Determine execution orderWeb Serverβ€’ DAG visualizationβ€’ Task history & logsβ€’ Manual triggersWorker (Celery)β€’ Execute tasksβ€’ Auto-scaling podsβ€’ Horizontal scalingMetadata DBβ€’ PostgreSQLβ€’ Task stateβ€’ Connection infoENVIRONMENT CONFIGMachine type:n1-standard-1 to n1-standard-8Node count:3-6 GKE nodes (auto-scaling)Airflow version:2.x (managed upgrades)Environment variables:Project-wide settingsDAGs & OPERATORSDAGs:Python files in dags/ bucketOperators:GCSObjectExistsOperator, etc.Hooks:GcpHook, BigQueryHook, GCSHookSensors:Wait for condition (file, time)GCP SERVICE INTEGRATIONSBigQueryCloud StorageDataflowDataprocPub/SubCloud FunctionsVertex AIVERSION MANAGEMENT & UPGRADESAuto-upgradeMinor versionsManual upgradeMajor versionsCanary deployTest new versionsRollbackRevert if needed
Interview Tip: Cloud Composer runs Airflow on GKE β€” you manage DAGs, Google manages the cluster. Use GcpHook/BigQueryHook for GCP integrations. Store DAGs in a GCS bucket. Use environment variables for project-wide config. Prefer Cloud Composer over self-managed Airflow for production workloads.

Environment Creation

# Create Cloud Composer environment
gcloud composer environments create my-composer-env \
  --location=us-central1 \
  --airflow-version=2.7.3 \
  --python-version=3.11 \
  --environment-size=medium \
  --disk-size=30GB \
  --master-machine-type=n1-standard-2 \
  --worker-machine-type=n1-standard-2 \
  --num-workers=3 \
  --image-version=composer-2.7.3-airflow-2.7.3 \
  --pypi-packages apache-airflow-providers-google \
  --env-variables=CORE_LOAD_EXAMPLES=false \
  --web-server-allowlist=0.0.0.0/0

# Check environment status
gcloud composer environments describe my-composer-env \
  --location=us-central1 \
  --format="value(state)"

DAG Development

Basic DAG Structure

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.task_group import TaskGroup

# Default DAG arguments
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['data-team@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=4),
}

# Define DAG
with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for sales data',
    schedule_interval='0 2 * * *',  # Daily at 2 AM UTC
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'daily', 'sales'],
    params={
        'source_date': '{{ ds }}',
        'target_date': '{{ ds }}',
    }
) as dag:

    # Task 1: Extract data from GCS
    extract_task = GCSToBigQueryOperator(
        task_id='extract_to_bronze',
        bucket='my-data-lake',
        source_objects=['raw/sales/{{ ds }}/*.csv'],
        destination_project_dataset_table='project.bronze.sales',
        source_format='CSV',
        skip_leading_rows=1,
        autodetect=True,
        write_disposition='WRITE_TRUNCATE',
        location='us-central1',
    )

    # Task 2: Transform with SQL
    with TaskGroup('transformations') as transform_group:
        transform_sql = BigQueryInsertJobOperator(
            task_id='silver_transform',
            configuration={
                'query': {
                    'query': '''
                        CREATE OR REPLACE TABLE `project.silver.sales` AS
                        SELECT
                            SAFE_CAST(order_id AS STRING) as order_id,
                            SAFE_CAST(customer_id AS STRING) as customer_id,
                            SAFE_CAST(product_id AS STRING) as product_id,
                            SAFE_CAST(amount AS FLOAT64) as amount,
                            SAFE_CAST(order_date AS DATE) as order_date,
                            TIMESTAMP '{{ ts }}' as processed_at
                        FROM `project.bronze.sales`
                        WHERE amount > 0
                          AND customer_id IS NOT NULL
                    ''',
                    'useLegacySql': False,
                }
            },
            location='us-central1',
        )

        quality_check = BigQueryInsertJobOperator(
            task_id='quality_check',
            configuration={
                'query': {
                    'query': '''
                        SELECT
                            COUNT(*) as total_rows,
                            COUNTIF(amount IS NULL) as null_amounts,
                            COUNTIF(amount <= 0) as invalid_amounts,
                            COUNT(DISTINCT customer_id) as unique_customers
                        FROM `project.silver.sales`
                        WHERE DATE(processed_at) = '{{ ds }}'
                    ''',
                    'useLegacySql': False,
                }
            },
            location='us-central1',
        )

        transform_sql >> quality_check

    # Task 3: Load to Gold
    load_gold = BigQueryInsertJobOperator(
        task_id='load_gold',
        configuration={
            'query': {
                'query': '''
                    CREATE OR REPLACE TABLE `project.gold.sales_summary`
                    PARTITION BY order_date
                    CLUSTER BY product_id
                    AS
                    SELECT
                        order_date,
                        product_id,
                        COUNT(*) as order_count,
                        SUM(amount) as total_revenue,
                        AVG(amount) as avg_order_value,
                        COUNT(DISTINCT customer_id) as unique_customers
                    FROM `project.silver.sales`
                    WHERE DATE(processed_at) = '{{ ds }}'
                    GROUP BY 1, 2
                ''',
                'useLegacySql': False,
            }
        },
        location='us-central1',
    )

    # Define dependencies
    extract_task >> transform_group >> load_gold

✨

Best Practice: Use TaskGroups to organize related tasks and improve DAG readability. Implement SLAs and timeouts to prevent stuck tasks. Use Jinja templates for dynamic parameters. Tag DAGs for better organization and filtering.

Advanced DAG Patterns

Dynamic Task Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

def generate_table_tasks(**context):
    """Dynamically generate tasks based on table list."""
    tables = ['orders', 'customers', 'products', 'inventory']
    return tables

with DAG(
    'dynamic_table_pipeline',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
) as dag:

    get_tables = PythonOperator(
        task_id='get_tables',
        python_callable=generate_table_tasks,
    )

    def process_table(table_name, **context):
        """Process individual table."""
        print(f"Processing table: {table_name}")
        # Table-specific logic here

    # Dynamic task generation
    @dag.task_group
    def process_all_tables(tables):
        for table in tables:
            PythonOperator(
                task_id=f'process_{table}',
                python_callable=process_table,
                op_kwargs={'table_name': table},
            )

    tables = get_tables.output
    process_all_tables(tables)

Error Handling and Retries

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

def validate_data(**context):
    """Validate data quality."""
    # Validation logic
    is_valid = True  # From actual validation
    return is_valid

def on_success_callback(context):
    """Handle successful task completion."""
    print(f"Task {context['task'].task_id} succeeded")

def on_failure_callback(context):
    """Handle task failure."""
    print(f"Task {context['task'].task_id} failed")
    # Send alert, update monitoring, etc.

with DAG(
    'error_handling_pipeline',
    start_date=datetime(2025, 1, 1),
    on_success_callback=on_success_callback,
    on_failure_callback=on_failure_callback,
) as dag:

    validate = PythonOperator(
        task_id='validate',
        python_callable=validate_data,
    )

    def branch_on_validation(is_valid, **context):
        if is_valid:
            return 'process_valid'
        return 'handle_invalid'

    branch = BranchPythonOperator(
        task_id='branch_on_validation',
        python_callable=branch_on_validation,
        op_args=[validate.output],
    )

    process_valid = PythonOperator(
        task_id='process_valid',
        python_callable=lambda: print("Processing valid data"),
    )

    handle_invalid = PythonOperator(
        task_id='handle_invalid',
        python_callable=lambda: print("Handling invalid data"),
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    validate >> branch >> [process_valid, handle_invalid]

GKE Worker Configuration

# Custom node pools for different workloads
gcloud composer environments update my-composer-env \
  --location=us-central1 \
  --update-worker-pool-config=type=CUSTOM,customize=cpu=4:memory=16:disk=50

# Pre-emptible workers for cost optimization
gcloud composer environments update my-composer-env \
  --location=us-central1 \
  --update-worker-pool-config=type=PREEMPTIBLE

Monitoring and Alerting

# Cloud Monitoring alert for DAG failures
from google.cloud import monitoring_v3

def create_alert_policy(project_id):
    """Create alert policy for DAG failures."""
    client = monitoring_v3.AlertPolicyServiceClient()

    alert_policy = monitoring_v3.AlertPolicy(
        display_name="Cloud Composer DAG Failure",
        conditions=[
            monitoring_v3.AlertPolicy.Condition(
                display_name="DAG Run Failure",
                condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                    filter='resource.type="cloud_composer_environment" AND '
                           'metric.type="composer.googleapis.com/environment/dag_runs/status" AND '
                           'metric.labels.state="failed"',
                    comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
                    threshold_value=0,
                    duration="60s",
                    aggregations=[
                        monitoring_v3.Aggregation(
                            alignment_period="300s",
                            per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_RATE,
                        )
                    ],
                ),
            )
        ],
        notification_channels=[],
        alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
            auto_close="1800s"
        ),
    )

    request = monitoring_v3.CreateAlertPolicyRequest(
        name=f"projects/{project_id}",
        alert_policy=alert_policy,
    )

    response = client.create_alert_policy(request=request)
    print(f"Created alert policy: {response.name}")

ℹ️

Cost Tip: Cloud Composer charges based on environment size and worker configuration. Use small/medium environments for development, large for production. Enable autoscaling and use pre-emptible workers for cost optimization. Schedule DAGs during off-peak hours when possible.

πŸ’¬

Common Interview Questions

Q1: What is the difference between Cloud Composer and Cloud Workflows?

Answer: Cloud Composer is for complex data pipeline orchestration with dependencies, branching, and error handling. Cloud Workflows is for simpler serverless workflow automation. Use Composer for ETL/ELT pipelines requiring Apache Airflow's rich features. Use Workflows for API orchestration and simple automation.

Q2: How do you handle dependencies in Cloud Composer?

Answer: Use the >> operator to define task dependencies. For complex dependencies, use trigger_rule settings. Use BranchPythonOperator for conditional execution. Use TaskGroups for logical grouping. Implement sensors for external dependencies (file existence, API availability).

Q3: What are sensors in Airflow?

Answer: Sensors are operators that wait for a condition to be met before executing. Common sensors: FileSensor (wait for file), ExternalTaskSensor (wait for another DAG), HttpSensor (wait for API endpoint), BigQueryTableSensor (wait for table). Use poke_interval and timeout to control polling behavior.

Q4: How do you optimize Cloud Composer costs?

Answer: 1) Use appropriate environment size (small/medium/large), 2) Enable autoscaling for worker nodes, 3) Use pre-emptible workers, 4) Optimize DAG parsing (avoid heavy imports at module level), 5) Implement SLAs to prevent stuck tasks, 6) Use TaskGroups to reduce task count.

Q5: How do you implement CI/CD for Cloud Composer?

Answer: Store DAGs in a Git repository, use Cloud Build for CI/CD, deploy to GCS bucket via Composer's DAG folder. Use Cloud Composer's environment versioning for rollback. Implement testing frameworks (pytest, unittest) for DAG validation. Use environment variables for configuration management.

Advertisement