Interview Question
βΉοΈInterview Context
Company: Google / Apple Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Design a comprehensive error handling strategy for a critical data pipeline. How do you handle retries, callbacks, alerts, and failures? What metrics should you track?"
Detailed Theory
Error Handling Fundamentals
# error_handling_fundamentals.py
"""
Error Handling in Airflow:
1. Retries:
- Automatic retry on failure
- Exponential backoff
- Retry delays
2. Callbacks:
- on_success_callback
- on_failure_callback
- on_retry_callback
3. Trigger Rules:
- all_success (default)
- all_failed
- all_done
- one_success
- one_failed
- none_failed
- none_failed_min_one_success
4. Alerting:
- Email notifications
- Slack integration
- Custom webhooks
"""
1. Retry Configuration
# retry_configuration.py
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id='retry_configuration',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def retry_config():
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
execution_timeout=timedelta(hours=1),
on_retry_callback=task_retry_callback,
)
def task_with_retries() -> dict:
"""Task with retry configuration"""
import random
# Simulate failure
if random.random() < 0.5:
raise ValueError("Random failure")
return {'status': 'success'}
@task(
retries=5,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=False,
)
def task_with_custom_retries() -> dict:
"""Task with custom retry settings"""
return {'status': 'success'}
# DAG-level retry settings
task_with_retries() >> task_with_custom_retries()
retry_config()
# Retry Configuration Options
RETRY_CONFIG = """
# Default retry settings in airflow.cfg
[core]
# Default number of retries
default_retries = 1
# Default retry delay
default_retry_delay = timedelta(minutes=1)
# Retry exponential backoff
retry_exponential_backoff = False
# Max retry delay
max_retry_delay = timedelta(minutes=15)
# Execution timeout
default_task_execution_timeout = None
"""
βΉοΈPro Tip
Use exponential backoff for external API calls. This prevents overwhelming the API with too many retry attempts.
2. Callback Functions
# callback_functions.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
import logging
def task_success_callback(context: Dict[str, Any]) -> None:
"""Callback on task success"""
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
run_id = context['run_id']
logging.info(f"Task {task_id} succeeded in {dag_id} ({run_id})")
# Send success notification
send_notification(
title=f"Task Success: {task_id}",
message=f"DAG: {dag_id}, Run: {run_id}",
severity="info"
)
def task_failure_callback(context: Dict[str, Any]) -> None:
"""Callback on task failure"""
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
run_id = context['run_id']
exception = context.get('exception')
logging.error(f"Task {task_id} failed: {exception}")
# Send failure alert
send_alert(
title=f"Task Failed: {task_id}",
message=f"DAG: {dag_id}, Run: {run_id}\nException: {exception}",
severity="critical"
)
def task_retry_callback(context: Dict[str, Any]) -> None:
"""Callback on task retry"""
task_id = context['task_instance'].task_id
try_number = context['task_instance'].try_number
logging.warning(f"Task {task_id} retrying (attempt {try_number})")
# Send retry notification
send_notification(
title=f"Task Retry: {task_id}",
message=f"Attempt {try_number}",
severity="warning"
)
def dag_failure_callback(context: Dict[str, Any]) -> None:
"""Callback on DAG failure"""
dag_id = context['dag'].dag_id
run_id = context['run_id']
logging.error(f"DAG {dag_id} failed ({run_id})")
# Send DAG failure alert
send_alert(
title=f"DAG Failed: {dag_id}",
message=f"Run: {run_id}",
severity="critical"
)
@dag(
dag_id='callback_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
on_success_callback=task_success_callback,
on_failure_callback=dag_failure_callback,
)
def callback_dag():
@task(
on_success_callback=task_success_callback,
on_failure_callback=task_failure_callback,
on_retry_callback=task_retry_callback,
)
def task_with_callbacks() -> dict:
"""Task with all callbacks"""
return {'status': 'success'}
task_with_callbacks()
callback_dag()
3. Alerting Integration
# alerting_integration.py
from typing import Dict, Any
import logging
import json
import requests
def send_slack_alert(
title: str,
message: str,
severity: str = "info",
channel: str = "#airflow-alerts"
) -> None:
"""Send alert to Slack"""
import os
webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
# Color based on severity
colors = {
"info": "#36a64f",
"warning": "#ff9900",
"critical": "#ff0000"
}
payload = {
"channel": channel,
"attachments": [{
"color": colors.get(severity, "#000000"),
"title": title,
"text": message,
"footer": "Airflow Alert",
"ts": int(datetime.now().timestamp())
}]
}
response = requests.post(
webhook_url,
json=payload,
timeout=10
)
if response.status_code != 200:
logging.error(f"Failed to send Slack alert: {response.text}")
def send_pagerduty_alert(
title: str,
message: str,
severity: str = "critical"
) -> None:
"""Send alert to PagerDuty"""
import os
routing_key = os.environ.get('PAGERDUTY_ROUTING_KEY')
payload = {
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": f"{title}: {message}",
"severity": severity,
"source": "airflow",
"component": "data-pipeline",
"group": "production",
"class": "pipeline-failure",
}
}
response = requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=10
)
if response.status_code != 202:
logging.error(f"Failed to send PagerDuty alert: {response.text}")
def send_email_alert(
to: list,
title: str,
message: str
) -> None:
"""Send email alert"""
from airflow.operators.email import EmailOperator
# This is a simplified example
# In practice, use Airflow's email operator
pass
# Alert configuration
ALERT_CONFIG = """
# Alerting configuration in airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
# SMTP settings
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_port = 587
smtp_user = airflow@example.com
smtp_password = {{ secrets.smtp_password }}
smtp_mail_from = airflow@example.com
"""
β οΈImportant
Never hardcode credentials in DAG files. Use Airflow Connections or Secrets Backends for sensitive information.
4. Error Handling Patterns
# error_handling_patterns.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='error_handling_patterns',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def error_patterns():
@task
def critical_task() -> Dict[str, Any]:
"""Critical task that must succeed"""
try:
result = perform_critical_operation()
return {'status': 'success', 'result': result}
except Exception as e:
logging.error(f"Critical task failed: {e}")
raise # Re-raise for retry
@task(trigger_rule=TriggerRule.ALL_FAILED)
def failure_handler() -> Dict[str, Any]:
"""Handle failure from upstream tasks"""
# Send alert
send_alert("Critical task failed", severity="critical")
# Log failure
logging.error("Pipeline failed, handling gracefully")
return {'status': 'failure_handled'}
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup() -> Dict[str, Any]:
"""Cleanup regardless of upstream status"""
# Cleanup resources
cleanup_temp_files()
return {'status': 'cleaned'}
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def partial_success_handler(
task1_result: Dict[str, Any] = None,
task2_result: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Handle partial success"""
if task1_result and task2_result:
return {'status': 'complete'}
elif task1_result or task2_result:
return {'status': 'partial'}
else:
return {'status': 'failed'}
# Dependencies
critical = critical_task()
failure = failure_handler()
cleanup_task = cleanup()
critical >> failure >> cleanup_task
error_patterns()
5. Circuit Breaker Pattern
# circuit_breaker.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict
import time
import logging
class CircuitBreakerOperator(BaseOperator):
"""
Circuit breaker pattern for external systems.
States:
- CLOSED: Normal operation, requests pass through
- OPEN: System is failing, requests are blocked
- HALF_OPEN: Testing if system recovered
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED'
def execute(self, context) -> Any:
"""Execute with circuit breaker logic"""
if self.state == 'OPEN':
if self._should_try_reset():
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = self._execute_with_retry(context)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _execute_with_retry(self, context) -> Any:
"""Execute with retry logic"""
# Implementation here
pass
def _on_success(self):
"""Handle successful execution"""
self.failure_count = 0
self.state = 'CLOSED'
def _on_failure(self):
"""Handle failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
logging.warning("Circuit breaker OPENED")
def _should_try_reset(self) -> bool:
"""Check if we should try to reset"""
if self.last_failure_time is None:
return True
return time.time() - self.last_failure_time > self.recovery_timeout
# Usage
circuit_breaker = CircuitBreakerOperator(
task_id='circuit_breaker',
failure_threshold=5,
recovery_timeout=60,
)
Real-World Scenarios
Scenario 1: Google's Data Pipeline
# google_pipeline.py
"""
Google-style data pipeline with error handling:
- Comprehensive retry logic
- Multi-channel alerting
- Circuit breaker pattern
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any
@dag(
dag_id='google_error_handling',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['google', 'production', 'error-handling'],
)
def google_pipeline():
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
on_failure_callback=task_failure_callback,
on_retry_callback=task_retry_callback,
)
def extract_data() -> Dict[str, Any]:
"""Extract data with retries"""
try:
data = fetch_from_api()
return {'records': len(data), 'status': 'extracted'}
except Exception as e:
logging.error(f"Extraction failed: {e}")
raise
@task(
retries=2,
retry_delay=timedelta(minutes=2),
on_failure_callback=task_failure_callback,
)
def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform data with error handling"""
try:
transformed = apply_transformations(data)
return {'records': len(transformed), 'status': 'transformed'}
except Exception as e:
logging.error(f"Transformation failed: {e}")
# Send alert
send_slack_alert(
title="Transformation Failed",
message=str(e),
severity="critical"
)
raise
@task(
retries=3,
retry_delay=timedelta(minutes=1),
on_failure_callback=task_failure_callback,
)
def load_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Load data with circuit breaker"""
try:
loaded = load_to_warehouse(data)
return {'rows': loaded, 'status': 'loaded'}
except Exception as e:
logging.error(f"Load failed: {e}")
raise
@task(trigger_rule='none_failed_min_one_success')
def notify_success(
extracted: Dict[str, Any],
transformed: Dict[str, Any],
loaded: Dict[str, Any]
) -> None:
"""Notify on success"""
send_slack_alert(
title="Pipeline Success",
message=f"Processed {loaded['rows']} rows",
severity="info"
)
# Pipeline with error handling
extracted = extract_data()
transformed = transform_data(extracted)
loaded = load_data(transformed)
notify = notify_success(extracted, transformed, loaded)
google_pipeline()
Scenario 2: Apple's Mission-Critical Pipeline
# apple_pipeline.py
"""
Apple-style mission-critical pipeline:
- Zero tolerance for failures
- Comprehensive monitoring
- Automatic recovery
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any
@dag(
dag_id='apple_mission_critical',
schedule_interval='*/5 * * * *', # Every 5 minutes
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['apple', 'mission-critical', 'production'],
)
def apple_pipeline():
@task(
retries=5,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=5),
execution_timeout=timedelta(minutes=2),
on_failure_callback=critical_failure_handler,
on_retry_callback=critical_retry_handler,
)
def process_critical_data() -> Dict[str, Any]:
"""Process critical data with zero tolerance"""
start_time = time.time()
try:
# Critical processing
result = perform_critical_processing()
# Track metrics
track_metric(
'processing_time',
time.time() - start_time
)
return {'status': 'success', 'result': result}
except Exception as e:
# Send PagerDuty alert
send_pagerduty_alert(
title="Critical Pipeline Failure",
message=str(e),
severity="critical"
)
raise
@task(trigger_rule='all_done')
def cleanup_and_monitor() -> Dict[str, Any]:
"""Cleanup and monitor"""
# Cleanup resources
cleanup_temp_files()
# Monitor pipeline health
check_pipeline_health()
return {'status': 'monitored'}
# Pipeline
result = process_critical_data()
monitor = cleanup_and_monitor()
result >> monitor
apple_pipeline()
Edge Cases
β οΈCommon Pitfalls
-
Infinite Retries: Always set max_retry_delay to prevent infinite retry loops.
-
Callback Failures: Handle callback failures gracefully. Don't let callback failures affect pipeline execution.
-
Alert Fatigue: Implement alert throttling to avoid alert fatigue.
-
State Management: Track circuit breaker state properly across retries.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(dag_id='error_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
# Infinite retry issue
@task(
retries=10,
retry_delay=timedelta(seconds=1),
# BAD: No max_retry_delay
)
def infinite_retry_issue():
pass
# Correct retry configuration
@task(
retries=3,
retry_delay=timedelta(minutes=1),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=10), # GOOD: Max delay
)
def correct_retry_config():
pass
# Callback failure issue
def failing_callback(context):
# BAD: Callback that can fail
raise Exception("Callback failed!")
def robust_callback(context):
# GOOD: Robust callback
try:
send_alert(context)
except Exception as e:
logging.error(f"Callback failed: {e}")
correct_retry_config()
edge_cases()
QuizBox
Best Practices
# best_practices.py
"""
Error Handling Best Practices:
1. Retry Strategy:
- Use exponential backoff
- Set appropriate retry counts
- Implement max_retry_delay
2. Callback Design:
- Make callbacks robust
- Handle callback failures
- Log callback execution
3. Alerting:
- Implement alert throttling
- Use multiple channels
- Track alert metrics
4. Circuit Breaker:
- Implement for external systems
- Track state properly
- Test recovery regularly
5. Monitoring:
- Track retry rates
- Monitor failure patterns
- Alert on anomalies
"""
βΉοΈGoogle Interview Tip
At Google, they emphasize resilience and observability. When discussing error handling, highlight the importance of comprehensive retry logic, circuit breaker patterns, and multi-channel alerting. Also mention how they track metrics to identify failure patterns.
Summary
Error handling is critical for production pipelines. Key takeaways:
- Retries with exponential backoff
- Callbacks for success/failure/retry
- Alerting with multiple channels
- Circuit breaker for external systems
- Monitoring and metrics tracking
For Google and Apple interviews, focus on:
- Comprehensive retry strategies
- Robust callback implementation
- Alert throttling and management
- Circuit breaker patterns
- Metrics and monitoring
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.