πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow Sensors Deep Dive

Apache Airflow AdvancedSensors⭐ Premium

Advertisement

Airflow Sensors Deep Dive

Waiting Strategies and Best Practices

GoogleMicrosoftDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Google Cloud / Microsoft Azure Role: Senior Data Engineer / Cloud Architect Difficulty: Advanced Time: 45-60 minutes

Question: "Explain the difference between poke mode and reschedule mode in Sensors. When would you use each? How do you handle sensor timeouts and failures in production?"


Detailed Theory

Sensor Fundamentals

# sensor_fundamentals.py
"""
Sensors in Airflow:

A Sensor is a special type of Operator that waits for a condition
to be met before completing. It "pokes" the condition at regular
intervals.

Key Parameters:
- poke_interval: Time between checks (seconds)
- timeout: Maximum wait time (seconds)
- mode: 'poke' or 'reschedule'
- soft_fail: If True, marks task as skipped instead of failed

Sensor Modes:
1. POKE (default): Keeps worker slot while waiting
2. RESCHEDULE: Releases worker slot between pokes
"""

1. Poke Mode vs Reschedule Mode

# poke_vs_reschedule.py
from airflow.sensors.http_sensor import HttpSensor
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
from airflow.decorators import dag

@dag(dag_id='sensor_modes_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_modes():
    """
    POKE MODE (default):
    - Keeps worker slot occupied during wait
    - Suitable for short waits (< 5 minutes)
    - Uses fewer resources for short waits
    
    RESCHEDULE MODE:
    - Releases worker slot between pokes
    - Suitable for long waits (> 5 minutes)
    - Uses fewer resources for long waits
    - Task state is saved between pokes
    """
    
    # Poke mode (default)
    poke_sensor = HttpSensor(
        task_id='poke_sensor',
        http_conn_id='api_default',
        endpoint='/api/health',
        poke_interval=30,  # Check every 30 seconds
        timeout=300,  # Timeout after 5 minutes
        mode='poke',  # Default mode
    )
    
    # Reschedule mode
    reschedule_sensor = HttpSensor(
        task_id='reschedule_sensor',
        http_conn_id='api_default',
        endpoint='/api/health',
        poke_interval=60,  # Check every minute
        timeout=3600,  # Timeout after 1 hour
        mode='reschedule',  # Release slot between pokes
    )
    
    # S3 sensor with reschedule
    s3_sensor = S3KeySensor(
        task_id='s3_sensor',
        bucket_name='my-bucket',
        key='data/{{ ds }}/file.csv',
        poke_interval=300,  # Check every 5 minutes
        timeout=7200,  # Timeout after 2 hours
        mode='reschedule',  # Long wait, use reschedule
        soft_fail=True,  # Skip if timeout instead of fail
    )
    
    # ExternalTaskSensor
    external_sensor = ExternalTaskSensor(
        task_id='external_sensor',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        poke_interval=60,
        timeout=3600,
        mode='reschedule',
        execution_delta=timedelta(hours=1),
    )
    
    poke_sensor >> reschedule_sensor >> s3_sensor >> external_sensor

sensor_modes()

⚠️Important

Use mode='reschedule' for sensors that wait more than a few minutes. This releases the worker slot and allows other tasks to run. In poke mode, the worker slot remains occupied for the entire wait duration.

2. Sensor Timeout Handling

# sensor_timeouts.py
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.utils.trigger_rule import TriggerRule

@dag(dag_id='sensor_timeout_handling', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_timeout_handling():
    """
    Sensor Timeout Strategies:
    
    1. soft_fail=True: Skip task on timeout
    2. soft_fail=False: Fail task on timeout (default)
    3. Exponential backoff: Increase poke_interval over time
    4. Multiple retries: Use retry parameters
    """
    
    # Strategy 1: Soft fail (skip on timeout)
    soft_fail_sensor = S3KeySensor(
        task_id='soft_fail_sensor',
        bucket_name='my-bucket',
        key='data/{{ ds }}/file.csv',
        poke_interval=60,
        timeout=300,
        mode='reschedule',
        soft_fail=True,  # Skip task if timeout
    )
    
    # Strategy 2: Exponential backoff
    exponential_sensor = S3KeySensor(
        task_id='exponential_sensor',
        bucket_name='my-bucket',
        key='data/{{ ds }}/file.csv',
        poke_interval=60,
        timeout=3600,
        mode='reschedule',
        exponential_backoff=True,  # Increase interval over time
    )
    
    # Strategy 3: Custom timeout handling
    @task
    def handle_timeout():
        """Handle sensor timeout"""
        # Custom logic for timeout
        print("Sensor timed out, handling gracefully")
        return {'status': 'timeout_handled'}
    
    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def cleanup_on_failure():
        """Cleanup after sensor failure"""
        print("Cleaning up after sensor failure")
    
    # Dependencies
    soft_fail_sensor >> exponential_sensor
    
    # Handle failures
    handle_timeout_task = handle_timeout()
    cleanup_task = cleanup_on_failure()
    
    exponential_sensor >> handle_timeout_task >> cleanup_task

sensor_timeout_handling()

3. Custom Sensors

# custom_sensors.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict, Optional
import logging

class CustomApiSensor(BaseSensorOperator):
    """
    Custom sensor that waits for API response.
    
    :param api_url: URL to check
    :param expected_status: Expected status code
    :param headers: Request headers
    """
    
    template_fields = ('api_url', 'expected_status')
    
    @apply_defaults
    def __init__(
        self,
        api_url: str,
        expected_status: int = 200,
        headers: Optional[Dict[str, str]] = None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.api_url = api_url
        self.expected_status = expected_status
        self.headers = headers or {}
    
    def poke(self, context) -> bool:
        """Check if condition is met"""
        import requests
        
        try:
            response = requests.get(
                self.api_url,
                headers=self.headers,
                timeout=10
            )
            
            if response.status_code == self.expected_status:
                self.log.info(f"API ready: {self.api_url}")
                return True
            
            self.log.info(
                f"API not ready: {response.status_code} "
                f"(expected {self.expected_status})"
            )
            return False
            
        except Exception as e:
            self.log.warning(f"API check failed: {str(e)}")
            return False

# Usage
custom_sensor = CustomApiSensor(
    task_id='wait_for_api',
    api_url='https://api.example.com/health',
    expected_status=200,
    poke_interval=30,
    timeout=300,
    mode='reschedule',
)

4. Database Sensor

# database_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.database_hook import DbHook
from airflow.utils.decorators import apply_defaults
from typing import Optional

class DatabaseRecordSensor(BaseSensorOperator):
    """
    Sensor that waits for a record to exist in database.
    
    :param table: Table name
    :param columns: Columns to check
    :param values: Expected values
    :param db_conn_id: Database connection ID
    """
    
    template_fields = ('table', 'columns', 'values')
    
    @apply_defaults
    def __init__(
        self,
        table: str,
        columns: list,
        values: list,
        db_conn_id: str = 'default',
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.table = table
        self.columns = columns
        self.values = values
        self.db_conn_id = db_conn_id
    
    def poke(self, context) -> bool:
        """Check if record exists"""
        hook = DbHook(database_conn_id=self.db_conn_id)
        
        # Build query
        conditions = ' AND '.join([
            f"{col} = %s" for col in self.columns
        ])
        query = f"SELECT 1 FROM {self.table} WHERE {conditions} LIMIT 1"
        
        # Execute
        conn = hook.get_conn()
        cursor = conn.cursor()
        cursor.execute(query, self.values)
        result = cursor.fetchone()
        
        if result:
            self.log.info(f"Record found in {self.table}")
            return True
        
        self.log.info(f"Record not found in {self.table}")
        return False

# Usage
db_sensor = DatabaseRecordSensor(
    task_id='wait_for_record',
    table='daily_loads',
    columns=['load_date', 'status'],
    values=['{{ ds }}', 'completed'],
    db_conn_id='warehouse',
    poke_interval=60,
    timeout=3600,
    mode='reschedule',
)

Real-World Scenarios

Scenario 1: Google Cloud Pipeline

# google_cloud_pipeline.py
"""
Google Cloud pipeline with sensors:
- Wait for GCS file
- Wait for BigQuery job
- Wait for Dataflow pipeline
"""

from airflow.decorators import dag, task
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.providers.google.cloud.sensors.bigquery import BigQueryJobStatusSensor
from datetime import datetime

@dag(
    dag_id='gcp_pipeline_with_sensors',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['gcp', 'production'],
)
def gcp_pipeline():
    # Wait for GCS file
    wait_gcs = GCSObjectExistenceSensor(
        task_id='wait_for_gcs_file',
        bucket='data-lake',
        object='incoming/{{ ds }}/data.csv',
        poke_interval=60,
        timeout=3600,
        mode='reschedule',
    )
    
    @task
    def process_data() -> dict:
        """Process data from GCS"""
        return {'records': 1000}
    
    @task
    def load_to_bigquery(data: dict) -> str:
        """Load to BigQuery and return job ID"""
        # Start BigQuery load job
        job_id = 'bq_job_123'
        return job_id
    
    # Wait for BigQuery job
    wait_bq = BigQueryJobStatusSensor(
        task_id='wait_for_bigquery',
        job_id='{{ ti.xcom_pull(task_ids="load_to_bigquery") }}',
        poke_interval=30,
        timeout=1800,
        mode='reschedule',
    )
    
    # Pipeline
    processed = process_data()
    job_id = load_to_bigquery(processed)
    wait_bq >> job_id

gcp_pipeline()

Scenario 2: Microsoft Azure Pipeline

# azure_pipeline.py
"""
Azure pipeline with sensors:
- Wait for Blob Storage
- Wait for Azure Data Factory
- Wait for SQL Database
"""

from airflow.decorators import dag, task
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
from datetime import datetime

@dag(
    dag_id='azure_pipeline_with_sensors',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['azure', 'production'],
)
def azure_pipeline():
    # Wait for Blob Storage
    wait_blob = WasbBlobSensor(
        task_id='wait_for_blob',
        container_name='data',
        blob_name='incoming/{{ ds }}/data.csv',
        wasb_conn_id='azure_default',
        poke_interval=60,
        timeout=3600,
        mode='reschedule',
    )
    
    @task
    def process_blob() -> dict:
        """Process blob data"""
        return {'records': 1000}
    
    @task
    def load_to_synapse(data: dict) -> bool:
        """Load to Synapse"""
        return True
    
    # Pipeline
    processed = process_blob()
    loaded = load_to_synapse(processed)
    
    wait_blob >> processed >> loaded

azure_pipeline()

ℹ️Pro Tip

In cloud environments, use cloud-specific sensors (GCSObjectExistenceSensor, WasbBlobSensor) instead of generic file sensors. They handle authentication and are optimized for cloud APIs.


Edge Cases

⚠️Common Pitfalls

  1. Worker Slot Exhaustion: Too many sensors in poke mode can exhaust worker slots. Use reschedule mode for long waits.

  2. Timeout Calculation: Set timeout based on expected wait time, not task execution time. Add buffer for unexpected delays.

  3. Exponential Backoff: Use exponential backoff to avoid overwhelming external systems with too many requests.

  4. Soft Fail vs Hard Fail: Use soft_fail=True for non-critical sensors to avoid failing entire DAG.

# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime

@dag(dag_id='sensor_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def sensor_edge_cases():
    # Worker slot issue
    from airflow.sensors.s3_key_sensor import S3KeySensor
    
    # BAD: Too many sensors in poke mode
    # for i in range(100):
    #     S3KeySensor(
    #         task_id=f'sensor_{i}',
    #         mode='poke',  # Occupies worker slot!
    #         ...
    #     )
    
    # GOOD: Use reschedule mode
    for i in range(100):
        S3KeySensor(
            task_id=f'sensor_{i}',
            mode='reschedule',  # Releases slot between pokes
            ...
        )
    
    # Timeout issue
    @task
    def check_sensor_timeout():
        """Monitor sensor timeouts"""
        # Send alert if sensor times out
        pass

sensor_edge_cases()

QuizBox


Best Practices

# best_practices.py
"""
Sensor Best Practices:

1. Mode Selection:
   - Use reschedule mode for waits > 5 minutes
   - Use poke mode for waits < 5 minutes
   - Consider worker slot availability

2. Timeout Configuration:
   - Set realistic timeouts based on expected wait
   - Add buffer for unexpected delays
   - Use soft_fail for non-critical sensors

3. Performance:
   - Use exponential backoff for long waits
   - Batch sensors when possible
   - Use cloud-specific sensors

4. Error Handling:
   - Implement proper callbacks
   - Log meaningful messages
   - Handle network failures gracefully

5. Monitoring:
   - Track sensor success/failure rates
   - Monitor poke intervals
   - Alert on excessive wait times
"""

ℹ️Google Interview Tip

At Google, they emphasize efficient resource usage. When discussing sensors, highlight the importance of reschedule mode for long waits and how it helps with worker slot management. Also discuss exponential backoff to avoid overwhelming external systems.


Summary

Sensors are essential for waiting on external conditions in Airflow. Key takeaways:

  1. Poke Mode - Occupies worker slot, good for short waits
  2. Reschedule Mode - Releases slot, good for long waits
  3. Timeout Handling - Use soft_fail and callbacks
  4. Custom Sensors - Create for specific use cases
  5. Cloud Sensors - Use cloud-specific sensors when available

For Google and Microsoft interviews, focus on:

  • Efficient resource usage with reschedule mode
  • Proper timeout handling
  • Cloud-specific sensor implementations
  • Monitoring and alerting strategies

This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.

Advertisement