πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Performance Tuning and Optimization in Apache Airflow

🟒 Free Lesson

Advertisement

Performance Tuning and Optimization

Performance Optimization LayersDAG DesignTask granularitySchedulerHeartbeat tuningMetadata DBIndexing, poolingExecutorWorker scalingCachingDAG, Variable cacheScheduler Throughputmin(parse_rate, db_rate, dispatch_rate)DB OptimizationIndexes on dag_id, task_id, execution_dateWorker Scalingconcurrency * workers = max parallelismKey metrics: scheduler_heartbeat_interval, min_file_process_interval

Architecture Diagram

Formal Definitions

DfScheduler Throughput

Scheduler Throughput is the number of tasks the scheduler can queue per unit time. It is bounded by the parsing rate RparseR_{\text{parse}}, database query rate RdbR_{\text{db}}, and executor dispatch rate RdispatchR_{\text{dispatch}}. The effective throughput is Teff=min⁑(Rparse,Rdb,Rdispatch)T_{\text{eff}} = \min(R_{\text{parse}}, R_{\text{db}}, R_{\text{dispatch}}).

DfTask Latency

Task Latency is the time from task queuing to execution start. It includes executor queue time TqueueT_{\text{queue}}, worker assignment time TassignT_{\text{assign}}, and task startup time TstartupT_{\text{startup}}. The total latency is Ltask=Tqueue+Tassign+TstartupL_{\text{task}} = T_{\text{queue}} + T_{\text{assign}} + T_{\text{startup}}.

DfDatabase Connection Pool Efficiency

Connection Pool Efficiency measures how effectively database connections are reused. The efficiency is Ξ·=NreusedNtotalΓ—100%\eta = \frac{N_{\text{reused}}}{N_{\text{total}}} \times 100\% where NreusedN_{\text{reused}} is connections reused from the pool and NtotalN_{\text{total}} is total connection attempts.

Detailed Explanation

Scheduler Optimization


Key Scheduler Settings:

ParameterDefaultRecommendedDescription
min_file_process_interval3030-60Seconds between DAG file scans
dag_dir_list_interval300300-600Seconds between directory listings
parsing_processes22-4Parallel DAG parsing processes
scheduler_heartbeat_sec55Scheduler heartbeat interval
parallelism3232-128Max concurrent tasks
max_active_tasks_per_dag1616-64Max tasks per DAG
max_active_runs_per_dag1616-32Max DAG runs per DAG
[scheduler]
min_file_process_interval = 30
parsing_processes = 2
parallelism = 32
max_active_tasks_per_dag = 16
store_serialized_dags = True

Database Optimization


Essential Indexes:

IndexTablePurpose
idx_task_instance_dag_runtask_instanceSpeeds up DAG run queries
idx_task_instance_statetask_instanceFast state filtering
idx_dag_run_statedag_runFast DAG run state queries

Connection Pool Settings:

ParameterRecommendedDescription
pool_size20Base connections
max_overflow30Extra connections for peaks
pool_timeout30Wait time for connection
pool_recycle1800Recycle after 30 min
pool_pre_pingTrueVerify connections
[database]
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 30
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True

Worker Optimization


Celery Worker Settings:

ParameterDescriptionRecommendation
worker_concurrencyTasks per worker4-16 (CPU-bound: 4-8, I/O-bound: 16-32)
worker_prefetch_multiplierTasks prefetched1 (fair scheduling)
worker_max_tasks_per_childRecycle worker after N tasks1000-2000
task_acks_lateAck after executionTrue (fault tolerance)
[celery]
worker_concurrency = 16
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
task_acks_late = True

Tip: For CPU-bound tasks, use lower concurrency (4-8). For I/O-bound, use higher (16-32).

return pool_config

Architecture Diagram

### Worker Optimization

```python
# worker_optimization.py
import psutil
import os

def get_worker_recommendations():
    """Get resource recommendations based on system specs."""
    cpu_count = psutil.cpu_count()
    memory = psutil.virtual_memory()
    
    # Celery worker configuration
    worker_config = {
        # Concurrency = CPU cores (for CPU-bound tasks)
        # Concurrency = 2 * CPU cores (for I/O-bound tasks)
        'concurrency': min(cpu_count, 16),
        
        # Prefetch multiplier - how many tasks to prefetch
        'prefetch_multiplier': 1,
        
        # Maximum tasks per child before worker restart
        'max_tasks_per_child': 200,
        
        # Worker memory limit
        'max_memory_per_child': int(memory.total * 0.8 / cpu_count),
        
        # Task time limit (seconds)
        'task_time_limit': 3600,
        
        # Soft time limit (seconds) - raises SoftTimeLimitExceeded
        'task_soft_time_limit': 3000,
    }
    
    return worker_config

def monitor_worker_health():
    """Monitor worker health metrics."""
    import psutil
    
    metrics = {
        'cpu_percent': psutil.cpu_percent(interval=1),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_usage': psutil.disk_usage('/').percent,
        'open_files': len(psutil.Process().open_files()),
        'connections': len(psutil.Process().connections()),
    }
    
    # Alert thresholds
    alerts = []
    if metrics['cpu_percent'] > 90:
        alerts.append(f"High CPU: {metrics['cpu_percent']}%")
    if metrics['memory_percent'] > 85:
        alerts.append(f"High Memory: {metrics['memory_percent']}%")
    if metrics['disk_usage'] > 90:
        alerts.append(f"High Disk: {metrics['disk_usage']}%")
    
    return {
        'metrics': metrics,
        'alerts': alerts,
        'healthy': len(alerts) == 0,
    }
Scheduler Throughput
Teff=min⁑(Rparse,Rdb,Rdispatch)T_{\text{eff}} = \min(R_{\text{parse}}, R_{\text{db}}, R_{\text{dispatch}})

Here,

  • TeffT_{\text{eff}}=Effective scheduler throughput
  • RparseR_{\text{parse}}=DAG parsing rate (DAGs/second)
  • RdbR_{\text{db}}=Database query rate (queries/second)
  • RdispatchR_{\text{dispatch}}=Executor dispatch rate (tasks/second)

Task End-to-End Latency

Le2e=Lqueue+Lassign+Lstartup+TexecL_{\text{e2e}} = L_{\text{queue}} + L_{\text{assign}} + L_{\text{startup}} + T_{\text{exec}}

Here,

  • Le2eL_{\text{e2e}}=End-to-end task latency
  • LqueueL_{\text{queue}}=Time in executor queue
  • LassignL_{\text{assign}}=Worker assignment time
  • LstartupL_{\text{startup}}=Task process startup time
  • TexecT_{\text{exec}}=Actual task execution time

Database Query Optimization Score

Sdb=Nindex_hitsNtotal_scansΓ—100%S_{\text{db}} = \frac{N_{\text{index\_hits}}}{N_{\text{total\_scans}}} \times 100\%

Here,

  • SdbS_{\text{db}}=Index hit ratio (higher is better)
  • N_{\text{index_hits}}=Queries using indexes
  • N_{\text{total_scans}}=Total table scans

For CPU-bound tasks, set worker concurrency equal to CPU cores. For I/O-bound tasks, set concurrency to 2-4x CPU cores. Monitor worker memory usage to prevent OOM kills.

Enable DAG serialization (store_serialized_dags = True) to reduce scheduler memory usage. This stores DAG definitions in the database instead of re-parsing files.

Key Concepts Table

Optimization AreaMetricTargetImpact
DAG ParsingParse time< 1s per DAGHigh
Task LatencyQueue to start< 5sHigh
DB Query TimeAverage query< 100msHigh
Worker MemoryPer-worker< 4GBMedium
XCom SizePer operation< 48KBMedium
Log StorageDaily volume< 10GB/dayLow
Scheduler HeartbeatInterval5sLow

Code Examples

Performance Monitoring Dashboard

# performance_monitoring.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import text, func
from datetime import datetime, timedelta

def get_performance_metrics():
    """Collect comprehensive performance metrics."""
    session = settings.Session()
    
    # Scheduler metrics
    scheduler_metrics = {
        'active_dags': session.query(DagModel).filter(
            DagModel.is_active == True
        ).count(),
        'total_dag_runs': session.query(DagRun).count(),
        'recent_runs_1h': session.query(DagRun).filter(
            DagRun.execution_date >= datetime.now() - timedelta(hours=1)
        ).count(),
    }
    
    # Task metrics
    task_stats = session.query(
        TaskInstance.state,
        func.count(TaskInstance.task_id)
    ).group_by(TaskInstance.state).all()
    
    task_metrics = dict(task_stats)
    
    # Performance metrics
    avg_task_duration = session.query(
        func.avg(TaskInstance.duration)
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).scalar()
    
    # Error rate
    total_tasks_24h = session.query(TaskInstance).filter(
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).count()
    
    failed_tasks_24h = session.query(TaskInstance).filter(
        TaskInstance.state == 'failed',
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).count()
    
    error_rate = failed_tasks_24h / total_tasks_24h if total_tasks_24h > 0 else 0
    
    return &#123;
        'scheduler': scheduler_metrics,
        'tasks': task_metrics,
        'avg_duration_seconds': avg_task_duration,
        'error_rate_24h': error_rate,
        'timestamp': datetime.now().isoformat(),
    &#125;

def identify_slow_dags(top_n=10):
    """Identify slowest DAGs by average task duration."""
    session = settings.Session()
    
    slow_dags = session.query(
        TaskInstance.dag_id,
        func.avg(TaskInstance.duration).label('avg_duration'),
        func.count(TaskInstance.task_id).label('task_count')
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
    ).group_by(
        TaskInstance.dag_id
    ).order_by(
        func.avg(TaskInstance.duration).desc()
    ).limit(top_n).all()
    
    return [
        &#123;
            'dag_id': row[0],
            'avg_duration': row[1],
            'task_count': row[2],
        &#125;
        for row in slow_dags
    ]

if __name__ == "__main__":
    metrics = get_performance_metrics()
    slow_dags = identify_slow_dags()
    
    print("Performance Metrics:")
    print(f"  Active DAGs: &#123;metrics['scheduler']['active_dags']&#125;")
    print(f"  Avg task duration: &#123;metrics['avg_duration_seconds']:.2f&#125;s")
    print(f"  Error rate (24h): &#123;metrics['error_rate_24h']:.2%&#125;")
    
    print("\nSlowest DAGs:")
    for dag in slow_dags:
        print(f"  &#123;dag['dag_id']&#125;: &#123;dag['avg_duration']:.2f&#125;s avg (&#123;dag['task_count']&#125; tasks)")

DAG Optimization Patterns

from airflow.decorators import task, dag
from datetime import datetime, timedelta
import asyncio

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['performance', 'optimization'],
)
def optimized_dag():
    
    @task
    def batch_processing():
        """Process data in batches for better performance."""
        import pandas as pd
        
        # Read data in chunks
        chunk_size = 10000
        total_processed = 0
        
        for chunk in pd.read_csv('/data/large_file.csv', chunksize=chunk_size):
            # Process chunk
            processed = chunk.dropna().drop_duplicates()
            total_processed += len(processed)
        
        return &#123;'processed': total_processed&#125;
    
    @task
    def parallel_processing():
        """Process independent tasks in parallel."""
        import concurrent.futures
        
        def process_item(item):
            # Simulate processing
            return item * 2
        
        items = list(range(1000))
        
        # Use thread pool for I/O-bound tasks
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(process_item, items))
        
        return &#123;'processed': len(results)&#125;
    
    @task
    def cached_computation():
        """Cache expensive computations."""
        from airflow.models import Variable
        import json
        import hashlib
        
        # Check cache
        cache_key = "expensive_computation_result"
        cached = Variable.get(cache_key, default_var=None)
        
        if cached:
            return json.loads(cached)
        
        # Perform expensive computation
        result = sum(i ** 2 for i in range(100000))
        
        # Cache result
        Variable.set(cache_key, json.dumps(&#123;'result': result&#125;))
        
        return &#123;'result': result&#125;
    
    batch_processing() >> parallel_processing() >> cached_computation()

optimized_dag()

Resource-Aware Task Scheduling

from airflow.decorators import task, dag
from datetime import datetime
import psutil

@dag(
    schedule_interval="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['performance', 'resource-aware'],
)
def resource_aware_dag():
    
    @task
    def check_resources():
        """Check available resources before processing."""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        
        return &#123;
            'cpu_available': 100 - cpu_percent,
            'memory_available_percent': 100 - memory.percent,
            'should_process': cpu_percent &lt; 80 and memory.percent &lt; 85,
        &#125;
    
    @task
    def adaptive_processing(resources: dict):
        """Adapt processing based on available resources."""
        if not resources['should_process']:
            return &#123;'status': 'skipped', 'reason': 'insufficient_resources'&#125;
        
        # Adjust batch size based on available memory
        batch_size = int(resources['memory_available_percent'] * 100)
        
        return &#123;
            'status': 'processing',
            'batch_size': batch_size,
        &#125;
    
    resources = check_resources()
    adaptive_processing(resources)

resource_aware_dag()

Performance Metrics

Optimization Impact

OptimizationBeforeAfterImprovement
DAG Serialization10s parse2s parse80% faster
DB Indexing500ms query50ms query90% faster
Connection Pooling100ms connect10ms connect90% faster
Worker Concurrency4 tasks16 tasks4x throughput
XCom Backend500ms push50ms push90% faster

Resource Utilization

ResourceRecommendedWarningCritical
CPU< 70%70-85%> 85%
Memory< 70%70-85%> 85%
Disk I/O< 70%70-85%> 85%
Network< 50%50-80%> 80%
DB Connections< 70%70-85%> 85%

Key Takeaways:

  • Tune scheduler parameters: min_file_process_interval, parsing_processes, parallelism
  • Optimize database with proper indexing and connection pooling
  • Set worker concurrency based on task type (CPU vs I/O bound)
  • Use DAG serialization to reduce scheduler memory usage
  • Monitor resource utilization and set alert thresholds
  • Batch large operations and cache expensive computations

See Also

⭐

Premium Content

Performance Tuning and Optimization in Apache Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement