Scheduling and Triggers in Apache Airflow
Architecture Diagram
Formal Definitions
DfSchedule Interval
The schedule interval is the time between consecutive DAG runs. It can be defined as a cron expression, a timedelta object, or a predefined shorthand (@daily, @hourly). The scheduler creates a new DagRun when .
DfTrigger
A trigger is an asynchronous event handler that allows operators to defer execution without consuming worker resources. A trigger implements an async run() method yielding TriggerEvent when a condition is met.
DfTimetable
A timetable is Airflow's scheduling abstraction that determines when DAG runs should be created. A timetable maps the last run to the next scheduled execution time, supporting cron, interval, and event-driven patterns.
Detailed Explanation
Cron Expressions
Cron expressions define recurring schedules using five fields.
Cron Format:
| Field | Range | Description |
|---|---|---|
| Minute | 0-59 | When in the hour |
| Hour | 0-23 | When in the day |
| Day of Month | 1-31 | Which day of month |
| Month | 1-12 | Which month |
| Day of Week | 0-6 | 0=Sunday |
Special Characters:
*β Match any value,β List (e.g.,1,3,5)-β Range (e.g.,1-5)/β Steps (e.g.,*/15every 15 min)
Common Pitfall:
0 */2 * * *runs every 2 hours (at minute 0), not every 2 minutes.
Predefined Schedules
| Alias | Cron | Description |
|---|---|---|
@hourly | 0 * * * * | Every hour |
@daily | 0 0 * * * | Daily at midnight |
@weekly | 0 0 * * 0 | Weekly on Sunday |
@monthly | 0 0 1 * * | 1st of month |
@yearly | 0 0 1 1 * | Annually |
Cron Expression Quick Reference
| Pattern | Cron | Description | Common Use Case |
|---|---|---|---|
@hourly | 0 * * * * | Every hour | Real-time sync |
@daily | 0 0 * * * | Once daily at midnight | ETL jobs |
@weekly | 0 0 * * 0 | Weekly on Sunday | Weekly reports |
@monthly | 0 0 1 * * | Monthly on 1st | Monthly aggregations |
@quarterly | 0 0 1 */3 * | Every 3 months | Quarterly reports |
@yearly | 0 0 1 1 * | Annually | Annual data loads |
| Weekdays | 0 9 * * 1-5 | 9 AM Mon-Fri | Business hour jobs |
| Every 15 min | */15 * * * * | Every 15 minutes | High-frequency jobs |
| First Monday | 0 0 1-7 * 1 | 1st Monday of month | Monthly processing |
Advanced Cron Patterns
# Common cron patterns with explanations
patterns = {
# Every 15 minutes during business hours (9 AM - 5 PM)
'business_hours_15min': '*/15 9-17 * * 1-5',
# Every hour, only on weekdays
'hourly_weekdays': '0 * * * 1-5',
# 8 AM and 8 PM daily
'twice_daily': '0 8,20 * * *',
# Every 6 hours, only on first day of month
'6hourly_monthly': '0 */6 1 * *',
# 9 AM on 1st and 15th of each month
'bi_monthly': '0 9 1,15 * *',
# Every 5 minutes between 9 AM and 5 PM on weekdays
'frequent_business': '*/5 9-17 * * 1-5',
}
Here,
- =Schedule interval
- =Next scheduled execution time
- =End of last completed data interval
Catchup Run Count
Here,
- =Current time
- =DAG start_date
- =Schedule interval
ThSchedule Drift Bound
For a scheduler heartbeat interval , the maximum schedule drift is bounded by where is the DAG parsing latency. Reducing improves scheduling accuracy at the cost of increased CPU utilization.
The catchup=False parameter prevents Airflow from creating backfill runs for missed intervals. This is essential for new DAGs to avoid unintended historical execution. Use catchup=True only when explicitly needed for data recovery.
Use timezone-aware datetime objects throughout your DAGs. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling to avoid scheduling inconsistencies.
Timetables
Timetables are Airflow's modern scheduling abstraction (Airflow 2.2+).
| Timetable | Description | Use Case |
|---|---|---|
| CronExpression | Default, uses cron syntax | Backward compatibility |
| DeltaTimetable | Fixed intervals via timedelta | Simple interval scheduling |
| Custom Timetable | User-defined logic | Complex patterns |
| Event-driven | Triggered by external events | Data-driven workflows |
Trigger Mechanism
Triggers enable deferrable execution β operators pause and resume without consuming worker resources.
Trigger Architecture:
- Operator defers β creates Trigger
- Triggerer monitors Trigger asynchronously
- Condition met β Triggerer resumes Operator
Built-in Triggers: FileTrigger, HttpTrigger, DatabaseTrigger, plus custom triggers.
Scheduling Best Practices
| Practice | Why |
|---|---|
| Idempotency | Safe retries and backfills |
| Timezone Awareness | Use timezone-aware datetime objects |
| Catchup=False | Avoid unintended backfill for new DAGs |
| Max Active Runs | Control concurrency, prevent resource contention |
| depends_on_past | Use carefully to avoid bottlenecks |
Key Concepts Table
| Component | Purpose | Example | Use Case |
|---|---|---|---|
| Cron Expression | Time-based scheduling | 0 0 * * * | Daily execution |
| DeltaTimetable | Interval scheduling | timedelta(hours=1) | Hourly processing |
| Trigger | Async waiting | FileTrigger | File availability |
| Sensor | Polling waiting | FileSensor | Simple conditions |
| Dataset | Data-driven scheduling | Dataset('s3://data') | Event-driven |
| Backfill | Historical runs | catchup=True | Data recovery |
| Max Active Runs | Concurrency control | max_active_runs=1 | Sequential execution |
| Timezone | Time handling | timezone('UTC') | Global deployments |
Scheduling Configuration Reference
# Complete DAG scheduling configuration
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import timezone
with DAG(
'comprehensive_scheduling_example',
# Schedule configuration
schedule_interval='@daily', # Or timedelta(hours=1), or cron
# timetable=CustomTimetable(), # Advanced: custom timetable
# Timezone handling
start_date=datetime(2024, 1, 1, tzinfo=timezone.utc), # Timezone-aware
end_date=datetime(2024, 12, 31, tzinfo=timezone.utc), # Optional end
# Backfill control
catchup=False, # Don't backfill missed runs
max_active_runs=1, # Only 1 concurrent run
# DAG-level settings
depends_on_past=False, # Don't depend on previous run
wait_for_downstream=False, # Don't wait for downstream
DAG_RUN_TIMEOUT=timedelta(hours=2), # Timeout for DAG run
# Tags for filtering
tags=['production', 'daily'],
) as dag:
pass
Code Examples
Advanced Cron Patterns
# advanced_cron_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.sensors.external_task import ExternalTaskSensor
import croniter
def business_day_schedule():
"""Generate schedule for business days only."""
# Run at 9 AM on weekdays
return '0 9 * * 1-5'
def month_end_schedule():
"""Generate schedule for last day of month."""
# Run at 11 PM on last day of month
return '0 23 28-31 * *'
def quarterly_schedule():
"""Generate schedule for quarterly processing."""
# Run on first day of each quarter at midnight
return '0 0 1 1,4,7,10 *'
def custom_cron_parser(cron_expression: str):
"""Parse and validate cron expression."""
from croniter import croniter
from datetime import datetime
# Validate cron expression
if not croniter.is_valid(cron_expression):
raise ValueError(f"Invalid cron expression: {cron_expression}")
# Get next execution times
cron = croniter(cron_expression, datetime.now())
next_runs = []
for _ in range(5):
next_runs.append(cron.get_next(datetime))
return next_runs
def advanced_cron_example():
"""Demonstrate advanced cron patterns."""
# Complex cron patterns
patterns = {
'every_15_minutes': '*/15 * * * *',
'every_2_hours_weekdays': '0 */2 * * 1-5',
'first_monday_of_month': '0 0 1-7 * 1',
'last_friday_of_month': '0 0 25-31 * 5',
'every_6_hours_utc': '0 */6 * * *',
}
for name, pattern in patterns.items():
print(f"\n{name}: {pattern}")
next_runs = custom_cron_parser(pattern)
for run_time in next_runs:
print(f" Next run: {run_time}")
with DAG(
'advanced_cron_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Advanced cron scheduling patterns',
schedule_interval=business_day_schedule(),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['cron', 'advanced'],
) as dag:
# Task to demonstrate cron parsing
parse_cron = PythonOperator(
task_id='parse_cron',
python_callable=advanced_cron_example,
)
# Time sensor example
def check_business_hours(**context):
"""Check if current time is within business hours."""
from airflow.utils import timezone
current_time = timezone.utcnow()
# Business hours: 9 AM to 5 PM UTC
if 9 <= current_time.hour < 17:
return True
return False
time_check = PythonOperator(
task_id='business_hours_check',
python_callable=check_business_hours,
)
parse_cron >> time_check
Custom Timetable Implementation
# custom_timetable.py
from datetime import datetime, timedelta
from typing import Optional
from airflow.timetables.base import Timetable, DagRunInfo, TimeRestriction
from airflow.models import DagModel
from airflow import settings
class BusinessDaysTimetable(Timetable):
"""
Custom timetable that only schedules on business days.
Excludes weekends and major holidays.
"""
def __init__(
self,
hour: int = 9,
minute: int = 0,
exclude_holidays: bool = True,
):
super().__init__()
self.hour = hour
self.minute = minute
self.exclude_holidays = exclude_holidays
def _is_business_day(self, date: datetime) -> bool:
"""Check if date is a business day."""
# Check if weekend
if date.weekday() >= 5: # Saturday (5) or Sunday (6)
return False
if self.exclude_holidays:
# Check against holiday list
holidays = self._get_holidays(date.year)
if date.date() in holidays:
return False
return True
def _get_holidays(self, year: int) -> set:
"""Get holidays for a given year."""
# Example US federal holidays
holidays = set()
# New Year's Day
holidays.add(datetime(year, 1, 1).date())
# Independence Day
holidays.add(datetime(year, 7, 4).date())
# Christmas
holidays.add(datetime(year, 12, 25).date())
# Add more holidays as needed
return holidays
def get_next_dagrun_info(
self,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
"""Get next DAG run information."""
if last_automated_data_interval is None:
# First run - start from restriction start_date
start_date = restriction.start_date or datetime.now()
else:
# Next run - start from last run end
start_date = last_automated_data_interval.end
# Find next business day
next_date = start_date
while not self._is_business_day(next_date):
next_date += timedelta(days=1)
# Set to specified time
next_date = next_date.replace(
hour=self.hour,
minute=self.minute,
second=0,
microsecond=0,
)
# Check if we've exceeded restriction
if restriction.end and next_date > restriction.end:
return None
# Create data interval
data_interval = DataInterval(start=next_date, end=next_date)
return DagRunInfo(run_id=f"business_{next_date.isoformat()}", data_interval=data_interval)
def infer_manual_data_interval(self, run_after: datetime) -> DataInterval:
"""Infer data interval for manual runs."""
# Use previous business day as data interval
previous_date = run_after - timedelta(days=1)
while not self._is_business_day(previous_date):
previous_date -= timedelta(days=1)
return DataInterval(start=previous_date, end=previous_date)
# Usage in DAG
with DAG(
'business_days_dag',
timetable=BusinessDaysTimetable(hour=9, minute=0),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['timetable', 'business_days'],
) as dag:
# Tasks here will only run on business days
pass
Trigger Implementation
# trigger_implementation.py
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models import Connection
import asyncio
import aiohttp
class WebhookTrigger(BaseTrigger):
"""
Custom trigger that waits for a webhook call.
This trigger listens for incoming HTTP requests and resumes
the operator when a valid webhook is received.
"""
def __init__(
self,
webhook_path: str,
method: str = 'POST',
headers: Optional[Dict[str, str]] = None,
timeout: int = 300,
):
super().__init__()
self.webhook_path = webhook_path
self.method = method
self.headers = headers or {}
self.timeout = timeout
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serialize the trigger for storage."""
return (
"trigger_implementation.WebhookTrigger",
{
"webhook_path": self.webhook_path,
"method": self.method,
"headers": self.headers,
"timeout": self.timeout,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Run the webhook listener."""
import aiohttp.web
# Create a simple web server to listen for webhooks
app = aiohttp.web.Application()
app.router.add_route(self.method, self.webhook_path, self.handle_webhook)
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
await site.start()
try:
# Wait for webhook with timeout
async with asyncio.timeout(self.timeout):
while True:
await asyncio.sleep(1)
# Check for webhook data
if hasattr(self, '_webhook_data'):
yield TriggerEvent(self._webhook_data)
break
finally:
await runner.cleanup()
async def handle_webhook(self, request: aiohttp.web.Request):
"""Handle incoming webhook request."""
try:
data = await request.json()
self._webhook_data = {
'status': 'success',
'data': data,
'timestamp': datetime.now().isoformat(),
}
return aiohttp.web.json_response({'status': 'received'})
except Exception as e:
self._webhook_data = {
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat(),
}
return aiohttp.web.json_response({'status': 'error'}, status=400)
class PollingTrigger(BaseTrigger):
"""
Custom trigger that polls an endpoint until condition is met.
"""
def __init__(
self,
http_conn_id: str,
endpoint: str,
method: str = 'GET',
headers: Optional[Dict[str, str]] = None,
poll_interval: int = 30,
timeout: int = 3600,
success_criteria: Optional[Dict[str, Any]] = None,
):
super().__init__()
self.http_conn_id = http_conn_id
self.endpoint = endpoint
self.method = method
self.headers = headers or {}
self.poll_interval = poll_interval
self.timeout = timeout
self.success_criteria = success_criteria or {}
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serialize the trigger for storage."""
return (
"trigger_implementation.PollingTrigger",
{
"http_conn_id": self.http_conn_id,
"endpoint": self.endpoint,
"method": self.method,
"headers": self.headers,
"poll_interval": self.poll_interval,
"timeout": self.timeout,
"success_criteria": self.success_criteria,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Run the polling loop."""
conn = Connection.get_connection(self.http_conn_id)
url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
headers = {**self.headers}
if conn.login:
headers['Authorization'] = f"Bearer {conn.get_password()}"
start_time = datetime.now()
timeout_delta = timedelta(seconds=self.timeout)
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.request(
self.method,
url,
headers=headers,
) as response:
data = await response.json()
# Check success criteria
if self._check_success_criteria(data):
yield TriggerEvent({
'status': 'success',
'data': data,
'timestamp': datetime.now().isoformat(),
})
return
except Exception as e:
# Log error but continue polling
print(f"Polling error: {e}")
# Check timeout
if datetime.now() - start_time > timeout_delta:
yield TriggerEvent({
'status': 'timeout',
'error': 'Polling timeout exceeded',
'timestamp': datetime.now().isoformat(),
})
return
# Wait before next poll
await asyncio.sleep(self.poll_interval)
def _check_success_criteria(self, data: Dict[str, Any]) -> bool:
"""Check if response meets success criteria."""
for key, value in self.success_criteria.items():
if key not in data or data[key] != value:
return False
return True
# Usage in DAG
from airflow.operators.python import PythonOperator
from airflow.models import DAG
def process_trigger_result(**context):
"""Process the result from trigger."""
trigger_event = context['ti'].xcom_pull(
task_ids='wait_for_webhook',
key='return_value',
)
print(f"Trigger event received: {trigger_event}")
with DAG(
'trigger_example_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Custom trigger example',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['trigger', 'custom'],
) as dag:
# Webhook trigger example
from airflow.operators.python import PythonOperator
wait_for_webhook = PythonOperator(
task_id='wait_for_webhook',
python_callable=process_trigger_result,
# In practice, you'd use a sensor with the custom trigger
)
# Polling trigger example
wait_for_condition = PythonOperator(
task_id='wait_for_condition',
python_callable=process_trigger_result,
)
wait_for_webhook >> wait_for_condition
Dataset-Driven Scheduling
# dataset_scheduling.py
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.datasets import Dataset
# Define datasets
raw_data_dataset = Dataset("s3://my-bucket/raw-data/{ds}/")
processed_data_dataset = Dataset("s3://my-bucket/processed-data/{ds}/")
report_dataset = Dataset("s3://my-bucket/reports/{ds}/")
def extract_data(**context):
"""Extract data from source."""
# Simulate data extraction
print("Extracting data...")
return {"status": "success", "records": 1000}
def transform_data(**context):
"""Transform extracted data."""
# Simulate data transformation
print("Transforming data...")
return {"status": "success", "transformed_records": 950}
def generate_report(**context):
"""Generate report from transformed data."""
# Simulate report generation
print("Generating report...")
return {"status": "success", "report_url": "s3://reports/report.pdf"}
# DAG 1: Data Extraction (writes to raw_data_dataset)
with DAG(
'data_extraction_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Data extraction DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'extraction'],
# This DAG triggers when raw_data_dataset is updated
schedule=[raw_data_dataset],
) as extraction_dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
outlets=[raw_data_dataset], # This DAG produces this dataset
)
# DAG 2: Data Processing (reads from raw_data_dataset, writes to processed_data_dataset)
with DAG(
'data_processing_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Data processing DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'processing'],
# This DAG runs when raw_data_dataset is updated
schedule=[raw_data_dataset],
) as processing_dag:
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
inlets=[raw_data_dataset], # This DAG reads this dataset
outlets=[processed_data_dataset], # This DAG writes this dataset
)
# DAG 3: Report Generation (reads from processed_data_dataset)
with DAG(
'report_generation_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Report generation DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'reporting'],
# This DAG runs when processed_data_dataset is updated
schedule=[processed_data_dataset],
) as report_dag:
report = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
inlets=[processed_data_dataset], # This DAG reads this dataset
outlets=[report_dataset], # This DAG writes this dataset
)
Performance Metrics
Scheduling Performance
| Metric | Description | Optimization Strategy | Warning Threshold |
|---|---|---|---|
| Cron Parse Time | Time to parse cron expression | Use simple expressions | > 100ms |
| Schedule Accuracy | How closely actual run times match scheduled times | Optimize scheduler heartbeat | > 5 min drift |
| Trigger Latency | Time for trigger to fire | Use async triggers, optimize polling | > 30 seconds |
| DAG Run Creation | Time to create DagRun objects | Optimize database queries | > 1 second |
| Backfill Speed | Time to complete backfill | Parallelize, optimize task execution | < 100 runs/hour |
| Timezone Handling | Accuracy of timezone conversions | Use timezone-aware datetime objects | N/A |
| Schedule Drift | Deviation from intended schedule | Monitor and adjust scheduling parameters | > 1 minute |
| Resource Usage | Scheduler resource consumption | Optimize heartbeat interval, limit active runs | > 80% CPU |
Schedule Drift Analysis
Drift Formula:
| Drift Amount | Impact | Action Required |
|---|---|---|
| < 1 second | Negligible | None |
| 1-30 seconds | Low | Monitor |
| 30 seconds - 5 minutes | Medium | Optimize scheduler |
| > 5 minutes | High | Investigate immediately |
Scheduler Resource Usage
| Scheduler Component | Typical CPU | Typical Memory | Optimization |
|---|---|---|---|
| Heartbeat Loop | 5-10% | 50-100 MB | Reduce frequency |
| DAG Parsing | 10-30% | 200-500 MB | Parallelize |
| Task Queuing | 5-15% | 100-200 MB | Batch operations |
| Database Queries | 10-20% | 50-100 MB | Add indexes |
| Total | 30-75% | 400MB-1GB | Monitor and tune |
Best Practices
-
Cron Expression Validation: Always validate cron expressions before deploying. Use tools like
croniterto test and verify schedules. -
Timezone Handling: Use timezone-aware datetime objects throughout your DAGs. Be explicit about timezone conversions and avoid naive datetime objects.
-
Catchup Configuration: Set
catchup=Falsefor new DAGs to avoid unintended backfill. Usecatchup=Trueonly when you explicitly need historical runs. -
Max Active Runs: Configure
max_active_runsto prevent resource contention. Usemax_active_runs=1for sequential execution. -
Schedule Interval: Choose appropriate schedule intervals based on data requirements and resource constraints. Avoid over-scheduling.
-
Trigger Usage: Use triggers for external dependencies instead of polling. This improves efficiency and reduces resource usage.
-
Dataset Dependencies: Use dataset-driven scheduling for event-driven workflows. This provides more flexibility than time-based scheduling.
-
Monitoring: Monitor schedule accuracy and trigger performance. Set up alerts for missed or delayed runs.
-
Testing: Test scheduling logic thoroughly. Use Airflow's testing utilities to validate schedule behavior.
-
Documentation: Document scheduling decisions and rationale. Include schedule descriptions in DAG documentation.
Key Takeaways:
- Schedule interval determines DAG run frequency
- Catchup run count is
- Schedule drift is bounded by
- Triggers enable async deferral without worker resource consumption
- Timetables provide flexible scheduling beyond cron expressions
- Always use timezone-aware datetime objects to avoid scheduling bugs
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)
See Also
- Airflow Architecture β Scheduler and trigger mechanisms
- DAG Design Patterns β Scheduling-related DAG patterns
- XCom Communications β Task communication for scheduled workflows
- Batch vs Streaming β Batch scheduling considerations