Apache Airflow Architecture
Architecture Diagram
Formal Definitions
DfDirected Acyclic Graph (DAG)
A DAG is a finite directed graph with no directed cycles. In Airflow, a DAG is a collection of tasks with organized dependencies, defining the execution order and schedule of a workflow. Formally, where is the set of tasks and is the set of dependency edges, with no cycle .
DfScheduler
The scheduler is the core orchestrator component responsible for triggering scheduled workflows, monitoring task instances, and managing dependencies. It operates on a heartbeat model with interval , creating DagRuns and queuing task instances for execution.
DfExecutor
An executor determines how task instances are executed. It abstracts the execution mechanism, mapping tasks to compute resources. The parallelism is bounded by where is the number of available executor slots and is the count of tasks ready for execution.
DfMetadata Database
The metadata database (typically PostgreSQL) stores all state information about DAGs, task instances, connections, variables, and other Airflow objects. It serves as the central coordination point ensuring consistency across all distributed components via ACID transactions.
Detailed Explanation
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It follows a modular architecture with several key components working together.
Core Components
| Component | Description |
|---|---|
| Web Server | Flask-based UI for monitoring DAGs, task instances, and logs |
| Scheduler | Heartbeat-based orchestrator that triggers workflows and manages dependencies |
| Metadata Database | Central state store (PostgreSQL recommended for ACID compliance) |
| Executor | Abstraction layer for task execution strategies |
| Workers | Compute nodes that execute tasks (Celery/K8s pods) |
| Triggerer | Handles async sensor and deferrable operator callbacks |
Key Insight: The executor layer lets you switch execution strategies without modifying DAGs.
Here,
- =Time between consecutive DAG runs
- =First possible execution date
- =End of the scheduling interval
Total Task Execution Time
Here,
- =Number of tasks in the DAG
- =Execution time of task i
- =Scheduling and queuing overhead for task i
Maximum Parallelism
Here,
- =Number of available executor slots
- =Total tasks ready for execution
ThDAG Correctness (Acyclicity Invariant)
A valid Airflow DAG must be a directed acyclic graph. If the dependency graph contains a cycle, the scheduler will reject the DAG during parsing. Formally, a path for any subset of tasks .
The scheduler processes DAG files at a configurable interval (min_file_process_interval). Reducing this interval improves responsiveness but increases CPU overhead. For most production deployments, 30 seconds is a reasonable default.
Component Interactions
DAG Processing Flow:
- Scheduler scans
dags/directory atmin_file_process_interval - DAG files are parsed in parallel (
parsing_processes) - Serialized DAGs are stored in metadata database
- Scheduler creates DagRuns based on schedule
- Tasks are queued based on dependencies and resource availability
- Executor dispatches tasks to workers/pods
Critical Role of Metadata Database: Ensures consistency across all distributed components by tracking states, managing concurrency, and storing configuration.
Scalability Considerations
Horizontal Scaling Options:
| Executor | Scaling Method | Use Case |
|---|---|---|
| Celery | Distribute across worker nodes | Medium-large deployments |
| Kubernetes | Dynamic pod provisioning | Cloud-native, bursty workloads |
| Web Server | Stateless, add load balancer | High-traffic UI access |
| Scheduler | Multiple instances with DB coordination | High-availability |
Note: Scheduler is typically single-instance to avoid scheduling conflicts. HA requires careful coordination via metadata DB.
Reliability and Fault Tolerance
Key Mechanisms:
- Heartbeat Detection: Scheduler uses periodic heartbeats to detect and recover from failures
- Task Retries: Configure
retriesandretry_delayto handle transient failures - State Recovery: Metadata DB stores enough info to resume after component failures
- Unique Run IDs: DAG runs tracked with unique identifiers for proper restart/resume
- Centralized Logging: Task logs stored centrally, accessible via web server
- Callback System: Integration with external monitoring tools for proactive alerting
Key Concepts Table
| Component | Purpose | Configuration | Scalability |
|---|---|---|---|
| Web Server | User interface and API | webserver_config.py | Horizontal scaling with load balancer |
| Scheduler | Workflow orchestration | airflow.cfg [scheduler] | Single instance recommended |
| Metadata DB | State management | airflow.cfg [database] | Read replicas for queries |
| Executor | Task execution | airflow.cfg [core] | Depends on executor type |
| Worker | Task processing | Celery/K8s config | Horizontal scaling |
| Triggerer | Async trigger handling | airflow.cfg [triggerer] | Horizontal scaling |
| DAG Processor | File parsing | airflow.cfg [dag_processor_manager] | Parallel parsing |
Deep Dive: Scheduler Internals
The scheduler operates on a heartbeat-based loop:
- Heartbeat β Every 5s trigger
- Scan DAG Files β Check dags/ directory
- Parse DAGs β Parallel processing
- Create DagRuns β Insert to metadata DB
- Queue Tasks β Based on dependencies
- Dispatch to Executor β Send to workers/pods
- Process Callbacks β Notify downstream systems
- Emit Heartbeat β Report scheduler health
Task Instance Lifecycle
Task instances progress through these states:
- No Status β Scheduled β Queued β Running β Success
- From Running, tasks can transition to Failed or Retry
- Skipped state occurs when upstream tasks skip dependent branches
Code Examples
Custom Executor Configuration
# dags/custom_executor_example.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
with DAG(
'architecture_demo_dag',
default_args=default_args,
description='Demonstration of Airflow architecture concepts',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['architecture', 'demo'],
) as dag:
# Task to demonstrate scheduler behavior
def scheduler_info(**context):
"""Display scheduler information and context."""
from airflow.models import DagRun
from airflow.utils.state import State
# Get current DAG run information
dag_run = context['dag_run']
execution_date = context['execution_date']
print(f"Scheduler triggered DAG run: {dag_run.run_id}")
print(f"Execution date: {execution_date}")
print(f"Run type: {dag_run.run_type}")
# Access metadata database through ORM
from airflow.models import TaskInstance
from airflow import settings
session = settings.Session()
task_instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.run_id == dag_run.run_id
).all()
for ti in task_instances:
print(f"Task: {ti.task_id}, State: {ti.state}")
# Task to demonstrate executor behavior
def executor_demo(**context):
"""Demonstrate executor-specific behavior."""
import platform
import os
executor_info = {
'hostname': platform.node(),
'pid': os.getpid(),
'executor': context['ti'].executor or 'default',
'worker_id': os.environ.get('HOSTNAME', 'unknown'),
}
print(f"Executing on: {executor_info['hostname']}")
print(f"Process ID: {executor_info['pid']}")
print(f"Executor: {executor_info['executor']}")
# Simulate work
import time
time.sleep(2)
return executor_info
# Define tasks
task1 = PythonOperator(
task_id='scheduler_info',
python_callable=scheduler_info,
)
task2 = PythonOperator(
task_id='executor_demo',
python_callable=executor_demo,
)
task3 = BashOperator(
task_id='bash_task',
command='echo "Task executed on $(hostname) at $(date)"',
)
# Set task dependencies
task1 >> task2 >> task3
Metadata Database Optimization
# metadata_db_optimization.py
from airflow import settings
from airflow.models import DagRun, TaskInstance
from sqlalchemy import text
def optimize_metadata_db():
"""Optimize metadata database for better performance."""
session = settings.Session()
# Analyze query performance
analysis_queries = [
# Find slow queries
"""
SELECT query, mean_time, calls
FROM pg_stat_statements
WHERE query LIKE '%task_instance%'
ORDER BY mean_time DESC
LIMIT 10;
""",
# Check index usage
"""
SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
""",
# Table statistics
"""
SELECT relname, n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_live_tup DESC;
""",
]
for query in analysis_queries:
result = session.execute(text(query))
print("Query Results:")
for row in result:
print(row)
# Optimize connection pooling
def configure_connection_pool():
"""Configure SQLAlchemy connection pool for Airflow."""
from sqlalchemy.pool import QueuePool
engine = settings.engine
if hasattr(engine.pool, '_configure'):
engine.pool._configure(
pool_size=5, # Number of connections to maintain
max_overflow=10, # Maximum overflow connections
pool_timeout=30, # Timeout for getting connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True # Verify connections before use
)
configure_connection_pool()
# Create additional indexes for better query performance
additional_indexes = [
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_dag_run
ON task_instance(dag_id, run_id);
""",
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_state
ON task_instance(state);
""",
"""
CREATE INDEX IF NOT EXISTS idx_dag_run_state
ON dag_run(state);
""",
]
for index_sql in additional_indexes:
try:
session.execute(text(index_sql))
session.commit()
except Exception as e:
print(f"Index creation failed: {e}")
session.rollback()
if __name__ == "__main__":
optimize_metadata_db()
Web Server Customization
# web_server_customization.py
from airflow.www.app import create_app
from airflow.security import permissions
from flask import Blueprint
# Custom blueprint for additional endpoints
custom_bp = Blueprint(
'custom_bp', # Blueprint name for URL generation
__name__, # Module name for template/static resolution
template_folder='templates',
static_folder='static',
)
@custom_bp.route('/api/v1/custom/metrics')
def get_custom_metrics():
"""Custom API endpoint for metrics.
Returns JSON with key performance indicators:
- total_dag_runs: count of all DAG runs in the system
- running_tasks: currently executing task instances
- failed_tasks: task instances in failed state
- success_rate: percentage of successful runs
"""
from airflow.models import DagRun, TaskInstance
from airflow.utils.state import State
from airflow import settings
from flask import jsonify
session = settings.Session()
# Get overall statistics
total_dags = session.query(DagRun).count()
running_tasks = session.query(TaskInstance).filter(
TaskInstance.state == State.RUNNING
).count()
failed_tasks = session.query(TaskInstance).filter(
TaskInstance.state == State.FAILED
).count()
metrics = {
'total_dag_runs': total_dags,
'running_tasks': running_tasks,
'failed_tasks': failed_tasks,
'success_rate': (total_dags - failed_tasks) / total_dags * 100 if total_dags > 0 else 0,
}
return jsonify(metrics)
def create_custom_app(config=None):
"""Create Airflow web app with customizations.
Args:
config: Optional dict of config values to override airflow.cfg
Returns:
Flask app instance with custom blueprint registered
"""
app = create_app(config)
# Register custom blueprint with URL prefix
app.register_blueprint(custom_bp, url_prefix='/airflow')
# Add custom template filters
@app.template_filter('datetime_format')
def datetime_format(value, format='%Y-%m-%d %H:%M:%S'):
"""Format datetime objects for template display."""
return value.strftime(format) if value else ''
# Add custom context processor
@app.context_processor
def inject_custom_variables():
"""Inject variables available in all templates."""
return {
'custom_title': 'Airflow Dashboard',
'version': '2.8.0',
}
return app
Scheduler Monitoring and Metrics
# scheduler_monitoring.py
from datetime import datetime, timedelta
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import text
def get_scheduler_metrics():
"""Collect comprehensive scheduler performance metrics.
Returns:
dict: Key scheduler metrics including parse time, task throughput,
and database statistics
"""
session = settings.Session()
# Active DAG count
active_dags = session.query(DagModel).filter(
DagModel.is_active == True
).count()
# Recent DAG runs (last hour)
one_hour_ago = datetime.now() - timedelta(hours=1)
recent_runs = session.query(DagRun).filter(
DagRun.execution_date >= one_hour_ago
).count()
# Task instance statistics
task_stats = session.query(
TaskInstance.state,
func.count(TaskInstance.task_id)
).group_by(TaskInstance.state).all()
# Calculate task throughput (tasks per minute)
tasks_running = dict(task_stats).get('running', 0)
tasks_queued = dict(task_stats).get('queued', 0)
return {
'active_dags': active_dags,
'recent_runs': recent_runs,
'tasks_by_state': dict(task_stats),
'tasks_running': tasks_running,
'tasks_queued': tasks_queued,
'timestamp': datetime.now().isoformat(),
}
def monitor_scheduler_health():
"""Monitor scheduler health and detect anomalies.
Checks for:
- Stale DAG runs (stuck in running state)
- Orphaned task instances
- Database connection pool exhaustion
"""
session = settings.Session()
# Detect stale DAG runs (running for > 24 hours)
stale_threshold = datetime.now() - timedelta(hours=24)
stale_runs = session.query(DagRun).filter(
DagRun.state == 'running',
DagRun.last_scheduling_decision < stale_threshold
).count()
# Detect orphaned tasks
orphaned_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'running',
TaskInstance.hostname == ''
).count()
return {
'stale_dag_runs': stale_runs,
'orphaned_tasks': orphaned_tasks,
'health_status': 'healthy' if stale_runs == 0 and orphaned_tasks == 0 else 'degraded',
}
Performance Metrics
System-Level Metrics
| Metric | Value | Optimization Strategy |
|---|---|---|
| Scheduler Heartbeat Interval | 5 seconds | Adjust based on workload |
| DAG Parsing Time | 1-10 seconds per file | Use dag_bag caching |
| Database Query Time | < 100ms | Proper indexing, connection pooling |
| Task Queue Latency | < 5 seconds | Optimized executor configuration |
| Web Server Response Time | < 500ms | Caching, load balancing |
| Metadata DB Connections | 50-100 | Connection pooling, read replicas |
| Worker Memory Usage | 1-4 GB | Resource limits, monitoring |
| Log Storage | 1-10 GB/day | Log rotation, external storage |
Component Startup Times
| Component | Cold Start | Warm Start | Resource Footprint |
|---|---|---|---|
| Scheduler | 5-15s | 2-5s | 512MB-2GB RAM |
| Web Server | 3-10s | 1-3s | 256MB-1GB RAM |
| Triggerer | 2-5s | 1-2s | 128MB-512MB RAM |
| Celery Worker | 10-30s | 5-10s | 512MB-4GB RAM |
| K8s Pod | 30-120s | 15-30s | Configurable |
Scaling Thresholds
| DAG Count | Task Count | Recommended Executor | Min Resources |
|---|---|---|---|
| 1-50 | 1-500 | LocalExecutor | 2 CPU, 4GB RAM |
| 50-200 | 500-5000 | CeleryExecutor | 4 CPU, 8GB RAM |
| 200-1000 | 5000-50000 | CeleryExecutor | 8 CPU, 16GB RAM |
| 1000+ | 50000+ | KubernetesExecutor | Dynamic scaling |
Best Practices
Database Optimization
- PostgreSQL Tuning: Use PostgreSQL with proper indexing. Monitor query performance and add indexes for frequently accessed tables. Consider read replicas for heavy read workloads.
-- Essential indexes for Airflow metadata database
CREATE INDEX IF NOT EXISTS idx_task_instance_dag_id ON task_instance(dag_id);
CREATE INDEX IF NOT EXISTS idx_task_instance_state ON task_instance(state);
CREATE INDEX IF NOT EXISTS idx_task_instance_execution_date ON task_instance(execution_date);
CREATE INDEX IF NOT EXISTS idx_dag_run_dag_id ON dag_run(dag_id);
CREATE INDEX IF NOT EXISTS idx_dag_run_execution_date ON dag_run(execution_date);
-- Partition task_instance by execution_date for large deployments
CREATE TABLE task_instance_partitioned (
LIKE task_instance INCLUDING ALL
) PARTITION BY RANGE (execution_date);
- Connection Pooling: Configure SQLAlchemy connection pool for optimal performance.
# airflow.cfg adjustments
[database]
# Pool size: number of connections to maintain
sql_alchemy_pool_size = 10
# Max overflow: maximum extra connections beyond pool_size
sql_alchemy_max_overflow = 20
# Connection timeout in seconds
sql_alchemy_pool_timeout = 30
# Recycle connections after this many seconds
sql_alchemy_pool_recycle = 1800
# Verify connections before use
sql_alchemy_pool_pre_ping = True
Executor Selection
- Choose based on scale requirements:
| Scale | DAGs | Tasks/Day | Recommended Executor |
|---|---|---|---|
| Development | < 10 | < 100 | SequentialExecutor |
| Small Production | 10-50 | 100-1000 | LocalExecutor |
| Medium Production | 50-500 | 1000-10000 | CeleryExecutor |
| Large Production | 500+ | 10000+ | KubernetesExecutor |
Scheduler Configuration
- Tune scheduling parameters:
[core]
# Maximum number of tasks that can run concurrently
parallelism = 32
# Maximum number of task instances allowed to run concurrently
# across all DAGs
max_active_tasks_per_dag = 16
# Maximum number of DAG runs per DAG
max_active_runs_per_dag = 16
[scheduler]
# How often (in seconds) to scan for new DAG files
min_file_process_interval = 30
# How often (in seconds) to list DAG files
dag_dir_list_interval = 300
# Maximum number of processes to parse DAG files
parsing_processes = 2
# Scheduler heartbeat interval in seconds
scheduler_heartbeat_sec = 5
Connection Management
- Implement connection pooling: Monitor connection usage and adjust pool sizes based on workload patterns.
Monitoring and Alerting
- Set up comprehensive monitoring:
# Example: Custom metrics for Prometheus
from airflow.stats import Stats
def emit_scheduler_metrics():
"""Emit scheduler metrics for monitoring."""
from airflow.models import DagRun, TaskInstance
# Track scheduler lag
Stats.gauge('scheduler_lag_seconds', get_scheduler_lag())
# Track task instance counts by state
for state in ['queued', 'running', 'success', 'failed']:
count = get_task_count_by_state(state)
Stats.gauge(f'task_instances_{state}', count)
# Track DAG count
Stats.gauge('total_dags', get_total_dags())
Security
- Implement role-based access control:
# webserver_config.py
AUTH_ROLE_PUBLIC = 'Viewer' # Default role for unauthenticated users
AUTH_ROLES = {
'Admin': ['admin'],
'Op': ['operator'],
'Viewer': ['viewer'],
}
# Enable OAuth for authentication
AUTH_TYPE = AUTH_OAUTH
OAUTH_PROVIDERS = [{
'name': 'Google',
'icon': 'fa-google',
'token_key': 'access_token',
'remote_app': {
'client_id': 'YOUR_CLIENT_ID',
'client_secret': 'YOUR_CLIENT_SECRET',
'api_base_url': 'https://www.googleapis.com/oauth2/v3/',
'request_token_params': {'scope': 'email profile'},
'access_token_url': 'https://accounts.google.com/o/oauth2/token',
'authorize_url': 'https://accounts.google.com/o/oauth2/auth',
},
}]
Backup Strategy
- Implement regular backups:
#!/bin/bash
# backup_airflow_db.sh
BACKUP_DIR="/backups/airflow"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30
# Full database backup
pg_dump -U airflow -F c -f "$BACKUP_DIR/airflow_$DATE.dump" airflow_db
# Compress backup
gzip "$BACKUP_DIR/airflow_$DATE.dump"
# Remove backups older than retention period
find $BACKUP_DIR -name "*.dump.gz" -mtime +$RETENTION_DAYS -delete
# Verify backup integrity
pg_restore -l "$BACKUP_DIR/airflow_$DATE.dump.gz" > /dev/null
if [ $? -eq 0 ]; then
echo "Backup verified successfully: $BACKUP_DIR/airflow_$DATE.dump.gz"
else
echo "BACKUP VERIFICATION FAILED!" >&2
# Send alert
fi
Resource Planning
- Estimate resource requirements:
def estimate_airflow_resources(num_dags, avg_tasks_per_dag, avg_task_duration_minutes):
"""Estimate resource requirements for Airflow deployment.
Args:
num_dags: Number of DAGs to manage
avg_tasks_per_dag: Average number of tasks per DAG
avg_task_duration_minutes: Average task execution duration in minutes
Returns:
dict: Recommended resource configuration
"""
total_tasks = num_dags * avg_tasks_per_dag
# Scheduler requirements
scheduler_cpu = max(2, num_dags // 100)
scheduler_ram = max(2, num_dags // 50) # GB
# Worker requirements
worker_count = max(2, total_tasks // 100)
worker_cpu_per = 4 # cores per worker
worker_ram_per = 8 # GB per worker
# Database requirements
db_connections = min(100, total_tasks // 10)
db_ram = max(4, total_tasks // 1000) # GB
return {
'scheduler': {
'cpu': scheduler_cpu,
'ram_gb': scheduler_ram,
},
'workers': {
'count': worker_count,
'cpu_per_worker': worker_cpu_per,
'ram_per_worker_gb': worker_ram_per,
},
'database': {
'connections': db_connections,
'ram_gb': db_ram,
},
}
# Example usage
resources = estimate_airflow_resources(
num_dags=200,
avg_tasks_per_dag=25,
avg_task_duration_minutes=10
)
Logging Configuration
- Centralize logs for debugging and compliance:
# airflow.cfg logging configuration
[logging]
# Base log folder
base_log_folder = /opt/airflow/logs
# Remote logging
remote_logging = True
remote_log_conn_id = aws_default
# For S3 logging
[logging]
remote_base_log_folder = s3://airflow-logs/production/
# For GCS logging
[logging]
remote_base_log_folder = gs://airflow-logs/production/
# Log rotation
# Use external log rotation tool (e.g., logrotate) for production
High Availability
- Deploy redundant components:
# docker-compose HA configuration
version: '3.8'
services:
scheduler:
image: apache/airflow:2.8.0
command: scheduler
replicas: 2 # Multiple scheduler instances
deploy:
resources:
limits:
cpus: '2'
memory: 4G
webserver:
image: apache/airflow:2.8.0
command: webserver
replicas: 3 # Multiple web server instances behind load balancer
deploy:
resources:
limits:
cpus: '1'
memory: 2G
postgres:
image: postgres:15
deploy:
placement:
constraints:
- node.labels.db == true
environment:
POSTGRES_PASSWORD_FILE: /run/secrets/db_password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Key Takeaways:
- Airflow follows a modular architecture: Scheduler, Executor, Web Server, Metadata DB, Workers
- The DAG correctness theorem requires acyclic dependency graphs for valid scheduling
- Maximum parallelism is bounded by
- Total execution time includes both task computation and scheduling overhead
- The metadata database is the central coordination point for all distributed components
- Heartbeat-based scheduling ensures fault tolerance and consistency
See Also
- DAG Design Patterns β Composition, dependencies, and design patterns for robust pipelines
- Operators and Hooks β Built-in operators, custom operators, and hooks for extensibility
- XCom Communications β Task communication, XCom backends, and data passing patterns
- Scheduling and Triggers β Cron expressions, triggers, timetables, and scheduling patterns
- Executors Comparison β Sequential, Local, Celery, and Kubernetes executor comparison
- Sensors and Operators β Sensors, poke modes, reschedule mode, and sensor patterns
- Branching Logic β BranchPythonOperator, ShortCircuitOperator, and conditional workflows
- PySpark Submit β Submitting PySpark jobs with Airflow orchestration
- Kafka Connect β Kafka integration patterns for data pipelines
- Data Engineering Orchestration β End-to-end orchestration best practices