Apache Airflow Operators and Hooks
Architecture Diagram
Formal Definitions
DfOperator
An operator is an atomic unit of work in Airflow. Formally, an operator consists of a unique task identifier, an execution function, and configuration parameters. The execute(context) method performs the actual computation.
DfHook
A hook manages connection details and client creation for external systems. A hook encapsulates a connection object , a client instance, and methods for interacting with the external service. Hooks handle retries, authentication, and connection pooling.
DfSensor
A sensor is a specialized operator that polls an external condition at intervals until the condition is met or a timeout is reached. It returns True when satisfied, False otherwise.
Detailed Explanation
Built-in Operators
Airflow provides a rich set of built-in operators for common tasks.
Operator Categories
| Category | Operators | Description |
|---|---|---|
| Action | PythonOperator, BashOperator, EmailOperator, DummyOperator | Execute code/commands |
| Transfer | S3ToRedshiftOperator, GCSToLocalOperator, PostgresToGCSOperator | Move data between systems |
| Sensors | FileSensor, HttpSensor, SqlSensor | Poll for conditions |
| External Services | SlackWebhookOperator, JiraOperator, KubernetesPodOperator | API integrations |
Hooks
Hooks are low-level building blocks for external system interaction.
Connection Flow:
- Operator creates Hook with
conn_id - Hook retrieves Connection from metadata DB
- Hook creates authenticated client
- Client interacts with external service
Common Hooks:
| Hook | Service | Key Methods |
|---|---|---|
S3Hook | AWS S3 | upload_data, download_file, list_keys |
PostgresHook | PostgreSQL | get_conn, run, get_records |
HttpHook | REST APIs | run, get_conn |
SlackWebhookHook | Slack | send_text |
Custom Hooks: Inherit from BaseHook, implement connection logic for internal services.
Custom Operator Development
A well-designed operator should be idempotent, handle errors gracefully, and provide clear documentation.
Here,
- =Base delay (seconds)
- =Attempt number (0-indexed)
- =Maximum retry count
Total Retry Window
Here,
- =Maximum total time spent retrying
ThIdempotency for Safe Retries
An operator is retry-safe if and only if it is idempotent: . Without idempotency, retries may cause data corruption, duplicate side effects, or inconsistent state. Implementation: Use unique identifiers, upsert operations, and atomic transactions to achieve idempotency.
Hooks manage connection pooling automatically. For high-throughput scenarios, configure pool_size and max_overflow in the hook's connection parameters. This prevents connection exhaustion under concurrent task execution.
When implementing custom operators, always set template_fields for parameters that support Jinja templating. This enables runtime parameter injection and makes operators more flexible across different DAG configurations.
Operator Structure: Custom operators inherit from BaseOperator and implement the execute method. The constructor accepts configuration parameters, and the execute method performs the actual work. Use template_fields to enable Jinja templating for parameters.
Serialization: For dynamic DAGs, implement serialize and serialize_downstream methods to enable proper serialization of operator arguments. This ensures that the operator can be properly reconstructed in different contexts.
Testing: Write unit tests for custom operators using Airflow's test utilities. Test both success and failure scenarios, and verify that the operator behaves correctly with different input parameters.
Connection and Hook Security
Security Best Practices:
| Practice | Description |
|---|---|
| Credential Management | Never hardcode credentials β use connections or environment variables |
| Secret Backends | Implement HashiCorp Vault, AWS Secrets Manager for production |
| Connection Encryption | Use HTTPS, SSL/TLS protocols |
| Access Control | Apply least-privilege principles for hook access |
Key Concepts Table
| Component | Purpose | Example | Use Case |
|---|---|---|---|
| BaseOperator | Base class for all operators | Custom operators | Extending functionality |
| PythonOperator | Execute Python functions | Data processing | Custom logic execution |
| BashOperator | Execute shell commands | System operations | Script execution |
| Sensors | Wait for conditions | File availability | External dependency |
| Transfer Operators | Move data between systems | S3 to Redshift | Data movement |
| Hooks | Connect to external services | S3Hook | Service integration |
| Connections | Store credentials | Database connections | Secure access |
| Templates | Dynamic parameters | {{ ds }} | Runtime configuration |
Operator Parameter Reference
# BaseOperator parameters explained
from datetime import timedelta
task = PythonOperator(
# Required parameters
task_id='my_task', # Unique identifier within DAG
python_callable=my_function, # Function to execute
# Execution parameters
op_kwargs={'key': 'value'}, # Keyword arguments to callable
op_args=[1, 2, 3], # Positional arguments to callable
templates_dict={'key': '{{ ds }}'}, # Template-enabled dict
# Retry configuration
retries=3, # Number of retry attempts
retry_delay=timedelta(minutes=5), # Delay between retries
retry_exponential_backoff=True, # Exponential backoff
max_retry_delay=timedelta(hours=1), # Maximum retry delay
# Execution control
execution_timeout=timedelta(hours=2),# Maximum execution time
pool='my_pool', # Pool for resource management
priority_weight=10, # Priority in queue (1-1000)
# Resource requirements
resources={'cpu': 2, 'memory': 4096}, # CPU cores, Memory in MB
# Callbacks
on_failure_callback=handle_failure, # Called on task failure
on_success_callback=handle_success, # Called on task success
on_retry_callback=handle_retry, # Called on retry
# Dependencies
depends_on_past=False, # Wait for previous run
wait_for_downstream=False, # Wait for downstream tasks
# Other
owner='data-team', # Task owner
weight_rule='absolute', # Weight rule for priority
dag=my_dag, # DAG object (optional in context)
)
Code Examples
Custom Operator with Advanced Features
# custom_operator.py
from typing import Any, Dict, Optional
from datetime import datetime
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import requests
import json
class DataValidationOperator(BaseOperator):
"""
Custom operator for data validation with multiple validation rules.
This operator validates data against defined rules and generates
validation reports.
:param data_source: Source of data to validate
:param validation_rules: Dictionary of validation rules
:param report_destination: Where to store validation report
:param alert_on_failure: Whether to send alerts on validation failure
"""
# Fields that support Jinja templating
template_fields = ('data_source', 'report_destination')
ui_color = '#4CAF50'
ui_fgcolor = '#FFFFFF'
@apply_defaults
def __init__(
self,
data_source: str,
validation_rules: Dict[str, Any],
report_destination: str = '/tmp/validation_report.json',
alert_on_failure: bool = True,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.data_source = data_source
self.validation_rules = validation_rules
self.report_destination = report_destination
self.alert_on_failure = alert_on_failure
def execute(self, context: Dict[str, Any]) -> Any:
"""Execute the validation logic."""
self.log.info(f"Starting validation for data source: {self.data_source}")
try:
# Load data
data = self._load_data()
# Run validation rules
validation_results = self._validate_data(data)
# Generate report
report = self._generate_report(validation_results)
# Store report
self._store_report(report)
# Check if validation passed
if not validation_results['passed']:
if self.alert_on_failure:
self._send_alert(validation_results)
raise AirflowException(
f"Data validation failed: {validation_results['summary']}"
)
self.log.info("Data validation completed successfully")
return report
except Exception as e:
self.log.error(f"Validation failed with error: {str(e)}")
raise
def _load_data(self) -> Any:
"""Load data from the source."""
# Implementation depends on data source type
if self.data_source.startswith('s3://'):
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id='aws_default')
# Load from S3
elif self.data_source.startswith('gs://'):
from airflow.providers.google.cloud.hooks.gcs import GCSHook
hook = GCSHook(gcp_conn_id='google_cloud_default')
# Load from GCS
else:
# Load from local file
with open(self.data_source, 'r') as f:
return json.load(f)
def _validate_data(self, data: Any) -> Dict[str, Any]:
"""Run validation rules against the data."""
results = {
'passed': True,
'rules_checked': 0,
'rules_passed': 0,
'rules_failed': 0,
'failed_rules': [],
'summary': '',
}
for rule_name, rule_config in self.validation_rules.items():
results['rules_checked'] += 1
try:
rule_passed = self._apply_rule(data, rule_config)
if rule_passed:
results['rules_passed'] += 1
else:
results['rules_failed'] += 1
results['passed'] = False
results['failed_rules'].append({
'rule': rule_name,
'message': rule_config.get('failure_message', 'Rule failed'),
})
except Exception as e:
results['rules_failed'] += 1
results['passed'] = False
results['failed_rules'].append({
'rule': rule_name,
'error': str(e),
})
results['summary'] = (
f"{results['rules_passed']}/{results['rules_checked']} rules passed"
)
return results
def _apply_rule(self, data: Any, rule_config: Dict[str, Any]) -> bool:
"""Apply a single validation rule."""
rule_type = rule_config.get('type')
if rule_type == 'not_null':
return data is not None
elif rule_type == 'range':
min_val = rule_config.get('min')
max_val = rule_config.get('max')
return min_val <= data <= max_val
elif rule_type == 'regex':
import re
pattern = rule_config.get('pattern')
return bool(re.match(pattern, str(data)))
elif rule_type == 'custom':
# Custom validation logic
custom_func = rule_config.get('function')
return custom_func(data)
else:
raise ValueError(f"Unknown rule type: {rule_type}")
def _generate_report(self, validation_results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a detailed validation report."""
return {
'timestamp': datetime.now().isoformat(),
'data_source': self.data_source,
'results': validation_results,
'metadata': {
'operator': self.__class__.__name__,
'task_id': self.task_id,
},
}
def _store_report(self, report: Dict[str, Any]) -> None:
"""Store the validation report."""
with open(self.report_destination, 'w') as f:
json.dump(report, f, indent=2)
def _send_alert(self, validation_results: Dict[str, Any]) -> None:
"""Send alert about validation failure."""
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
message = (
f"Data Validation Failed\n"
f"Source: {self.data_source}\n"
f"Summary: {validation_results['summary']}\n"
f"Failed Rules: {len(validation_results['failed_rules'])}"
)
hook.send(message)
def on_kill(self) -> None:
"""Handle operator termination."""
self.log.info("Operator was killed")
Custom Hook Implementation
# custom_hook.py
from typing import Any, Dict, Optional
from airflow.hooks.base import BaseHook
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class CustomAPIHook(BaseHook):
"""
Custom hook for interacting with external APIs.
This hook provides a standardized interface for API calls with
retry logic, authentication, and error handling.
:param api_conn_id: Airflow connection ID for API credentials
:param api_base_url: Base URL for API endpoints
:param timeout: Request timeout in seconds
:param max_retries: Maximum number of retry attempts
"""
conn_name_attr = 'api_conn_id'
default_conn_name = 'custom_api_default'
conn_type = 'custom_api'
hook_name = 'Custom API'
def __init__(
self,
api_conn_id: str = default_conn_name,
api_base_url: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
):
super().__init__()
self.api_conn_id = api_conn_id
self.api_base_url = api_base_url or self._get_base_url()
self.timeout = timeout
self.max_retries = max_retries
self._session = None
def get_conn(self) -> requests.Session:
"""Get a requests session with retry logic."""
if self._session is None:
self._session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Set authentication headers
headers = self._get_auth_headers()
self._session.headers.update(headers)
return self._session
def _get_base_url(self) -> str:
"""Get base URL from connection."""
conn = self.get_connection(self.api_conn_id)
return f"{conn.schema}://{conn.host}:{conn.port}"
def _get_auth_headers(self) -> Dict[str, str]:
"""Get authentication headers from connection."""
conn = self.get_connection(self.api_conn_id)
password = conn.get_password()
return {
'Authorization': f'Bearer {password}',
'Content-Type': 'application/json',
}
def make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Make an API request.
:param method: HTTP method (GET, POST, PUT, DELETE)
:param endpoint: API endpoint
:param data: Request body data
:param params: Query parameters
:return: API response
"""
session = self.get_conn()
url = f"{self.api_base_url}{endpoint}"
try:
response = session.request(
method=method,
url=url,
json=data,
params=params,
timeout=self.timeout,
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.log.error(f"API request failed: {str(e)}")
raise
def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a GET request."""
return self.make_request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a POST request."""
return self.make_request('POST', endpoint, data=data)
def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a PUT request."""
return self.make_request('PUT', endpoint, data=data)
def delete(self, endpoint: str) -> Dict[str, Any]:
"""Make a DELETE request."""
return self.make_request('DELETE', endpoint)
def test_connection(self) -> bool:
"""Test the connection to the API."""
try:
response = self.get('/health')
return response.get('status') == 'ok'
except Exception:
return False
Operator with Hook Integration
# operator_with_hook.py
from typing import Any, Dict, Optional
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime
class DataSynchronizationOperator(BaseOperator):
"""
Operator for synchronizing data between systems using hooks.
This operator demonstrates how to create operators that integrate
with multiple hooks for complex data operations.
:param source_system: Source system identifier
:param target_system: Target system identifier
:param sync_config: Synchronization configuration
:param batch_size: Number of records to process in each batch
"""
template_fields = ('source_system', 'target_system', 'sync_config')
@apply_defaults
def __init__(
self,
source_system: str,
target_system: str,
sync_config: Dict[str, Any],
batch_size: int = 1000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.source_system = source_system
self.target_system = target_system
self.sync_config = sync_config
self.batch_size = batch_size
def execute(self, context: Dict[str, Any]) -> Any:
"""Execute the data synchronization."""
self.log.info(
f"Starting sync from {self.source_system} to {self.target_system}"
)
# Initialize hooks
source_hook = self._get_source_hook()
target_hook = self._get_target_hook()
try:
# Extract data from source
source_data = self._extract_data(source_hook)
# Transform data if needed
transformed_data = self._transform_data(source_data)
# Load data to target
sync_results = self._load_data(target_hook, transformed_data)
# Validate synchronization
self._validate_sync(source_hook, target_hook, sync_results)
self.log.info("Data synchronization completed successfully")
return sync_results
except Exception as e:
self.log.error(f"Synchronization failed: {str(e)}")
raise
def _get_source_hook(self) -> Any:
"""Get appropriate hook for source system."""
if self.source_system == 'postgresql':
from airflow.providers.postgres.hooks.postgres import PostgresHook
return PostgresHook(postgres_conn_id='source_postgres')
elif self.source_system == 'mysql':
from airflow.providers.mysql.hooks.mysql import MySqlHook
return MySqlHook(mysql_conn_id='source_mysql')
elif self.source_system == 's3':
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
return S3Hook(aws_conn_id='source_aws')
else:
raise ValueError(f"Unsupported source system: {self.source_system}")
def _get_target_hook(self) -> Any:
"""Get appropriate hook for target system."""
if self.target_system == 'postgresql':
from airflow.providers.postgres.hooks.postgres import PostgresHook
return PostgresHook(postgres_conn_id='target_postgres')
elif self.target_system == 'redshift':
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
return RedshiftSQLHook(redshift_conn_id='target_redshift')
elif self.target_system == 'gcs':
from airflow.providers.google.cloud.hooks.gcs import GCSHook
return GCSHook(gcp_conn_id='target_gcs')
else:
raise ValueError(f"Unsupported target system: {self.target_system}")
def _extract_data(self, source_hook: Any) -> Any:
"""Extract data from source system."""
query = self.sync_config.get('extract_query')
if hasattr(source_hook, 'get_records'):
return source_hook.get_records(query)
elif hasattr(source_hook, 'read_key'):
return source_hook.read_key(self.sync_config.get('source_key'))
else:
raise ValueError("Source hook does not support data extraction")
def _transform_data(self, data: Any) -> Any:
"""Transform data if needed."""
transformations = self.sync_config.get('transformations', [])
for transformation in transformations:
if transformation['type'] == 'filter':
data = [row for row in data if self._apply_filter(row, transformation)]
elif transformation['type'] == 'map':
data = [self._apply_mapping(row, transformation) for row in data]
elif transformation['type'] == 'aggregate':
data = self._apply_aggregation(data, transformation)
return data
def _load_data(self, target_hook: Any, data: Any) -> Dict[str, Any]:
"""Load data to target system."""
results = {'records_loaded': 0, 'errors': []}
# Process in batches
for i in range(0, len(data), self.batch_size):
batch = data[i:i + self.batch_size]
try:
if hasattr(target_hook, 'insert_rows'):
target_hook.insert_rows(
table=self.sync_config.get('target_table'),
rows=batch,
)
elif hasattr(target_hook, 'upload'):
target_hook.upload(
bucket_name=self.sync_config.get('target_bucket'),
object_name=f"batch_{i}.json",
data=str(batch),
)
results['records_loaded'] += len(batch)
except Exception as e:
results['errors'].append({
'batch_start': i,
'error': str(e),
})
return results
def _validate_sync(
self,
source_hook: Any,
target_hook: Any,
sync_results: Dict[str, Any]
) -> None:
"""Validate that synchronization was successful."""
if sync_results['errors']:
raise ValueError(
f"Sync completed with errors: {sync_results['errors']}"
)
# Additional validation logic can be added here
self.log.info(
f"Sync validation passed: {sync_results['records_loaded']} records loaded"
)
def _apply_filter(self, row: Any, transformation: Dict[str, Any]) -> bool:
"""Apply filter transformation."""
column = transformation.get('column')
operator = transformation.get('operator')
value = transformation.get('value')
if operator == 'eq':
return row[column] == value
elif operator == 'gt':
return row[column] > value
elif operator == 'lt':
return row[column] < value
elif operator == 'contains':
return value in str(row[column])
else:
return True
def _apply_mapping(self, row: Any, transformation: Dict[str, Any]) -> Dict[str, Any]:
"""Apply mapping transformation."""
mapping = transformation.get('mapping', {})
return {mapping.get(k, k): v for k, v in row.items()}
def _apply_aggregation(self, data: Any, transformation: Dict[str, Any]) -> Any:
"""Apply aggregation transformation."""
group_by = transformation.get('group_by')
aggregate_column = transformation.get('aggregate_column')
aggregate_func = transformation.get('aggregate_function', 'sum')
groups = {}
for row in data:
key = row[group_by]
if key not in groups:
groups[key] = []
groups[key].append(row[aggregate_column])
result = []
for key, values in groups.items():
if aggregate_func == 'sum':
aggregated = sum(values)
elif aggregate_func == 'avg':
aggregated = sum(values) / len(values)
elif aggregate_func == 'count':
aggregated = len(values)
else:
aggregated = values
result.append({group_by: key, aggregate_column: aggregated})
return result
Performance Metrics
Operator Performance
| Metric | Description | Optimization Strategy | Warning Threshold |
|---|---|---|---|
| Operator Execution Time | Time to complete operator execution | Optimize external calls, use batching | > 1 hour |
| Hook Connection Time | Time to establish connection | Connection pooling, caching | > 30 seconds |
| API Response Time | External API response time | Async operations, retry logic | > 10 seconds |
| Data Transfer Rate | Amount of data moved | Compression, streaming, batching | < 1 MB/s |
| Error Rate | Percentage of failed operations | Retry logic, circuit breakers | > 5% |
| Memory Usage | Operator memory footprint | Streaming processing, pagination | > 1 GB |
| CPU Utilization | Processing power usage | Parallel execution, optimization | > 90% |
| Connection Pool Size | Active connections | Pool tuning, monitoring | Pool exhausted |
Hook Connection Patterns
Operator Execution Timing
# Measure operator execution time
import time
from functools import wraps
def timing_operator(func):
"""Decorator to measure operator execution time."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"Operator executed in {duration:.2f} seconds")
return result
return wrapper
@timing_operator
def my_operator_logic(**context):
"""Example operator with timing."""
# Perform work
time.sleep(2)
return {"status": "success"}
Best Practices
1. Operator Design
Keep operators focused on single responsibilities. Implement idempotency to handle retries safely. Use template fields for dynamic parameters.
# Good: Single responsibility
class ExtractDataOperator(BaseOperator):
"""Extracts data from source system."""
template_fields = ('source_path', 'extraction_query')
# Bad: Multiple responsibilities
class ETLCompleteOperator(BaseOperator):
"""Does everything - hard to retry, debug, reuse."""
pass
2. Hook Implementation
Follow the hook pattern for connection management. Implement proper error handling and retry logic. Use connection pooling for high-throughput scenarios.
class CustomHook(BaseHook):
"""Hook for custom API integration."""
# Required class attributes
conn_name_attr = 'custom_conn_id'
default_conn_name = 'custom_default'
conn_type = 'custom'
hook_name = 'Custom API'
def __init__(self, custom_conn_id=None):
super().__init__()
self.custom_conn_id = custom_conn_id or self.default_conn_name
def get_conn(self):
"""Get API client with connection pooling."""
conn = self.get_connection(self.custom_conn_id)
# Return configured client
return client
3. Error Handling
Implement comprehensive error handling with meaningful error messages. Use Airflow's exception classes for proper error propagation.
from airflow.exceptions import (
AirflowException,
AirflowFailException, # Marks task as failed without retry
AirflowSkipException, # Skips the task
AirflowSensorTimeout, # Sensor timeout
AirflowTaskTimeout, # Task execution timeout
)
def execute(self, context):
try:
result = self._do_work()
except ConnectionError as e:
# Retryable error
raise AirflowException(f"Connection failed: {e}")
except ValidationError as e:
# Non-retryable error
raise AirflowFailException(f"Invalid input: {e}")
except DataNotFound:
# Skip task
raise AirflowSkipException("No data to process")
4. Testing
Write unit tests for operators and hooks. Mock external dependencies to ensure test isolation. Test both success and failure scenarios.
import pytest
from unittest.mock import Mock, patch
from airflow.models import DagBag
class TestMyOperator:
"""Unit tests for custom operator."""
def test_execute_success(self):
"""Test successful execution."""
operator = MyOperator(
task_id='test',
source='test_source',
)
context = {'ds': '2024-01-01'}
with patch('my_hook.MyHook') as mock_hook:
mock_hook.return_value.get_data.return_value = ['data']
result = operator.execute(context)
assert result == {'status': 'success'}
def test_execute_failure(self):
"""Test failure handling."""
operator = MyOperator(
task_id='test',
source='test_source',
)
context = {'ds': '2024-01-01'}
with patch('my_hook.MyHook') as mock_hook:
mock_hook.return_value.get_data.side_effect = ConnectionError
with pytest.raises(AirflowException):
operator.execute(context)
5. Documentation
Provide clear documentation for custom operators and hooks. Include usage examples and parameter descriptions. Document any external dependencies.
class DataQualityOperator(BaseOperator):
"""
Operator for validating data quality against defined rules.
This operator loads data from a source, applies validation rules,
and generates a quality report. If validation fails, the task
raises an exception.
:param data_source: Path or URI to the data source
:param validation_rules: Dict of rule_name -> rule_config
:param report_path: Where to save the validation report
:param fail_on_error: If True, raises exception on validation failure
Example:
>>> operator = DataQualityOperator(
... task_id='validate_orders',
... data_source='s3://bucket/orders.parquet',
... validation_rules={
... 'not_null': {'columns': ['order_id', 'customer_id']},
... 'range': {'column': 'amount', 'min': 0, 'max': 10000}
... },
... report_path='/tmp/quality_report.json',
... )
"""
template_fields = ('data_source', 'report_path')
6. Security
Never hardcode credentials. Use Airflow's connection system for credential management. Implement proper access controls for sensitive operations.
# Bad: Hardcoded credentials
hook = S3Hook(aws_access_key_id='AKIA...', aws_secret_access_key='...')
# Good: Use Airflow connections
hook = S3Hook(aws_conn_id='aws_default')
# Good: Use environment variables for sensitive config
import os
api_key = os.environ.get('API_KEY')
7. Performance
Optimize external calls to minimize latency. Use batching for bulk operations. Implement connection pooling for high-throughput scenarios.
8. Monitoring
Add logging for important operations. Implement custom metrics for monitoring. Use Airflow's callback system for alerting.
9. Maintainability
Write clean, readable code with proper separation of concerns. Follow Python coding standards. Use type hints for better code clarity.
10. Reusability
Design operators and hooks for reuse across multiple DAGs. Use configuration to customize behavior. Implement proper abstraction layers.
Key Takeaways:
- Operators encapsulate unit-of-work logic; hooks manage external connections
- Idempotency () is required for safe retries
- Total retry window grows exponentially:
- Hooks abstract connection details, authentication, and client creation
- Custom operators must implement
execute()and definetemplate_fields - Never hardcode credentials; use Airflow's connection system
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)
See Also
- Airflow Architecture β Core architecture and component overview
- DAG Design Patterns β DAG composition and dependency patterns
- XCom Communications β Task communication and data passing
- Sensors and Operators β Sensor-based operators and poke modes
- Kafka Connect β Kafka integration patterns