πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Airflow: DAGs, Operators, and Scheduling

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Apache Airflow: Building Production-Grade DAGs

Apache Airflow is the industry-standard open-source workflow orchestration platform for programmatically authoring, scheduling, and monitoring data pipelines. Originally developed at Airbnb in 2014, Airflow has become the backbone of data infrastructure at thousands of organizations.

Apache Airflow ArchitectureWeb ServerFlask UI DashboardMonitor & trigger DAGsSchedulerCore orchestration engineManages DAG runs & tasksMetadata DBPostgreSQL backendDAG state & historyWorker 1Celery ExecutorRuns tasksWorker 2Celery ExecutorRuns tasksWorker 3Celery ExecutorRuns tasksComponents: Web Server | Scheduler | Workers | Metadata DB | Redis (broker)

Why Airflow Dominates Data Orchestration


Core Innovation:

  • Treats workflows as code β€” Python-defined DAGs
  • Encodes dependencies, scheduling, retry logic, and alerting
  • Python-native authoring model leverages the full Python ecosystem

Key Capabilities:

  1. Task dependencies β€” branching and joining workflows
  2. Cross-task data passing β€” via XComs
  3. Scheduling β€” time-based and event-driven
  4. Distributed execution β€” across a worker pool
  5. Production-grade UI β€” monitoring and manual intervention

Key Insight: Unlike cron-based scheduling or custom orchestration scripts, Airflow provides a rich set of primitives for expressing complex workflow logic.

Architecture Diagram

A DAG is a finite directed graph with no directed cycles. In workflow orchestration, a DAG represents a collection of tasks with dependencies: each node is a task, and each directed edge represents a dependency (upstream -> downstream). The acyclic property guarantees that the workflow has a well-defined execution order. For a DAG with n tasks and e edges, topological sort produces a valid execution sequence in O(n + e) time.

An operator is an atomic unit of work in Airflow. Each operator encapsulates a single task β€” executing a bash command, running a Python function, submitting a Spark job, or querying a database. Operators are stateless, composable, and idempotent by design. Airflow provides 100+ built-in operators and supports custom operators via the BaseOperator class.

A sensor is a special operator that polls an external system until a condition is met (e.g., a file arrives in S3, a database table is created, an API returns a 200 status). Sensors are designed for event-driven workflows where tasks must wait for external triggers before proceeding. Sensors implement the poke() method, which returns True when the condition is satisfied.

DAG Scheduling Frequency

The scheduling interval is defined by schedule_interval (Airflow 1.x) or schedule (Airflow 2.x using Timetables). For a DAG with schedule_interval @daily, the execution date is T-1 (the DAGrun covers the period [T-1, T]). The catchup parameter controls whether historical DAGruns are backfilled. Total DAGruns to execute = (current_date - start_date) / schedule_interval.

Airflow enforces that every DAG must be acyclic. Before scheduling, the scheduler performs a topological sort and detects cycles using Tarjan's algorithm. If a cycle exists, the DAG is marked as invalid and no tasks are scheduled. This invariant guarantees that every workflow terminates and avoids deadlock in distributed execution.

Key Concepts

ConceptDescriptionExample
DAGDirected Acyclic Graph defining task dependenciesetl_dag = DAG('daily_etl', schedule='@daily')
OperatorAtomic unit of workBashOperator, PythonOperator, SparkSubmitOperator
TaskInstance of an operator within a DAGextract_task = PythonOperator(task_id='extract', ...)
Task InstanceA specific run of a task in a DAGruntask_instance = TaskInstance(task=extract_task, execution_date=...)
DAGrunA logical execution of a DAG at a specific timeOne @daily DAGrun per day
Execution DateThe logical time the DAGrun representsexecution_date = 2024-01-15 for a run covering Jan 15
Schedule IntervalFrequency of DAGruns@hourly, @daily, @weekly, timedelta(hours=6)
CatchupWhether to backfill historical DAGrunscatchup=False prevents backfilling
PoolResource slot limiting concurrent task executionspark_pool = Pool('spark', slots=4)
XComCross-Communication for passing data between tasksti.xcom_push(key='result', value=data)
SensorOperator that waits for an external conditionS3KeySensor(bucket='data', key='file.csv')
BranchingConditional task execution based on runtime logicBranchPythonOperator(task_id='branch', ...)
Trigger RuleCondition for task execution based on upstream statetrigger_rule='all_failed' runs on all failures
CallbackFunctions invoked on task state changeson_success_callback, on_failure_callback
ConnectionStored credentials for external systemsconn_id='postgres_default'
VariableKey-value store for pipeline configurationVariable.get('api_key')
SubDAGNested DAG for modular workflow compositionSubDAG(subdag='etl.extract_subdag')
TaskGroupLogical grouping of tasks in the UI (Airflow 2.x)with TaskGroup('extract') as extract_group:

Pipeline Throughput

For a DAG with n tasks, where each task i has execution time t_i and the critical path has length L = max(t_1 + t_2 + ... + t_k) over all paths, the minimum pipeline duration is L (critical path). Parallelization reduces wall-clock time only for non-critical tasks. Effective throughput = total_records / max(L, total_records / parallel_capacity).

  1. Define the DAG: Create a DAG object with schedule_interval, start_date, catchup, max_active_runs, and default_args.
  2. Configure default_args: Set owner, retries, retry_delay, execution_timeout, on_failure_callback, and email for all tasks.
  3. Create tasks: Instantiate operators for each pipeline step (extract, transform, load, validate).
  4. Define dependencies: Use >> and << operators to express task ordering: extract >> transform >> load >> validate.
  5. Add error handling: Configure trigger_rule, on_failure_callback, and on_retry_callback for graceful failure handling.
  6. Implement idempotency: Ensure each task produces the same output when re-run with the same execution_date.
  7. Test locally: Run airflow dags test <dag_id> <execution_date> to validate the DAG without scheduling.
  8. Deploy and monitor: Push to the dags/ directory, verify the DAG appears in the UI, and monitor the first run.

Airflow 2.x vs 1.x: Airflow 2.x introduced the TaskFlow API (decorator-based task definition), dynamic task mapping, the TaskGroup concept, improved scheduler performance, and a redesigned web UI. Always use Airflow 2.x for new projects. The TaskFlow API (@dag and @task decorators) eliminates boilerplate for Python-heavy workflows.

Production DAG Example

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.sensors.s3_key import S3KeySensor
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable
import logging

logger = logging.getLogger(__name__)

default_args = {
    "owner": "data-engineering",           # Team responsible for this DAG
    "retries": 3,                          # Auto-retry failed tasks up to 3 times
    "retry_delay": timedelta(minutes=5),   # Wait 5 minutes between retries
    "execution_timeout": timedelta(hours=2),  # Kill tasks after 2 hours
    "email_on_failure": True,              # Send email on final failure
    "email": ["data-alerts@company.com"],  # Alert recipients
    "on_failure_callback": lambda context: logger.error(
        f"Task {context['task_instance'].task_id} failed"
    ),
}


def extract_orders(**context):
    """
    Extract orders from the source database for the execution date.
    
    Parameters (via Airflow context):
        execution_date (datetime): The logical date for this DAG run
        ti (TaskInstance): Task instance for XCom operations
        
    Returns:
        list: Order records as list of dicts for XCom serialization
    """
    execution_date = context["execution_date"]
    import sqlalchemy

    engine = sqlalchemy.create_engine(
        Variable.get("source_db_connection")
    )
    query = f"""
        SELECT order_id, customer_id, amount, status, created_at
        FROM orders
        WHERE DATE(created_at) = '{execution_date.strftime('%Y-%m-%d')}'
    """
    df = pd.read_sql(query, engine)
    logger.info(f"Extracted {len(df)} orders for {execution_date}")

    # Store row count in XCom for downstream monitoring
    context["ti"].xcom_push(key="row_count", value=len(df))
    return df.to_dict("records")


def transform_orders(orders: list, **context):
    """
    Cleanse, deduplicate, and enrich order data.
    
    Parameters:
        orders (list): Raw order records from extract task
        context (dict): Airflow task context
        
    Returns:
        list: Transformed order records with customer tier enrichment
    """
    import pandas as pd

    df = pd.DataFrame(orders)
    df = df.drop_duplicates(subset=["order_id"])
    df = df[df["amount"] > 0]
    df["processed_at"] = datetime.utcnow()

    # Enrich with customer tier via merge
    customer_df = pd.read_parquet(
        Variable.get("customer_data_path")
    )
    df = df.merge(customer_df[["customer_id", "tier"]], on="customer_id", how="left")

    context["ti"].xcom_push(key="transformed_count", value=len(df))
    return df.to_dict("records")


def load_to_warehouse(records: list, **context):
    """
    Load transformed records into the data warehouse.
    
    Parameters:
        records (list): Transformed order records from transform task
        context (dict): Airflow task context
        
    Note: Uses chunked inserts (chunksize=1000) for memory efficiency
          with large datasets. For production, consider MERGE/UPSERT.
    """
    import sqlalchemy

    engine = sqlalchemy.create_engine(
        Variable.get("warehouse_connection")
    )
    df = pd.DataFrame(records)
    df.to_sql(
        "orders_fact",
        engine,
        if_exists="append",
        index=False,
        method="multi",     # Multi-row inserts for better performance
        chunksize=1000,     # Batch size to prevent memory issues
    )
    logger.info(f"Loaded {len(df)} records to warehouse")


with DAG(
    dag_id="daily_order_etl",
    default_args=default_args,
    description="Daily order extraction, transformation, and warehouse load",
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["orders", "etl", "production"],
) as dag:

    # Sensor: wait for source data to be available in S3
    wait_for_data = S3KeySensor(
        task_id="wait_for_source_data",
        bucket_name="raw-data-lake",
        bucket_key="orders/{{ ds_nodash }}/orders.csv",
        timeout=3600,
        poke_interval=60,
        mode="reschedule",
    )

    with TaskGroup("etl_tasks") as etl_group:
        extract = PythonOperator(
            task_id="extract_orders",
            python_callable=extract_orders,
        )

        transform = PythonOperator(
            task_id="transform_orders",
            python_callable=transform_orders,
            op_kwargs={"orders": "{{ ti.xcom_pull(task_ids='etl_tasks.extract_orders') }}"},
        )

        load = PythonOperator(
            task_id="load_to_warehouse",
            python_callable=load_to_warehouse,
            op_kwargs={"records": "{{ ti.xcom_pull(task_ids='etl_tasks.transform_orders') }}"},
        )

        extract >> transform >> load

    notify_success = EmailOperator(
        task_id="notify_success",
        to=["data-alerts@company.com"],
        subject="Daily ETL completed for {{ ds }}",
        html_content="<p>Pipeline completed successfully.</p>",
        trigger_rule="all_success",
    )

    wait_for_data >> etl_group >> notify_success

Operator Comparison Table

OperatorUse CaseExecutionRetry Support
BashOperatorShell commandsLocal workerYes
PythonOperatorPython functionsLocal workerYes
SparkSubmitOperatorSpark jobsRemote clusterYes
PostgresOperatorSQL queriesDirect DBYes
S3KeySensorWait for S3 objectPollingN/A
HttpSensorWait for API endpointPollingN/A
DockerOperatorContainer executionDocker engineYes
KubernetesPodOperatorPod executionK8s clusterYes
BranchPythonOperatorConditional branchingLocal workerYes
DummyOperatorNo-op placeholderLocal workerN/A
  • Airflow treats workflows as Python code, enabling version control, testing, and CI/CD for pipeline definitions.
  • DAGs encode task dependencies and scheduling; operators encapsulate atomic work units; sensors handle event-driven triggers.
  • The scheduler enforces the acyclicity invariant using Tarjan's algorithm before scheduling any DAGrun.
  • Pipeline duration is bounded by the critical path; parallelization reduces wall-clock time for non-critical tasks.
  • Use catchup=False in production to avoid accidental backfilling of historical data.
  • Always configure execution_timeout, retries, and on_failure_callback for production robustness.
  • Airflow 2.x TaskFlow API and TaskGroup significantly reduce boilerplate for Python-heavy workflows.

Best Practices

  1. Keep DAGs simple: Each DAG should represent a single logical workflow. Avoid nested SubDAGs; prefer TaskGroup for visual organization.
  2. Use catchup=False in production to prevent unintended backfilling. Enable catchup only when explicitly backfilling historical data.
  3. Set max_active_runs=1 for DAGs that modify shared state (e.g., database writes) to prevent race conditions.
  4. Parameterize with Variables and Connections, not hardcoded values. Store secrets in the Airflow metadata database or a vault.
  5. Implement idempotent tasks: every task must produce identical output when re-run with the same execution_date.
  6. Use Sensors in reschedule mode to free up worker slots while waiting. poke mode holds a slot for the entire wait duration.
  7. Configure execution_timeout on every task to prevent hung tasks from consuming resources indefinitely.
  8. Monitor with callbacks: set on_failure_callback and on_retry_callback to integrate with Slack, PagerDuty, or email.
  9. Test DAGs locally with airflow dags test before deploying to production. Validate task dependencies and data flow.
  10. Version control DAGs using Git. Deploy via CI/CD pipelines that run linting, type checking, and unit tests.

Airflow Configuration Reference

ParameterDefaultRecommendedDescription
parallelism3264-128Max tasks running across all DAGs
dag_concurrency1632-64Max tasks per DAG run
max_active_runs_per_dag161-4Concurrent DAG runs per DAG
worker_concurrency168-32Tasks per Celery worker
pool slots128VariesResource-limited task slots
email_on_retryTrueFalseReduce email noise
default_retry_delay5 min5-15 minWait between retries
max_retry_delay24 hours1-6 hoursCap on exponential backoff

See Also

⭐

Premium Content

Apache Airflow: DAGs, Operators, and Scheduling

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement