πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Airflow DAG Design Patterns & Error Handling

Data EngineeringOrchestration⭐ Premium

Advertisement

Airbnb & Spotify Interview

Airflow DAG Design Patterns & Error Handling

Building production-ready data pipelines with Airflow

Interview Question

"Design an Airflow DAG that: (1) runs daily at 2 AM, (2) extracts data from 3 sources (API, database, S3), (3) validates data quality, (4) transforms and loads into a warehouse, (5) sends alerts on failure, (6) handles late-arriving data. Include error handling, retry logic, and testing strategy."

Difficulty: Medium-Hard | Frequently asked at Airbnb, Spotify, Lyft, DoorDash


Theoretical Foundation

Airflow Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Airflow Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚   Web Serverβ”‚    β”‚   Scheduler  β”‚    β”‚   Metadata  β”‚     β”‚
β”‚  β”‚   (UI)      β”‚    β”‚              β”‚    β”‚   Database  β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚         β”‚                  β”‚                  β”‚             β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β”‚                            β”‚                                β”‚
β”‚                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚
β”‚                   β”‚   Executor      β”‚                       β”‚
β”‚                   β”‚   (Celery/K8s)  β”‚                       β”‚
β”‚                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚
β”‚                            β”‚                                β”‚
β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚         β”‚                  β”‚                  β”‚             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚  Worker 1   β”‚    β”‚  Worker 2   β”‚    β”‚  Worker 3   β”‚     β”‚
β”‚  β”‚  (Task)     β”‚    β”‚  (Task)     β”‚    β”‚  (Task)     β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

DAG (Directed Acyclic Graph)

A DAG defines the workflow structureβ€”tasks and their dependencies.

DAG: daily_etl_pipelineextract_apiextract_dbextract_s3validate_apivalidate_dbvalidate_s3transformload@daily

Key properties:

  • Directed: Tasks have clear dependencies
  • Acyclic: No circular dependencies
  • Graph: Visual representation of workflow

Task Dependencies

# Dependency patterns
task_a >> task_b  # A must complete before B
task_a >> [task_b, task_c]  # A before B and C (parallel)
[task_a, task_b] >> task_c  # A and B before C
task_a >> task_b >> task_c  # Sequential chain

Operators

Operators define what work to do:

OperatorUse Case
PythonOperatorPython functions
BashOperatorShell commands
SparkSubmitOperatorSpark jobs
S3ToRedshiftOperatorS3 to Redshift
PostgresOperatorPostgreSQL queries
HttpSensorWait for API
S3KeySensorWait for S3 file

Executors

ExecutorUse Case
SequentialExecutorSingle machine, testing
LocalExecutorMulti-core single machine
CeleryExecutorDistributed workers
KubernetesExecutorDynamic pod scaling

Error Handling Patterns

1. Retries with Exponential Backoff

task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_func,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
)

2. Callbacks

task = PythonOperator(
    task_id='process_data',
    python_callable=process_func,
    on_success_callback=success_alert,
    on_failure_callback=failure_alert,
    on_retry_callback=retry_alert,
)

3. Trigger Rules

# Default: all_success
task_b >> task_c  # C runs only if B succeeds

# Run even if upstream fails
task_b.set_upstream(task_a)
task_c.set_upstream(task_b)
task_c.trigger_rule = TriggerRule.ALL_FAILED  # Run if B fails

# Run if any upstream succeeds
task_c.trigger_rule = TriggerRule.ONE_SUCCESS

# Always run (even if upstream fails)
task_c.trigger_rule = TriggerRule.NONE_FAILED

4. Branching

def choose_branch(**context):
    """Choose which branch to execute"""
    if context['dag_run'].conf.get('full_refresh'):
        return 'full_load'
    return 'incremental_load'

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

full_load = PythonOperator(task_id='full_load', ...)
incremental = PythonOperator(task_id='incremental', ...)

branch >> [full_load, incremental]

TaskFlow API (Airflow 2.0+)

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
    tags=['etl', 'production'],
)
def etl_pipeline():
    
    @task
    def extract_api():
        """Extract data from API"""
        data = call_api()
        return data  # XCom automatically
    
    @task
    def extract_db():
        """Extract data from database"""
        data = query_database()
        return data
    
    @task
    def validate(api_data, db_data):
        """Validate data quality"""
        assert len(api_data) > 0
        assert len(db_data) > 0
        return {'valid': True}
    
    @task
    def transform(api_data, db_data, validation):
        """Transform data"""
        combined = combine(api_data, db_data)
        return combined
    
    @task
    def load(transformed_data):
        """Load into warehouse"""
        write_to_warehouse(transformed_data)
    
    # Define dependencies
    api_data = extract_api()
    db_data = extract_db()
    validation = validate(api_data, db_data)
    transformed = transform(api_data, db_data, validation)
    load(transformed)

etl_pipeline()

Dynamic DAG Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json

# Load configuration from external source
def load_config():
    """Load DAG configuration from JSON file"""
    with open('/opt/airflow/config/dags.json') as f:
        return json.load(f)

# Generate DAGs from configuration
for dag_config in load_config():
    
    dag = DAG(
        dag_id=dag_config['name'],
        schedule_interval=dag_config['schedule'],
        start_date=days_ago(1),
        catchup=False,
        tags=dag_config.get('tags', []),
    )
    
    tasks = {}
    for task_config in dag_config['tasks']:
        task = PythonOperator(
            task_id=task_config['name'],
            python_callable=eval(task_config['callable']),
            dag=dag,
        )
        tasks[task_config['name']] = task
    
    # Set dependencies
    for task_config in dag_config['tasks']:
        for dep in task_config.get('dependencies', []):
            tasks[task_config['name']] >> tasks[dep]
    
    globals()[dag_config['name']] = dag

Testing Strategy

# ============================================================
# UNIT TESTING DAGs
# ============================================================

import pytest
from airflow.models import DagBag

@pytest.fixture
def dag_bag():
    """Load all DAGs"""
    return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)

def test_dag_import(dag_bag):
    """Test that all DAGs can be imported"""
    assert len(dag_bag.import_errors) == 0, \
        f"DAG import errors: {dag_bag.import_errors}"

def test_dag_structure(dag_bag):
    """Test DAG structure"""
    dag = dag_bag.get_dag('daily_etl_pipeline')
    
    # Check schedule
    assert dag.schedule_interval == '@daily'
    
    # Check tasks exist
    assert 'extract_api' in dag.task_ids
    assert 'extract_db' in dag.task_ids
    assert 'extract_s3' in dag.task_ids
    assert 'validate' in dag.task_ids
    assert 'transform' in dag.task_ids
    assert 'load' in dag.task_ids

def test_task_dependencies(dag_bag):
    """Test task dependencies"""
    dag = dag_bag.get_dag('daily_etl_pipeline')
    
    # Check dependencies
    extract_api = dag.get_task('extract_api')
    validate = dag.get_task('validate')
    
    assert validate.task_id in [t.task_id for t in extract_api.downstream_list]

def test_retry_config(dag_bag):
    """Test retry configuration"""
    dag = dag_bag.get_dag('daily_etl_pipeline')
    
    for task in dag.tasks:
        if task.task_id.startswith('extract'):
            assert task.retries == 3
            assert task.retry_delay == timedelta(minutes=5)

Production DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.utils.dates import days_ago
from airflow.models import Variable
import json
import logging

logger = logging.getLogger(__name__)

# ============================================================
# DAG CONFIGURATION
# ============================================================

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
    'execution_timeout': timedelta(hours=2),
    'on_failure_callback': None,  # Set in DAG
    'on_retry_callback': None,
}

# ============================================================
# CALLBACK FUNCTIONS
# ============================================================

def alert_failure(context):
    """Send alert on task failure"""
    logger.error(f"Task {context['task_instance'].task_id} failed")
    
    # Send to Slack/PagerDuty
    import requests
    requests.post(
        Variable.get('slack_webhook_url'),
        json={
            'text': f"🚨 ETL Pipeline Failed\n"
                    f"Task: {context['task_instance'].task_id}\n"
                    f"DAG: {context['dag'].dag_id}\n"
                    f"Execution: {context['execution_date']}\n"
                    f"Log: {context['task_instance'].log_url}"
        }
    )

def alert_success(context):
    """Send alert on DAG success"""
    logger.info(f"DAG {context['dag'].dag_id} completed successfully")

# ============================================================
# DATA QUALITY CHECKS
# ============================================================

def validate_api_data(**context):
    """Validate API data quality"""
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract_api')
    
    # Check row count
    if len(data) == 0:
        raise ValueError("API returned no data")
    
    # Check required fields
    required_fields = ['id', 'name', 'timestamp']
    for field in required_fields:
        if field not in data[0]:
            raise ValueError(f"Missing required field: {field}")
    
    # Check data freshness
    latest_timestamp = max(row['timestamp'] for row in data)
    if latest_timestamp < context['execution_date'] - timedelta(days=1):
        raise ValueError("API data is stale")
    
    return {'valid': True, 'row_count': len(data)}

def validate_db_data(**context):
    """Validate database data quality"""
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract_db')
    
    # Check for nulls in critical columns
    for row in data:
        if row.get('user_id') is None:
            raise ValueError(f"Null user_id found: {row}")
    
    return {'valid': True, 'row_count': len(data)}

# ============================================================
# DAG DEFINITION
# ============================================================

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline with data quality checks',
    schedule_interval='0 2 * * *',  # 2 AM daily
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'production', 'daily'],
    on_success_callback=alert_success,
    on_failure_callback=alert_failure,
) as dag:

    # ============================================================
    # EXTRACT TASKS
    # ============================================================
    
    extract_api = PythonOperator(
        task_id='extract_api',
        python_callable=lambda: call_api(
            url=Variable.get('api_url'),
            api_key=Variable.get('api_key')
        ),
        provide_context=True,
    )
    
    extract_db = PythonOperator(
        task_id='extract_db',
        python_callable=lambda: query_database(
            connection_id='postgres_source',
            query="SELECT * FROM users WHERE updated_at > %s"
        ),
        provide_context=True,
    )
    
    extract_s3 = BashOperator(
        task_id='extract_s3',
        bash_command="""
            aws s3 cp s3://source-bucket/data/{{ ds }}/data.csv \
                /tmp/extracted_data.csv
        """,
    )
    
    # ============================================================
    # VALIDATE TASKS
    # ============================================================
    
    validate_api = PythonOperator(
        task_id='validate_api',
        python_callable=validate_api_data,
        provide_context=True,
    )
    
    validate_db = PythonOperator(
        task_id='validate_db',
        python_callable=validate_db_data,
        provide_context=True,
    )
    
    validate_s3 = PostgresOperator(
        task_id='validate_s3',
        postgres_conn_id='postgres_warehouse',
        sql="""
            SELECT COUNT(*) FROM stg.s3_data
            WHERE date = '{{ ds }}'
        """,
    )
    
    # ============================================================
    # TRANSFORM TASKS
    # ============================================================
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        provide_context=True,
        execution_timeout=timedelta(hours=1),
    )
    
    # ============================================================
    # LOAD TASKS
    # ============================================================
    
    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='postgres_warehouse',
        sql="""
            INSERT INTO warehouse.users (id, name, email, updated_at)
            SELECT id, name, email, updated_at
            FROM stg.users_staging
            ON CONFLICT (id) DO UPDATE
            SET name = EXCLUDED.name,
                email = EXCLUDED.email,
                updated_at = EXCLUDED.updated_at;
        """,
    )
    
    # ============================================================
    # DEPENDENCIES
    # ============================================================
    
    # Extract in parallel
    [extract_api, extract_db, extract_s3] >> validate_api
    [extract_api, extract_db, extract_s3] >> validate_db
    [extract_api, extract_db, extract_s3] >> validate_s3
    
    # Validate then transform
    [validate_api, validate_db, validate_s3] >> transform
    
    # Transform then load
    transform >> load

Advanced Patterns

SubDAGs

from airflow.utils.task_group import TaskGroup

def create_extraction_tasks():
    """Create extraction task group"""
    with TaskGroup("extraction") as extraction:
        extract_api = PythonOperator(task_id='extract_api', ...)
        extract_db = PythonOperator(task_id='extract_db', ...)
        extract_s3 = PythonOperator(task_id='extract_s3', ...)
        
        return [extract_api, extract_db, extract_s3]

def create_validation_tasks():
    """Create validation task group"""
    with TaskGroup("validation") as validation:
        validate_api = PythonOperator(task_id='validate_api', ...)
        validate_db = PythonOperator(task_id='validate_db', ...)
        validate_s3 = PythonOperator(task_id='validate_s3', ...)
        
        return [validate_api, validate_db, validate_s3]

# Main DAG
with DAG('etl_pipeline', ...) as dag:
    extraction = create_extraction_tasks()
    validation = create_validation_tasks()
    transform = PythonOperator(task_id='transform', ...)
    load = PythonOperator(task_id='load', ...)
    
    extraction >> validation >> transform >> load

Sensor Pattern

from airflow.sensors.s3KeySensor import S3KeySensor
from airflow.sensors.http_sensor import HttpSensor

# Wait for data availability
wait_for_s3 = S3KeySensor(
    task_id='wait_for_s3',
    bucket_name='source-bucket',
    bucket_key='data/{{ ds }}/data.csv',
    timeout=3600,
    poke_interval=60,
    mode='reschedule',
)

# Wait for API availability
wait_for_api = HttpSensor(
    task_id='wait_for_api',
    http_conn_id='api_endpoint',
    endpoint='/health',
    timeout=300,
    poke_interval=30,
)

Monitoring and Observability

# ============================================================
# CUSTOM METRICS
# ============================================================

from airflow.metrics import metrics

def extract_data(**context):
    """Extract data with metrics"""
    
    # Increment counter
    metrics.incr('etl.extract.api.calls')
    
    # Time the operation
    import time
    start = time.time()
    
    data = call_api()
    
    duration = time.time() - start
    metrics.timing('etl.extract.api.duration', duration)
    
    # Record row count
    metrics.gauge('etl.extract.api.rows', len(data))
    
    return data

# ============================================================
# LOGGING
# ============================================================

def log_pipeline_status(**context):
    """Log pipeline status"""
    ti = context['task_instance']
    
    logger.info(f"""
    Pipeline Status:
    - DAG: {context['dag'].dag_id}
    - Task: {ti.task_id}
    - Execution Date: {context['execution_date']}
    - State: {ti.state}
    - Duration: {ti.duration}
    - Try Number: {ti.try_number}
    """)

πŸ’‘

Production Tip: Use Airflow Variables and Connections for sensitive configuration. Never hardcode credentials in DAGs. Use environment variables or a secrets backend (HashiCorp Vault, AWS Secrets Manager).


Common Follow-Up Questions

Q1: How do you handle backfills in Airflow?

# Option 1: catchup=True with start_date
dag = DAG(
    'backfill_dag',
    start_date=datetime(2024, 1, 1),
    catchup=True,  # Will run for all dates since start_date
)

# Option 2: Manual backfill with specific dates
airflow tasks backfill -s 2024-01-01 -e 2024-01-31 daily_etl_pipeline

# Option 3: Use BackfillOperator
from airflow.operators.empty import EmptyOperator

Q2: How do you handle data dependencies?

# Use ExternalTaskSensor
from airflow.sensors.external_task import ExternalTaskSensor

wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream',
    external_dag_id='upstream_dag',
    external_task_id='load_task',
    execution_delta=timedelta(hours=1),
    mode='reschedule',
)

Q3: How do you manage DAG versioning?

# Use Git for version control
# Use tags for releases
dag = DAG(
    'etl_pipeline',
    tags=['v1.2.3', 'production'],
)

# Use MD5 hash of DAG file for change detection
import hashlib
with open(__file__, 'rb') as f:
    dag_hash = hashlib.md5(f.read()).hexdigest()

Q4: How do you handle secrets?

# Use Airflow Variables
api_key = Variable.get('api_key')

# Use Connections
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_connection')

# Use Secrets Backend
from airflow.providers.amazon.aws.secrets.secrets_manager import SecretsManagerBackend

⚠️

Critical Consideration: Airflow is not designed for real-time streaming. Use it for batch workflows and orchestration. For streaming, use Kafka, Flink, or Spark Streaming. Airflow can trigger streaming jobs but shouldn't process events itself.


Company-Specific Tips

Airbnb Interview Tips

  • Discuss SubDAGs and TaskGroups for complex workflows
  • Explain data quality with Great Expectations
  • Mention cost optimization with spot instances
  • Talk about multi-tenancy with role-based access

Spotify Interview Tips

  • Focus on data product delivery pipelines
  • Discuss A/B testing data pipelines
  • Mention ML pipeline orchestration
  • Talk about data freshness monitoring

Lyft Interview Tips

  • Discuss geospatial data processing pipelines
  • Explain real-time analytics with Airflow
  • Mention cost optimization for ride data
  • Talk about multi-region deployment

ℹ️

Final Takeaway: Airflow is the de facto standard for batch data pipeline orchestration. Master DAG design patterns, error handling, and testing to build production-ready pipelines. Always consider: (1) idempotency, (2) retry logic, (3) monitoring, and (4) cost optimization.

Advertisement