Sensors and Operators in Apache Airflow
Architecture Diagram
Formal Definitions
DfSensor
A sensor is a specialized operator that polls an external condition at intervals until the condition returns True or a timeout is exceeded. A sensor is defined as where poke() is the condition check function.
DfPoke Interval
The poke interval is the time (in seconds) between consecutive condition checks by a sensor. The sensor thread is occupied during each poke in poke mode, or released and re-queued in reschedule mode.
DfDeferred Mode
Deferred mode is a sensor execution strategy where the operator creates an async trigger and releases the worker slot entirely. The triggerer service monitors the condition without consuming worker resources. This is the most efficient mode for long waits.
Detailed Explanation
Sensor Fundamentals
Sensors are specialized operators that poll external conditions until met or timeout.
Sensor Lifecycle:
- Sensor starts β initializes poke
- Execute
poke()method β check condition - If
Trueβ Sensor succeeds - If
Falseβ Check timeout - If timeout exceeded β Sensor fails
- If no timeout β Wait
poke_interval, repeat
Key Parameters:
| Parameter | Default | Description |
|---|---|---|
poke_interval | 60s | Time between checks |
timeout | 7 days | Max wait time |
soft_fail | False | Mark as skipped (not failed) |
mode | 'poke' | Execution strategy |
Sensor Modes
| Mode | Worker Slot | Latency | Best For |
|---|---|---|---|
| Poke | Held entire wait | Low | Short waits (< 5 min) |
| Reschedule | Released between pokes | Medium | Medium waits (5-60 min) |
| Deferred | None (uses triggerer) | High | Long waits (> 60 min) |
Resource Usage:
- Poke: Occupies 1 worker slot for entire wait
- Reschedule: Occupies slot only during poke (~100ms)
- Deferred: 0 worker slots, uses triggerer process
Tip: Use
deferredmode for long waits to free worker resources.
Advanced Sensor Patterns
| Pattern | Description |
|---|---|
| Exponential Backoff | Double poke interval each attempt (up to max_wait) |
| Sensor Chaining | Chain sensors for complex conditions |
| Sensor Aggregation | Parallel sensor waits with BranchPythonOperator |
| Custom Sensors | Inherit BaseSensorOperator, implement poke() |
External Task Sensor
Waits for a task in another DAG (or same DAG) to complete.
Configuration:
| Parameter | Description |
|---|---|
external_dag_id | Target DAG identifier |
external_task_id | Target task identifier |
execution_delta | Time offset from execution date |
allowed_states | List of acceptable states |
Here,
- =Number of pokes before condition is met
- =Configured poke interval
- =Execution time of the i-th poke check
Exponential Backoff Interval
Here,
- =Initial poke interval
- =Multiplier (typically 2.0)
- =Poke attempt number (0-indexed)
- =Maximum poke interval cap
Worker Slot Utilization (Poke Mode)
Here,
- =Total sensor wait time
- =Downstream task execution time
ThSensor Timeout Guarantee
If a sensor has timeout and poke interval , the sensor is guaranteed to either succeed or fail within where is the maximum poke execution time. Proof: The sensor checks at most times before exceeding the timeout.
In poke mode, the sensor occupies a worker slot for the entire wait duration. For sensors waiting simultaneously, the effective parallelism is reduced by . Use reschedule or deferred mode to free slots during waits.
For exponential backoff sensors, set max_wait to prevent excessively long intervals. A common pattern is seconds (5 minutes) to balance responsiveness with external system load.
Advanced Sensor Patterns
Exponential Backoff: Sensors can use exponential backoff for poke intervals. The interval starts at poke_interval and doubles with each poke, up to max_wait. This reduces load on external systems while maintaining responsiveness.
Sensor Chaining: Multiple sensors can be chained to wait for complex conditions. Each sensor waits for its specific condition before allowing the next sensor to start.
Sensor Aggregation: Use BranchPythonOperator or custom logic to wait for multiple conditions in parallel. This pattern reduces overall wait time compared to sequential sensor chaining.
Custom Sensors: Create custom sensors for specialized conditions. Inherit from BaseSensorOperator and implement the poke method with your specific logic.
External Task Sensor
The ExternalTaskSensor waits for a task in another DAG or the same DAG to complete. It's useful for cross-DAG dependencies and complex workflow orchestration.
Configuration: Specify the external_dag_id, external_task_id, and optionally execution_date. The sensor checks if the external task has reached the expected state.
States: By default, the sensor waits for the task to succeed. Use allowed_states to wait for specific states like success, failed, or skipped.
Execution Date Matching: The sensor can match execution dates to ensure proper synchronization between DAGs. Use execution_delta or execution_date_fn for flexible date matching.
Key Concepts Table
| Sensor Type | Mode | Best For | Resource Usage | Latency |
|---|---|---|---|---|
| FileSensor | Poke | Short file waits | High | Low |
| S3KeySensor | Reschedule | S3 object waits | Medium | Medium |
| HttpSensor | Deferred | API availability | Low | High |
| SqlSensor | Poke | Database conditions | High | Low |
| ExternalTaskSensor | Poke/Reschedule | Cross-DAG deps | Medium | Medium |
| TimeSensor | Poke | Scheduled waits | High | Low |
Sensor Comparison Matrix
| Sensor | Poke Interval | Timeout | Mode | Use Case |
|---|---|---|---|---|
| FileSensor | 1s-60s | 7 days | Poke | Wait for file arrival |
| S3KeySensor | 5-300s | 7 days | Reschedule | Wait for S3 object |
| GCSObjectExistenceSensor | 5-300s | 7 days | Reschedule | Wait for GCS object |
| HttpSensor | 30-300s | 7 days | Deferred | Wait for API endpoint |
| SqlSensor | 10-60s | 7 days | Poke | Wait for database condition |
| ExternalTaskSensor | 60-300s | 7 days | Reschedule | Wait for other DAG task |
| TimeSensor | 60s | N/A | Poke | Wait for specific time |
| BaseSensorOperator | Custom | Custom | Custom | Custom conditions |
Code Examples
Advanced Sensor Implementation
# advanced_sensors.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import requests
import json
class AdvancedHttpSensor(BaseSensorOperator):
"""
Advanced HTTP sensor with multiple condition support.
This sensor supports multiple conditions, custom headers,
and response validation.
"""
template_fields = ('endpoint', 'headers', 'expected_status')
ui_color = '#4285F4'
@apply_defaults
def __init__(
self,
endpoint: str,
method: str = 'GET',
headers: dict = None,
expected_status: int = 200,
json_path: str = None,
expected_value: any = None,
auth_type: str = 'bearer',
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.endpoint = endpoint
self.method = method
self.headers = headers or {}
self.expected_status = expected_status
self.json_path = json_path
self.expected_value = expected_value
self.auth_type = auth_type
def poke(self, context):
"""Check if the HTTP condition is met."""
try:
# Get connection details
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(self.http_conn_id)
# Build request
url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
headers = {**self.headers}
# Add authentication
if self.auth_type == 'bearer':
headers['Authorization'] = f"Bearer {conn.get_password()}"
elif self.auth_type == 'basic':
import base64
credentials = base64.b64encode(
f"{conn.login}:{conn.get_password()}".encode()
).decode()
headers['Authorization'] = f"Basic {credentials}"
# Make request
response = requests.request(
method=self.method,
url=url,
headers=headers,
timeout=30,
)
# Check status code
if response.status_code != self.expected_status:
self.log.warning(
f"Expected status {self.expected_status}, "
f"got {response.status_code}"
)
return False
# Check JSON path if specified
if self.json_path and self.expected_value is not None:
data = response.json()
actual_value = self._get_json_value(data, self.json_path)
if actual_value != self.expected_value:
self.log.warning(
f"Expected value {self.expected_value} at path "
f"{self.json_path}, got {actual_value}"
)
return False
self.log.info("HTTP condition met successfully")
return True
except Exception as e:
self.log.error(f"HTTP sensor failed: {str(e)}")
return False
def _get_json_value(self, data: dict, path: str):
"""Get value from JSON using dot notation path."""
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
class MultiConditionSensor(BaseSensorOperator):
"""
Sensor that waits for multiple conditions to be met.
"""
@apply_defaults
def __init__(
self,
conditions: list,
operator: str = 'AND',
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.conditions = conditions
self.operator = operator
def poke(self, context):
"""Check if conditions are met."""
results = []
for condition in self.conditions:
condition_type = condition.get('type')
result = False
if condition_type == 'file':
result = self._check_file_condition(condition)
elif condition_type == 'http':
result = self._check_http_condition(condition)
elif condition_type == 'database':
result = self._check_database_condition(condition)
results.append(result)
if self.operator == 'AND':
return all(results)
else: # OR
return any(results)
def _check_file_condition(self, condition: dict) -> bool:
"""Check file-based condition."""
import os
path = condition.get('path')
exists = condition.get('exists', True)
file_exists = os.path.exists(path)
return file_exists == exists
def _check_http_condition(self, condition: dict) -> bool:
"""Check HTTP-based condition."""
url = condition.get('url')
expected_status = condition.get('expected_status', 200)
try:
response = requests.get(url, timeout=10)
return response.status_code == expected_status
except Exception:
return False
def _check_database_condition(self, condition: dict) -> bool:
"""Check database-based condition."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=condition.get('conn_id'))
query = condition.get('query')
result = hook.get_first(query)
return bool(result)
# Usage in DAG
with DAG(
'advanced_sensor_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Advanced sensor examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sensors', 'advanced'],
) as dag:
# Advanced HTTP sensor
api_sensor = AdvancedHttpSensor(
task_id='api_availability_sensor',
endpoint='/api/v1/status',
expected_status=200,
json_path='status',
expected_value='healthy',
mode='reschedule',
poke_interval=30,
timeout=300,
)
# Multi-condition sensor
multi_condition = MultiConditionSensor(
task_id='multi_condition_sensor',
conditions=[
{'type': 'file', 'path': '/data/input.csv', 'exists': True},
{'type': 'http', 'url': 'http://api:8080/health', 'expected_status': 200},
{'type': 'database', 'conn_id': 'postgres', 'query': 'SELECT 1'},
],
operator='AND',
mode='poke',
poke_interval=60,
)
# Process task
process = PythonOperator(
task_id='process_data',
python_callable=lambda: print("Processing data..."),
)
api_sensor >> multi_condition >> process
External Task Sensor Patterns
# external_task_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.state import State
def process_data(**context):
"""Process data after external task completes."""
print("Processing data after external dependency met")
def generate_report(**context):
"""Generate report after processing."""
print("Generating report...")
# DAG 1: Upstream DAG (external dependency)
with DAG(
'upstream_data_pipeline',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Upstream data pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'upstream'],
) as upstream_dag:
def extract_data(**context):
"""Extract data from source."""
print("Extracting data...")
return {"status": "success"}
def transform_data(**context):
"""Transform extracted data."""
print("Transforming data...")
return {"status": "success"}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
extract >> transform
# DAG 2: Downstream DAG (waits for upstream)
with DAG(
'downstream_data_pipeline',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Downstream data pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'downstream'],
) as downstream_dag:
# Wait for upstream DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_data_pipeline',
external_task_id='transform',
allowed_states=[State.SUCCESS],
failed_states=[State.FAILED],
execution_delta=timedelta(hours=1),
mode='reschedule',
poke_interval=300,
timeout=3600,
)
# Wait for specific execution date
wait_for_yesterday = ExternalTaskSensor(
task_id='wait_for_yesterday',
external_dag_id='upstream_data_pipeline',
external_task_id='transform',
execution_date_fn=lambda exec_date: exec_date - timedelta(days=1),
mode='reschedule',
poke_interval=300,
)
# Process after upstream completes
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
# Generate report
report = PythonOperator(
task_id='report',
python_callable=generate_report,
)
[wait_for_upstream, wait_for_yesterday] >> process >> report
# DAG 3: Complex external dependencies
with DAG(
'complex_external_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Complex external dependency example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'complex'],
) as complex_dag:
# Wait for multiple external tasks
wait_task_1 = ExternalTaskSensor(
task_id='wait_task_1',
external_dag_id='upstream_data_pipeline',
external_task_id='extract',
mode='reschedule',
poke_interval=300,
)
wait_task_2 = ExternalTaskSensor(
task_id='wait_task_2',
external_dag_id='another_pipeline',
external_task_id='load',
mode='reschedule',
poke_interval=300,
)
# Time sensor
time_check = TimeSensor(
task_id='time_check',
target_time=datetime.strptime('09:00', '%H:%M').time(),
mode='reschedule',
)
# Process after all conditions met
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
[wait_task_1, wait_task_2, time_check] >> process
Sensor Optimization Patterns
# sensor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import time
import random
class OptimizedFileSensor(BaseSensorOperator):
"""
File sensor with optimization features.
"""
template_fields = ('filepath',)
ui_color = '#FF9800'
@apply_defaults
def __init__(
self,
filepath: str,
recursive: bool = False,
min_file_size: int = 0,
max_file_age_days: int = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.filepath = filepath
self.recursive = recursive
self.min_file_size = min_file_size
self.max_file_age_days = max_file_age_days
def poke(self, context):
"""Check if file meets all criteria."""
import os
from datetime import datetime, timedelta
if not os.path.exists(self.filepath):
return False
# Check file size
file_size = os.path.getsize(self.filepath)
if file_size < self.min_file_size:
self.log.info(
f"File size {file_size} < minimum {self.min_file_size}"
)
return False
# Check file age
if self.max_file_age_days:
file_mtime = os.path.getmtime(self.filepath)
file_age = datetime.now() - datetime.fromtimestamp(file_mtime)
if file_age > timedelta(days=self.max_file_age_days):
self.log.info(f"File is too old: {file_age}")
return False
self.log.info(f"File {self.filepath} meets all criteria")
return True
class ExponentialBackoffSensor(BaseSensorOperator):
"""
Sensor with exponential backoff.
"""
@apply_defaults
def __init__(
self,
initial_interval: int = 10,
max_interval: int = 300,
multiplier: float = 2.0,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.initial_interval = initial_interval
self.max_interval = max_interval
self.multiplier = multiplier
self._current_interval = initial_interval
def poke(self, context):
"""Implement exponential backoff logic."""
# Check condition
condition_met = self._check_condition()
if condition_met:
return True
# Update interval for next poke
self._current_interval = min(
self._current_interval * self.multiplier,
self.max_interval
)
self.log.info(
f"Condition not met. Next poke in {self._current_interval} seconds"
)
# Sleep for current interval
time.sleep(self._current_interval)
return False
def _check_condition(self):
"""Override this method to implement your condition check."""
# Example: Random condition for demonstration
return random.random() < 0.1 # 10% chance of success
with DAG(
'sensor_optimization_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Sensor optimization patterns',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sensors', 'optimization'],
) as dag:
# Optimized file sensor
file_sensor = OptimizedFileSensor(
task_id='optimized_file_sensor',
filepath='/data/input.csv',
min_file_size=1024, # At least 1KB
max_file_age_days=1, # Not older than 1 day
mode='reschedule',
poke_interval=60,
timeout=3600,
)
# Exponential backoff sensor
backoff_sensor = ExponentialBackoffSensor(
task_id='exponential_backoff_sensor',
initial_interval=10,
max_interval=300,
multiplier=2.0,
mode='poke',
timeout=3600,
)
# Process task
process = PythonOperator(
task_id='process',
python_callable=lambda: print("Processing data..."),
)
file_sensor >> backoff_sensor >> process
Performance Metrics
Sensor Performance Characteristics
| Metric | Description | Optimization Strategy | Warning Threshold |
|---|---|---|---|
| Poke Interval | Time between condition checks | Adjust based on expected wait time | > 5 min for time-sensitive |
| Worker Slot Usage | Resources consumed during wait | Use reschedule or deferred mode | > 50% slot utilization |
| Latency | Time from condition met to task start | Minimize poke interval for time-sensitive tasks | > 5 min latency |
| Timeout Rate | Percentage of sensors that timeout | Set appropriate timeouts, monitor external systems | > 10% timeout |
| Resource Efficiency | Worker resources used per wait | Use deferred mode for long waits | < 50% efficiency |
| Error Rate | Percentage of failed sensor checks | Implement proper error handling | > 5% errors |
| False Positive Rate | Incorrect condition detection | Validate sensor logic thoroughly | > 1% false positives |
| Throughput | Number of sensors handled concurrently | Optimize poke intervals, use deferred mode | < 10 concurrent |
Resource Usage by Mode
| Mode | Worker Slot | CPU Usage | Memory | Duration |
|---|---|---|---|---|
| Poke | 1.0 | 1-5% | 10-50 MB | Entire wait |
| Reschedule | 0.1 | < 1% | 5-20 MB | Poke only |
| Deferred | 0.0 | < 0.1% | 1-5 MB | None |
Throughput Analysis
| Scenario | Recommended Mode | Max Concurrent | Throughput |
|---|---|---|---|
| File arrival (< 5 min) | Poke | 50 | 50 sensors |
| S3 object (5-60 min) | Reschedule | 100 | 100 sensors |
| API endpoint (> 60 min) | Deferred | 1000 | 1000 sensors |
| Database condition | Poke | 50 | 50 sensors |
| Cross-DAG dependency | Reschedule | 100 | 100 sensors |
Best Practices
1. Mode Selection
Choose the appropriate mode based on wait time. Use poke for short waits (< 5 min), reschedule for medium waits (5-60 min), and deferred for long waits (> 1 hour).
# Mode selection based on expected wait time
def select_sensor_mode(expected_wait_minutes):
"""Select sensor mode based on expected wait time."""
if expected_wait_minutes < 5:
return 'poke'
elif expected_wait_minutes < 60:
return 'reschedule'
else:
return 'deferred'
# Example usage
sensor = S3KeySensor(
task_id='wait_for_data',
bucket_name='my-bucket',
bucket_key='data/{{ ds }}/data.parquet',
mode=select_sensor_mode(expected_wait_minutes=30),
poke_interval=300, # 5 minutes for reschedule mode
timeout=7200, # 2 hours timeout
)
2. Timeout Configuration
Set reasonable timeouts based on expected wait times. Consider external system reliability and implement graceful failure handling.
3. Poke Interval Optimization
Balance between responsiveness and resource usage. Use exponential backoff for unpredictable wait times.
# Exponential backoff sensor
sensor = HttpSensor(
task_id='wait_for_api',
http_conn_id='api_default',
endpoint='/health',
mode='reschedule',
poke_interval=60, # Start with 1 minute
exponential_backoff=True, # Enable backoff
timeout=3600, # 1 hour timeout
)
# Backoff sequence: 60s, 120s, 240s, 480s, 960s, 1920s (max)
4. Error Handling
Implement proper error handling in sensor logic. Use soft_fail for optional conditions. Log meaningful error messages.
5. Resource Management
Monitor sensor resource usage. Use pools to limit concurrent sensors. Implement sensor prioritization.
# Pool-limited sensors
sensor = FileSensor(
task_id='wait_for_files',
filepath='/data/input.csv',
pool='file_sensors', # Max 10 concurrent sensors
poke_interval=60,
mode='reschedule',
)
6. Testing
Test sensors in isolation with mock external dependencies. Verify timeout behavior and error handling. Test different modes.
7. Monitoring
Track sensor performance metrics. Monitor poke intervals, success rates, and resource usage. Set up alerts for anomalies.
8. Documentation
Document sensor dependencies and expected behavior. Include timeout and retry configurations. Provide troubleshooting guidance.
9. Maintenance
Regularly review and update sensor configurations. Remove obsolete sensors. Update timeout values based on changing external systems.
10. Security
Implement proper authentication for external system access. Use secrets management for credentials. Validate input parameters.
Key Takeaways:
- Sensors poll conditions at interval until success or timeout
- Exponential backoff:
- Timeout guarantee: sensor resolves within
- Poke mode uses worker slots; reschedule mode releases between pokes; deferred mode uses triggerer
- Worker slot utilization in poke mode:
- Use
ExternalTaskSensorfor cross-DAG dependencies with execution date matching
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)
See Also
- Airflow Architecture β Sensor architecture overview
- Operators and Hooks β Operator and hook patterns
- Scheduling and Triggers β Scheduling and trigger-based sensors
- Error Handling and Retries β Sensor retry patterns