Apache Airflow Executors Comparison
Architecture Diagram
Formal Definitions
DfExecutor
An executor is the component that determines how task instances are dispatched and executed. An executor is characterized by its parallelism capacity , scheduling strategy , and fault-tolerance mechanism . Airflow supports Sequential, Local, Celery, and Kubernetes executors.
DfTask Parallelism
Task parallelism is the number of tasks executing simultaneously across all workers. It is bounded by where is the executor slot count, is the number of workers, and is the per-worker concurrency limit.
Detailed Explanation
Executor Selection Criteria
Choosing the right executor depends on several factors.
Decision Factors:
| Factor | Consideration |
|---|---|
| Scale | DAG count, task volume, concurrency needs |
| Infrastructure | Existing systems (K8s, Redis, single server) |
| Cost | Hardware, operations, licensing |
| Team Expertise | Kubernetes, Celery, Python knowledge |
Scale Recommendations:
| Scale | DAGs | Recommended Executor |
|---|---|---|
| Development | < 50 | SequentialExecutor |
| Small Production | 50-200 | LocalExecutor |
| Medium Production | 200-500 | CeleryExecutor |
| Large Production | 500+ | KubernetesExecutor |
Detailed Executor Comparison
| Feature | Sequential | Local | Celery | Kubernetes |
|---|---|---|---|---|
| Parallelism | None (serial) | Process-based | Worker-based | Pod-based |
| Max Tasks | 1 concurrent | CPU cores | Workers Γ Concurrency | Dynamic |
| Fault Tolerance | None | Single node | High (distributed) | Very High (isolated) |
| Resource Isolation | None | Process-level | Worker-level | Container-level |
| Startup Time | ~100ms | ~200ms | ~500ms | ~2-5s |
| Infrastructure | None | None | Redis/RabbitMQ | K8s Cluster |
| Complexity | Very Low | Low | Medium | High |
| Best For | Dev/Test | Small Production | Medium-Large | Cloud-Native |
Executor Deep Dive
| Executor | Use Case | Benefits | Challenges |
|---|---|---|---|
| Sequential | Dev/test, simple workflows | No dependencies, simple | No parallelism, single point of failure |
| Local | Small-medium production | Good performance, no external deps | Limited by single machine resources |
| Celery | Medium-large production | Horizontal scaling, fault tolerance | Requires message broker, worker monitoring |
| Kubernetes | Large-scale, dynamic workloads | Dynamic scaling, resource isolation | Requires K8s expertise, pod startup overhead |
Here,
- =Executor-level parallelism setting
- =Number of active workers
- =Per-worker concurrency (e.g., Celery worker_concurrency)
- =Number of tasks in queueable state
Task Throughput (Steady State)
Here,
- =Average number of concurrently executing tasks
- =Mean task execution time
Executor Overhead Ratio
Here,
- =Time for scheduler to queue the task
- =Worker/container startup time
- =Actual task execution time
ThParallelism Bound
For any executor, the effective parallelism satisfies where is the configured parallelism and is the actual available compute resources. Oversubscription beyond leads to context-switching overhead and degraded throughput.
The KubernetesExecutor has the highest startup overhead (2-5s per pod) but provides complete resource isolation. The CeleryExecutor has moderate overhead (500ms) with shared worker resources. LocalExecutor has minimal overhead (200ms) but is limited to a single machine.
For CPU-intensive tasks, set lower worker_concurrency (4-8) on Celery workers. For I/O-intensive tasks, use higher concurrency (16-32). Match the executor configuration to your workload characteristics for optimal throughput.
SequentialExecutor
Architecture: Runs tasks one at a time in a single process. No external dependencies.
Use Cases: Development, testing, simple workflows, learning Airflow.
Limitations: No parallelism, single point of failure, not suitable for production.
LocalExecutor
Architecture: Runs tasks as separate processes on the same machine using Python's multiprocessing.
Use Cases: Small-medium production, single-server deployments, cost-sensitive environments.
Considerations: Limited by single machine resources, no fault tolerance across machines.
CeleryExecutor
Architecture: Distributes tasks across worker nodes using a message broker (Redis/RabbitMQ).
Use Cases: Medium-large deployments, horizontal scaling needs, existing Celery infrastructure.
Benefits: Horizontal scaling, fault tolerance, resource isolation, queue-based distribution.
Challenges: Broker management, worker monitoring, result backend configuration.
KubernetesExecutor
Architecture: Creates dynamic worker pods for each task via Kubernetes API.
Use Cases: Large-scale deployments, dynamic workloads, cloud-native architectures.
Benefits: Dynamic scaling, resource isolation, cost optimization (pay-per-use).
Challenges: K8s expertise required, pod startup overhead (~2-5s), network complexity.
Key Concepts Table
| Executor | Parallelism | Scalability | Complexity | Fault Tolerance | Use Case |
|---|---|---|---|---|---|
| Sequential | None | None | Very Low | Single point | Development |
| Local | Limited | Single node | Low | Single node | Small production |
| Celery | High | Horizontal | Medium | High | Medium production |
| Kubernetes | Very High | Dynamic | High | Very High | Large scale |
Cost Analysis by Executor
| Cost Factor | Sequential | Local | Celery | Kubernetes |
|---|---|---|---|---|
| Infrastructure | 0 | 500-3000/mo | ||
| Operations | Minimal | Low | Medium | High |
| Licensing | Open source | Open source | Open source | Open source |
| Monitoring | Basic | Basic | Advanced | Advanced |
| Total Annual | 1-3K | 10-50K |
Code Examples
Executor Configuration Examples
# executor_configuration.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.state import State
# Sequential Executor Configuration
# airflow.cfg:
# [core]
# executor = SequentialExecutor
# Local Executor Configuration
# airflow.cfg:
# [core]
# executor = LocalExecutor
# parallelism = 32
# max_active_tasks_per_dag = 16
# Celery Executor Configuration
# airflow.cfg:
# [core]
# executor = CeleryExecutor
# [celery]
# broker_url = redis://redis:6379/0
# result_backend = db+postgresql://airflow:airflow@postgres/airflow
# worker_concurrency = 16
# Kubernetes Executor Configuration
# airflow.cfg:
# [core]
# executor = KubernetesExecutor
# [kubernetes_executor]
# namespace = airflow
# worker_container_repository = apache/airflow
# worker_container_tag = 2.8.0
# delete_worker_pods = True
# delete_worker_pods_on_failure = False
def executor_specific_task(**context):
"""Task that behaves differently based on executor."""
from airflow.configuration import conf
executor = conf.get('core', 'EXECUTOR')
print(f"Running on executor: {executor}")
if executor == 'KubernetesExecutor':
# Kubernetes-specific logic
import os
pod_name = os.environ.get('POD_NAME', 'unknown')
namespace = os.environ.get('NAMESPACE', 'default')
print(f"Running in pod: {pod_name} in namespace: {namespace}")
elif executor == 'CeleryExecutor':
# Celery-specific logic
import celery
print(f"Worker hostname: {celery.current_app.connection().hostname}")
elif executor == 'LocalExecutor':
# Local executor logic
import multiprocessing
print(f"Running in process: {multiprocessing.current_process().name}")
else:
# Sequential executor logic
print("Running in sequential mode")
def resource_monitoring_task(**context):
"""Monitor resource usage based on executor type."""
import psutil
import os
# Get system resource information
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
resource_info = {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk.percent,
'hostname': os.uname().nodename,
'pid': os.getpid(),
}
print(f"Resource usage: {resource_info}")
return resource_info
with DAG(
'executor_example_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Executor configuration examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['executor', 'configuration'],
) as dag:
# Task to demonstrate executor behavior
executor_task = PythonOperator(
task_id='executor_specific_task',
python_callable=executor_specific_task,
)
# Resource monitoring task
resource_monitor = PythonOperator(
task_id='resource_monitor',
python_callable=resource_monitoring_task,
)
# Bash task to demonstrate executor capabilities
bash_task = BashOperator(
task_id='bash_task',
command='echo "Hostname: $(hostname) | PID: $$ | Executor: $AIRFLOW__CORE__EXECUTOR"',
)
# Set dependencies
executor_task >> resource_monitor >> bash_task
Kubernetes Executor Advanced Configuration
# kubernetes_executor_advanced.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.volume import EmptyDirVolume, VolumeMount
from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import V1ResourceRequirements, V1Toleration
def kubernetes_specific_task(**context):
"""Task designed for Kubernetes executor."""
import os
import json
# Get Kubernetes-specific environment variables
pod_info = {
'pod_name': os.environ.get('POD_NAME', 'unknown'),
'namespace': os.environ.get('NAMESPACE', 'default'),
'node_name': os.environ.get('NODE_NAME', 'unknown'),
'pod_ip': os.environ.get('POD_IP', 'unknown'),
'service_account': os.environ.get('SERVICE_ACCOUNT', 'default'),
}
print(f"Kubernetes pod info: {json.dumps(pod_info, indent=2)}")
# Access Kubernetes API if needed
try:
from kubernetes import client, config
config.load_incluster_config()
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace=pod_info['namespace'])
print(f"Total pods in namespace: {len(pods.items)}")
except Exception as e:
print(f"Could not access Kubernetes API: {e}")
return pod_info
# Advanced Kubernetes configuration
kubernetes_config = {
# Resource requirements
'resources': V1ResourceRequirements(
requests={'cpu': '100m', 'memory': '128Mi'},
limits={'cpu': '500m', 'memory': '512Mi'},
),
# Volume mounts
'volumes': [
EmptyDirVolume(name='shared-data'),
],
'volume_mounts': [
VolumeMount(
name='shared-data',
mount_path='/tmp/shared',
sub_path=None,
),
],
# Secrets
'secrets': [
Secret('env', 'API_KEY', 'kubernetes-secrets', 'api-key'),
Secret('volume', 'db-cert', 'kubernetes-secrets', 'db-cert',
mount_path='/etc/db-cert'),
],
# Tolerations
'tolerations': [
V1Toleration(
key='dedicated',
operator='Equal',
value='airflow',
effect='NoSchedule',
),
],
# Node selectors
'node_selector': {
'node-type': 'airflow-worker',
'zone': 'us-west-2a',
},
# Image pull secrets
'image_pull_secrets': ['airflow-registry'],
# Service account
'service_account_name': 'airflow-worker',
# Pod lifecycle hooks
'on_finish_action': 'delete_pod', # or 'keep_pod'
# Retry configuration
'get_logs': True,
'log_events_on_failure': True,
'tolerations': [],
'node_selectors': {},
}
with DAG(
'kubernetes_advanced_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
description='Advanced Kubernetes executor configuration',
schedule_interval=timedelta(hours=2),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['kubernetes', 'advanced'],
) as dag:
# KubernetesPodOperator with advanced configuration
k8s_task = KubernetesPodOperator(
task_id='kubernetes_pod_task',
name='kubernetes-task-pod',
namespace='airflow',
image='apache/airflow:2.8.0',
cmds=['python', '-c'],
arguments=['''
import os
print(f"Running in pod: {os.environ.get('POD_NAME', 'unknown')}")
print(f"Node: {os.environ.get('NODE_NAME', 'unknown')}")
print("Task completed successfully")
'''],
**kubernetes_config,
)
# Python operator for Kubernetes-specific logic
python_k8s_task = PythonOperator(
task_id='python_kubernetes_task',
python_callable=kubernetes_specific_task,
)
# Set dependencies
k8s_task >> python_k8s_task
Celery Executor Optimization
# celery_executor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
def celery_optimized_task(**context):
"""Task optimized for Celery executor."""
import os
import socket
import time
# Get Celery worker information
worker_info = {
'hostname': socket.gethostname(),
'pid': os.getpid(),
'celery_task_id': context.get('task_instance').task_id,
'queue': context.get('task_instance').queue or 'default',
}
print(f"Running on Celery worker: {worker_info}")
# Simulate CPU-intensive work
start_time = time.time()
result = sum(i * i for i in range(1000000))
end_time = time.time()
worker_info['computation_time'] = end_time - start_time
worker_info['result'] = result
return worker_info
def queue_monitoring_task(**context):
"""Monitor Celery queue health."""
from airflow.providers.celery.hooks.celery import CeleryHook
try:
hook = CeleryHook()
inspector = hook.connection.control.inspect()
# Get queue information
queues = inspector.active_queues()
stats = inspector.stats()
monitoring_data = {
'active_queues': len(queues) if queues else 0,
'workers': len(stats) if stats else 0,
'timestamp': datetime.now().isoformat(),
}
print(f"Celery monitoring data: {monitoring_data}")
return monitoring_data
except Exception as e:
print(f"Error monitoring Celery: {e}")
return {'error': str(e)}
# Celery queue configuration
celery_queues = {
'default': {
'description': 'Default queue for general tasks',
'routing_key': 'default',
},
'cpu_intensive': {
'description': 'Queue for CPU-intensive tasks',
'routing_key': 'cpu_intensive',
'worker_concurrency': 4, # Lower concurrency for CPU tasks
},
'io_intensive': {
'description': 'Queue for I/O-intensive tasks',
'routing_key': 'io_intensive',
'worker_concurrency': 32, # Higher concurrency for I/O tasks
},
'priority': {
'description': 'Queue for high-priority tasks',
'routing_key': 'priority',
},
}
with DAG(
'celery_optimization_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Celery executor optimization examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['celery', 'optimization'],
) as dag:
# CPU-intensive task
cpu_task = PythonOperator(
task_id='cpu_intensive_task',
python_callable=celery_optimized_task,
queue='cpu_intensive', # Route to specific queue
pool='cpu_pool', # Use specific pool
priority_weight=10,
)
# I/O-intensive task
io_task = PythonOperator(
task_id='io_intensive_task',
python_callable=celery_optimized_task,
queue='io_intensive',
pool='io_pool',
priority_weight=5,
)
# High-priority task
priority_task = PythonOperator(
task_id='priority_task',
python_callable=celery_optimized_task,
queue='priority',
priority_weight=100,
)
# Queue monitoring task
monitor_task = PythonOperator(
task_id='queue_monitor',
python_callable=queue_monitoring_task,
)
# Set dependencies
[cpu_task, io_task, priority_task] >> monitor_task
Executor Comparison Benchmark
# executor_benchmark.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import time
import json
import psutil
def benchmark_task(task_id: str, iterations: int = 1000, **context):
"""Benchmark task to measure executor performance."""
start_time = time.time()
start_memory = psutil.virtual_memory().used
# Simulate work
result = 0
for i in range(iterations):
result += i * i
end_time = time.time()
end_memory = psutil.virtual_memory().used
benchmark_results = {
'task_id': task_id,
'iterations': iterations,
'execution_time': end_time - start_time,
'memory_used': end_memory - start_memory,
'cpu_percent': psutil.cpu_percent(),
'timestamp': datetime.now().isoformat(),
}
print(f"Benchmark results: {json.dumps(benchmark_results, indent=2)}")
return benchmark_results
def parallel_benchmark(num_tasks: int = 10, **context):
"""Benchmark parallel execution across executors."""
import multiprocessing
def worker(task_num):
"""Worker function for parallel benchmark."""
start_time = time.time()
result = sum(i * i for i in range(100000))
return {
'task_num': task_num,
'execution_time': time.time() - start_time,
'result': result,
}
# Execute tasks in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_tasks) as pool:
results = pool.map(worker, range(num_tasks))
total_time = time.time() - start_time
parallel_results = {
'num_tasks': num_tasks,
'total_time': total_time,
'avg_time_per_task': total_time / num_tasks,
'parallel_efficiency': (sum(r['execution_time'] for r in results) / total_time),
'results': results,
}
print(f"Parallel benchmark results: {json.dumps(parallel_results, indent=2)}")
return parallel_results
with DAG(
'executor_benchmark_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Executor performance benchmark',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['benchmark', 'performance'],
) as dag:
# Single task benchmark
single_benchmark = PythonOperator(
task_id='single_task_benchmark',
python_callable=benchmark_task,
op_kwargs={'task_id': 'single', 'iterations': 10000},
)
# Parallel benchmark
parallel_benchmark_task = PythonOperator(
task_id='parallel_benchmark',
python_callable=parallel_benchmark,
op_kwargs={'num_tasks': 5},
)
# Bash benchmark
bash_benchmark = BashOperator(
task_id='bash_benchmark',
command='time (for i in {1..100}; do echo "Processing $i"; done)',
)
# Set dependencies
single_benchmark >> parallel_benchmark_task >> bash_benchmark
Performance Metrics
Executor Performance Comparison
| Executor | Task Startup Time | Parallelism | Resource Usage | Fault Tolerance | Scalability |
|---|---|---|---|---|---|
| Sequential | ~100ms | None | Minimal | Single point | None |
| Local | ~200ms | Limited | Single node | Single node | Limited |
| Celery | ~500ms | High | Distributed | High | Horizontal |
| Kubernetes | ~2-5s | Very High | Dynamic | Very High | Dynamic |
Throughput Analysis
| Metric | Sequential | Local | Celery | Kubernetes |
|---|---|---|---|---|
| Tasks/Hour | 60 | 600 | 6,000 | 60,000+ |
| Concurrent Tasks | 1 | 10-32 | 100-1000 | 1000+ |
| Task Duration | Unlimited | Unlimited | Depends on worker | Depends on pod |
| Queue Depth | N/A | N/A | Depends on broker | N/A |
Latency Comparison
| Operation | Sequential | Local | Celery | Kubernetes |
|---|---|---|---|---|
| Task Startup | ~100ms | ~200ms | ~500ms | ~2-5s |
| Task Completion | ~100ms | ~200ms | ~500ms | ~1s |
| State Update | ~50ms | ~50ms | ~100ms | ~200ms |
| Log Retrieval | ~10ms | ~10ms | ~50ms | ~200ms |
Best Practices
1. Executor Selection
Choose executor based on scale requirements. Start with LocalExecutor for development, migrate to Celery or Kubernetes for production.
# Configuration based on environment
import os
ENVIRONMENT = os.environ.get('AIRFLOW_ENVIRONMENT', 'development')
if ENVIRONMENT == 'development':
EXECUTOR = 'SequentialExecutor'
PARALLELISM = 1
elif ENVIRONMENT == 'staging':
EXECUTOR = 'LocalExecutor'
PARALLELISM = 16
elif ENVIRONMENT == 'production':
EXECUTOR = 'CeleryExecutor'
PARALLELISM = 64
2. Resource Planning
Estimate resource requirements for each executor type. Monitor resource usage and adjust configurations accordingly.
3. Queue Management
Use queues effectively in Celery to separate different types of workloads. Implement priority queues for critical tasks.
# Task queue configuration
task = PythonOperator(
task_id='heavy_computation',
python_callable=compute_ml_model,
queue='ml_queue', # Route to ML workers
priority_weight=100, # Higher priority
pool='ml_pool', # Limit concurrency
)
# Queue configuration in airflow.cfg
# [celery]
# task_routes = {
# 'ml.*': {'queue': 'ml_queue'},
# 'etl.*': {'queue': 'etl_queue'},
# 'report.*': {'queue': 'report_queue'},
# }
4. Kubernetes Configuration
Use appropriate resource limits for Kubernetes pods. Implement node selectors and tolerations for workload placement.
# Kubernetes executor configuration
kubernetes_executor_config = {
'resources': {
'requests': {'cpu': '100m', 'memory': '128Mi'},
'limits': {'cpu': '1000m', 'memory': '1Gi'},
},
'node_selector': {'workload-type': 'airflow'},
'tolerations': [
{'key': 'airflow', 'operator': 'Equal', 'value': 'true', 'effect': 'NoSchedule'}
],
}
5. Monitoring
Implement comprehensive monitoring for all executors. Track task success rates, execution times, and resource utilization.
6. Scaling
Configure auto-scaling for Celery workers and Kubernetes pods. Set appropriate scaling policies based on workload patterns.
7. Fault Tolerance
Configure retries and failure handling. Implement dead letter queues for failed tasks. Use Kubernetes pod disruption budgets.
8. Cost Optimization
Right-size resources based on actual usage. Use spot instances for non-critical workloads. Implement resource quotas.
9. Security
Implement proper authentication and authorization. Use network policies for Kubernetes. Secure message brokers for Celery.
10. Testing
Test executor configurations in staging environments. Validate failover and scaling scenarios. Performance test before production deployment.
Key Takeaways:
- Maximum parallelism:
- Throughput depends on executor parallelism and task duration
- Overhead ratio varies by executor type
- Sequential: no parallelism; Local: single-node; Celery: horizontal; Kubernetes: dynamic
- Kubernetes provides full isolation but highest startup cost (~2-5s per pod)
- Celery is the most common production choice for medium-to-large deployments
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)
See Also
- Airflow Architecture β Executor architecture overview
- DAG Design Patterns β DAG-level executor considerations
- Kubernetes for Data Engineers β Container orchestration for executors
- Cloud Platforms Overview β Cloud-based executor deployment