πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cloud Composer: Environments, DAGs & GKE Workers

GCP Data EngineeringCloud Composer Advanced⭐ Premium

Advertisement

Cloud Composer Advanced Patterns

Master Cloud Composer including environment configuration, DAG development, GKE workers, sensors, and production patterns.

20 min readAdvanced

Environment Sizes

🎡 Cloud Composer (Airflow) Architecture
Cloud Composer: Managed Apache Airflow on GCPGKE CLUSTER (Managed Kubernetes)Schedulerβ€’ Parse DAG filesβ€’ Create task instancesβ€’ Determine execution orderWeb Serverβ€’ DAG visualizationβ€’ Task history & logsβ€’ Manual triggersWorker (Celery)β€’ Execute tasksβ€’ Auto-scaling podsβ€’ Horizontal scalingMetadata DBβ€’ PostgreSQLβ€’ Task stateβ€’ Connection infoENVIRONMENT CONFIGMachine type:n1-standard-1 to n1-standard-8Node count:3-6 GKE nodes (auto-scaling)Airflow version:2.x (managed upgrades)Environment variables:Project-wide settingsDAGs & OPERATORSDAGs:Python files in dags/ bucketOperators:GCSObjectExistsOperator, etc.Hooks:GcpHook, BigQueryHook, GCSHookSensors:Wait for condition (file, time)GCP SERVICE INTEGRATIONSBigQueryCloud StorageDataflowDataprocPub/SubCloud FunctionsVertex AIVERSION MANAGEMENT & UPGRADESAuto-upgradeMinor versionsManual upgradeMajor versionsCanary deployTest new versionsRollbackRevert if needed
Interview Tip: Cloud Composer runs Airflow on GKE β€” you manage DAGs, Google manages the cluster. Use GcpHook/BigQueryHook for GCP integrations. Store DAGs in a GCS bucket. Use environment variables for project-wide config. Prefer Cloud Composer over self-managed Airflow for production workloads.

DAG Patterns

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
    'owner': 'data-engineering',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='production_etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['production', 'etl'],
) as dag:

    # Wait for upstream DAG
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_pipeline',
        external_task_id='load_complete',
        mode='reschedule',
        poke_interval=300,
        timeout=3600,
    )

    # Transform data
    transform = BigQueryInsertJobOperator(
        task_id='transform_data',
        configuration={
            'query': {
                'query': '''
                    CREATE OR REPLACE TABLE `project.analytics.summary` AS
                    SELECT * FROM `project.staging.raw_data`
                ''',
                'useLegacySql': False,
            }
        },
        location='us-central1',
    )

    wait_for_upstream >> transform

✨

Best Practice: Use ExternalTaskSensor for cross-DAG dependencies. Implement SLAs and timeouts. Use TaskGroups for organization. Tag DAGs for filtering. Use Jinja templates for dynamic parameters. Implement proper error handling with retries.

πŸ’¬

Common Interview Questions

Q1: What is the difference between Cloud Composer 1 and 2?

Answer: Composer 2 runs on GKE with better isolation, autoscaling, and custom worker pools. It supports Airflow 2.x with modern features. Composer 1 runs on Compute Engine with less flexibility. Use Composer 2 for new deployments.

Q2: How do you handle cross-DAG dependencies?

Answer: Use ExternalTaskSensor to wait for tasks in other DAGs. Use ExternalTaskMarker to mark completion. For complex dependencies, use Airflow's Dataset feature (Airflow 2.4+). Implement proper timeout handling.

Q3: What are sensors in Airflow?

Answer: Sensors are operators that wait for conditions before executing. Common sensors: FileSensor, ExternalTaskSensor, HttpSensor, BigQueryTableSensor. Use poke_interval and timeout to control polling behavior.

Q4: How do you optimize Cloud Composer costs?

Answer: 1) Use appropriate environment size, 2) Enable autoscaling, 3) Optimize DAG parsing (avoid heavy imports), 4) Implement SLAs, 5) Use TaskGroups to reduce task count, 6) Schedule during off-peak hours.

Q5: How do you implement CI/CD for Cloud Composer?

Answer: 1) Store DAGs in Git, 2) Use Cloud Build for CI/CD, 3) Deploy to GCS bucket via Composer's DAG folder, 4) Test DAGs with pytest, 5) Use environment variables for configuration, 6) Implement rollback procedures.

Advertisement