DAG Design Patterns
Architecture Diagram
Formal Definitions
DfTask Dependency
A task dependency is a directed edge in the DAG indicating that task cannot execute until completes. Dependencies enforce execution ordering via , meaning must finish before begins.
DfIdempotency
A task is idempotent if executing it times with the same input produces the same output as executing it once: . This property is essential for safe retries in distributed workflow systems.
DfFan-Out / Fan-In
Fan-out is the pattern where a single task spawns parallel downstream tasks. Fan-in is the convergence of parallel branches into a single aggregation task. The fan-out factor is .
Detailed Explanation
DAG Composition Fundamentals
A DAG in Airflow represents a collection of tasks with organized dependencies. The DAG object defines structure and schedule, while tasks perform actual work.
DAG Definition Requirements:
| Parameter | Description | Example |
|---|---|---|
dag_id | Unique identifier | 'my_etl_pipeline' |
start_date | First possible execution | datetime(2024, 1, 1) |
schedule_interval | When to run | '@daily', timedelta(hours=1) |
catchup | Backfill missed runs | True or False |
Task Dependencies:
- Use
>>and<<operators:task1 >> task2 - Fan-in:
[task1, task2] >> task3 - Programmatic:
set_upstream()/set_downstream()
Default Arguments: Common config for all tasks β retry behavior, email, timeout, etc. Task-specific args override defaults.
Advanced Design Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Branching | Conditional paths based on runtime | Decision logic |
| Fan-Out/Fan-In | Parallel processing + aggregation | Data partitioning |
| SubDAG | Reusable workflow templates (deprecated) | Legacy systems |
| Dynamic DAGs | Config-driven generation | Multi-tenant environments |
| TaskGroup | Logical grouping (preferred over SubDAG) | Complex pipelines |
Task Configuration Best Practices
| Practice | Why | How |
|---|---|---|
| Idempotency | Safe retries without corruption | Use upserts, unique IDs |
| Atomic Operations | Easier debugging & retries | One task = one operation |
| Data Passing | Performance optimization | XCom (small) vs S3/GCS (large) |
| Resource Management | Fair scheduling | Set resources parameter |
Error Handling and Recovery
Retry Logic: Configure exponential backoff for transient failures.
Here,
- =Base retry delay in seconds
- =Zero-indexed attempt number
- =Wait time before the next retry
Use retry_exponential_backoff=True with retry_delay=timedelta(seconds=30) for production workloads. This starts at 30s and doubles each attempt (30s, 60s, 120s, 240s...), preventing thundering herd on external services.
Advanced DAG Patterns
TaskGroup is preferred over SubDAG for new implementations. TaskGroup provides better UI integration, simpler debugging, and no separate scheduler overhead. SubDAG is deprecated in favor of TaskGroup for grouping tasks.
DAG Critical Path Length
Here,
- =Set of all paths from source to sink
- =Execution time of task t
- =Longest path through the DAG (minimum possible completion time)
ThIdempotency Theorem (Retry Safety)
If a task is idempotent, then for any number of retries , the final state satisfies . This guarantees that retries do not cause data corruption or duplication. Proof sketch: By induction, and since by idempotency.
When designing fan-out/fan-in patterns, aim for a balanced fan-out factor. If parallel tasks have highly variable execution times , the aggregate task must wait for . Consider partitioning work evenly to minimize idle time.
Timeouts: Always set execution_timeout to prevent tasks from hanging indefinitely. This protects against infinite loops and external service failures.
Callbacks: Use on_failure_callback and on_success_callback for custom notification and logging. Callbacks can integrate with external monitoring systems and alerting mechanisms.
Partial Failure Handling: In complex workflows, implement strategies for handling partial failures. Use TriggerRule to control task execution based on upstream task states.
Key Concepts Table
| Pattern | Use Case | Complexity | Parallelism | Best For |
|---|---|---|---|---|
| Linear | Sequential processing | Low | None | Simple ETL |
| Branching | Conditional logic | Medium | Limited | Decision workflows |
| Fan-Out/Fan-In | Parallel processing | Medium | High | Data partitioning |
| TaskGroup | Logical grouping | Low | Full | Complex pipelines |
| SubDAG | Reusable components | High | Full | Template workflows (deprecated) |
| Dynamic DAGs | Configuration-driven | High | Full | Multi-tenant systems |
| Cross-Sensor | External dependencies | Medium | Limited | Event-driven workflows |
| Backfill | Historical data recovery | Low | Configurable | Data recovery |
Pattern Selection Guide
Code Examples
Advanced Branching Pattern
# branching_pattern.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def determine_processing_path(**context):
"""Determine which processing path to take based on data characteristics."""
import random
# Simulate data analysis
data_volume = random.randint(1, 1000)
data_quality = random.choice(['high', 'medium', 'low'])
# Store metadata for downstream tasks
context['ti'].xcom_push(key='data_volume', value=data_volume)
context['ti'].xcom_push(key='data_quality', value=data_quality)
# Decision logic
if data_volume > 500:
return 'process_large_dataset'
elif data_quality == 'low':
return 'data_quality_check'
else:
return 'standard_processing'
def process_large_dataset(**context):
"""Process large datasets with distributed processing."""
data_volume = context['ti'].xcom_pull(
task_ids='determine_path',
key='data_volume'
)
print(f"Processing large dataset with volume: {data_volume}")
# Implementation for large dataset processing
def data_quality_check(**context):
"""Perform data quality checks and remediation."""
data_quality = context['ti'].xcom_pull(
task_ids='determine_path',
key='data_quality'
)
print(f"Data quality is {data_quality}, performing checks...")
# Implementation for data quality checks
def standard_processing(**context):
"""Standard data processing pipeline."""
print("Executing standard processing pipeline")
# Standard processing implementation
def aggregate_results(**context):
"""Aggregate results from all processing paths."""
# Use TriggerRule to execute even if some branches are skipped
print("Aggregating results from all processing paths")
with DAG(
'advanced_branching_pattern',
default_args=default_args,
description='Advanced branching pattern with multiple conditions',
schedule_interval=timedelta(hours=6),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['branching', 'advanced'],
) as dag:
start = EmptyOperator(task_id='start')
determine_path = BranchPythonOperator(
task_id='determine_path',
python_callable=determine_processing_path,
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=process_large_dataset,
)
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
)
standard_proc = PythonOperator(
task_id='standard_processing',
python_callable=standard_processing,
)
# Merge point - uses TriggerRule to handle skipped branches
aggregate = PythonOperator(
task_id='aggregate_results',
python_callable=aggregate_results,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
end = EmptyOperator(task_id='end')
# Define dependencies
start >> determine_path
determine_path >> [process_large, quality_check, standard_proc]
[process_large, quality_check, standard_proc] >> aggregate >> end
Dynamic DAG Generation
# dynamic_dag_generation.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import BaseOperator
from typing import List, Dict, Any
import json
class DynamicDAGFactory:
"""Factory for creating dynamic DAGs based on configuration."""
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""Load DAG configuration from JSON file."""
with open(config_path, 'r') as f:
return json.load(f)
def create_dag(self, dag_id: str, schedule: str) -> DAG:
"""Create a DAG based on configuration."""
default_args = {
'owner': self.config.get('owner', 'airflow'),
'depends_on_past': False,
'retries': self.config.get('retries', 1),
'retry_delay': timedelta(
minutes=self.config.get('retry_delay_minutes', 5)
),
}
with DAG(
dag_id=dag_id,
default_args=default_args,
description=self.config.get('description', 'Dynamic DAG'),
schedule_interval=schedule,
start_date=datetime(2024, 1, 1),
catchup=self.config.get('catchup', False),
tags=self.config.get('tags', []),
) as dag:
# Create tasks from configuration
tasks = {}
for task_config in self.config['tasks']:
task_id = task_config['id']
task_type = task_config['type']
if task_type == 'python':
tasks[task_id] = PythonOperator(
task_id=task_id,
python_callable=self._get_callable(
task_config['callable']
),
op_kwargs=task_config.get('kwargs', {}),
)
elif task_type == 'bash':
tasks[task_id] = BashOperator(
task_id=task_id,
command=task_config['command'],
)
# Set dependencies
for dep in task_config.get('dependencies', []):
tasks[dep] >> tasks[task_id]
return dag
def _get_callable(self, callable_name: str):
"""Get callable function by name."""
# In practice, this would map to actual functions
def generic_callable(**context):
print(f"Executing {callable_name}")
return {"status": "success"}
return generic_callable
# Example configuration
config = {
"owner": "data_engineering",
"description": "Dynamic ETL pipeline",
"retries": 2,
"retry_delay_minutes": 10,
"catchup": False,
"tags": ["dynamic", "etl"],
"tasks": [
{
"id": "extract",
"type": "python",
"callable": "extract_data",
"dependencies": [],
"kwargs": {"source": "database"}
},
{
"id": "transform",
"type": "python",
"callable": "transform_data",
"dependencies": ["extract"],
"kwargs": {"transformations": ["clean", "aggregate"]}
},
{
"id": "load",
"type": "bash",
"command": "echo 'Loading data to warehouse'",
"dependencies": ["transform"]
}
]
}
# Create DAGs for multiple datasets
datasets = ['users', 'orders', 'products']
factory = DynamicDAGFactory('config.json')
for dataset in datasets:
dag_id = f'dynamic_etl_{dataset}'
globals()[dag_id] = factory.create_dag(
dag_id=dag_id,
schedule='@daily'
)
TaskGroup with Advanced Dependencies
# taskgroup_advanced.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def extract_from_source(source_name: str, **context):
"""Extract data from a specific source."""
print(f"Extracting data from {source_name}")
# Simulate extraction
import time
time.sleep(2)
return {"source": source_name, "records": 1000}
def transform_data(data: dict, **context):
"""Transform extracted data."""
print(f"Transforming data from {data['source']}")
# Simulate transformation
return {"transformed": True, "source": data['source']}
def load_to_warehouse(data: dict, **context):
"""Load transformed data to data warehouse."""
print(f"Loading {data['source']} data to warehouse")
# Simulate loading
return {"loaded": True, "source": data['source']}
def validate_load(data: dict, **context):
"""Validate loaded data."""
print(f"Validating load for {data['source']}")
# Simulate validation
return {"validated": True, "source": data['source']}
with DAG(
'taskgroup_advanced_example',
default_args=default_args,
description='Advanced TaskGroup pattern with complex dependencies',
schedule_interval=timedelta(hours=12),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskgroup', 'advanced'],
) as dag:
start = EmptyOperator(task_id='start')
# Define multiple data sources
data_sources = ['database', 'api', 'file']
# Create TaskGroups for each source
with TaskGroup('extraction_group') as extraction_group:
extraction_tasks = {}
for source in data_sources:
extraction_tasks[source] = PythonOperator(
task_id=f'extract_{source}',
python_callable=extract_from_source,
op_kwargs={'source_name': source},
)
# Transform TaskGroup
with TaskGroup('transformation_group') as transformation_group:
transform_tasks = {}
for source in data_sources:
transform_tasks[source] = PythonOperator(
task_id=f'transform_{source}',
python_callable=transform_data,
op_kwargs={
'data': extraction_tasks[source].output
},
)
# Loading TaskGroup
with TaskGroup('loading_group') as loading_group:
load_tasks = {}
for source in data_sources:
load_tasks[source] = PythonOperator(
task_id=f'load_{source}',
python_callable=load_to_warehouse,
op_kwargs={
'data': transform_tasks[source].output
},
)
# Validation TaskGroup
with TaskGroup('validation_group') as validation_group:
validation_tasks = {}
for source in data_sources:
validation_tasks[source] = PythonOperator(
task_id=f'validate_{source}',
python_callable=validate_load,
op_kwargs={
'data': load_tasks[source].output
},
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
end = EmptyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Connect TaskGroups
start >> extraction_group
extraction_group >> transformation_group
transformation_group >> loading_group
loading_group >> validation_group
validation_group >> end
Performance Metrics
DAG Performance Characteristics
| Metric | Description | Best Practice | Warning Threshold |
|---|---|---|---|
| DAG Parse Time | Time to parse DAG file | < 5 seconds | > 10 seconds |
| Task Dependency Resolution | Time to resolve dependencies | < 1 second per task | > 2 seconds |
| Parallel Execution | Concurrent task execution | Maximize based on resources | 0 (no parallelism) |
| Data Transfer | XCom data size | < 48KB | > 48KB |
| Memory Usage | DAG memory footprint | < 100MB per DAG | > 500MB |
| Task Instance Overhead | Per-task overhead | < 100ms for Python tasks | > 500ms |
| Dependency Chain Depth | Maximum nesting level | < 10 levels | > 15 levels |
| Task Count per DAG | Number of tasks | < 100 | > 200 |
Critical Path Analysis
The critical path determines the minimum execution time for a DAG. Understanding this helps optimize workflow duration.
Critical Path Calculation:
For the example above: minutes
Optimization Strategy: Focus optimization efforts on critical path tasks to reduce overall DAG duration.
Best Practices
1. DAG Naming Conventions
Use consistent naming patterns like {team}_{use_case}_{frequency}. Include descriptive tags for filtering and organization.
# Good naming convention
DAG_ID = "marketing_daily_email_campaigns"
# Include descriptive tags
tags = ['marketing', 'daily', 'email', 'production']
# Include owner and team information
default_args = {
'owner': 'marketing-team',
'team': 'customer-engagement',
'priority_weight': 10,
}
2. Task Granularity
Balance between too many small tasks and too few large tasks. Aim for tasks that complete in 5-30 minutes with clear boundaries.
# Too fine-grained (many tiny tasks)
task1 = PythonOperator(task_id='validate_column_a', ...)
task2 = PythonOperator(task_id='validate_column_b', ...)
task3 = PythonOperator(task_id='validate_column_c', ...)
# This creates excessive overhead
# Too coarse-grained (monolithic task)
task1 = PythonOperator(task_id='validate_all_data', ...)
# This loses retry granularity and visibility
# Good granularity (balanced)
task1 = PythonOperator(task_id='validate_schema', ...) # 5 min
task2 = PythonOperator(task_id='validate_business_rules', ...) # 10 min
task3 = PythonOperator(task_id='validate_referential', ...) # 8 min
3. Dependency Management
Use explicit dependencies over implicit ones. Prefer >> operator syntax for readability. Avoid complex dependency patterns that are hard to understand.
# Clear, readable dependencies
task_a >> task_b >> task_c
[task_d, task_e] >> task_f
task_f >> [task_g, task_h]
# Avoid this (hard to read)
task_a.set_downstream(task_b)
task_b.set_downstream(task_c)
task_c.set_downstream(task_d)
4. Error Handling
Implement comprehensive error handling with retries and callbacks. Use TriggerRule to handle partial failures gracefully.
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'on_failure_callback': alert_on_failure,
'on_success_callback': notify_success,
'execution_timeout': timedelta(hours=2),
}
# Merge point with appropriate trigger rule
merge_task = PythonOperator(
task_id='merge_results',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
...
)
5. Testing
Write unit tests for individual tasks and integration tests for DAG structures. Use Airflow's test utilities to validate task dependencies.
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dagbag():
return DagBag(dag_folder="/opt/airflow/dags", include_examples=False)
def test_dags_import(dagbag):
"""Verify all DAGs can be imported."""
assert dagbag.import_errors == {}
def test_dag_dependencies(dagbag):
"""Verify DAG dependencies are valid."""
for dag_id, dag in dagbag.dags.items():
assert dag is not None
for task in dag.tasks:
# Verify all upstream tasks exist
for upstream in task.upstream_list:
assert upstream.task_id in [t.task_id for t in dag.tasks]
6. Documentation
Include docstrings for DAGs and tasks. Use descriptive task IDs and add notes for complex logic. Maintain a data catalog for pipeline documentation.
"""
Daily Email Campaign DAG
This DAG processes email campaigns daily:
1. Extracts recipient lists from CRM
2. Validates email addresses
3. Sends personalized emails via SendGrid
4. Logs delivery status
SLA: Must complete by 6 AM UTC
Owner: marketing-team
On-Call: #marketing-oncall Slack channel
"""
7. Monitoring
Implement task-level monitoring with custom metrics. Set up alerts for task failures and performance degradation. Use Airflow's callback system for notifications.
8. Resource Management
Set appropriate resource requirements for tasks. Use pools to control concurrency for resource-intensive operations.
# Resource-limited task
heavy_computation = PythonOperator(
task_id='ml_training',
python_callable=train_model,
pool='ml_pool', # Limit to 2 concurrent instances
resources={'cpu': 4, 'memory': 8192}, # 4 CPU cores, 8GB RAM
priority_weight=10, # Higher priority in queue
)
9. Version Control
Treat DAGs as code with proper version control. Use branching strategies for development and deployment. Implement CI/CD pipelines for DAG deployment.
10. Performance Optimization
Profile DAG performance regularly. Optimize task dependencies to maximize parallelism. Use caching for expensive operations and external storage for large datasets.
Key Takeaways:
- DAGs must be acyclic; the critical path determines minimum completion time
- Retry backoff follows exponential growth:
- Idempotency ensures retry safety:
- Fan-out/fan-in patterns maximize parallelism; balance task sizes for optimal throughput
- Use
TriggerRule.NONE_FAILED_MIN_ONE_SUCCESSat merge points after branching - TaskGroup provides logical grouping with better UI integration than SubDAG
See Also
- Airflow Architecture β Core architecture, scheduler, executor, and metadata database
- Operators and Hooks β Built-in operators, custom operators, and hooks for extensibility
- XCom Communications β Task communication, XCom backends, and data passing patterns
- Branching Logic β BranchPythonOperator, ShortCircuitOperator, and conditional workflows
- PySpark Submit β Submitting PySpark jobs with Airflow orchestration
- Kafka Connect β Kafka integration patterns for data pipelines
- Data Engineering Orchestration β End-to-end orchestration best practices