πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Airflow Operators and Hooks

🟒 Free Lesson

Advertisement

Apache Airflow Operators and Hooks

Operator Hierarchy TreeBaseOperatortask_id, owner, retriesActionPython, BashTransferS3ToRedshiftSensorFile, HTTP, SQLExternalSlack, JiraCustomUser-definedBaseOperator β†’ Action | Transfer | Sensor | External | Custom
Hook Architecture: Operator β†’ Hook β†’ Connection β†’ ClientOperatorHookConnectionClient SDKExternal APIOperator uses Hook β†’ Hook reads Connection from Metadata DB β†’ Creates Client

Architecture Diagram

Formal Definitions

DfOperator

An operator is an atomic unit of work in Airflow. Formally, an operator O=(tid,execute,params)O = (t_{\text{id}}, \text{execute}, \text{params}) consists of a unique task identifier, an execution function, and configuration parameters. The execute(context) method performs the actual computation.

DfHook

A hook manages connection details and client creation for external systems. A hook H=(C,client,methods)H = (C, \text{client}, \text{methods}) encapsulates a connection object CC, a client instance, and methods for interacting with the external service. Hooks handle retries, authentication, and connection pooling.

DfSensor

A sensor is a specialized operator that polls an external condition at intervals Ξ”tpoke\Delta t_{\text{poke}} until the condition is met or a timeout Ο„timeout\tau_{\text{timeout}} is reached. It returns True when satisfied, False otherwise.

Detailed Explanation

Built-in Operators

Airflow provides a rich set of built-in operators for common tasks.


Operator Categories

CategoryOperatorsDescription
ActionPythonOperator, BashOperator, EmailOperator, DummyOperatorExecute code/commands
TransferS3ToRedshiftOperator, GCSToLocalOperator, PostgresToGCSOperatorMove data between systems
SensorsFileSensor, HttpSensor, SqlSensorPoll for conditions
External ServicesSlackWebhookOperator, JiraOperator, KubernetesPodOperatorAPI integrations

Hooks

Hooks are low-level building blocks for external system interaction.


Connection Flow:

  1. Operator creates Hook with conn_id
  2. Hook retrieves Connection from metadata DB
  3. Hook creates authenticated client
  4. Client interacts with external service

Common Hooks:

HookServiceKey Methods
S3HookAWS S3upload_data, download_file, list_keys
PostgresHookPostgreSQLget_conn, run, get_records
HttpHookREST APIsrun, get_conn
SlackWebhookHookSlacksend_text

Custom Hooks: Inherit from BaseHook, implement connection logic for internal services.


Custom Operator Development

A well-designed operator should be idempotent, handle errors gracefully, and provide clear documentation.

Operator Retry Backoff
wn=bβ‹…2nforΒ attemptΒ n=0,1,2,…,Nmax⁑w_n = b \cdot 2^n \quad \text{for attempt } n = 0, 1, 2, \ldots, N_{\max}

Here,

  • bb=Base delay (seconds)
  • nn=Attempt number (0-indexed)
  • Nmax⁑N_{\max}=Maximum retry count

Total Retry Window

Wtotal=βˆ‘n=0Nmax⁑bβ‹…2n=b(2Nmax⁑+1βˆ’1)W_{\text{total}} = \sum_{n=0}^{N_{\max}} b \cdot 2^n = b(2^{N_{\max}+1} - 1)

Here,

  • WtotalW_{\text{total}}=Maximum total time spent retrying

ThIdempotency for Safe Retries

An operator OO is retry-safe if and only if it is idempotent: O(O(x))=O(x)O(O(x)) = O(x). Without idempotency, retries may cause data corruption, duplicate side effects, or inconsistent state. Implementation: Use unique identifiers, upsert operations, and atomic transactions to achieve idempotency.

Hooks manage connection pooling automatically. For high-throughput scenarios, configure pool_size and max_overflow in the hook's connection parameters. This prevents connection exhaustion under concurrent task execution.

When implementing custom operators, always set template_fields for parameters that support Jinja templating. This enables runtime parameter injection and makes operators more flexible across different DAG configurations.

Operator Structure: Custom operators inherit from BaseOperator and implement the execute method. The constructor accepts configuration parameters, and the execute method performs the actual work. Use template_fields to enable Jinja templating for parameters.

Serialization: For dynamic DAGs, implement serialize and serialize_downstream methods to enable proper serialization of operator arguments. This ensures that the operator can be properly reconstructed in different contexts.

Testing: Write unit tests for custom operators using Airflow's test utilities. Test both success and failure scenarios, and verify that the operator behaves correctly with different input parameters.

Connection and Hook Security

Security Best Practices:

PracticeDescription
Credential ManagementNever hardcode credentials β€” use connections or environment variables
Secret BackendsImplement HashiCorp Vault, AWS Secrets Manager for production
Connection EncryptionUse HTTPS, SSL/TLS protocols
Access ControlApply least-privilege principles for hook access

Key Concepts Table

ComponentPurposeExampleUse Case
BaseOperatorBase class for all operatorsCustom operatorsExtending functionality
PythonOperatorExecute Python functionsData processingCustom logic execution
BashOperatorExecute shell commandsSystem operationsScript execution
SensorsWait for conditionsFile availabilityExternal dependency
Transfer OperatorsMove data between systemsS3 to RedshiftData movement
HooksConnect to external servicesS3HookService integration
ConnectionsStore credentialsDatabase connectionsSecure access
TemplatesDynamic parameters{{ ds }}Runtime configuration

Operator Parameter Reference

# BaseOperator parameters explained
from datetime import timedelta

task = PythonOperator(
    # Required parameters
    task_id='my_task',                    # Unique identifier within DAG
    python_callable=my_function,          # Function to execute

    # Execution parameters
    op_kwargs={'key': 'value'},           # Keyword arguments to callable
    op_args=[1, 2, 3],                   # Positional arguments to callable
    templates_dict={'key': '{{ ds }}'},  # Template-enabled dict

    # Retry configuration
    retries=3,                           # Number of retry attempts
    retry_delay=timedelta(minutes=5),    # Delay between retries
    retry_exponential_backoff=True,      # Exponential backoff
    max_retry_delay=timedelta(hours=1),  # Maximum retry delay

    # Execution control
    execution_timeout=timedelta(hours=2),# Maximum execution time
    pool='my_pool',                      # Pool for resource management
    priority_weight=10,                  # Priority in queue (1-1000)

    # Resource requirements
    resources={'cpu': 2, 'memory': 4096},  # CPU cores, Memory in MB

    # Callbacks
    on_failure_callback=handle_failure,   # Called on task failure
    on_success_callback=handle_success,  # Called on task success
    on_retry_callback=handle_retry,      # Called on retry

    # Dependencies
    depends_on_past=False,               # Wait for previous run
    wait_for_downstream=False,           # Wait for downstream tasks

    # Other
    owner='data-team',                   # Task owner
    weight_rule='absolute',              # Weight rule for priority
    dag=my_dag,                          # DAG object (optional in context)
)

Code Examples

Custom Operator with Advanced Features

# custom_operator.py
from typing import Any, Dict, Optional
from datetime import datetime
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import requests
import json

class DataValidationOperator(BaseOperator):
    """
    Custom operator for data validation with multiple validation rules.

    This operator validates data against defined rules and generates
    validation reports.

    :param data_source: Source of data to validate
    :param validation_rules: Dictionary of validation rules
    :param report_destination: Where to store validation report
    :param alert_on_failure: Whether to send alerts on validation failure
    """

    # Fields that support Jinja templating
    template_fields = ('data_source', 'report_destination')
    ui_color = '#4CAF50'
    ui_fgcolor = '#FFFFFF'

    @apply_defaults
    def __init__(
        self,
        data_source: str,
        validation_rules: Dict[str, Any],
        report_destination: str = '/tmp/validation_report.json',
        alert_on_failure: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.data_source = data_source
        self.validation_rules = validation_rules
        self.report_destination = report_destination
        self.alert_on_failure = alert_on_failure

    def execute(self, context: Dict[str, Any]) -> Any:
        """Execute the validation logic."""
        self.log.info(f"Starting validation for data source: {self.data_source}")

        try:
            # Load data
            data = self._load_data()

            # Run validation rules
            validation_results = self._validate_data(data)

            # Generate report
            report = self._generate_report(validation_results)

            # Store report
            self._store_report(report)

            # Check if validation passed
            if not validation_results['passed']:
                if self.alert_on_failure:
                    self._send_alert(validation_results)
                raise AirflowException(
                    f"Data validation failed: {validation_results['summary']}"
                )

            self.log.info("Data validation completed successfully")
            return report

        except Exception as e:
            self.log.error(f"Validation failed with error: {str(e)}")
            raise

    def _load_data(self) -> Any:
        """Load data from the source."""
        # Implementation depends on data source type
        if self.data_source.startswith('s3://'):
            from airflow.providers.amazon.aws.hooks.s3 import S3Hook
            hook = S3Hook(aws_conn_id='aws_default')
            # Load from S3
        elif self.data_source.startswith('gs://'):
            from airflow.providers.google.cloud.hooks.gcs import GCSHook
            hook = GCSHook(gcp_conn_id='google_cloud_default')
            # Load from GCS
        else:
            # Load from local file
            with open(self.data_source, 'r') as f:
                return json.load(f)

    def _validate_data(self, data: Any) -> Dict[str, Any]:
        """Run validation rules against the data."""
        results = {
            'passed': True,
            'rules_checked': 0,
            'rules_passed': 0,
            'rules_failed': 0,
            'failed_rules': [],
            'summary': '',
        }

        for rule_name, rule_config in self.validation_rules.items():
            results['rules_checked'] += 1

            try:
                rule_passed = self._apply_rule(data, rule_config)
                if rule_passed:
                    results['rules_passed'] += 1
                else:
                    results['rules_failed'] += 1
                    results['passed'] = False
                    results['failed_rules'].append({
                        'rule': rule_name,
                        'message': rule_config.get('failure_message', 'Rule failed'),
                    })
            except Exception as e:
                results['rules_failed'] += 1
                results['passed'] = False
                results['failed_rules'].append({
                    'rule': rule_name,
                    'error': str(e),
                })

        results['summary'] = (
            f"{results['rules_passed']}/{results['rules_checked']} rules passed"
        )

        return results

    def _apply_rule(self, data: Any, rule_config: Dict[str, Any]) -> bool:
        """Apply a single validation rule."""
        rule_type = rule_config.get('type')

        if rule_type == 'not_null':
            return data is not None
        elif rule_type == 'range':
            min_val = rule_config.get('min')
            max_val = rule_config.get('max')
            return min_val <= data <= max_val
        elif rule_type == 'regex':
            import re
            pattern = rule_config.get('pattern')
            return bool(re.match(pattern, str(data)))
        elif rule_type == 'custom':
            # Custom validation logic
            custom_func = rule_config.get('function')
            return custom_func(data)
        else:
            raise ValueError(f"Unknown rule type: {rule_type}")

    def _generate_report(self, validation_results: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a detailed validation report."""
        return {
            'timestamp': datetime.now().isoformat(),
            'data_source': self.data_source,
            'results': validation_results,
            'metadata': {
                'operator': self.__class__.__name__,
                'task_id': self.task_id,
            },
        }

    def _store_report(self, report: Dict[str, Any]) -> None:
        """Store the validation report."""
        with open(self.report_destination, 'w') as f:
            json.dump(report, f, indent=2)

    def _send_alert(self, validation_results: Dict[str, Any]) -> None:
        """Send alert about validation failure."""
        from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

        hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
        message = (
            f"Data Validation Failed\n"
            f"Source: {self.data_source}\n"
            f"Summary: {validation_results['summary']}\n"
            f"Failed Rules: {len(validation_results['failed_rules'])}"
        )
        hook.send(message)

    def on_kill(self) -> None:
        """Handle operator termination."""
        self.log.info("Operator was killed")

Custom Hook Implementation

# custom_hook.py
from typing import Any, Dict, Optional
from airflow.hooks.base import BaseHook
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class CustomAPIHook(BaseHook):
    """
    Custom hook for interacting with external APIs.

    This hook provides a standardized interface for API calls with
    retry logic, authentication, and error handling.

    :param api_conn_id: Airflow connection ID for API credentials
    :param api_base_url: Base URL for API endpoints
    :param timeout: Request timeout in seconds
    :param max_retries: Maximum number of retry attempts
    """

    conn_name_attr = 'api_conn_id'
    default_conn_name = 'custom_api_default'
    conn_type = 'custom_api'
    hook_name = 'Custom API'

    def __init__(
        self,
        api_conn_id: str = default_conn_name,
        api_base_url: Optional[str] = None,
        timeout: int = 30,
        max_retries: int = 3,
    ):
        super().__init__()
        self.api_conn_id = api_conn_id
        self.api_base_url = api_base_url or self._get_base_url()
        self.timeout = timeout
        self.max_retries = max_retries
        self._session = None

    def get_conn(self) -> requests.Session:
        """Get a requests session with retry logic."""
        if self._session is None:
            self._session = requests.Session()

            # Configure retry strategy
            retry_strategy = Retry(
                total=self.max_retries,
                backoff_factor=1,
                status_forcelist=[429, 500, 502, 503, 504],
            )

            adapter = HTTPAdapter(max_retries=retry_strategy)
            self._session.mount("http://", adapter)
            self._session.mount("https://", adapter)

            # Set authentication headers
            headers = self._get_auth_headers()
            self._session.headers.update(headers)

        return self._session

    def _get_base_url(self) -> str:
        """Get base URL from connection."""
        conn = self.get_connection(self.api_conn_id)
        return f"{conn.schema}://{conn.host}:{conn.port}"

    def _get_auth_headers(self) -> Dict[str, str]:
        """Get authentication headers from connection."""
        conn = self.get_connection(self.api_conn_id)
        password = conn.get_password()

        return {
            'Authorization': f'Bearer {password}',
            'Content-Type': 'application/json',
        }

    def make_request(
        self,
        method: str,
        endpoint: str,
        data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """
        Make an API request.

        :param method: HTTP method (GET, POST, PUT, DELETE)
        :param endpoint: API endpoint
        :param data: Request body data
        :param params: Query parameters
        :return: API response
        """
        session = self.get_conn()
        url = f"{self.api_base_url}{endpoint}"

        try:
            response = session.request(
                method=method,
                url=url,
                json=data,
                params=params,
                timeout=self.timeout,
            )
            response.raise_for_status()

            return response.json()

        except requests.exceptions.RequestException as e:
            self.log.error(f"API request failed: {str(e)}")
            raise

    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Make a GET request."""
        return self.make_request('GET', endpoint, params=params)

    def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Make a POST request."""
        return self.make_request('POST', endpoint, data=data)

    def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Make a PUT request."""
        return self.make_request('PUT', endpoint, data=data)

    def delete(self, endpoint: str) -> Dict[str, Any]:
        """Make a DELETE request."""
        return self.make_request('DELETE', endpoint)

    def test_connection(self) -> bool:
        """Test the connection to the API."""
        try:
            response = self.get('/health')
            return response.get('status') == 'ok'
        except Exception:
            return False

Operator with Hook Integration

# operator_with_hook.py
from typing import Any, Dict, Optional
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime

class DataSynchronizationOperator(BaseOperator):
    """
    Operator for synchronizing data between systems using hooks.

    This operator demonstrates how to create operators that integrate
    with multiple hooks for complex data operations.

    :param source_system: Source system identifier
    :param target_system: Target system identifier
    :param sync_config: Synchronization configuration
    :param batch_size: Number of records to process in each batch
    """

    template_fields = ('source_system', 'target_system', 'sync_config')

    @apply_defaults
    def __init__(
        self,
        source_system: str,
        target_system: str,
        sync_config: Dict[str, Any],
        batch_size: int = 1000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.source_system = source_system
        self.target_system = target_system
        self.sync_config = sync_config
        self.batch_size = batch_size

    def execute(self, context: Dict[str, Any]) -> Any:
        """Execute the data synchronization."""
        self.log.info(
            f"Starting sync from {self.source_system} to {self.target_system}"
        )

        # Initialize hooks
        source_hook = self._get_source_hook()
        target_hook = self._get_target_hook()

        try:
            # Extract data from source
            source_data = self._extract_data(source_hook)

            # Transform data if needed
            transformed_data = self._transform_data(source_data)

            # Load data to target
            sync_results = self._load_data(target_hook, transformed_data)

            # Validate synchronization
            self._validate_sync(source_hook, target_hook, sync_results)

            self.log.info("Data synchronization completed successfully")
            return sync_results

        except Exception as e:
            self.log.error(f"Synchronization failed: {str(e)}")
            raise

    def _get_source_hook(self) -> Any:
        """Get appropriate hook for source system."""
        if self.source_system == 'postgresql':
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            return PostgresHook(postgres_conn_id='source_postgres')
        elif self.source_system == 'mysql':
            from airflow.providers.mysql.hooks.mysql import MySqlHook
            return MySqlHook(mysql_conn_id='source_mysql')
        elif self.source_system == 's3':
            from airflow.providers.amazon.aws.hooks.s3 import S3Hook
            return S3Hook(aws_conn_id='source_aws')
        else:
            raise ValueError(f"Unsupported source system: {self.source_system}")

    def _get_target_hook(self) -> Any:
        """Get appropriate hook for target system."""
        if self.target_system == 'postgresql':
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            return PostgresHook(postgres_conn_id='target_postgres')
        elif self.target_system == 'redshift':
            from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
            return RedshiftSQLHook(redshift_conn_id='target_redshift')
        elif self.target_system == 'gcs':
            from airflow.providers.google.cloud.hooks.gcs import GCSHook
            return GCSHook(gcp_conn_id='target_gcs')
        else:
            raise ValueError(f"Unsupported target system: {self.target_system}")

    def _extract_data(self, source_hook: Any) -> Any:
        """Extract data from source system."""
        query = self.sync_config.get('extract_query')

        if hasattr(source_hook, 'get_records'):
            return source_hook.get_records(query)
        elif hasattr(source_hook, 'read_key'):
            return source_hook.read_key(self.sync_config.get('source_key'))
        else:
            raise ValueError("Source hook does not support data extraction")

    def _transform_data(self, data: Any) -> Any:
        """Transform data if needed."""
        transformations = self.sync_config.get('transformations', [])

        for transformation in transformations:
            if transformation['type'] == 'filter':
                data = [row for row in data if self._apply_filter(row, transformation)]
            elif transformation['type'] == 'map':
                data = [self._apply_mapping(row, transformation) for row in data]
            elif transformation['type'] == 'aggregate':
                data = self._apply_aggregation(data, transformation)

        return data

    def _load_data(self, target_hook: Any, data: Any) -> Dict[str, Any]:
        """Load data to target system."""
        results = {'records_loaded': 0, 'errors': []}

        # Process in batches
        for i in range(0, len(data), self.batch_size):
            batch = data[i:i + self.batch_size]

            try:
                if hasattr(target_hook, 'insert_rows'):
                    target_hook.insert_rows(
                        table=self.sync_config.get('target_table'),
                        rows=batch,
                    )
                elif hasattr(target_hook, 'upload'):
                    target_hook.upload(
                        bucket_name=self.sync_config.get('target_bucket'),
                        object_name=f"batch_{i}.json",
                        data=str(batch),
                    )

                results['records_loaded'] += len(batch)

            except Exception as e:
                results['errors'].append({
                    'batch_start': i,
                    'error': str(e),
                })

        return results

    def _validate_sync(
        self,
        source_hook: Any,
        target_hook: Any,
        sync_results: Dict[str, Any]
    ) -> None:
        """Validate that synchronization was successful."""
        if sync_results['errors']:
            raise ValueError(
                f"Sync completed with errors: {sync_results['errors']}"
            )

        # Additional validation logic can be added here
        self.log.info(
            f"Sync validation passed: {sync_results['records_loaded']} records loaded"
        )

    def _apply_filter(self, row: Any, transformation: Dict[str, Any]) -> bool:
        """Apply filter transformation."""
        column = transformation.get('column')
        operator = transformation.get('operator')
        value = transformation.get('value')

        if operator == 'eq':
            return row[column] == value
        elif operator == 'gt':
            return row[column] > value
        elif operator == 'lt':
            return row[column] < value
        elif operator == 'contains':
            return value in str(row[column])
        else:
            return True

    def _apply_mapping(self, row: Any, transformation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply mapping transformation."""
        mapping = transformation.get('mapping', {})
        return {mapping.get(k, k): v for k, v in row.items()}

    def _apply_aggregation(self, data: Any, transformation: Dict[str, Any]) -> Any:
        """Apply aggregation transformation."""
        group_by = transformation.get('group_by')
        aggregate_column = transformation.get('aggregate_column')
        aggregate_func = transformation.get('aggregate_function', 'sum')

        groups = {}
        for row in data:
            key = row[group_by]
            if key not in groups:
                groups[key] = []
            groups[key].append(row[aggregate_column])

        result = []
        for key, values in groups.items():
            if aggregate_func == 'sum':
                aggregated = sum(values)
            elif aggregate_func == 'avg':
                aggregated = sum(values) / len(values)
            elif aggregate_func == 'count':
                aggregated = len(values)
            else:
                aggregated = values

            result.append({group_by: key, aggregate_column: aggregated})

        return result

Performance Metrics

Operator Performance

MetricDescriptionOptimization StrategyWarning Threshold
Operator Execution TimeTime to complete operator executionOptimize external calls, use batching> 1 hour
Hook Connection TimeTime to establish connectionConnection pooling, caching> 30 seconds
API Response TimeExternal API response timeAsync operations, retry logic> 10 seconds
Data Transfer RateAmount of data movedCompression, streaming, batching< 1 MB/s
Error RatePercentage of failed operationsRetry logic, circuit breakers> 5%
Memory UsageOperator memory footprintStreaming processing, pagination> 1 GB
CPU UtilizationProcessing power usageParallel execution, optimization> 90%
Connection Pool SizeActive connectionsPool tuning, monitoringPool exhausted

Hook Connection Patterns

Operator Execution Timing

# Measure operator execution time
import time
from functools import wraps

def timing_operator(func):
    """Decorator to measure operator execution time."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        print(f"Operator executed in {duration:.2f} seconds")
        return result
    return wrapper

@timing_operator
def my_operator_logic(**context):
    """Example operator with timing."""
    # Perform work
    time.sleep(2)
    return {"status": "success"}

Best Practices

1. Operator Design

Keep operators focused on single responsibilities. Implement idempotency to handle retries safely. Use template fields for dynamic parameters.

# Good: Single responsibility
class ExtractDataOperator(BaseOperator):
    """Extracts data from source system."""
    template_fields = ('source_path', 'extraction_query')

# Bad: Multiple responsibilities
class ETLCompleteOperator(BaseOperator):
    """Does everything - hard to retry, debug, reuse."""
    pass

2. Hook Implementation

Follow the hook pattern for connection management. Implement proper error handling and retry logic. Use connection pooling for high-throughput scenarios.

class CustomHook(BaseHook):
    """Hook for custom API integration."""

    # Required class attributes
    conn_name_attr = 'custom_conn_id'
    default_conn_name = 'custom_default'
    conn_type = 'custom'
    hook_name = 'Custom API'

    def __init__(self, custom_conn_id=None):
        super().__init__()
        self.custom_conn_id = custom_conn_id or self.default_conn_name

    def get_conn(self):
        """Get API client with connection pooling."""
        conn = self.get_connection(self.custom_conn_id)
        # Return configured client
        return client

3. Error Handling

Implement comprehensive error handling with meaningful error messages. Use Airflow's exception classes for proper error propagation.

from airflow.exceptions import (
    AirflowException,
    AirflowFailException,      # Marks task as failed without retry
    AirflowSkipException,      # Skips the task
    AirflowSensorTimeout,      # Sensor timeout
    AirflowTaskTimeout,        # Task execution timeout
)

def execute(self, context):
    try:
        result = self._do_work()
    except ConnectionError as e:
        # Retryable error
        raise AirflowException(f"Connection failed: {e}")
    except ValidationError as e:
        # Non-retryable error
        raise AirflowFailException(f"Invalid input: {e}")
    except DataNotFound:
        # Skip task
        raise AirflowSkipException("No data to process")

4. Testing

Write unit tests for operators and hooks. Mock external dependencies to ensure test isolation. Test both success and failure scenarios.

import pytest
from unittest.mock import Mock, patch
from airflow.models import DagBag

class TestMyOperator:
    """Unit tests for custom operator."""

    def test_execute_success(self):
        """Test successful execution."""
        operator = MyOperator(
            task_id='test',
            source='test_source',
        )
        context = {'ds': '2024-01-01'}

        with patch('my_hook.MyHook') as mock_hook:
            mock_hook.return_value.get_data.return_value = ['data']
            result = operator.execute(context)

        assert result == {'status': 'success'}

    def test_execute_failure(self):
        """Test failure handling."""
        operator = MyOperator(
            task_id='test',
            source='test_source',
        )
        context = {'ds': '2024-01-01'}

        with patch('my_hook.MyHook') as mock_hook:
            mock_hook.return_value.get_data.side_effect = ConnectionError
            with pytest.raises(AirflowException):
                operator.execute(context)

5. Documentation

Provide clear documentation for custom operators and hooks. Include usage examples and parameter descriptions. Document any external dependencies.

class DataQualityOperator(BaseOperator):
    """
    Operator for validating data quality against defined rules.

    This operator loads data from a source, applies validation rules,
    and generates a quality report. If validation fails, the task
    raises an exception.

    :param data_source: Path or URI to the data source
    :param validation_rules: Dict of rule_name -> rule_config
    :param report_path: Where to save the validation report
    :param fail_on_error: If True, raises exception on validation failure

    Example:
        >>> operator = DataQualityOperator(
        ...     task_id='validate_orders',
        ...     data_source='s3://bucket/orders.parquet',
        ...     validation_rules={
        ...         'not_null': {'columns': ['order_id', 'customer_id']},
        ...         'range': {'column': 'amount', 'min': 0, 'max': 10000}
        ...     },
        ...     report_path='/tmp/quality_report.json',
        ... )
    """

    template_fields = ('data_source', 'report_path')

6. Security

Never hardcode credentials. Use Airflow's connection system for credential management. Implement proper access controls for sensitive operations.

# Bad: Hardcoded credentials
hook = S3Hook(aws_access_key_id='AKIA...', aws_secret_access_key='...')

# Good: Use Airflow connections
hook = S3Hook(aws_conn_id='aws_default')

# Good: Use environment variables for sensitive config
import os
api_key = os.environ.get('API_KEY')

7. Performance

Optimize external calls to minimize latency. Use batching for bulk operations. Implement connection pooling for high-throughput scenarios.

8. Monitoring

Add logging for important operations. Implement custom metrics for monitoring. Use Airflow's callback system for alerting.

9. Maintainability

Write clean, readable code with proper separation of concerns. Follow Python coding standards. Use type hints for better code clarity.

10. Reusability

Design operators and hooks for reuse across multiple DAGs. Use configuration to customize behavior. Implement proper abstraction layers.

Key Takeaways:

  • Operators encapsulate unit-of-work logic; hooks manage external connections
  • Idempotency (O(O(x))=O(x)O(O(x)) = O(x)) is required for safe retries
  • Total retry window grows exponentially: W=b(2Nmax⁑+1βˆ’1)W = b(2^{N_{\max}+1} - 1)
  • Hooks abstract connection details, authentication, and client creation
  • Custom operators must implement execute() and define template_fields
  • Never hardcode credentials; use Airflow's connection system

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

See Also

⭐

Premium Content

Apache Airflow Operators and Hooks

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement