Interview Question
βΉοΈInterview Context
Company: Google Cloud / Microsoft Azure Role: Senior Data Engineer / Cloud Architect Difficulty: Advanced Time: 45-60 minutes
Question: "Explain the difference between poke mode and reschedule mode in Sensors. When would you use each? How do you handle sensor timeouts and failures in production?"
Detailed Theory
Sensor Fundamentals
# sensor_fundamentals.py
"""
Sensors in Airflow:
A Sensor is a special type of Operator that waits for a condition
to be met before completing. It "pokes" the condition at regular
intervals.
Key Parameters:
- poke_interval: Time between checks (seconds)
- timeout: Maximum wait time (seconds)
- mode: 'poke' or 'reschedule'
- soft_fail: If True, marks task as skipped instead of failed
Sensor Modes:
1. POKE (default): Keeps worker slot while waiting
2. RESCHEDULE: Releases worker slot between pokes
"""
1. Poke Mode vs Reschedule Mode
# poke_vs_reschedule.py
from airflow.sensors.http_sensor import HttpSensor
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
from airflow.decorators import dag
@dag(dag_id='sensor_modes_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_modes():
"""
POKE MODE (default):
- Keeps worker slot occupied during wait
- Suitable for short waits (< 5 minutes)
- Uses fewer resources for short waits
RESCHEDULE MODE:
- Releases worker slot between pokes
- Suitable for long waits (> 5 minutes)
- Uses fewer resources for long waits
- Task state is saved between pokes
"""
# Poke mode (default)
poke_sensor = HttpSensor(
task_id='poke_sensor',
http_conn_id='api_default',
endpoint='/api/health',
poke_interval=30, # Check every 30 seconds
timeout=300, # Timeout after 5 minutes
mode='poke', # Default mode
)
# Reschedule mode
reschedule_sensor = HttpSensor(
task_id='reschedule_sensor',
http_conn_id='api_default',
endpoint='/api/health',
poke_interval=60, # Check every minute
timeout=3600, # Timeout after 1 hour
mode='reschedule', # Release slot between pokes
)
# S3 sensor with reschedule
s3_sensor = S3KeySensor(
task_id='s3_sensor',
bucket_name='my-bucket',
key='data/{{ ds }}/file.csv',
poke_interval=300, # Check every 5 minutes
timeout=7200, # Timeout after 2 hours
mode='reschedule', # Long wait, use reschedule
soft_fail=True, # Skip if timeout instead of fail
)
# ExternalTaskSensor
external_sensor = ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='upstream_dag',
external_task_id='final_task',
poke_interval=60,
timeout=3600,
mode='reschedule',
execution_delta=timedelta(hours=1),
)
poke_sensor >> reschedule_sensor >> s3_sensor >> external_sensor
sensor_modes()
β οΈImportant
Use mode='reschedule' for sensors that wait more than a few minutes. This releases the worker slot and allows other tasks to run. In poke mode, the worker slot remains occupied for the entire wait duration.
2. Sensor Timeout Handling
# sensor_timeouts.py
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.utils.trigger_rule import TriggerRule
@dag(dag_id='sensor_timeout_handling', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_timeout_handling():
"""
Sensor Timeout Strategies:
1. soft_fail=True: Skip task on timeout
2. soft_fail=False: Fail task on timeout (default)
3. Exponential backoff: Increase poke_interval over time
4. Multiple retries: Use retry parameters
"""
# Strategy 1: Soft fail (skip on timeout)
soft_fail_sensor = S3KeySensor(
task_id='soft_fail_sensor',
bucket_name='my-bucket',
key='data/{{ ds }}/file.csv',
poke_interval=60,
timeout=300,
mode='reschedule',
soft_fail=True, # Skip task if timeout
)
# Strategy 2: Exponential backoff
exponential_sensor = S3KeySensor(
task_id='exponential_sensor',
bucket_name='my-bucket',
key='data/{{ ds }}/file.csv',
poke_interval=60,
timeout=3600,
mode='reschedule',
exponential_backoff=True, # Increase interval over time
)
# Strategy 3: Custom timeout handling
@task
def handle_timeout():
"""Handle sensor timeout"""
# Custom logic for timeout
print("Sensor timed out, handling gracefully")
return {'status': 'timeout_handled'}
@task(trigger_rule=TriggerRule.ALL_FAILED)
def cleanup_on_failure():
"""Cleanup after sensor failure"""
print("Cleaning up after sensor failure")
# Dependencies
soft_fail_sensor >> exponential_sensor
# Handle failures
handle_timeout_task = handle_timeout()
cleanup_task = cleanup_on_failure()
exponential_sensor >> handle_timeout_task >> cleanup_task
sensor_timeout_handling()
3. Custom Sensors
# custom_sensors.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict, Optional
import logging
class CustomApiSensor(BaseSensorOperator):
"""
Custom sensor that waits for API response.
:param api_url: URL to check
:param expected_status: Expected status code
:param headers: Request headers
"""
template_fields = ('api_url', 'expected_status')
@apply_defaults
def __init__(
self,
api_url: str,
expected_status: int = 200,
headers: Optional[Dict[str, str]] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.api_url = api_url
self.expected_status = expected_status
self.headers = headers or {}
def poke(self, context) -> bool:
"""Check if condition is met"""
import requests
try:
response = requests.get(
self.api_url,
headers=self.headers,
timeout=10
)
if response.status_code == self.expected_status:
self.log.info(f"API ready: {self.api_url}")
return True
self.log.info(
f"API not ready: {response.status_code} "
f"(expected {self.expected_status})"
)
return False
except Exception as e:
self.log.warning(f"API check failed: {str(e)}")
return False
# Usage
custom_sensor = CustomApiSensor(
task_id='wait_for_api',
api_url='https://api.example.com/health',
expected_status=200,
poke_interval=30,
timeout=300,
mode='reschedule',
)
4. Database Sensor
# database_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.database_hook import DbHook
from airflow.utils.decorators import apply_defaults
from typing import Optional
class DatabaseRecordSensor(BaseSensorOperator):
"""
Sensor that waits for a record to exist in database.
:param table: Table name
:param columns: Columns to check
:param values: Expected values
:param db_conn_id: Database connection ID
"""
template_fields = ('table', 'columns', 'values')
@apply_defaults
def __init__(
self,
table: str,
columns: list,
values: list,
db_conn_id: str = 'default',
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.table = table
self.columns = columns
self.values = values
self.db_conn_id = db_conn_id
def poke(self, context) -> bool:
"""Check if record exists"""
hook = DbHook(database_conn_id=self.db_conn_id)
# Build query
conditions = ' AND '.join([
f"{col} = %s" for col in self.columns
])
query = f"SELECT 1 FROM {self.table} WHERE {conditions} LIMIT 1"
# Execute
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute(query, self.values)
result = cursor.fetchone()
if result:
self.log.info(f"Record found in {self.table}")
return True
self.log.info(f"Record not found in {self.table}")
return False
# Usage
db_sensor = DatabaseRecordSensor(
task_id='wait_for_record',
table='daily_loads',
columns=['load_date', 'status'],
values=['{{ ds }}', 'completed'],
db_conn_id='warehouse',
poke_interval=60,
timeout=3600,
mode='reschedule',
)
Real-World Scenarios
Scenario 1: Google Cloud Pipeline
# google_cloud_pipeline.py
"""
Google Cloud pipeline with sensors:
- Wait for GCS file
- Wait for BigQuery job
- Wait for Dataflow pipeline
"""
from airflow.decorators import dag, task
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.providers.google.cloud.sensors.bigquery import BigQueryJobStatusSensor
from datetime import datetime
@dag(
dag_id='gcp_pipeline_with_sensors',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['gcp', 'production'],
)
def gcp_pipeline():
# Wait for GCS file
wait_gcs = GCSObjectExistenceSensor(
task_id='wait_for_gcs_file',
bucket='data-lake',
object='incoming/{{ ds }}/data.csv',
poke_interval=60,
timeout=3600,
mode='reschedule',
)
@task
def process_data() -> dict:
"""Process data from GCS"""
return {'records': 1000}
@task
def load_to_bigquery(data: dict) -> str:
"""Load to BigQuery and return job ID"""
# Start BigQuery load job
job_id = 'bq_job_123'
return job_id
# Wait for BigQuery job
wait_bq = BigQueryJobStatusSensor(
task_id='wait_for_bigquery',
job_id='{{ ti.xcom_pull(task_ids="load_to_bigquery") }}',
poke_interval=30,
timeout=1800,
mode='reschedule',
)
# Pipeline
processed = process_data()
job_id = load_to_bigquery(processed)
wait_bq >> job_id
gcp_pipeline()
Scenario 2: Microsoft Azure Pipeline
# azure_pipeline.py
"""
Azure pipeline with sensors:
- Wait for Blob Storage
- Wait for Azure Data Factory
- Wait for SQL Database
"""
from airflow.decorators import dag, task
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
from datetime import datetime
@dag(
dag_id='azure_pipeline_with_sensors',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['azure', 'production'],
)
def azure_pipeline():
# Wait for Blob Storage
wait_blob = WasbBlobSensor(
task_id='wait_for_blob',
container_name='data',
blob_name='incoming/{{ ds }}/data.csv',
wasb_conn_id='azure_default',
poke_interval=60,
timeout=3600,
mode='reschedule',
)
@task
def process_blob() -> dict:
"""Process blob data"""
return {'records': 1000}
@task
def load_to_synapse(data: dict) -> bool:
"""Load to Synapse"""
return True
# Pipeline
processed = process_blob()
loaded = load_to_synapse(processed)
wait_blob >> processed >> loaded
azure_pipeline()
βΉοΈPro Tip
In cloud environments, use cloud-specific sensors (GCSObjectExistenceSensor, WasbBlobSensor) instead of generic file sensors. They handle authentication and are optimized for cloud APIs.
Edge Cases
β οΈCommon Pitfalls
-
Worker Slot Exhaustion: Too many sensors in poke mode can exhaust worker slots. Use reschedule mode for long waits.
-
Timeout Calculation: Set timeout based on expected wait time, not task execution time. Add buffer for unexpected delays.
-
Exponential Backoff: Use exponential backoff to avoid overwhelming external systems with too many requests.
-
Soft Fail vs Hard Fail: Use soft_fail=True for non-critical sensors to avoid failing entire DAG.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id='sensor_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_edge_cases():
# Worker slot issue
from airflow.sensors.s3_key_sensor import S3KeySensor
# BAD: Too many sensors in poke mode
# for i in range(100):
# S3KeySensor(
# task_id=f'sensor_{i}',
# mode='poke', # Occupies worker slot!
# ...
# )
# GOOD: Use reschedule mode
for i in range(100):
S3KeySensor(
task_id=f'sensor_{i}',
mode='reschedule', # Releases slot between pokes
...
)
# Timeout issue
@task
def check_sensor_timeout():
"""Monitor sensor timeouts"""
# Send alert if sensor times out
pass
sensor_edge_cases()
QuizBox
Best Practices
# best_practices.py
"""
Sensor Best Practices:
1. Mode Selection:
- Use reschedule mode for waits > 5 minutes
- Use poke mode for waits < 5 minutes
- Consider worker slot availability
2. Timeout Configuration:
- Set realistic timeouts based on expected wait
- Add buffer for unexpected delays
- Use soft_fail for non-critical sensors
3. Performance:
- Use exponential backoff for long waits
- Batch sensors when possible
- Use cloud-specific sensors
4. Error Handling:
- Implement proper callbacks
- Log meaningful messages
- Handle network failures gracefully
5. Monitoring:
- Track sensor success/failure rates
- Monitor poke intervals
- Alert on excessive wait times
"""
βΉοΈGoogle Interview Tip
At Google, they emphasize efficient resource usage. When discussing sensors, highlight the importance of reschedule mode for long waits and how it helps with worker slot management. Also discuss exponential backoff to avoid overwhelming external systems.
Summary
Sensors are essential for waiting on external conditions in Airflow. Key takeaways:
- Poke Mode - Occupies worker slot, good for short waits
- Reschedule Mode - Releases slot, good for long waits
- Timeout Handling - Use soft_fail and callbacks
- Custom Sensors - Create for specific use cases
- Cloud Sensors - Use cloud-specific sensors when available
For Google and Microsoft interviews, focus on:
- Efficient resource usage with reschedule mode
- Proper timeout handling
- Cloud-specific sensor implementations
- Monitoring and alerting strategies
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.