Interview Question
ℹ️Interview Context
Company: Netflix / Meta Role: Senior Data Engineer / Data Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Design a modular, maintainable DAG for a complex data pipeline. How do you handle dynamic task generation, task dependencies, and ensure idempotency? Walk us through your design patterns."
Detailed Theory
Core Design Principles
# design_principles.py
"""
Netflix's DAG Design Principles:
1. MODULARITY:
- Single responsibility per DAG
- Reusable task libraries
- Abstract common patterns into mixins
2. IDEMPOTENCY:
- Tasks must produce same result on re-run
- Use unique run IDs for data partitioning
- Handle partial failures gracefully
3. OBSERVABILITY:
- Clear task naming
- Informative logs
- Proper error handling
4. SCALABILITY:
- Dynamic task generation
- Efficient resource usage
- Parallel execution where possible
"""
Pattern 1: TaskFactory Pattern
# task_factory_pattern.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict
# Task Factory for creating reusable task sets
class TaskFactory:
"""Factory pattern for creating task sets"""
@staticmethod
def create_extract_tasks(
sources: List[str],
default_args: Dict = None
):
"""Create extraction tasks for multiple sources"""
tasks = []
for source in sources:
@task(task_id=f'extract_{source}')
def extract(source_name=source):
# Extraction logic
return {'source': source_name, 'rows': 1000}
tasks.append(extract())
return tasks
@staticmethod
def create_load_tasks(
targets: List[str]
):
"""Create loading tasks for multiple targets"""
tasks = []
for target in targets:
@task(task_id=f'load_{target}')
def load(data, target_name=target):
# Loading logic
return {'target': target_name, 'status': 'success'}
tasks.append(load)
return tasks
# Usage in DAG
@dag(
dag_id='factory_pattern_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def factory_dag():
sources = ['mysql', 'postgres', 'api']
targets = ['s3', 'bigquery', 'snowflake']
extracts = TaskFactory.create_extract_tasks(sources)
loads = TaskFactory.create_load_tasks(targets)
# Wire dependencies
for extract in extracts:
for load in loads:
load(extract)
factory_dag()
Pattern 2: Dynamic Task Generation
# dynamic_tasks.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List
@dag(
dag_id='dynamic_generation_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dynamic', 'template'],
)
def dynamic_generation():
"""
Dynamic task generation pattern:
- Tasks created at parse time based on config
- Useful for variable number of targets
- Each task is independent and parallelizable
"""
@task
def get_sources() -> List[str]:
"""Fetch list of sources from config"""
return ['source_a', 'source_b', 'source_c', 'source_d']
@task
def process_source(source: str) -> dict:
"""Process individual source"""
# Processing logic
return {
'source': source,
'records': 1000,
'status': 'processed'
}
@task
def aggregate(results: List[dict]) -> dict:
"""Aggregate results from all sources"""
total_records = sum(r['records'] for r in results)
return {
'total_sources': len(results),
'total_records': total_records
}
# Dynamic task creation
sources = get_sources()
processed = process_source.expand(sources)
agg = aggregate(processed)
dynamic_generation()
ℹ️Pro Tip
Use the .expand() method for dynamic task generation. It's cleaner than using PythonOperator with dynamic Python code. The TaskFlow API with @task decorator makes this pattern very intuitive.
Pattern 3: Branching Pattern
# branching_pattern.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
@dag(
dag_id='branching_pattern_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def branching_pattern():
"""
Branching pattern:
- Choose execution path based on conditions
- Useful for A/B testing, feature flags
- Handles conditional logic gracefully
"""
@task
def evaluate_condition(data: dict) -> str:
"""Evaluate condition and return branch name"""
if data.get('use_new_path'):
return 'process_new'
return 'process_legacy'
@task
def process_new(data: dict) -> dict:
"""New processing path"""
return {'path': 'new', 'result': 'new_result'}
@task
def process_legacy(data: dict) -> dict:
"""Legacy processing path"""
return {'path': 'legacy', 'result': 'legacy_result'}
@task
def merge_results(new_result: dict = None, legacy_result: dict = None) -> dict:
"""Merge results from either path"""
return new_result or legacy_result
# Branch
condition = evaluate_condition()
# Branch tasks
new_path = process_new()
legacy_path = process_legacy()
# Use BranchPythonOperator for branching
from airflow.operators.python import BranchPythonOperator
branch = BranchPythonOperator(
task_id='branch',
python_callable=lambda: 'process_new' if condition else 'process_legacy',
)
# Merge
merge = merge_results(
new_result=new_path,
legacy_result=legacy_path
)
branch >> [new_path, legacy_path] >> merge
branching_pattern()
Pattern 4: Error Handling Pattern
# error_handling.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
@dag(
dag_id='error_handling_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
on_failure_callback=handle_failure,
on_retry_callback=handle_retry,
)
def error_handling_pattern():
"""
Error handling pattern:
- Proper trigger rules
- Callback functions
- Retry mechanisms
- Alerting integration
"""
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
on_failure_callback=task_failure_callback,
on_retry_callback=task_retry_callback,
on_success_callback=task_success_callback,
)
def critical_task() -> dict:
"""Task with comprehensive error handling"""
try:
# Critical operation
result = perform_critical_operation()
return {'status': 'success', 'result': result}
except Exception as e:
# Log and re-raise for retry
logger.error(f"Task failed: {str(e)}")
raise
@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failure() -> dict:
"""Handle failure from upstream tasks"""
# Send alert
send_alert("Task failed", severity="critical")
return {'status': 'failure_handled'}
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup() -> dict:
"""Cleanup regardless of upstream status"""
# Cleanup resources
cleanup_temp_files()
return {'status': 'cleaned'}
# Dependencies
critical = critical_task()
failure_handler = handle_failure()
cleanup_task = cleanup()
critical >> failure_handler >> cleanup_task
error_handling_pattern()
⚠️Important
Always set trigger_rule=TriggerRule.ALL_DONE for cleanup tasks. This ensures they run regardless of upstream task status, which is critical for resource cleanup.
Pattern 5: Cross-DAG Dependencies
# cross_dag_deps.py
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
@dag(
dag_id='cross_dag_dependency_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def cross_dag_dependency():
"""
Cross-DAG dependency pattern:
- Wait for upstream DAG completion
- Pass data between DAGs via XCom
- Handle timing differences
"""
@task
def wait_for_upstream():
"""Wait for upstream DAG to complete"""
# ExternalTaskSensor handles the waiting
pass
@task
def process_data() -> dict:
"""Process data from upstream"""
return {'processed': True}
# ExternalTaskSensor for cross-DAG dependency
wait_task = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_etl_dag',
external_task_id='final_task',
execution_delta=timedelta(hours=1),
poke_interval=60,
timeout=3600,
mode='reschedule',
)
process = process_data()
wait_task >> process
cross_dag_dependency()
Pattern 6: SubDAG Pattern (Airflow 1.x Style)
# subdag_pattern.py
"""
Note: SubDAGs are deprecated in Airflow 2.x.
Use TaskGroups instead for better UI and performance.
"""
# Old SubDAG pattern (deprecated)
from airflow.models import SubDAG
def create_subdag(parent_dag, child_dag_name, start_date, schedule):
subdag = SubDAG(
parent_dag=parent_dag,
child_dag_name=child_dag_name,
start_date=start_date,
schedule=schedule,
)
# Add tasks to subdag
with subdag:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2
return subdag
# New TaskGroup pattern (recommended)
from airflow.utils.task_group import TaskGroup
@dag(
dag_id='taskgroup_pattern_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def taskgroup_pattern():
start = DummyOperator(task_id='start')
with TaskGroup(group_id='processing') as processing:
task1 = DummyOperator(task_id='extract')
task2 = DummyOperator(task_id='transform')
task3 = DummyOperator(task_id='load')
task1 >> task2 >> task3
end = DummyOperator(task_id='end')
start >> processing >> end
Real-World Scenarios
Scenario 1: Netflix ETL Pipeline
# netflix_etl_pattern.py
"""
Netflix-style ETL Pipeline:
- Ingest from multiple sources
- Apply business logic
- Load to data warehouse
- Notify stakeholders
"""
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict
@dag(
dag_id='netflix_etl_pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['netflix', 'etl', 'production'],
max_active_runs=1, # Prevent concurrent runs
doc_md="""
## Netflix ETL Pipeline
This pipeline ingests data from multiple sources,
transforms it, and loads to the data warehouse.
**Sources:** MySQL, Kafka, S3
**Targets:** BigQuery, Snowflake
"""
)
def netflix_etl():
@task
def extract_from_mysql() -> List[Dict]:
"""Extract from MySQL source"""
# Implementation
return [{'table': 'users', 'rows': 10000}]
@task
def extract_from_kafka() -> List[Dict]:
"""Extract from Kafka topic"""
# Implementation
return [{'topic': 'events', 'records': 50000}]
@task
def transform_mysql(data: List[Dict]) -> List[Dict]:
"""Transform MySQL data"""
# Business logic
return data
@task
def transform_kafka(data: List[Dict]) -> List[Dict]:
"""Transform Kafka data"""
# Business logic
return data
@task
def load_to_bigquery(mysql_data: List[Dict], kafka_data: List[Dict]) -> Dict:
"""Load combined data to BigQuery"""
# Combine and load
return {'status': 'loaded', 'rows': 60000}
@task
def notify_stakeholders(load_result: Dict) -> None:
"""Send notification to stakeholders"""
# Send email/Slack notification
pass
# Extract
mysql_data = extract_from_mysql()
kafka_data = extract_from_kafka()
# Transform
transformed_mysql = transform_mysql(mysql_data)
transformed_kafka = transform_kafka(kafka_data)
# Load
load_result = load_to_bigquery(transformed_mysql, transformed_kafka)
# Notify
notify_stakeholders(load_result)
netflix_etl()
Scenario 2: Meta Data Pipeline with Quality Checks
# meta_quality_pipeline.py
"""
Meta-style pipeline with data quality checks:
- Validate data before processing
- Alert on quality issues
- Quarantine bad data
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='meta_quality_pipeline',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['meta', 'quality', 'production'],
)
def meta_quality_pipeline():
@task
def extract() -> dict:
"""Extract raw data"""
return {'records': 1000, 'quality_score': 0.85}
@task
def validate(data: dict) -> dict:
"""Validate data quality"""
quality_score = data.get('quality_score', 0)
if quality_score < 0.7:
raise ValueError(f"Quality score too low: {quality_score}")
return {
'valid': True,
'quality_score': quality_score,
'records': data['records']
}
@task
def process(validated_data: dict) -> dict:
"""Process valid data"""
return {
'processed': True,
'records': validated_data['records']
}
@task
def quarantine(data: dict) -> None:
"""Quarantine invalid data"""
# Move to quarantine table
pass
@task
def alert(data: dict) -> None:
"""Send quality alert"""
# Send alert
pass
# Pipeline
raw = extract()
validated = validate(raw)
processed = process(validated)
# Error handling
validate.set_downstream(quarantine)
validate.set_downstream(alert)
meta_quality_pipeline()
Edge Cases and Gotchas
⚠️Common Pitfalls
-
Mutable Default Arguments: Never use mutable defaults in task functions. Use
Noneand initialize inside. -
Global State: Don't rely on global variables in tasks. They may not be available in worker processes.
-
Task Serialization: Ensure all task arguments are JSON-serializable.
-
Circular Dependencies: Airflow detects circular dependencies at parse time. Always verify your DAG structure.
-
parse_start_date: Using
parse_start_datecan cause unexpected behavior. Useexecution_dateinstead.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime
# Bad: Mutable default argument
@task
def bad_task(items=[]): # This will cause issues!
items.append('new')
return items
# Good: Use None and initialize
@task
def good_task(items=None):
if items is None:
items = []
items.append('new')
return items
# Edge Case: Task naming conflicts
@dag(dag_id='naming_dag', schedule_interval='@daily', start_date=datetime(2024, 1, 1))
def naming_dag():
# Use unique task IDs
@task(task_id='process_data_v1')
def process_v1():
return {'version': 'v1'}
@task(task_id='process_data_v2')
def process_v2():
return {'version': 'v2'}
process_v1() >> process_v2()
naming_dag()
QuizBox
Best Practices
Naming Conventions
# naming_conventions.py
"""
Naming Best Practices:
1. DAG IDs:
- Use snake_case
- Include team/domain prefix
- Be descriptive
- Example: 'marketing_email_campaign_daily'
2. Task IDs:
- Use snake_case
- Include action verb
- Be specific
- Example: 'extract_users_from_mysql'
3. Tags:
- Use consistent tags
- Include team, type, priority
- Example: ['marketing', 'etl', 'high-priority']
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='marketing_email_campaign_daily', # Clear naming
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
tags=['marketing', 'etl', 'high-priority'], # Consistent tagging
doc_md="## Email Campaign Pipeline\nDaily extraction and processing of email campaign data.",
)
def well_named_dag():
@task(task_id='extract_campaign_data') # Descriptive task ID
def extract():
return {'data': 'value'}
@task(task_id='transform_campaign_metrics')
def transform(data):
return data
@task(task_id='load_to_warehouse')
def load(data):
return {'status': 'loaded'}
extract() >> transform() >> load()
well_named_dag()
Documentation Standards
# documentation.py
"""
Documentation Best Practices:
1. Use doc_md for DAG documentation
2. Use doc_md for task documentation
3. Include purpose, inputs, outputs
4. Document any special considerations
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='documented_pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
doc_md="""
## Documented Pipeline
### Purpose
This pipeline processes customer data for analytics.
### Inputs
- MySQL: customer_updates table
- S3: raw_events bucket
### Outputs
- BigQuery: analytics.customer_metrics
### Schedule
Runs daily at 2 AM UTC
### Contacts
- Team: data-engineering@company.com
- On-call: #data-engineering
""",
)
def documented_pipeline():
@task(
doc_md="""
### Extract Customer Updates
Extracts incremental customer updates from MySQL.
**Logic:**
- Query where updated_at > last_run
- Handle NULL values
- Return as JSON
""",
)
def extract_customers() -> dict:
"""Extract customer updates from MySQL."""
# Implementation
return {'records': 1000}
extract_customers()
documented_pipeline()
ℹ️Netflix Interview Tip
Netflix emphasizes modularity and reusability. When discussing DAG design, mention how they use TaskFactory patterns to create reusable task sets across multiple pipelines. Also discuss their approach to error handling with comprehensive callbacks and alerting.
Summary
Mastering DAG design patterns is essential for building production-ready data pipelines. Key patterns include:
- TaskFactory Pattern - Create reusable task sets
- Dynamic Task Generation - Use
.expand()for variable task counts - Branching Pattern - Handle conditional logic
- Error Handling Pattern - Comprehensive error handling with callbacks
- Cross-DAG Dependencies - Use ExternalTaskSensor
- TaskGroup Pattern - Organize related tasks
For Netflix and Meta interviews, focus on:
- Modular, maintainable design
- Comprehensive error handling
- Proper documentation
- Scalability considerations
- Production best practices
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.