Performance Tuning and Optimization
Architecture Diagram
Formal Definitions
DfScheduler Throughput
Scheduler Throughput is the number of tasks the scheduler can queue per unit time. It is bounded by the parsing rate , database query rate , and executor dispatch rate . The effective throughput is .
DfTask Latency
Task Latency is the time from task queuing to execution start. It includes executor queue time , worker assignment time , and task startup time . The total latency is .
DfDatabase Connection Pool Efficiency
Connection Pool Efficiency measures how effectively database connections are reused. The efficiency is where is connections reused from the pool and is total connection attempts.
Detailed Explanation
Scheduler Optimization
Key Scheduler Settings:
| Parameter | Default | Recommended | Description |
|---|---|---|---|
min_file_process_interval | 30 | 30-60 | Seconds between DAG file scans |
dag_dir_list_interval | 300 | 300-600 | Seconds between directory listings |
parsing_processes | 2 | 2-4 | Parallel DAG parsing processes |
scheduler_heartbeat_sec | 5 | 5 | Scheduler heartbeat interval |
parallelism | 32 | 32-128 | Max concurrent tasks |
max_active_tasks_per_dag | 16 | 16-64 | Max tasks per DAG |
max_active_runs_per_dag | 16 | 16-32 | Max DAG runs per DAG |
[scheduler]
min_file_process_interval = 30
parsing_processes = 2
parallelism = 32
max_active_tasks_per_dag = 16
store_serialized_dags = True
Database Optimization
Essential Indexes:
| Index | Table | Purpose |
|---|---|---|
idx_task_instance_dag_run | task_instance | Speeds up DAG run queries |
idx_task_instance_state | task_instance | Fast state filtering |
idx_dag_run_state | dag_run | Fast DAG run state queries |
Connection Pool Settings:
| Parameter | Recommended | Description |
|---|---|---|
pool_size | 20 | Base connections |
max_overflow | 30 | Extra connections for peaks |
pool_timeout | 30 | Wait time for connection |
pool_recycle | 1800 | Recycle after 30 min |
pool_pre_ping | True | Verify connections |
[database]
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 30
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
Worker Optimization
Celery Worker Settings:
| Parameter | Description | Recommendation |
|---|---|---|
worker_concurrency | Tasks per worker | 4-16 (CPU-bound: 4-8, I/O-bound: 16-32) |
worker_prefetch_multiplier | Tasks prefetched | 1 (fair scheduling) |
worker_max_tasks_per_child | Recycle worker after N tasks | 1000-2000 |
task_acks_late | Ack after execution | True (fault tolerance) |
[celery]
worker_concurrency = 16
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
task_acks_late = True
Tip: For CPU-bound tasks, use lower concurrency (4-8). For I/O-bound, use higher (16-32).
return pool_config
### Worker Optimization
```python
# worker_optimization.py
import psutil
import os
def get_worker_recommendations():
"""Get resource recommendations based on system specs."""
cpu_count = psutil.cpu_count()
memory = psutil.virtual_memory()
# Celery worker configuration
worker_config = {
# Concurrency = CPU cores (for CPU-bound tasks)
# Concurrency = 2 * CPU cores (for I/O-bound tasks)
'concurrency': min(cpu_count, 16),
# Prefetch multiplier - how many tasks to prefetch
'prefetch_multiplier': 1,
# Maximum tasks per child before worker restart
'max_tasks_per_child': 200,
# Worker memory limit
'max_memory_per_child': int(memory.total * 0.8 / cpu_count),
# Task time limit (seconds)
'task_time_limit': 3600,
# Soft time limit (seconds) - raises SoftTimeLimitExceeded
'task_soft_time_limit': 3000,
}
return worker_config
def monitor_worker_health():
"""Monitor worker health metrics."""
import psutil
metrics = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'open_files': len(psutil.Process().open_files()),
'connections': len(psutil.Process().connections()),
}
# Alert thresholds
alerts = []
if metrics['cpu_percent'] > 90:
alerts.append(f"High CPU: {metrics['cpu_percent']}%")
if metrics['memory_percent'] > 85:
alerts.append(f"High Memory: {metrics['memory_percent']}%")
if metrics['disk_usage'] > 90:
alerts.append(f"High Disk: {metrics['disk_usage']}%")
return {
'metrics': metrics,
'alerts': alerts,
'healthy': len(alerts) == 0,
}
Here,
- =Effective scheduler throughput
- =DAG parsing rate (DAGs/second)
- =Database query rate (queries/second)
- =Executor dispatch rate (tasks/second)
Task End-to-End Latency
Here,
- =End-to-end task latency
- =Time in executor queue
- =Worker assignment time
- =Task process startup time
- =Actual task execution time
Database Query Optimization Score
Here,
- =Index hit ratio (higher is better)
- N_{\text{index_hits}}=Queries using indexes
- N_{\text{total_scans}}=Total table scans
For CPU-bound tasks, set worker concurrency equal to CPU cores. For I/O-bound tasks, set concurrency to 2-4x CPU cores. Monitor worker memory usage to prevent OOM kills.
Enable DAG serialization (store_serialized_dags = True) to reduce scheduler memory usage. This stores DAG definitions in the database instead of re-parsing files.
Key Concepts Table
| Optimization Area | Metric | Target | Impact |
|---|---|---|---|
| DAG Parsing | Parse time | < 1s per DAG | High |
| Task Latency | Queue to start | < 5s | High |
| DB Query Time | Average query | < 100ms | High |
| Worker Memory | Per-worker | < 4GB | Medium |
| XCom Size | Per operation | < 48KB | Medium |
| Log Storage | Daily volume | < 10GB/day | Low |
| Scheduler Heartbeat | Interval | 5s | Low |
Code Examples
Performance Monitoring Dashboard
# performance_monitoring.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import text, func
from datetime import datetime, timedelta
def get_performance_metrics():
"""Collect comprehensive performance metrics."""
session = settings.Session()
# Scheduler metrics
scheduler_metrics = {
'active_dags': session.query(DagModel).filter(
DagModel.is_active == True
).count(),
'total_dag_runs': session.query(DagRun).count(),
'recent_runs_1h': session.query(DagRun).filter(
DagRun.execution_date >= datetime.now() - timedelta(hours=1)
).count(),
}
# Task metrics
task_stats = session.query(
TaskInstance.state,
func.count(TaskInstance.task_id)
).group_by(TaskInstance.state).all()
task_metrics = dict(task_stats)
# Performance metrics
avg_task_duration = session.query(
func.avg(TaskInstance.duration)
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).scalar()
# Error rate
total_tasks_24h = session.query(TaskInstance).filter(
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).count()
failed_tasks_24h = session.query(TaskInstance).filter(
TaskInstance.state == 'failed',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).count()
error_rate = failed_tasks_24h / total_tasks_24h if total_tasks_24h > 0 else 0
return {
'scheduler': scheduler_metrics,
'tasks': task_metrics,
'avg_duration_seconds': avg_task_duration,
'error_rate_24h': error_rate,
'timestamp': datetime.now().isoformat(),
}
def identify_slow_dags(top_n=10):
"""Identify slowest DAGs by average task duration."""
session = settings.Session()
slow_dags = session.query(
TaskInstance.dag_id,
func.avg(TaskInstance.duration).label('avg_duration'),
func.count(TaskInstance.task_id).label('task_count')
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
).group_by(
TaskInstance.dag_id
).order_by(
func.avg(TaskInstance.duration).desc()
).limit(top_n).all()
return [
{
'dag_id': row[0],
'avg_duration': row[1],
'task_count': row[2],
}
for row in slow_dags
]
if __name__ == "__main__":
metrics = get_performance_metrics()
slow_dags = identify_slow_dags()
print("Performance Metrics:")
print(f" Active DAGs: {metrics['scheduler']['active_dags']}")
print(f" Avg task duration: {metrics['avg_duration_seconds']:.2f}s")
print(f" Error rate (24h): {metrics['error_rate_24h']:.2%}")
print("\nSlowest DAGs:")
for dag in slow_dags:
print(f" {dag['dag_id']}: {dag['avg_duration']:.2f}s avg ({dag['task_count']} tasks)")
DAG Optimization Patterns
from airflow.decorators import task, dag
from datetime import datetime, timedelta
import asyncio
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['performance', 'optimization'],
)
def optimized_dag():
@task
def batch_processing():
"""Process data in batches for better performance."""
import pandas as pd
# Read data in chunks
chunk_size = 10000
total_processed = 0
for chunk in pd.read_csv('/data/large_file.csv', chunksize=chunk_size):
# Process chunk
processed = chunk.dropna().drop_duplicates()
total_processed += len(processed)
return {'processed': total_processed}
@task
def parallel_processing():
"""Process independent tasks in parallel."""
import concurrent.futures
def process_item(item):
# Simulate processing
return item * 2
items = list(range(1000))
# Use thread pool for I/O-bound tasks
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_item, items))
return {'processed': len(results)}
@task
def cached_computation():
"""Cache expensive computations."""
from airflow.models import Variable
import json
import hashlib
# Check cache
cache_key = "expensive_computation_result"
cached = Variable.get(cache_key, default_var=None)
if cached:
return json.loads(cached)
# Perform expensive computation
result = sum(i ** 2 for i in range(100000))
# Cache result
Variable.set(cache_key, json.dumps({'result': result}))
return {'result': result}
batch_processing() >> parallel_processing() >> cached_computation()
optimized_dag()
Resource-Aware Task Scheduling
from airflow.decorators import task, dag
from datetime import datetime
import psutil
@dag(
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['performance', 'resource-aware'],
)
def resource_aware_dag():
@task
def check_resources():
"""Check available resources before processing."""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
return {
'cpu_available': 100 - cpu_percent,
'memory_available_percent': 100 - memory.percent,
'should_process': cpu_percent < 80 and memory.percent < 85,
}
@task
def adaptive_processing(resources: dict):
"""Adapt processing based on available resources."""
if not resources['should_process']:
return {'status': 'skipped', 'reason': 'insufficient_resources'}
# Adjust batch size based on available memory
batch_size = int(resources['memory_available_percent'] * 100)
return {
'status': 'processing',
'batch_size': batch_size,
}
resources = check_resources()
adaptive_processing(resources)
resource_aware_dag()
Performance Metrics
Optimization Impact
| Optimization | Before | After | Improvement |
|---|---|---|---|
| DAG Serialization | 10s parse | 2s parse | 80% faster |
| DB Indexing | 500ms query | 50ms query | 90% faster |
| Connection Pooling | 100ms connect | 10ms connect | 90% faster |
| Worker Concurrency | 4 tasks | 16 tasks | 4x throughput |
| XCom Backend | 500ms push | 50ms push | 90% faster |
Resource Utilization
| Resource | Recommended | Warning | Critical |
|---|---|---|---|
| CPU | < 70% | 70-85% | > 85% |
| Memory | < 70% | 70-85% | > 85% |
| Disk I/O | < 70% | 70-85% | > 85% |
| Network | < 50% | 50-80% | > 80% |
| DB Connections | < 70% | 70-85% | > 85% |
Key Takeaways:
- Tune scheduler parameters:
min_file_process_interval,parsing_processes,parallelism - Optimize database with proper indexing and connection pooling
- Set worker concurrency based on task type (CPU vs I/O bound)
- Use DAG serialization to reduce scheduler memory usage
- Monitor resource utilization and set alert thresholds
- Batch large operations and cache expensive computations
See Also
- Airflow Architecture β Core architecture components
- Executors Comparison β Choosing the right executor
- Kubernetes Executor β Dynamic scaling with K8s
- Monitoring and Alerting β Performance monitoring