πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Sensors and Operators in Apache Airflow

🟒 Free Lesson

Advertisement

Sensors and Operators in Apache Airflow

Sensor Modes ComparisonPoke ModeHolds worker slotBest for {'<'} 5 minRescheduleReleases worker slotBest for 5-60 minDeferredNo worker slotBest for {'>'} 60 minExternalTaskWaits for upstream DAGCross-DAG dependenciesPoke SequenceStart {'->'} Poke {'->'} Wait {'->'} Repeat until True or timeoutDeferred FlowDefer {'->'} Trigger fires {'->'} Resume operatorKey: Use mode='reschedule' for short waits, Deferred for long polling

Architecture Diagram

Formal Definitions

DfSensor

A sensor is a specialized operator that polls an external condition at intervals Ξ”tpoke\Delta t_{\text{poke}} until the condition returns True or a timeout Ο„\tau is exceeded. A sensor is defined as S=(poke,Ξ”tpoke,Ο„,mode)S = (\text{poke}, \Delta t_{\text{poke}}, \tau, \text{mode}) where poke() is the condition check function.

DfPoke Interval

The poke interval Ξ”tpoke\Delta t_{\text{poke}} is the time (in seconds) between consecutive condition checks by a sensor. The sensor thread is occupied during each poke in poke mode, or released and re-queued in reschedule mode.

DfDeferred Mode

Deferred mode is a sensor execution strategy where the operator creates an async trigger and releases the worker slot entirely. The triggerer service monitors the condition without consuming worker resources. This is the most efficient mode for long waits.

Detailed Explanation

Sensor Fundamentals

Sensors are specialized operators that poll external conditions until met or timeout.


Sensor Lifecycle:

  1. Sensor starts β†’ initializes poke
  2. Execute poke() method β†’ check condition
  3. If True β†’ Sensor succeeds
  4. If False β†’ Check timeout
  5. If timeout exceeded β†’ Sensor fails
  6. If no timeout β†’ Wait poke_interval, repeat

Key Parameters:

ParameterDefaultDescription
poke_interval60sTime between checks
timeout7 daysMax wait time
soft_failFalseMark as skipped (not failed)
mode'poke'Execution strategy

Sensor Modes

ModeWorker SlotLatencyBest For
PokeHeld entire waitLowShort waits (< 5 min)
RescheduleReleased between pokesMediumMedium waits (5-60 min)
DeferredNone (uses triggerer)HighLong waits (> 60 min)

Resource Usage:

  • Poke: Occupies 1 worker slot for entire wait
  • Reschedule: Occupies slot only during poke (~100ms)
  • Deferred: 0 worker slots, uses triggerer process

Tip: Use deferred mode for long waits to free worker resources.


Advanced Sensor Patterns

PatternDescription
Exponential BackoffDouble poke interval each attempt (up to max_wait)
Sensor ChainingChain sensors for complex conditions
Sensor AggregationParallel sensor waits with BranchPythonOperator
Custom SensorsInherit BaseSensorOperator, implement poke()

External Task Sensor

Waits for a task in another DAG (or same DAG) to complete.

Configuration:

ParameterDescription
external_dag_idTarget DAG identifier
external_task_idTarget task identifier
execution_deltaTime offset from execution date
allowed_statesList of acceptable states
Sensor Total Wait Time
Twait=nβ‹…Ξ”tpoke+βˆ‘i=1ntpoke,iT_{\text{wait}} = n \cdot \Delta t_{\text{poke}} + \sum_{i=1}^{n} t_{\text{poke},i}

Here,

  • nn=Number of pokes before condition is met
  • Ξ”tpoke\Delta t_{\text{poke}}=Configured poke interval
  • tpoke,it_{\text{poke},i}=Execution time of the i-th poke check

Exponential Backoff Interval

\Delta t_i = \min\\left(\Delta t_0 \cdot m^i,\\; \Delta t_{\max}\right)

Here,

  • Ξ”t0\Delta t_0=Initial poke interval
  • mm=Multiplier (typically 2.0)
  • ii=Poke attempt number (0-indexed)
  • Ξ”tmax⁑\Delta t_{\max}=Maximum poke interval cap

Worker Slot Utilization (Poke Mode)

Uslot=TwaitTwait+TtaskΓ—100%U_{\text{slot}} = \frac{T_{\text{wait}}}{T_{\text{wait}} + T_{\text{task}}} \times 100\%

Here,

  • TwaitT_{\text{wait}}=Total sensor wait time
  • TtaskT_{\text{task}}=Downstream task execution time

ThSensor Timeout Guarantee

If a sensor has timeout Ο„\tau and poke interval Ξ”tpoke\Delta t_{\text{poke}}, the sensor is guaranteed to either succeed or fail within Tmax=Ο„+tpokeT_{\text{max}} = \tau + t_{\text{poke}} where tpoket_{\text{poke}} is the maximum poke execution time. Proof: The sensor checks at most βŒˆΟ„/Ξ”tpokeβŒ‰+1\lceil \tau / \Delta t_{\text{poke}} \rceil + 1 times before exceeding the timeout.

In poke mode, the sensor occupies a worker slot for the entire wait duration. For kk sensors waiting simultaneously, the effective parallelism is reduced by kk. Use reschedule or deferred mode to free slots during waits.

For exponential backoff sensors, set max_wait to prevent excessively long intervals. A common pattern is Ξ”tmax⁑=300\Delta t_{\max} = 300 seconds (5 minutes) to balance responsiveness with external system load.

Advanced Sensor Patterns

Exponential Backoff: Sensors can use exponential backoff for poke intervals. The interval starts at poke_interval and doubles with each poke, up to max_wait. This reduces load on external systems while maintaining responsiveness.

Sensor Chaining: Multiple sensors can be chained to wait for complex conditions. Each sensor waits for its specific condition before allowing the next sensor to start.

Sensor Aggregation: Use BranchPythonOperator or custom logic to wait for multiple conditions in parallel. This pattern reduces overall wait time compared to sequential sensor chaining.

Custom Sensors: Create custom sensors for specialized conditions. Inherit from BaseSensorOperator and implement the poke method with your specific logic.

External Task Sensor

The ExternalTaskSensor waits for a task in another DAG or the same DAG to complete. It's useful for cross-DAG dependencies and complex workflow orchestration.

Configuration: Specify the external_dag_id, external_task_id, and optionally execution_date. The sensor checks if the external task has reached the expected state.

States: By default, the sensor waits for the task to succeed. Use allowed_states to wait for specific states like success, failed, or skipped.

Execution Date Matching: The sensor can match execution dates to ensure proper synchronization between DAGs. Use execution_delta or execution_date_fn for flexible date matching.

Key Concepts Table

Sensor TypeModeBest ForResource UsageLatency
FileSensorPokeShort file waitsHighLow
S3KeySensorRescheduleS3 object waitsMediumMedium
HttpSensorDeferredAPI availabilityLowHigh
SqlSensorPokeDatabase conditionsHighLow
ExternalTaskSensorPoke/RescheduleCross-DAG depsMediumMedium
TimeSensorPokeScheduled waitsHighLow

Sensor Comparison Matrix

SensorPoke IntervalTimeoutModeUse Case
FileSensor1s-60s7 daysPokeWait for file arrival
S3KeySensor5-300s7 daysRescheduleWait for S3 object
GCSObjectExistenceSensor5-300s7 daysRescheduleWait for GCS object
HttpSensor30-300s7 daysDeferredWait for API endpoint
SqlSensor10-60s7 daysPokeWait for database condition
ExternalTaskSensor60-300s7 daysRescheduleWait for other DAG task
TimeSensor60sN/APokeWait for specific time
BaseSensorOperatorCustomCustomCustomCustom conditions

Code Examples

Advanced Sensor Implementation

# advanced_sensors.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import requests
import json

class AdvancedHttpSensor(BaseSensorOperator):
    """
    Advanced HTTP sensor with multiple condition support.

    This sensor supports multiple conditions, custom headers,
    and response validation.
    """

    template_fields = ('endpoint', 'headers', 'expected_status')
    ui_color = '#4285F4'

    @apply_defaults
    def __init__(
        self,
        endpoint: str,
        method: str = 'GET',
        headers: dict = None,
        expected_status: int = 200,
        json_path: str = None,
        expected_value: any = None,
        auth_type: str = 'bearer',
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.endpoint = endpoint
        self.method = method
        self.headers = headers or {}
        self.expected_status = expected_status
        self.json_path = json_path
        self.expected_value = expected_value
        self.auth_type = auth_type

    def poke(self, context):
        """Check if the HTTP condition is met."""
        try:
            # Get connection details
            from airflow.hooks.base import BaseHook
            conn = BaseHook.get_connection(self.http_conn_id)

            # Build request
            url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
            headers = {**self.headers}

            # Add authentication
            if self.auth_type == 'bearer':
                headers['Authorization'] = f"Bearer {conn.get_password()}"
            elif self.auth_type == 'basic':
                import base64
                credentials = base64.b64encode(
                    f"{conn.login}:{conn.get_password()}".encode()
                ).decode()
                headers['Authorization'] = f"Basic {credentials}"

            # Make request
            response = requests.request(
                method=self.method,
                url=url,
                headers=headers,
                timeout=30,
            )

            # Check status code
            if response.status_code != self.expected_status:
                self.log.warning(
                    f"Expected status {self.expected_status}, "
                    f"got {response.status_code}"
                )
                return False

            # Check JSON path if specified
            if self.json_path and self.expected_value is not None:
                data = response.json()
                actual_value = self._get_json_value(data, self.json_path)

                if actual_value != self.expected_value:
                    self.log.warning(
                        f"Expected value {self.expected_value} at path "
                        f"{self.json_path}, got {actual_value}"
                    )
                    return False

            self.log.info("HTTP condition met successfully")
            return True

        except Exception as e:
            self.log.error(f"HTTP sensor failed: {str(e)}")
            return False

    def _get_json_value(self, data: dict, path: str):
        """Get value from JSON using dot notation path."""
        keys = path.split('.')
        value = data
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            else:
                return None
        return value

class MultiConditionSensor(BaseSensorOperator):
    """
    Sensor that waits for multiple conditions to be met.
    """

    @apply_defaults
    def __init__(
        self,
        conditions: list,
        operator: str = 'AND',
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.conditions = conditions
        self.operator = operator

    def poke(self, context):
        """Check if conditions are met."""
        results = []

        for condition in self.conditions:
            condition_type = condition.get('type')
            result = False

            if condition_type == 'file':
                result = self._check_file_condition(condition)
            elif condition_type == 'http':
                result = self._check_http_condition(condition)
            elif condition_type == 'database':
                result = self._check_database_condition(condition)

            results.append(result)

        if self.operator == 'AND':
            return all(results)
        else:  # OR
            return any(results)

    def _check_file_condition(self, condition: dict) -> bool:
        """Check file-based condition."""
        import os
        path = condition.get('path')
        exists = condition.get('exists', True)

        file_exists = os.path.exists(path)
        return file_exists == exists

    def _check_http_condition(self, condition: dict) -> bool:
        """Check HTTP-based condition."""
        url = condition.get('url')
        expected_status = condition.get('expected_status', 200)

        try:
            response = requests.get(url, timeout=10)
            return response.status_code == expected_status
        except Exception:
            return False

    def _check_database_condition(self, condition: dict) -> bool:
        """Check database-based condition."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id=condition.get('conn_id'))
        query = condition.get('query')

        result = hook.get_first(query)
        return bool(result)

# Usage in DAG
with DAG(
    'advanced_sensor_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced sensor examples',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sensors', 'advanced'],
) as dag:

    # Advanced HTTP sensor
    api_sensor = AdvancedHttpSensor(
        task_id='api_availability_sensor',
        endpoint='/api/v1/status',
        expected_status=200,
        json_path='status',
        expected_value='healthy',
        mode='reschedule',
        poke_interval=30,
        timeout=300,
    )

    # Multi-condition sensor
    multi_condition = MultiConditionSensor(
        task_id='multi_condition_sensor',
        conditions=[
            {'type': 'file', 'path': '/data/input.csv', 'exists': True},
            {'type': 'http', 'url': 'http://api:8080/health', 'expected_status': 200},
            {'type': 'database', 'conn_id': 'postgres', 'query': 'SELECT 1'},
        ],
        operator='AND',
        mode='poke',
        poke_interval=60,
    )

    # Process task
    process = PythonOperator(
        task_id='process_data',
        python_callable=lambda: print("Processing data..."),
    )

    api_sensor >> multi_condition >> process

External Task Sensor Patterns

# external_task_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.state import State

def process_data(**context):
    """Process data after external task completes."""
    print("Processing data after external dependency met")

def generate_report(**context):
    """Generate report after processing."""
    print("Generating report...")

# DAG 1: Upstream DAG (external dependency)
with DAG(
    'upstream_data_pipeline',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Upstream data pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'upstream'],
) as upstream_dag:

    def extract_data(**context):
        """Extract data from source."""
        print("Extracting data...")
        return {"status": "success"}

    def transform_data(**context):
        """Transform extracted data."""
        print("Transforming data...")
        return {"status": "success"}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    extract >> transform

# DAG 2: Downstream DAG (waits for upstream)
with DAG(
    'downstream_data_pipeline',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Downstream data pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'downstream'],
) as downstream_dag:

    # Wait for upstream DAG to complete
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_data_pipeline',
        external_task_id='transform',
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        execution_delta=timedelta(hours=1),
        mode='reschedule',
        poke_interval=300,
        timeout=3600,
    )

    # Wait for specific execution date
    wait_for_yesterday = ExternalTaskSensor(
        task_id='wait_for_yesterday',
        external_dag_id='upstream_data_pipeline',
        external_task_id='transform',
        execution_date_fn=lambda exec_date: exec_date - timedelta(days=1),
        mode='reschedule',
        poke_interval=300,
    )

    # Process after upstream completes
    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    # Generate report
    report = PythonOperator(
        task_id='report',
        python_callable=generate_report,
    )

    [wait_for_upstream, wait_for_yesterday] >> process >> report

# DAG 3: Complex external dependencies
with DAG(
    'complex_external_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Complex external dependency example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'complex'],
) as complex_dag:

    # Wait for multiple external tasks
    wait_task_1 = ExternalTaskSensor(
        task_id='wait_task_1',
        external_dag_id='upstream_data_pipeline',
        external_task_id='extract',
        mode='reschedule',
        poke_interval=300,
    )

    wait_task_2 = ExternalTaskSensor(
        task_id='wait_task_2',
        external_dag_id='another_pipeline',
        external_task_id='load',
        mode='reschedule',
        poke_interval=300,
    )

    # Time sensor
    time_check = TimeSensor(
        task_id='time_check',
        target_time=datetime.strptime('09:00', '%H:%M').time(),
        mode='reschedule',
    )

    # Process after all conditions met
    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    [wait_task_1, wait_task_2, time_check] >> process

Sensor Optimization Patterns

# sensor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import time
import random

class OptimizedFileSensor(BaseSensorOperator):
    """
    File sensor with optimization features.
    """

    template_fields = ('filepath',)
    ui_color = '#FF9800'

    @apply_defaults
    def __init__(
        self,
        filepath: str,
        recursive: bool = False,
        min_file_size: int = 0,
        max_file_age_days: int = None,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.filepath = filepath
        self.recursive = recursive
        self.min_file_size = min_file_size
        self.max_file_age_days = max_file_age_days

    def poke(self, context):
        """Check if file meets all criteria."""
        import os
        from datetime import datetime, timedelta

        if not os.path.exists(self.filepath):
            return False

        # Check file size
        file_size = os.path.getsize(self.filepath)
        if file_size < self.min_file_size:
            self.log.info(
                f"File size {file_size} < minimum {self.min_file_size}"
            )
            return False

        # Check file age
        if self.max_file_age_days:
            file_mtime = os.path.getmtime(self.filepath)
            file_age = datetime.now() - datetime.fromtimestamp(file_mtime)
            if file_age > timedelta(days=self.max_file_age_days):
                self.log.info(f"File is too old: {file_age}")
                return False

        self.log.info(f"File {self.filepath} meets all criteria")
        return True

class ExponentialBackoffSensor(BaseSensorOperator):
    """
    Sensor with exponential backoff.
    """

    @apply_defaults
    def __init__(
        self,
        initial_interval: int = 10,
        max_interval: int = 300,
        multiplier: float = 2.0,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.initial_interval = initial_interval
        self.max_interval = max_interval
        self.multiplier = multiplier
        self._current_interval = initial_interval

    def poke(self, context):
        """Implement exponential backoff logic."""
        # Check condition
        condition_met = self._check_condition()

        if condition_met:
            return True

        # Update interval for next poke
        self._current_interval = min(
            self._current_interval * self.multiplier,
            self.max_interval
        )

        self.log.info(
            f"Condition not met. Next poke in {self._current_interval} seconds"
        )

        # Sleep for current interval
        time.sleep(self._current_interval)

        return False

    def _check_condition(self):
        """Override this method to implement your condition check."""
        # Example: Random condition for demonstration
        return random.random() < 0.1  # 10% chance of success

with DAG(
    'sensor_optimization_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Sensor optimization patterns',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sensors', 'optimization'],
) as dag:

    # Optimized file sensor
    file_sensor = OptimizedFileSensor(
        task_id='optimized_file_sensor',
        filepath='/data/input.csv',
        min_file_size=1024,  # At least 1KB
        max_file_age_days=1,  # Not older than 1 day
        mode='reschedule',
        poke_interval=60,
        timeout=3600,
    )

    # Exponential backoff sensor
    backoff_sensor = ExponentialBackoffSensor(
        task_id='exponential_backoff_sensor',
        initial_interval=10,
        max_interval=300,
        multiplier=2.0,
        mode='poke',
        timeout=3600,
    )

    # Process task
    process = PythonOperator(
        task_id='process',
        python_callable=lambda: print("Processing data..."),
    )

    file_sensor >> backoff_sensor >> process

Performance Metrics

Sensor Performance Characteristics

MetricDescriptionOptimization StrategyWarning Threshold
Poke IntervalTime between condition checksAdjust based on expected wait time> 5 min for time-sensitive
Worker Slot UsageResources consumed during waitUse reschedule or deferred mode> 50% slot utilization
LatencyTime from condition met to task startMinimize poke interval for time-sensitive tasks> 5 min latency
Timeout RatePercentage of sensors that timeoutSet appropriate timeouts, monitor external systems> 10% timeout
Resource EfficiencyWorker resources used per waitUse deferred mode for long waits< 50% efficiency
Error RatePercentage of failed sensor checksImplement proper error handling> 5% errors
False Positive RateIncorrect condition detectionValidate sensor logic thoroughly> 1% false positives
ThroughputNumber of sensors handled concurrentlyOptimize poke intervals, use deferred mode< 10 concurrent

Resource Usage by Mode

ModeWorker SlotCPU UsageMemoryDuration
Poke1.01-5%10-50 MBEntire wait
Reschedule0.1< 1%5-20 MBPoke only
Deferred0.0< 0.1%1-5 MBNone

Throughput Analysis

ScenarioRecommended ModeMax ConcurrentThroughput
File arrival (< 5 min)Poke5050 sensors
S3 object (5-60 min)Reschedule100100 sensors
API endpoint (> 60 min)Deferred10001000 sensors
Database conditionPoke5050 sensors
Cross-DAG dependencyReschedule100100 sensors

Best Practices

1. Mode Selection

Choose the appropriate mode based on wait time. Use poke for short waits (< 5 min), reschedule for medium waits (5-60 min), and deferred for long waits (> 1 hour).

# Mode selection based on expected wait time
def select_sensor_mode(expected_wait_minutes):
    """Select sensor mode based on expected wait time."""
    if expected_wait_minutes < 5:
        return 'poke'
    elif expected_wait_minutes < 60:
        return 'reschedule'
    else:
        return 'deferred'

# Example usage
sensor = S3KeySensor(
    task_id='wait_for_data',
    bucket_name='my-bucket',
    bucket_key='data/{{ ds }}/data.parquet',
    mode=select_sensor_mode(expected_wait_minutes=30),
    poke_interval=300,  # 5 minutes for reschedule mode
    timeout=7200,       # 2 hours timeout
)

2. Timeout Configuration

Set reasonable timeouts based on expected wait times. Consider external system reliability and implement graceful failure handling.

3. Poke Interval Optimization

Balance between responsiveness and resource usage. Use exponential backoff for unpredictable wait times.

# Exponential backoff sensor
sensor = HttpSensor(
    task_id='wait_for_api',
    http_conn_id='api_default',
    endpoint='/health',
    mode='reschedule',
    poke_interval=60,           # Start with 1 minute
    exponential_backoff=True,    # Enable backoff
    timeout=3600,                # 1 hour timeout
)
# Backoff sequence: 60s, 120s, 240s, 480s, 960s, 1920s (max)

4. Error Handling

Implement proper error handling in sensor logic. Use soft_fail for optional conditions. Log meaningful error messages.

5. Resource Management

Monitor sensor resource usage. Use pools to limit concurrent sensors. Implement sensor prioritization.

# Pool-limited sensors
sensor = FileSensor(
    task_id='wait_for_files',
    filepath='/data/input.csv',
    pool='file_sensors',        # Max 10 concurrent sensors
    poke_interval=60,
    mode='reschedule',
)

6. Testing

Test sensors in isolation with mock external dependencies. Verify timeout behavior and error handling. Test different modes.

7. Monitoring

Track sensor performance metrics. Monitor poke intervals, success rates, and resource usage. Set up alerts for anomalies.

8. Documentation

Document sensor dependencies and expected behavior. Include timeout and retry configurations. Provide troubleshooting guidance.

9. Maintenance

Regularly review and update sensor configurations. Remove obsolete sensors. Update timeout values based on changing external systems.

10. Security

Implement proper authentication for external system access. Use secrets management for credentials. Validate input parameters.

Key Takeaways:

  • Sensors poll conditions at interval Ξ”tpoke\Delta t_{\text{poke}} until success or timeout Ο„\tau
  • Exponential backoff: Ξ”ti=min⁑(Ξ”t0β‹…mi,Ξ”tmax⁑)\Delta t_i = \min(\Delta t_0 \cdot m^i, \Delta t_{\max})
  • Timeout guarantee: sensor resolves within Tmax⁑=Ο„+tpokeT_{\max} = \tau + t_{\text{poke}}
  • Poke mode uses worker slots; reschedule mode releases between pokes; deferred mode uses triggerer
  • Worker slot utilization in poke mode: U=Twait/(Twait+Ttask)U = T_{\text{wait}} / (T_{\text{wait}} + T_{\text{task}})
  • Use ExternalTaskSensor for cross-DAG dependencies with execution date matching

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

See Also

⭐

Premium Content

Sensors and Operators in Apache Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement