πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Scheduling and Triggers in Apache Airflow

🟒 Free Lesson

Advertisement

Scheduling and Triggers in Apache Airflow

Triggerer Architecture: Defer β†’ Trigger β†’ ResumeOperatorDefersTriggerCreated in DBTriggererEvent LoopSchedulerResumes TaskWorkerExecutes TaskNo Worker Slot UsedAsync monitoringEvent-DrivenFile/HTTP/DB sensorsResource EfficientScales independentlyOperator defers β†’ Trigger created β†’ Triggerer monitors β†’ Event β†’ Resume on Worker

Architecture Diagram

Formal Definitions

DfSchedule Interval

The schedule interval Ξ”t\Delta t is the time between consecutive DAG runs. It can be defined as a cron expression, a timedelta object, or a predefined shorthand (@daily, @hourly). The scheduler creates a new DagRun when nowβ‰₯last_run_end+Ξ”t\text{now} \geq \text{last\_run\_end} + \Delta t.

DfTrigger

A trigger is an asynchronous event handler that allows operators to defer execution without consuming worker resources. A trigger T=(run,serialize,event)T = (\text{run}, \text{serialize}, \text{event}) implements an async run() method yielding TriggerEvent when a condition is met.

DfTimetable

A timetable is Airflow's scheduling abstraction that determines when DAG runs should be created. A timetable M=(next_run,infer_interval)M = (\text{next\_run}, \text{infer\_interval}) maps the last run to the next scheduled execution time, supporting cron, interval, and event-driven patterns.

Detailed Explanation

Cron Expressions

Cron expressions define recurring schedules using five fields.


Cron Format:

FieldRangeDescription
Minute0-59When in the hour
Hour0-23When in the day
Day of Month1-31Which day of month
Month1-12Which month
Day of Week0-60=Sunday

Special Characters:

  • * β€” Match any value
  • , β€” List (e.g., 1,3,5)
  • - β€” Range (e.g., 1-5)
  • / β€” Steps (e.g., */15 every 15 min)

Common Pitfall: 0 */2 * * * runs every 2 hours (at minute 0), not every 2 minutes.


Predefined Schedules

AliasCronDescription
@hourly0 * * * *Every hour
@daily0 0 * * *Daily at midnight
@weekly0 0 * * 0Weekly on Sunday
@monthly0 0 1 * *1st of month
@yearly0 0 1 1 *Annually

Cron Expression Quick Reference

PatternCronDescriptionCommon Use Case
@hourly0 * * * *Every hourReal-time sync
@daily0 0 * * *Once daily at midnightETL jobs
@weekly0 0 * * 0Weekly on SundayWeekly reports
@monthly0 0 1 * *Monthly on 1stMonthly aggregations
@quarterly0 0 1 */3 *Every 3 monthsQuarterly reports
@yearly0 0 1 1 *AnnuallyAnnual data loads
Weekdays0 9 * * 1-59 AM Mon-FriBusiness hour jobs
Every 15 min*/15 * * * *Every 15 minutesHigh-frequency jobs
First Monday0 0 1-7 * 11st Monday of monthMonthly processing

Advanced Cron Patterns

# Common cron patterns with explanations
patterns = {
    # Every 15 minutes during business hours (9 AM - 5 PM)
    'business_hours_15min': '*/15 9-17 * * 1-5',

    # Every hour, only on weekdays
    'hourly_weekdays': '0 * * * 1-5',

    # 8 AM and 8 PM daily
    'twice_daily': '0 8,20 * * *',

    # Every 6 hours, only on first day of month
    '6hourly_monthly': '0 */6 1 * *',

    # 9 AM on 1st and 15th of each month
    'bi_monthly': '0 9 1,15 * *',

    # Every 5 minutes between 9 AM and 5 PM on weekdays
    'frequent_business': '*/5 9-17 * * 1-5',
}
Schedule Interval (Delta)
Ξ”t=tnextβˆ’tlast_completed\Delta t = t_{\text{next}} - t_{\text{last\_completed}}

Here,

  • Ξ”t\Delta t=Schedule interval
  • tnextt_{\text{next}}=Next scheduled execution time
  • tlast_completedt_{\text{last\_completed}}=End of last completed data interval

Catchup Run Count

Ncatchup=⌊tnowβˆ’tstartΞ”tβŒ‹N_{\text{catchup}} = \left\lfloor \frac{t_{\text{now}} - t_{\text{start}}}{\Delta t} \right\rfloor

Here,

  • tnowt_{\text{now}}=Current time
  • tstartt_{\text{start}}=DAG start_date
  • Ξ”t\Delta t=Schedule interval

ThSchedule Drift Bound

For a scheduler heartbeat interval Ξ”thb\Delta t_{\text{hb}}, the maximum schedule drift is bounded by Ξ΄max⁑=Ξ”thb+Ο΅parse\delta_{\max} = \Delta t_{\text{hb}} + \epsilon_{\text{parse}} where Ο΅parse\epsilon_{\text{parse}} is the DAG parsing latency. Reducing Ξ”thb\Delta t_{\text{hb}} improves scheduling accuracy at the cost of increased CPU utilization.

The catchup=False parameter prevents Airflow from creating backfill runs for missed intervals. This is essential for new DAGs to avoid unintended historical execution. Use catchup=True only when explicitly needed for data recovery.

Use timezone-aware datetime objects throughout your DAGs. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling to avoid scheduling inconsistencies.

Timetables

Timetables are Airflow's modern scheduling abstraction (Airflow 2.2+).

TimetableDescriptionUse Case
CronExpressionDefault, uses cron syntaxBackward compatibility
DeltaTimetableFixed intervals via timedeltaSimple interval scheduling
Custom TimetableUser-defined logicComplex patterns
Event-drivenTriggered by external eventsData-driven workflows

Trigger Mechanism

Triggers enable deferrable execution β€” operators pause and resume without consuming worker resources.

Trigger Architecture:

  1. Operator defers β†’ creates Trigger
  2. Triggerer monitors Trigger asynchronously
  3. Condition met β†’ Triggerer resumes Operator

Built-in Triggers: FileTrigger, HttpTrigger, DatabaseTrigger, plus custom triggers.


Scheduling Best Practices

PracticeWhy
IdempotencySafe retries and backfills
Timezone AwarenessUse timezone-aware datetime objects
Catchup=FalseAvoid unintended backfill for new DAGs
Max Active RunsControl concurrency, prevent resource contention
depends_on_pastUse carefully to avoid bottlenecks

Key Concepts Table

ComponentPurposeExampleUse Case
Cron ExpressionTime-based scheduling0 0 * * *Daily execution
DeltaTimetableInterval schedulingtimedelta(hours=1)Hourly processing
TriggerAsync waitingFileTriggerFile availability
SensorPolling waitingFileSensorSimple conditions
DatasetData-driven schedulingDataset('s3://data')Event-driven
BackfillHistorical runscatchup=TrueData recovery
Max Active RunsConcurrency controlmax_active_runs=1Sequential execution
TimezoneTime handlingtimezone('UTC')Global deployments

Scheduling Configuration Reference

# Complete DAG scheduling configuration
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import timezone

with DAG(
    'comprehensive_scheduling_example',

    # Schedule configuration
    schedule_interval='@daily',           # Or timedelta(hours=1), or cron
    # timetable=CustomTimetable(),        # Advanced: custom timetable

    # Timezone handling
    start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),  # Timezone-aware
    end_date=datetime(2024, 12, 31, tzinfo=timezone.utc),  # Optional end

    # Backfill control
    catchup=False,                        # Don't backfill missed runs
    max_active_runs=1,                    # Only 1 concurrent run

    # DAG-level settings
    depends_on_past=False,                # Don't depend on previous run
    wait_for_downstream=False,            # Don't wait for downstream
    DAG_RUN_TIMEOUT=timedelta(hours=2),   # Timeout for DAG run

    # Tags for filtering
    tags=['production', 'daily'],
) as dag:
    pass

Code Examples

Advanced Cron Patterns

# advanced_cron_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.sensors.external_task import ExternalTaskSensor
import croniter

def business_day_schedule():
    """Generate schedule for business days only."""
    # Run at 9 AM on weekdays
    return '0 9 * * 1-5'

def month_end_schedule():
    """Generate schedule for last day of month."""
    # Run at 11 PM on last day of month
    return '0 23 28-31 * *'

def quarterly_schedule():
    """Generate schedule for quarterly processing."""
    # Run on first day of each quarter at midnight
    return '0 0 1 1,4,7,10 *'

def custom_cron_parser(cron_expression: str):
    """Parse and validate cron expression."""
    from croniter import croniter
    from datetime import datetime

    # Validate cron expression
    if not croniter.is_valid(cron_expression):
        raise ValueError(f"Invalid cron expression: {cron_expression}")

    # Get next execution times
    cron = croniter(cron_expression, datetime.now())
    next_runs = []
    for _ in range(5):
        next_runs.append(cron.get_next(datetime))

    return next_runs

def advanced_cron_example():
    """Demonstrate advanced cron patterns."""
    # Complex cron patterns
    patterns = {
        'every_15_minutes': '*/15 * * * *',
        'every_2_hours_weekdays': '0 */2 * * 1-5',
        'first_monday_of_month': '0 0 1-7 * 1',
        'last_friday_of_month': '0 0 25-31 * 5',
        'every_6_hours_utc': '0 */6 * * *',
    }

    for name, pattern in patterns.items():
        print(f"\n{name}: {pattern}")
        next_runs = custom_cron_parser(pattern)
        for run_time in next_runs:
            print(f"  Next run: {run_time}")

with DAG(
    'advanced_cron_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced cron scheduling patterns',
    schedule_interval=business_day_schedule(),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['cron', 'advanced'],
) as dag:

    # Task to demonstrate cron parsing
    parse_cron = PythonOperator(
        task_id='parse_cron',
        python_callable=advanced_cron_example,
    )

    # Time sensor example
    def check_business_hours(**context):
        """Check if current time is within business hours."""
        from airflow.utils import timezone
        current_time = timezone.utcnow()

        # Business hours: 9 AM to 5 PM UTC
        if 9 <= current_time.hour < 17:
            return True
        return False

    time_check = PythonOperator(
        task_id='business_hours_check',
        python_callable=check_business_hours,
    )

    parse_cron >> time_check

Custom Timetable Implementation

# custom_timetable.py
from datetime import datetime, timedelta
from typing import Optional
from airflow.timetables.base import Timetable, DagRunInfo, TimeRestriction
from airflow.models import DagModel
from airflow import settings

class BusinessDaysTimetable(Timetable):
    """
    Custom timetable that only schedules on business days.
    Excludes weekends and major holidays.
    """

    def __init__(
        self,
        hour: int = 9,
        minute: int = 0,
        exclude_holidays: bool = True,
    ):
        super().__init__()
        self.hour = hour
        self.minute = minute
        self.exclude_holidays = exclude_holidays

    def _is_business_day(self, date: datetime) -> bool:
        """Check if date is a business day."""
        # Check if weekend
        if date.weekday() >= 5:  # Saturday (5) or Sunday (6)
            return False

        if self.exclude_holidays:
            # Check against holiday list
            holidays = self._get_holidays(date.year)
            if date.date() in holidays:
                return False

        return True

    def _get_holidays(self, year: int) -> set:
        """Get holidays for a given year."""
        # Example US federal holidays
        holidays = set()

        # New Year's Day
        holidays.add(datetime(year, 1, 1).date())

        # Independence Day
        holidays.add(datetime(year, 7, 4).date())

        # Christmas
        holidays.add(datetime(year, 12, 25).date())

        # Add more holidays as needed

        return holidays

    def get_next_dagrun_info(
        self,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        """Get next DAG run information."""
        if last_automated_data_interval is None:
            # First run - start from restriction start_date
            start_date = restriction.start_date or datetime.now()
        else:
            # Next run - start from last run end
            start_date = last_automated_data_interval.end

        # Find next business day
        next_date = start_date
        while not self._is_business_day(next_date):
            next_date += timedelta(days=1)

            # Set to specified time
            next_date = next_date.replace(
                hour=self.hour,
                minute=self.minute,
                second=0,
                microsecond=0,
            )

            # Check if we've exceeded restriction
            if restriction.end and next_date > restriction.end:
                return None

        # Create data interval
        data_interval = DataInterval(start=next_date, end=next_date)

        return DagRunInfo(run_id=f"business_{next_date.isoformat()}", data_interval=data_interval)

    def infer_manual_data_interval(self, run_after: datetime) -> DataInterval:
        """Infer data interval for manual runs."""
        # Use previous business day as data interval
        previous_date = run_after - timedelta(days=1)
        while not self._is_business_day(previous_date):
            previous_date -= timedelta(days=1)

        return DataInterval(start=previous_date, end=previous_date)

# Usage in DAG
with DAG(
    'business_days_dag',
    timetable=BusinessDaysTimetable(hour=9, minute=0),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['timetable', 'business_days'],
) as dag:
    # Tasks here will only run on business days
    pass

Trigger Implementation

# trigger_implementation.py
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models import Connection
import asyncio
import aiohttp

class WebhookTrigger(BaseTrigger):
    """
    Custom trigger that waits for a webhook call.

    This trigger listens for incoming HTTP requests and resumes
    the operator when a valid webhook is received.
    """

    def __init__(
        self,
        webhook_path: str,
        method: str = 'POST',
        headers: Optional[Dict[str, str]] = None,
        timeout: int = 300,
    ):
        super().__init__()
        self.webhook_path = webhook_path
        self.method = method
        self.headers = headers or {}
        self.timeout = timeout

    def serialize(self) -> Tuple[str, Dict[str, Any]]:
        """Serialize the trigger for storage."""
        return (
            "trigger_implementation.WebhookTrigger",
            {
                "webhook_path": self.webhook_path,
                "method": self.method,
                "headers": self.headers,
                "timeout": self.timeout,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Run the webhook listener."""
        import aiohttp.web

        # Create a simple web server to listen for webhooks
        app = aiohttp.web.Application()
        app.router.add_route(self.method, self.webhook_path, self.handle_webhook)

        runner = aiohttp.web.AppRunner(app)
        await runner.setup()
        site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
        await site.start()

        try:
            # Wait for webhook with timeout
            async with asyncio.timeout(self.timeout):
                while True:
                    await asyncio.sleep(1)
                    # Check for webhook data
                    if hasattr(self, '_webhook_data'):
                        yield TriggerEvent(self._webhook_data)
                        break
        finally:
            await runner.cleanup()

    async def handle_webhook(self, request: aiohttp.web.Request):
        """Handle incoming webhook request."""
        try:
            data = await request.json()
            self._webhook_data = {
                'status': 'success',
                'data': data,
                'timestamp': datetime.now().isoformat(),
            }
            return aiohttp.web.json_response({'status': 'received'})
        except Exception as e:
            self._webhook_data = {
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat(),
            }
            return aiohttp.web.json_response({'status': 'error'}, status=400)

class PollingTrigger(BaseTrigger):
    """
    Custom trigger that polls an endpoint until condition is met.
    """

    def __init__(
        self,
        http_conn_id: str,
        endpoint: str,
        method: str = 'GET',
        headers: Optional[Dict[str, str]] = None,
        poll_interval: int = 30,
        timeout: int = 3600,
        success_criteria: Optional[Dict[str, Any]] = None,
    ):
        super().__init__()
        self.http_conn_id = http_conn_id
        self.endpoint = endpoint
        self.method = method
        self.headers = headers or {}
        self.poll_interval = poll_interval
        self.timeout = timeout
        self.success_criteria = success_criteria or {}

    def serialize(self) -> Tuple[str, Dict[str, Any]]:
        """Serialize the trigger for storage."""
        return (
            "trigger_implementation.PollingTrigger",
            {
                "http_conn_id": self.http_conn_id,
                "endpoint": self.endpoint,
                "method": self.method,
                "headers": self.headers,
                "poll_interval": self.poll_interval,
                "timeout": self.timeout,
                "success_criteria": self.success_criteria,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Run the polling loop."""
        conn = Connection.get_connection(self.http_conn_id)
        url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"

        headers = {**self.headers}
        if conn.login:
            headers['Authorization'] = f"Bearer {conn.get_password()}"

        start_time = datetime.now()
        timeout_delta = timedelta(seconds=self.timeout)

        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    async with session.request(
                        self.method,
                        url,
                        headers=headers,
                    ) as response:
                        data = await response.json()

                        # Check success criteria
                        if self._check_success_criteria(data):
                            yield TriggerEvent({
                                'status': 'success',
                                'data': data,
                                'timestamp': datetime.now().isoformat(),
                            })
                            return

                except Exception as e:
                    # Log error but continue polling
                    print(f"Polling error: {e}")

                # Check timeout
                if datetime.now() - start_time > timeout_delta:
                    yield TriggerEvent({
                        'status': 'timeout',
                        'error': 'Polling timeout exceeded',
                        'timestamp': datetime.now().isoformat(),
                    })
                    return

                # Wait before next poll
                await asyncio.sleep(self.poll_interval)

    def _check_success_criteria(self, data: Dict[str, Any]) -> bool:
        """Check if response meets success criteria."""
        for key, value in self.success_criteria.items():
            if key not in data or data[key] != value:
                return False
        return True

# Usage in DAG
from airflow.operators.python import PythonOperator
from airflow.models import DAG

def process_trigger_result(**context):
    """Process the result from trigger."""
    trigger_event = context['ti'].xcom_pull(
        task_ids='wait_for_webhook',
        key='return_value',
    )
    print(f"Trigger event received: {trigger_event}")

with DAG(
    'trigger_example_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Custom trigger example',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['trigger', 'custom'],
) as dag:

    # Webhook trigger example
    from airflow.operators.python import PythonOperator

    wait_for_webhook = PythonOperator(
        task_id='wait_for_webhook',
        python_callable=process_trigger_result,
        # In practice, you'd use a sensor with the custom trigger
    )

    # Polling trigger example
    wait_for_condition = PythonOperator(
        task_id='wait_for_condition',
        python_callable=process_trigger_result,
    )

    wait_for_webhook >> wait_for_condition

Dataset-Driven Scheduling

# dataset_scheduling.py
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.datasets import Dataset

# Define datasets
raw_data_dataset = Dataset("s3://my-bucket/raw-data/{ds}/")
processed_data_dataset = Dataset("s3://my-bucket/processed-data/{ds}/")
report_dataset = Dataset("s3://my-bucket/reports/{ds}/")

def extract_data(**context):
    """Extract data from source."""
    # Simulate data extraction
    print("Extracting data...")
    return {"status": "success", "records": 1000}

def transform_data(**context):
    """Transform extracted data."""
    # Simulate data transformation
    print("Transforming data...")
    return {"status": "success", "transformed_records": 950}

def generate_report(**context):
    """Generate report from transformed data."""
    # Simulate report generation
    print("Generating report...")
    return {"status": "success", "report_url": "s3://reports/report.pdf"}

# DAG 1: Data Extraction (writes to raw_data_dataset)
with DAG(
    'data_extraction_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Data extraction DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'extraction'],
    # This DAG triggers when raw_data_dataset is updated
    schedule=[raw_data_dataset],
) as extraction_dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
        outlets=[raw_data_dataset],  # This DAG produces this dataset
    )

# DAG 2: Data Processing (reads from raw_data_dataset, writes to processed_data_dataset)
with DAG(
    'data_processing_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Data processing DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'processing'],
    # This DAG runs when raw_data_dataset is updated
    schedule=[raw_data_dataset],
) as processing_dag:

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        inlets=[raw_data_dataset],  # This DAG reads this dataset
        outlets=[processed_data_dataset],  # This DAG writes this dataset
    )

# DAG 3: Report Generation (reads from processed_data_dataset)
with DAG(
    'report_generation_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Report generation DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'reporting'],
    # This DAG runs when processed_data_dataset is updated
    schedule=[processed_data_dataset],
) as report_dag:

    report = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        inlets=[processed_data_dataset],  # This DAG reads this dataset
        outlets=[report_dataset],  # This DAG writes this dataset
    )

Performance Metrics

Scheduling Performance

MetricDescriptionOptimization StrategyWarning Threshold
Cron Parse TimeTime to parse cron expressionUse simple expressions> 100ms
Schedule AccuracyHow closely actual run times match scheduled timesOptimize scheduler heartbeat> 5 min drift
Trigger LatencyTime for trigger to fireUse async triggers, optimize polling> 30 seconds
DAG Run CreationTime to create DagRun objectsOptimize database queries> 1 second
Backfill SpeedTime to complete backfillParallelize, optimize task execution< 100 runs/hour
Timezone HandlingAccuracy of timezone conversionsUse timezone-aware datetime objectsN/A
Schedule DriftDeviation from intended scheduleMonitor and adjust scheduling parameters> 1 minute
Resource UsageScheduler resource consumptionOptimize heartbeat interval, limit active runs> 80% CPU

Schedule Drift Analysis

Drift Formula: Ξ΄=tactualβˆ’tscheduled\delta = t_{actual} - t_{scheduled}

Drift AmountImpactAction Required
< 1 secondNegligibleNone
1-30 secondsLowMonitor
30 seconds - 5 minutesMediumOptimize scheduler
> 5 minutesHighInvestigate immediately

Scheduler Resource Usage

Scheduler ComponentTypical CPUTypical MemoryOptimization
Heartbeat Loop5-10%50-100 MBReduce frequency
DAG Parsing10-30%200-500 MBParallelize
Task Queuing5-15%100-200 MBBatch operations
Database Queries10-20%50-100 MBAdd indexes
Total30-75%400MB-1GBMonitor and tune

Best Practices

  1. Cron Expression Validation: Always validate cron expressions before deploying. Use tools like croniter to test and verify schedules.

  2. Timezone Handling: Use timezone-aware datetime objects throughout your DAGs. Be explicit about timezone conversions and avoid naive datetime objects.

  3. Catchup Configuration: Set catchup=False for new DAGs to avoid unintended backfill. Use catchup=True only when you explicitly need historical runs.

  4. Max Active Runs: Configure max_active_runs to prevent resource contention. Use max_active_runs=1 for sequential execution.

  5. Schedule Interval: Choose appropriate schedule intervals based on data requirements and resource constraints. Avoid over-scheduling.

  6. Trigger Usage: Use triggers for external dependencies instead of polling. This improves efficiency and reduces resource usage.

  7. Dataset Dependencies: Use dataset-driven scheduling for event-driven workflows. This provides more flexibility than time-based scheduling.

  8. Monitoring: Monitor schedule accuracy and trigger performance. Set up alerts for missed or delayed runs.

  9. Testing: Test scheduling logic thoroughly. Use Airflow's testing utilities to validate schedule behavior.

  10. Documentation: Document scheduling decisions and rationale. Include schedule descriptions in DAG documentation.

Key Takeaways:

  • Schedule interval Ξ”t=tnextβˆ’tlast_completed\Delta t = t_{\text{next}} - t_{\text{last\_completed}} determines DAG run frequency
  • Catchup run count is ⌊(tnowβˆ’tstart)/Ξ”tβŒ‹\lfloor (t_{\text{now}} - t_{\text{start}}) / \Delta t \rfloor
  • Schedule drift is bounded by Ξ΄max⁑=Ξ”thb+Ο΅parse\delta_{\max} = \Delta t_{\text{hb}} + \epsilon_{\text{parse}}
  • Triggers enable async deferral without worker resource consumption
  • Timetables provide flexible scheduling beyond cron expressions
  • Always use timezone-aware datetime objects to avoid scheduling bugs

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

See Also

⭐

Premium Content

Scheduling and Triggers in Apache Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement