Apache Airflow: Building Production-Grade DAGs
Apache Airflow is the industry-standard open-source workflow orchestration platform for programmatically authoring, scheduling, and monitoring data pipelines. Originally developed at Airbnb in 2014, Airflow has become the backbone of data infrastructure at thousands of organizations.
Why Airflow Dominates Data Orchestration
Core Innovation:
- Treats workflows as code β Python-defined DAGs
- Encodes dependencies, scheduling, retry logic, and alerting
- Python-native authoring model leverages the full Python ecosystem
Key Capabilities:
- Task dependencies β branching and joining workflows
- Cross-task data passing β via XComs
- Scheduling β time-based and event-driven
- Distributed execution β across a worker pool
- Production-grade UI β monitoring and manual intervention
Key Insight: Unlike cron-based scheduling or custom orchestration scripts, Airflow provides a rich set of primitives for expressing complex workflow logic.
Architecture Diagram
A DAG is a finite directed graph with no directed cycles. In workflow orchestration, a DAG represents a collection of tasks with dependencies: each node is a task, and each directed edge represents a dependency (upstream -> downstream). The acyclic property guarantees that the workflow has a well-defined execution order. For a DAG with n tasks and e edges, topological sort produces a valid execution sequence in O(n + e) time.
An operator is an atomic unit of work in Airflow. Each operator encapsulates a single task β executing a bash command, running a Python function, submitting a Spark job, or querying a database. Operators are stateless, composable, and idempotent by design. Airflow provides 100+ built-in operators and supports custom operators via the BaseOperator class.
A sensor is a special operator that polls an external system until a condition is met (e.g., a file arrives in S3, a database table is created, an API returns a 200 status). Sensors are designed for event-driven workflows where tasks must wait for external triggers before proceeding. Sensors implement the poke() method, which returns True when the condition is satisfied.
DAG Scheduling Frequency
The scheduling interval is defined by schedule_interval (Airflow 1.x) or schedule (Airflow 2.x using Timetables). For a DAG with schedule_interval @daily, the execution date is T-1 (the DAGrun covers the period [T-1, T]). The catchup parameter controls whether historical DAGruns are backfilled. Total DAGruns to execute = (current_date - start_date) / schedule_interval.
Airflow enforces that every DAG must be acyclic. Before scheduling, the scheduler performs a topological sort and detects cycles using Tarjan's algorithm. If a cycle exists, the DAG is marked as invalid and no tasks are scheduled. This invariant guarantees that every workflow terminates and avoids deadlock in distributed execution.
Key Concepts
| Concept | Description | Example |
|---|---|---|
| DAG | Directed Acyclic Graph defining task dependencies | etl_dag = DAG('daily_etl', schedule='@daily') |
| Operator | Atomic unit of work | BashOperator, PythonOperator, SparkSubmitOperator |
| Task | Instance of an operator within a DAG | extract_task = PythonOperator(task_id='extract', ...) |
| Task Instance | A specific run of a task in a DAGrun | task_instance = TaskInstance(task=extract_task, execution_date=...) |
| DAGrun | A logical execution of a DAG at a specific time | One @daily DAGrun per day |
| Execution Date | The logical time the DAGrun represents | execution_date = 2024-01-15 for a run covering Jan 15 |
| Schedule Interval | Frequency of DAGruns | @hourly, @daily, @weekly, timedelta(hours=6) |
| Catchup | Whether to backfill historical DAGruns | catchup=False prevents backfilling |
| Pool | Resource slot limiting concurrent task execution | spark_pool = Pool('spark', slots=4) |
| XCom | Cross-Communication for passing data between tasks | ti.xcom_push(key='result', value=data) |
| Sensor | Operator that waits for an external condition | S3KeySensor(bucket='data', key='file.csv') |
| Branching | Conditional task execution based on runtime logic | BranchPythonOperator(task_id='branch', ...) |
| Trigger Rule | Condition for task execution based on upstream state | trigger_rule='all_failed' runs on all failures |
| Callback | Functions invoked on task state changes | on_success_callback, on_failure_callback |
| Connection | Stored credentials for external systems | conn_id='postgres_default' |
| Variable | Key-value store for pipeline configuration | Variable.get('api_key') |
| SubDAG | Nested DAG for modular workflow composition | SubDAG(subdag='etl.extract_subdag') |
| TaskGroup | Logical grouping of tasks in the UI (Airflow 2.x) | with TaskGroup('extract') as extract_group: |
Pipeline Throughput
For a DAG with n tasks, where each task i has execution time t_i and the critical path has length L = max(t_1 + t_2 + ... + t_k) over all paths, the minimum pipeline duration is L (critical path). Parallelization reduces wall-clock time only for non-critical tasks. Effective throughput = total_records / max(L, total_records / parallel_capacity).
- Define the DAG: Create a DAG object with
schedule_interval,start_date,catchup,max_active_runs, anddefault_args. - Configure default_args: Set
owner,retries,retry_delay,execution_timeout,on_failure_callback, andemailfor all tasks. - Create tasks: Instantiate operators for each pipeline step (extract, transform, load, validate).
- Define dependencies: Use
>>and<<operators to express task ordering:extract >> transform >> load >> validate. - Add error handling: Configure
trigger_rule,on_failure_callback, andon_retry_callbackfor graceful failure handling. - Implement idempotency: Ensure each task produces the same output when re-run with the same execution_date.
- Test locally: Run
airflow dags test <dag_id> <execution_date>to validate the DAG without scheduling. - Deploy and monitor: Push to the
dags/directory, verify the DAG appears in the UI, and monitor the first run.
Airflow 2.x vs 1.x: Airflow 2.x introduced the TaskFlow API (decorator-based task definition), dynamic task mapping, the TaskGroup concept, improved scheduler performance, and a redesigned web UI. Always use Airflow 2.x for new projects. The TaskFlow API (@dag and @task decorators) eliminates boilerplate for Python-heavy workflows.
Production DAG Example
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.sensors.s3_key import S3KeySensor
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-engineering", # Team responsible for this DAG
"retries": 3, # Auto-retry failed tasks up to 3 times
"retry_delay": timedelta(minutes=5), # Wait 5 minutes between retries
"execution_timeout": timedelta(hours=2), # Kill tasks after 2 hours
"email_on_failure": True, # Send email on final failure
"email": ["data-alerts@company.com"], # Alert recipients
"on_failure_callback": lambda context: logger.error(
f"Task {context['task_instance'].task_id} failed"
),
}
def extract_orders(**context):
"""
Extract orders from the source database for the execution date.
Parameters (via Airflow context):
execution_date (datetime): The logical date for this DAG run
ti (TaskInstance): Task instance for XCom operations
Returns:
list: Order records as list of dicts for XCom serialization
"""
execution_date = context["execution_date"]
import sqlalchemy
engine = sqlalchemy.create_engine(
Variable.get("source_db_connection")
)
query = f"""
SELECT order_id, customer_id, amount, status, created_at
FROM orders
WHERE DATE(created_at) = '{execution_date.strftime('%Y-%m-%d')}'
"""
df = pd.read_sql(query, engine)
logger.info(f"Extracted {len(df)} orders for {execution_date}")
# Store row count in XCom for downstream monitoring
context["ti"].xcom_push(key="row_count", value=len(df))
return df.to_dict("records")
def transform_orders(orders: list, **context):
"""
Cleanse, deduplicate, and enrich order data.
Parameters:
orders (list): Raw order records from extract task
context (dict): Airflow task context
Returns:
list: Transformed order records with customer tier enrichment
"""
import pandas as pd
df = pd.DataFrame(orders)
df = df.drop_duplicates(subset=["order_id"])
df = df[df["amount"] > 0]
df["processed_at"] = datetime.utcnow()
# Enrich with customer tier via merge
customer_df = pd.read_parquet(
Variable.get("customer_data_path")
)
df = df.merge(customer_df[["customer_id", "tier"]], on="customer_id", how="left")
context["ti"].xcom_push(key="transformed_count", value=len(df))
return df.to_dict("records")
def load_to_warehouse(records: list, **context):
"""
Load transformed records into the data warehouse.
Parameters:
records (list): Transformed order records from transform task
context (dict): Airflow task context
Note: Uses chunked inserts (chunksize=1000) for memory efficiency
with large datasets. For production, consider MERGE/UPSERT.
"""
import sqlalchemy
engine = sqlalchemy.create_engine(
Variable.get("warehouse_connection")
)
df = pd.DataFrame(records)
df.to_sql(
"orders_fact",
engine,
if_exists="append",
index=False,
method="multi", # Multi-row inserts for better performance
chunksize=1000, # Batch size to prevent memory issues
)
logger.info(f"Loaded {len(df)} records to warehouse")
with DAG(
dag_id="daily_order_etl",
default_args=default_args,
description="Daily order extraction, transformation, and warehouse load",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["orders", "etl", "production"],
) as dag:
# Sensor: wait for source data to be available in S3
wait_for_data = S3KeySensor(
task_id="wait_for_source_data",
bucket_name="raw-data-lake",
bucket_key="orders/{{ ds_nodash }}/orders.csv",
timeout=3600,
poke_interval=60,
mode="reschedule",
)
with TaskGroup("etl_tasks") as etl_group:
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
transform = PythonOperator(
task_id="transform_orders",
python_callable=transform_orders,
op_kwargs={"orders": "{{ ti.xcom_pull(task_ids='etl_tasks.extract_orders') }}"},
)
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_warehouse,
op_kwargs={"records": "{{ ti.xcom_pull(task_ids='etl_tasks.transform_orders') }}"},
)
extract >> transform >> load
notify_success = EmailOperator(
task_id="notify_success",
to=["data-alerts@company.com"],
subject="Daily ETL completed for {{ ds }}",
html_content="<p>Pipeline completed successfully.</p>",
trigger_rule="all_success",
)
wait_for_data >> etl_group >> notify_success
Operator Comparison Table
| Operator | Use Case | Execution | Retry Support |
|---|---|---|---|
BashOperator | Shell commands | Local worker | Yes |
PythonOperator | Python functions | Local worker | Yes |
SparkSubmitOperator | Spark jobs | Remote cluster | Yes |
PostgresOperator | SQL queries | Direct DB | Yes |
S3KeySensor | Wait for S3 object | Polling | N/A |
HttpSensor | Wait for API endpoint | Polling | N/A |
DockerOperator | Container execution | Docker engine | Yes |
KubernetesPodOperator | Pod execution | K8s cluster | Yes |
BranchPythonOperator | Conditional branching | Local worker | Yes |
DummyOperator | No-op placeholder | Local worker | N/A |
- Airflow treats workflows as Python code, enabling version control, testing, and CI/CD for pipeline definitions.
- DAGs encode task dependencies and scheduling; operators encapsulate atomic work units; sensors handle event-driven triggers.
- The scheduler enforces the acyclicity invariant using Tarjan's algorithm before scheduling any DAGrun.
- Pipeline duration is bounded by the critical path; parallelization reduces wall-clock time for non-critical tasks.
- Use
catchup=Falsein production to avoid accidental backfilling of historical data. - Always configure
execution_timeout,retries, andon_failure_callbackfor production robustness. - Airflow 2.x TaskFlow API and TaskGroup significantly reduce boilerplate for Python-heavy workflows.
Best Practices
- Keep DAGs simple: Each DAG should represent a single logical workflow. Avoid nested SubDAGs; prefer TaskGroup for visual organization.
- Use
catchup=Falsein production to prevent unintended backfilling. Enable catchup only when explicitly backfilling historical data. - Set
max_active_runs=1for DAGs that modify shared state (e.g., database writes) to prevent race conditions. - Parameterize with Variables and Connections, not hardcoded values. Store secrets in the Airflow metadata database or a vault.
- Implement idempotent tasks: every task must produce identical output when re-run with the same
execution_date. - Use Sensors in
reschedulemode to free up worker slots while waiting.pokemode holds a slot for the entire wait duration. - Configure
execution_timeouton every task to prevent hung tasks from consuming resources indefinitely. - Monitor with callbacks: set
on_failure_callbackandon_retry_callbackto integrate with Slack, PagerDuty, or email. - Test DAGs locally with
airflow dags testbefore deploying to production. Validate task dependencies and data flow. - Version control DAGs using Git. Deploy via CI/CD pipelines that run linting, type checking, and unit tests.
Airflow Configuration Reference
| Parameter | Default | Recommended | Description |
|---|---|---|---|
parallelism | 32 | 64-128 | Max tasks running across all DAGs |
dag_concurrency | 16 | 32-64 | Max tasks per DAG run |
max_active_runs_per_dag | 16 | 1-4 | Concurrent DAG runs per DAG |
worker_concurrency | 16 | 8-32 | Tasks per Celery worker |
pool slots | 128 | Varies | Resource-limited task slots |
email_on_retry | True | False | Reduce email noise |
default_retry_delay | 5 min | 5-15 min | Wait between retries |
max_retry_delay | 24 hours | 1-6 hours | Cap on exponential backoff |
See Also
- 018 - Advanced Airflow: XComs, Dynamic DAGs, and the TaskFlow API - Advanced patterns for complex workflows
- 029 - Prefect: Modern Workflow Orchestration - Alternative Python-native orchestration
- 016 - ETL vs ELT: Choosing the Right Approach - Pipeline paradigm selection
- 028 - Error Handling, Retries, and Dead Letter Queues - Production error handling patterns
- 027 - Pipeline Monitoring and Observability - Monitoring Airflow pipelines