Interview Question
βΉοΈInterview Context
Company: Amazon / Meta Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Explain the different scheduling mechanisms in Airflow. How do cron expressions, timedelta, and custom timetables work? When would you use each approach?"
Detailed Theory
Scheduling Fundamentals
# scheduling_fundamentals.py
"""
Airflow Scheduling Mechanisms:
1. Cron Expressions:
- Standard cron syntax
- Good for simple schedules
- Limited flexibility
2. timedelta:
- Python timedelta objects
- Good for fixed intervals
- Easy to understand
3. Custom Timetables:
- Airflow 2.2+ feature
- Maximum flexibility
- Complex scheduling logic
4. Preset Intervals:
- @daily, @hourly, @weekly
- Convenient shortcuts
- Common schedules
"""
1. Cron Expressions
# cron_expressions.py
from airflow.decorators import dag
from datetime import datetime
# Standard cron format: minute hour day_of_month month day_of_week
# * * * * *
# | | | | |
# | | | | +-- Day of week (0-6) (Sunday=0)
# | | | +---- Month (1-12)
# | | +------ Day of month (1-31)
# | +-------- Hour (0-23)
# +---------- Minute (0-59)
# Common cron examples
CRON_EXAMPLES = {
# Every day at 2 AM
'daily_2am': '0 2 * * *',
# Every hour
'hourly': '0 * * * *',
# Every 15 minutes
'every_15_min': '*/15 * * * *',
# Monday at 9 AM
'weekly_monday_9am': '0 9 * * 1',
# First day of month at midnight
'monthly_first': '0 0 1 * *',
# Every weekday at 6 AM
'weekdays_6am': '0 6 * * 1-5',
# Every 6 hours
'every_6_hours': '0 */6 * * *',
# 1st and 15th of month at 8 AM
'twice_monthly': '0 8 1,15 * *',
}
# DAG with cron schedule
@dag(
dag_id='cron_schedule_example',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
)
def cron_dag():
pass
# DAG with preset schedule
@dag(
dag_id='preset_schedule_example',
schedule_interval='@daily', # Same as '0 0 * * *'
start_date=datetime(2024, 1, 1),
catchup=False,
)
def preset_dag():
pass
β οΈCron Limitations
Cron expressions don't handle:
- Timezone conversions (use tz parameter)
- Complex business logic
- Variable intervals
- DST transitions
2. timedelta Scheduling
# timedelta_scheduling.py
from airflow.decorators import dag
from datetime import datetime, timedelta
# timedelta schedule
@dag(
dag_id='timedelta_schedule_example',
schedule_interval=timedelta(hours=6), # Every 6 hours
start_date=datetime(2024, 1, 1),
catchup=False,
)
def timedelta_dag():
pass
# Common timedelta examples
TIMEDELTA_EXAMPLES = {
'every_15_min': timedelta(minutes=15),
'every_hour': timedelta(hours=1),
'every_6_hours': timedelta(hours=6),
'daily': timedelta(days=1),
'weekly': timedelta(weeks=1),
'bi_weekly': timedelta(weeks=2),
'monthly': timedelta(days=30), # Approximate
}
# DAG with timedelta
@dag(
dag_id='timedelta_example',
schedule_interval=timedelta(hours=6),
start_date=datetime(2024, 1, 1),
catchup=False,
)
def timedelta_schedule():
pass
3. Custom Timetables
# custom_timetables.py
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from airflow.timetombs.schedules import cron
from datetime import datetime, timedelta
from typing import Optional
import pytz
class BusinessHourTimetable(Timetable):
"""
Custom timetable for business hours only.
Runs only during business hours (9 AM - 5 PM, Mon-Fri).
"""
def __init__(
self,
cron_str: str = '0 9-17 * * 1-5', # Every hour 9-5, Mon-Fri
timezone: str = 'UTC',
):
self.cron = cron_str
self.timezone = pytz.timezone(timezone)
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DataInterval]:
"""Calculate next DAG run time"""
if last_automated_data_interval is None:
# First run
start = datetime.now(self.timezone)
start = start.replace(minute=0, second=0, microsecond=0)
end = start + timedelta(hours=1)
return DataInterval(start=start, end=end)
# Next run
start = last_automated_data_interval.end
end = start + timedelta(hours=1)
# Skip weekends
if start.weekday() >= 5: # Saturday or Sunday
# Move to Monday 9 AM
days_until_monday = 7 - start.weekday()
start = start + timedelta(days=days_until_monday)
start = start.replace(hour=9, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=1)
# Skip non-business hours
if start.hour < 9:
start = start.replace(hour=9, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=1)
elif start.hour >= 17:
start = start + timedelta(days=1)
start = start.replace(hour=9, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=1)
return DataInterval(start=start, end=end)
# Usage
@dag(
dag_id='business_hours_dag',
timetable=BusinessHourTimetable(timezone='America/New_York'),
start_date=datetime(2024, 1, 1),
catchup=False,
)
def business_hours_dag():
pass
class MonthlyFirstDayTimetable(Timetable):
"""
Custom timetable for first day of month.
"""
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DataInterval]:
"""Calculate next DAG run time"""
if last_automated_data_interval is None:
# First run: first day of current month
now = datetime.now()
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
return DataInterval(start=start, end=end)
# Next run: first day of next month
start = last_automated_data_interval.end
if start.month == 12:
start = start.replace(year=start.year + 1, month=1, day=1)
else:
start = start.replace(month=start.month + 1, day=1)
end = start + timedelta(days=1)
return DataInterval(start=start, end=end)
# Usage
@dag(
dag_id='monthly_first_dag',
timetable=MonthlyFirstDayTimetable(),
start_date=datetime(2024, 1, 1),
catchup=False,
)
def monthly_first_dag():
pass
βΉοΈPro Tip
Custom Timetables are the most flexible scheduling option in Airflow 2.2+. Use them for complex business logic that can't be expressed with cron or timedelta.
4. Backfilling and Catchup
# backfilling.py
from airflow.decorators import dag
from datetime import datetime, timedelta
# DAG with catchup
@dag(
dag_id='catchup_example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=True, # Backfill missing runs
max_active_runs=1, # Only one run at a time
)
def catchup_dag():
pass
# DAG without catchup
@dag(
dag_id='no_catchup_example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False, # Don't backfill
)
def no_catchup_dag():
pass
# Manual backfill command
# airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag
# Backfill configuration
BACKFILL_CONFIG = """
# airflow.cfg settings for backfilling
[core]
# Max active runs per DAG
max_active_runs_per_dag = 1
# Parallelism for backfill
parallelism = 32
# DAG level settings
[dag_level_permissions]
# Enable DAG level permissions
enable_dag_level_permissions = True
"""
5. Timezone Handling
# timezone_handling.py
from airflow.decorators import dag
from datetime import datetime
import pytz
# DAG with timezone
@dag(
dag_id='timezone_example',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1, tzinfo=pytz.timezone('America/New_York')),
catchup=False,
tags=['timezone'],
)
def timezone_dag():
pass
# Timezone utilities
TIMEZONE_EXAMPLES = {
'UTC': 'UTC',
'US_Eastern': 'America/New_York',
'US_Pacific': 'America/Los_Angeles',
'Europe_London': 'Europe/London',
'Asia_Tokyo': 'Asia/Tokyo',
}
# Get current time in timezone
def get_current_time(timezone: str = 'UTC') -> datetime:
"""Get current time in specified timezone"""
tz = pytz.timezone(timezone)
return datetime.now(tz)
# Convert between timezones
def convert_timezone(
dt: datetime,
from_tz: str,
to_tz: str
) -> datetime:
"""Convert datetime between timezones"""
from_timezone = pytz.timezone(from_tz)
to_timezone = pytz.timezone(to_tz)
# Localize to source timezone
dt = from_timezone.localize(dt)
# Convert to target timezone
return dt.astimezone(to_timezone)
Real-World Scenarios
Scenario 1: Amazon's Multi-Region Scheduling
# amazon_multi_region.py
"""
Amazon-style multi-region scheduling:
- Different schedules per region
- Timezone handling
- Business hours only
"""
from airflow.decorators import dag
from datetime import datetime, timedelta
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from typing import Optional
import pytz
class MultiRegionTimetable(Timetable):
"""Custom timetable for multi-region processing"""
def __init__(
self,
regions: dict,
primary_region: str = 'us-east-1',
):
self.regions = regions
self.primary_region = primary_region
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DataInterval]:
"""Calculate next DAG run based on primary region"""
primary_tz = pytz.timezone(
self.regions[self.primary_region]['timezone']
)
if last_automated_data_interval is None:
# First run
now = datetime.now(primary_tz)
start = now.replace(minute=0, second=0, microsecond=0)
end = start + timedelta(hours=1)
return DataInterval(start=start, end=end)
# Next run
start = last_automated_data_interval.end
end = start + timedelta(hours=1)
# Check if within business hours
current_hour = start.hour
if current_hour < 9 or current_hour >= 17:
# Move to next business hour
start = start.replace(hour=9, minute=0, second=0, microsecond=0)
if current_hour >= 17:
start = start + timedelta(days=1)
end = start + timedelta(hours=1)
return DataInterval(start=start, end=end)
# Usage
REGIONS = {
'us-east-1': {'timezone': 'America/New_York', 'business_hours': (9, 17)},
'eu-west-1': {'timezone': 'Europe/London', 'business_hours': (9, 17)},
'ap-southeast-1': {'timezone': 'Asia/Singapore', 'business_hours': (9, 17)},
}
@dag(
dag_id='amazon_multi_region',
timetable=MultiRegionTimetable(
regions=REGIONS,
primary_region='us-east-1'
),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['amazon', 'multi-region', 'production'],
)
def multi_region_dag():
pass
Scenario 2: Meta's Event-Driven Scheduling
# meta_event_driven.py
"""
Meta-style event-driven scheduling:
- Trigger on external events
- Variable intervals
- Dynamic scheduling
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from typing import Optional
import redis
import json
class EventDrivenTimetable(Timetable):
"""Timetable that triggers on external events"""
def __init__(
self,
redis_host: str = 'localhost',
redis_port: int = 6379,
queue_name: str = 'airflow_events',
):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port
)
self.queue_name = queue_name
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DataInterval]:
"""Check for events and schedule accordingly"""
# Check if there are events to process
queue_length = self.redis_client.llen(self.queue_name)
if queue_length == 0:
return None # No events, don't schedule
# Schedule run for now
now = datetime.now()
start = now.replace(second=0, microsecond=0)
end = start + timedelta(minutes=5) # 5-minute window
return DataInterval(start=start, end=end)
# Usage
@dag(
dag_id='meta_event_driven',
timetable=EventDrivenTimetable(
redis_host='redis-cluster',
queue_name='data_events'
),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['meta', 'event-driven', 'production'],
)
def event_driven_dag():
@task
def process_events():
"""Process events from queue"""
import redis
import json
r = redis.Redis(host='redis-cluster')
events = []
# Get events from queue
while True:
event = r.lpop('data_events')
if not event:
break
events.append(json.loads(event))
return {'events_processed': len(events)}
process_events()
event_driven_dag()
Edge Cases
β οΈCommon Pitfalls
-
Timezone Issues: Always specify timezone explicitly. Don't rely on system timezone.
-
DST Transitions: Handle daylight saving time transitions gracefully.
-
Catchup Runs: Be careful with catchup=True on large date ranges.
-
Overlapping Runs: Use max_active_runs to prevent overlapping runs.
# edge_cases.py
from airflow.decorators import dag
from datetime import datetime
import pytz
# Timezone issue
@dag(
dag_id='timezone_issue',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1), # BAD: No timezone!
catchup=False,
)
def timezone_issue_dag():
pass
# Timezone correct
@dag(
dag_id='timezone_correct',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1, tzinfo=pytz.timezone('America/New_York')),
catchup=False,
)
def timezone_correct_dag():
pass
# Overlapping runs issue
@dag(
dag_id='overlapping_issue',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=None, # BAD: No limit!
)
def overlapping_issue_dag():
pass
# Overlapping correct
@dag(
dag_id='overlapping_correct',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1, # GOOD: Limit to 1
)
def overlapping_correct_dag():
pass
QuizBox
Best Practices
# best_practices.py
"""
Scheduling Best Practices:
1. Timezone Handling:
- Always specify timezone explicitly
- Use UTC for internal processing
- Convert to local time for display
2. Catchup Configuration:
- Use catchup=False for new DAGs
- Use catchup=True for historical data
- Set max_active_runs for backfills
3. Schedule Selection:
- Use cron for specific times
- Use timedelta for fixed intervals
- Use custom Timetables for complex logic
4. Performance:
- Avoid very frequent schedules (< 5 min)
- Use appropriate max_active_runs
- Monitor scheduler performance
5. Error Handling:
- Handle missed runs gracefully
- Implement retry logic
- Monitor scheduling delays
"""
βΉοΈAmazon Interview Tip
At Amazon, they use custom Timetables for complex scheduling logic. When discussing scheduling, emphasize the importance of timezone handling and how they prevent overlapping runs. Also mention their approach to backfilling historical data.
Summary
Scheduling is fundamental to Airflow. Key takeaways:
- Cron expressions for specific times
- Timedelta for fixed intervals
- Custom Timetables for complex logic
- Timezone handling is critical
- Catchup and backfilling for historical data
For Amazon and Meta interviews, focus on:
- Timezone handling
- Custom Timetable implementation
- Preventing overlapping runs
- Backfilling strategies
- Performance optimization
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.