πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow Retry Strategies and Error Handling

Apache Airflow AdvancedRetry & Error Handling⭐ Premium

Advertisement

Retry Strategies & Error Handling

Production-Grade Resilience Patterns

GoogleAppleDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Google / Apple Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Design a comprehensive error handling strategy for a critical data pipeline. How do you handle retries, callbacks, alerts, and failures? What metrics should you track?"


Detailed Theory

Error Handling Fundamentals

# error_handling_fundamentals.py
"""
Error Handling in Airflow:

1. Retries:
   - Automatic retry on failure
   - Exponential backoff
   - Retry delays

2. Callbacks:
   - on_success_callback
   - on_failure_callback
   - on_retry_callback

3. Trigger Rules:
   - all_success (default)
   - all_failed
   - all_done
   - one_success
   - one_failed
   - none_failed
   - none_failed_min_one_success

4. Alerting:
   - Email notifications
   - Slack integration
   - Custom webhooks
"""

1. Retry Configuration

# retry_configuration.py
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    dag_id='retry_configuration',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def retry_config():
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        execution_timeout=timedelta(hours=1),
        on_retry_callback=task_retry_callback,
    )
    def task_with_retries() -> dict:
        """Task with retry configuration"""
        import random
        
        # Simulate failure
        if random.random() < 0.5:
            raise ValueError("Random failure")
        
        return {'status': 'success'}
    
    @task(
        retries=5,
        retry_delay=timedelta(seconds=30),
        retry_exponential_backoff=False,
    )
    def task_with_custom_retries() -> dict:
        """Task with custom retry settings"""
        return {'status': 'success'}
    
    # DAG-level retry settings
    task_with_retries() >> task_with_custom_retries()

retry_config()

# Retry Configuration Options
RETRY_CONFIG = """
# Default retry settings in airflow.cfg
[core]
# Default number of retries
default_retries = 1

# Default retry delay
default_retry_delay = timedelta(minutes=1)

# Retry exponential backoff
retry_exponential_backoff = False

# Max retry delay
max_retry_delay = timedelta(minutes=15)

# Execution timeout
default_task_execution_timeout = None
"""

ℹ️Pro Tip

Use exponential backoff for external API calls. This prevents overwhelming the API with too many retry attempts.

2. Callback Functions

# callback_functions.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
import logging

def task_success_callback(context: Dict[str, Any]) -> None:
    """Callback on task success"""
    task_id = context['task_instance'].task_id
    dag_id = context['dag'].dag_id
    run_id = context['run_id']
    
    logging.info(f"Task {task_id} succeeded in {dag_id} ({run_id})")
    
    # Send success notification
    send_notification(
        title=f"Task Success: {task_id}",
        message=f"DAG: {dag_id}, Run: {run_id}",
        severity="info"
    )

def task_failure_callback(context: Dict[str, Any]) -> None:
    """Callback on task failure"""
    task_id = context['task_instance'].task_id
    dag_id = context['dag'].dag_id
    run_id = context['run_id']
    exception = context.get('exception')
    
    logging.error(f"Task {task_id} failed: {exception}")
    
    # Send failure alert
    send_alert(
        title=f"Task Failed: {task_id}",
        message=f"DAG: {dag_id}, Run: {run_id}\nException: {exception}",
        severity="critical"
    )

def task_retry_callback(context: Dict[str, Any]) -> None:
    """Callback on task retry"""
    task_id = context['task_instance'].task_id
    try_number = context['task_instance'].try_number
    
    logging.warning(f"Task {task_id} retrying (attempt {try_number})")
    
    # Send retry notification
    send_notification(
        title=f"Task Retry: {task_id}",
        message=f"Attempt {try_number}",
        severity="warning"
    )

def dag_failure_callback(context: Dict[str, Any]) -> None:
    """Callback on DAG failure"""
    dag_id = context['dag'].dag_id
    run_id = context['run_id']
    
    logging.error(f"DAG {dag_id} failed ({run_id})")
    
    # Send DAG failure alert
    send_alert(
        title=f"DAG Failed: {dag_id}",
        message=f"Run: {run_id}",
        severity="critical"
    )

@dag(
    dag_id='callback_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    on_success_callback=task_success_callback,
    on_failure_callback=dag_failure_callback,
)
def callback_dag():
    @task(
        on_success_callback=task_success_callback,
        on_failure_callback=task_failure_callback,
        on_retry_callback=task_retry_callback,
    )
    def task_with_callbacks() -> dict:
        """Task with all callbacks"""
        return {'status': 'success'}
    
    task_with_callbacks()

callback_dag()

3. Alerting Integration

# alerting_integration.py
from typing import Dict, Any
import logging
import json
import requests

def send_slack_alert(
    title: str,
    message: str,
    severity: str = "info",
    channel: str = "#airflow-alerts"
) -> None:
    """Send alert to Slack"""
    import os
    
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    
    # Color based on severity
    colors = {
        "info": "#36a64f",
        "warning": "#ff9900",
        "critical": "#ff0000"
    }
    
    payload = {
        "channel": channel,
        "attachments": [{
            "color": colors.get(severity, "#000000"),
            "title": title,
            "text": message,
            "footer": "Airflow Alert",
            "ts": int(datetime.now().timestamp())
        }]
    }
    
    response = requests.post(
        webhook_url,
        json=payload,
        timeout=10
    )
    
    if response.status_code != 200:
        logging.error(f"Failed to send Slack alert: {response.text}")

def send_pagerduty_alert(
    title: str,
    message: str,
    severity: str = "critical"
) -> None:
    """Send alert to PagerDuty"""
    import os
    
    routing_key = os.environ.get('PAGERDUTY_ROUTING_KEY')
    
    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "payload": {
            "summary": f"{title}: {message}",
            "severity": severity,
            "source": "airflow",
            "component": "data-pipeline",
            "group": "production",
            "class": "pipeline-failure",
        }
    }
    
    response = requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload,
        timeout=10
    )
    
    if response.status_code != 202:
        logging.error(f"Failed to send PagerDuty alert: {response.text}")

def send_email_alert(
    to: list,
    title: str,
    message: str
) -> None:
    """Send email alert"""
    from airflow.operators.email import EmailOperator
    
    # This is a simplified example
    # In practice, use Airflow's email operator
    pass

# Alert configuration
ALERT_CONFIG = """
# Alerting configuration in airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default

# SMTP settings
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_port = 587
smtp_user = airflow@example.com
smtp_password = {{ secrets.smtp_password }}
smtp_mail_from = airflow@example.com
"""

⚠️Important

Never hardcode credentials in DAG files. Use Airflow Connections or Secrets Backends for sensitive information.

4. Error Handling Patterns

# error_handling_patterns.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
from typing import Dict, Any

@dag(
    dag_id='error_handling_patterns',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def error_patterns():
    @task
    def critical_task() -> Dict[str, Any]:
        """Critical task that must succeed"""
        try:
            result = perform_critical_operation()
            return {'status': 'success', 'result': result}
        except Exception as e:
            logging.error(f"Critical task failed: {e}")
            raise  # Re-raise for retry
    
    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def failure_handler() -> Dict[str, Any]:
        """Handle failure from upstream tasks"""
        # Send alert
        send_alert("Critical task failed", severity="critical")
        
        # Log failure
        logging.error("Pipeline failed, handling gracefully")
        
        return {'status': 'failure_handled'}
    
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup() -> Dict[str, Any]:
        """Cleanup regardless of upstream status"""
        # Cleanup resources
        cleanup_temp_files()
        
        return {'status': 'cleaned'}
    
    @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
    def partial_success_handler(
        task1_result: Dict[str, Any] = None,
        task2_result: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """Handle partial success"""
        if task1_result and task2_result:
            return {'status': 'complete'}
        elif task1_result or task2_result:
            return {'status': 'partial'}
        else:
            return {'status': 'failed'}
    
    # Dependencies
    critical = critical_task()
    failure = failure_handler()
    cleanup_task = cleanup()
    
    critical >> failure >> cleanup_task

error_patterns()

5. Circuit Breaker Pattern

# circuit_breaker.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict
import time
import logging

class CircuitBreakerOperator(BaseOperator):
    """
    Circuit breaker pattern for external systems.
    
    States:
    - CLOSED: Normal operation, requests pass through
    - OPEN: System is failing, requests are blocked
    - HALF_OPEN: Testing if system recovered
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'
    
    def execute(self, context) -> Any:
        """Execute with circuit breaker logic"""
        if self.state == 'OPEN':
            if self._should_try_reset():
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = self._execute_with_retry(context)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _execute_with_retry(self, context) -> Any:
        """Execute with retry logic"""
        # Implementation here
        pass
    
    def _on_success(self):
        """Handle successful execution"""
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def _on_failure(self):
        """Handle failed execution"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
            logging.warning("Circuit breaker OPENED")
    
    def _should_try_reset(self) -> bool:
        """Check if we should try to reset"""
        if self.last_failure_time is None:
            return True
        
        return time.time() - self.last_failure_time > self.recovery_timeout

# Usage
circuit_breaker = CircuitBreakerOperator(
    task_id='circuit_breaker',
    failure_threshold=5,
    recovery_timeout=60,
)

Real-World Scenarios

Scenario 1: Google's Data Pipeline

# google_pipeline.py
"""
Google-style data pipeline with error handling:
- Comprehensive retry logic
- Multi-channel alerting
- Circuit breaker pattern
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    dag_id='google_error_handling',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['google', 'production', 'error-handling'],
)
def google_pipeline():
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        on_failure_callback=task_failure_callback,
        on_retry_callback=task_retry_callback,
    )
    def extract_data() -> Dict[str, Any]:
        """Extract data with retries"""
        try:
            data = fetch_from_api()
            return {'records': len(data), 'status': 'extracted'}
        except Exception as e:
            logging.error(f"Extraction failed: {e}")
            raise
    
    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=task_failure_callback,
    )
    def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform data with error handling"""
        try:
            transformed = apply_transformations(data)
            return {'records': len(transformed), 'status': 'transformed'}
        except Exception as e:
            logging.error(f"Transformation failed: {e}")
            # Send alert
            send_slack_alert(
                title="Transformation Failed",
                message=str(e),
                severity="critical"
            )
            raise
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=1),
        on_failure_callback=task_failure_callback,
    )
    def load_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Load data with circuit breaker"""
        try:
            loaded = load_to_warehouse(data)
            return {'rows': loaded, 'status': 'loaded'}
        except Exception as e:
            logging.error(f"Load failed: {e}")
            raise
    
    @task(trigger_rule='none_failed_min_one_success')
    def notify_success(
        extracted: Dict[str, Any],
        transformed: Dict[str, Any],
        loaded: Dict[str, Any]
    ) -> None:
        """Notify on success"""
        send_slack_alert(
            title="Pipeline Success",
            message=f"Processed {loaded['rows']} rows",
            severity="info"
        )
    
    # Pipeline with error handling
    extracted = extract_data()
    transformed = transform_data(extracted)
    loaded = load_data(transformed)
    notify = notify_success(extracted, transformed, loaded)

google_pipeline()

Scenario 2: Apple's Mission-Critical Pipeline

# apple_pipeline.py
"""
Apple-style mission-critical pipeline:
- Zero tolerance for failures
- Comprehensive monitoring
- Automatic recovery
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    dag_id='apple_mission_critical',
    schedule_interval='*/5 * * * *',  # Every 5 minutes
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['apple', 'mission-critical', 'production'],
)
def apple_pipeline():
    @task(
        retries=5,
        retry_delay=timedelta(seconds=30),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=5),
        execution_timeout=timedelta(minutes=2),
        on_failure_callback=critical_failure_handler,
        on_retry_callback=critical_retry_handler,
    )
    def process_critical_data() -> Dict[str, Any]:
        """Process critical data with zero tolerance"""
        start_time = time.time()
        
        try:
            # Critical processing
            result = perform_critical_processing()
            
            # Track metrics
            track_metric(
                'processing_time',
                time.time() - start_time
            )
            
            return {'status': 'success', 'result': result}
        except Exception as e:
            # Send PagerDuty alert
            send_pagerduty_alert(
                title="Critical Pipeline Failure",
                message=str(e),
                severity="critical"
            )
            raise
    
    @task(trigger_rule='all_done')
    def cleanup_and_monitor() -> Dict[str, Any]:
        """Cleanup and monitor"""
        # Cleanup resources
        cleanup_temp_files()
        
        # Monitor pipeline health
        check_pipeline_health()
        
        return {'status': 'monitored'}
    
    # Pipeline
    result = process_critical_data()
    monitor = cleanup_and_monitor()
    
    result >> monitor

apple_pipeline()

Edge Cases

⚠️Common Pitfalls

  1. Infinite Retries: Always set max_retry_delay to prevent infinite retry loops.

  2. Callback Failures: Handle callback failures gracefully. Don't let callback failures affect pipeline execution.

  3. Alert Fatigue: Implement alert throttling to avoid alert fatigue.

  4. State Management: Track circuit breaker state properly across retries.

# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(dag_id='error_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
    # Infinite retry issue
    @task(
        retries=10,
        retry_delay=timedelta(seconds=1),
        # BAD: No max_retry_delay
    )
    def infinite_retry_issue():
        pass
    
    # Correct retry configuration
    @task(
        retries=3,
        retry_delay=timedelta(minutes=1),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=10),  # GOOD: Max delay
    )
    def correct_retry_config():
        pass
    
    # Callback failure issue
    def failing_callback(context):
        # BAD: Callback that can fail
        raise Exception("Callback failed!")
    
    def robust_callback(context):
        # GOOD: Robust callback
        try:
            send_alert(context)
        except Exception as e:
            logging.error(f"Callback failed: {e}")
    
    correct_retry_config()

edge_cases()

QuizBox


Best Practices

# best_practices.py
"""
Error Handling Best Practices:

1. Retry Strategy:
   - Use exponential backoff
   - Set appropriate retry counts
   - Implement max_retry_delay

2. Callback Design:
   - Make callbacks robust
   - Handle callback failures
   - Log callback execution

3. Alerting:
   - Implement alert throttling
   - Use multiple channels
   - Track alert metrics

4. Circuit Breaker:
   - Implement for external systems
   - Track state properly
   - Test recovery regularly

5. Monitoring:
   - Track retry rates
   - Monitor failure patterns
   - Alert on anomalies
"""

ℹ️Google Interview Tip

At Google, they emphasize resilience and observability. When discussing error handling, highlight the importance of comprehensive retry logic, circuit breaker patterns, and multi-channel alerting. Also mention how they track metrics to identify failure patterns.


Summary

Error handling is critical for production pipelines. Key takeaways:

  1. Retries with exponential backoff
  2. Callbacks for success/failure/retry
  3. Alerting with multiple channels
  4. Circuit breaker for external systems
  5. Monitoring and metrics tracking

For Google and Apple interviews, focus on:

  • Comprehensive retry strategies
  • Robust callback implementation
  • Alert throttling and management
  • Circuit breaker patterns
  • Metrics and monitoring

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement