Interview Question
βΉοΈInterview Context
Company: Amazon / Netflix Role: Senior Data Engineer / Data Architect Difficulty: Advanced Time: 60-90 minutes
Question: "Design an end-to-end data pipeline for a large-scale e-commerce platform. How would you handle data ingestion, transformation, storage, and monitoring? Consider scalability, reliability, and cost optimization."
Detailed Theory
Pipeline Design Fundamentals
# pipeline_fundamentals.py
"""
End-to-End Pipeline Design:
1. Data Ingestion:
- Batch vs streaming
- Multiple sources
- Schema management
2. Data Processing:
- Transformation logic
- Data quality
- Error handling
3. Data Storage:
- Data lake/lakehouse
- Data warehouse
- Caching layer
4. Data Serving:
- APIs
- Dashboards
- ML models
5. Monitoring:
- Data quality
- Pipeline health
- Cost optimization
"""
1. Complete Pipeline Example
# complete_pipeline.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
from typing import Dict, Any, List
@dag(
dag_id='ecommerce_pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
default_args={
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': pipeline_failure_callback,
},
tags=['ecommerce', 'production', 'daily'],
doc_md="""
## E-Commerce Data Pipeline
### Overview
This pipeline processes daily e-commerce data:
- Ingests from multiple sources (MySQL, API, S3)
- Transforms and validates data
- Loads to data warehouse
- Sends notifications
### Schedule
Runs daily at 2 AM UTC
### Dependencies
- Upstream: MySQL, S3, External APIs
- Downstream: BI dashboards, ML models
""",
)
def ecommerce_pipeline():
@task
def extract_orders() -> List[Dict[str, Any]]:
"""Extract orders from MySQL"""
from airflow.hooks.postgres_hook import PostgresHook
hook = PostgresHook(postgres_conn_id='ecommerce_db')
query = """
SELECT order_id, customer_id, order_date, total_amount
FROM orders
WHERE order_date = CURRENT_DATE - INTERVAL '1 day'
"""
records = hook.get_records(query)
return [
{
'order_id': r[0],
'customer_id': r[1],
'order_date': str(r[2]),
'total_amount': float(r[3]),
}
for r in records
]
@task
def extract_products() -> List[Dict[str, Any]]:
"""Extract product catalog from S3"""
import boto3
import json
s3 = boto3.client('s3')
response = s3.get_object(
Bucket='product-catalog',
Key='latest/products.json'
)
return json.loads(response['Body'].read())
@task
def transform_data(
orders: List[Dict[str, Any]],
products: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Transform and enrich data"""
# Create product lookup
product_lookup = {p['product_id']: p for p in products}
# Enrich orders with product details
enriched_orders = []
for order in orders:
# Add business logic
order['processed_at'] = datetime.now().isoformat()
order['status'] = 'processed'
enriched_orders.append(order)
# Calculate metrics
metrics = {
'total_orders': len(enriched_orders),
'total_revenue': sum(o['total_amount'] for o in enriched_orders),
'avg_order_value': sum(o['total_amount'] for o in enriched_orders) / len(enriched_orders) if enriched_orders else 0,
}
return {
'orders': enriched_orders,
'metrics': metrics,
}
@task
def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate data quality"""
orders = data['orders']
# Quality checks
checks = {
'has_data': len(orders) > 0,
'no_nulls': all(o.get('order_id') is not None for o in orders),
'valid_amounts': all(o.get('total_amount', 0) > 0 for o in orders),
}
if not all(checks.values()):
failed_checks = [k for k, v in checks.items() if not v]
raise ValueError(f"Data quality checks failed: {failed_checks}")
return data
@task
def load_to_warehouse(data: Dict[str, Any]) -> Dict[str, Any]:
"""Load to data warehouse"""
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
hook = RedshiftSQLHook(redshift_conn_id='redshift')
# Insert data
for order in data['orders']:
insert_query = """
INSERT INTO orders (order_id, customer_id, order_date, total_amount, processed_at)
VALUES (%s, %s, %s, %s, %s)
"""
hook.run(
insert_query,
parameters=(
order['order_id'],
order['customer_id'],
order['order_date'],
order['total_amount'],
order['processed_at'],
),
)
return {
'rows_loaded': len(data['orders']),
'status': 'success',
}
@task
def send_notifications(result: Dict[str, Any]) -> None:
"""Send pipeline notifications"""
from airflow.operators.email import EmailOperator
# Send success email
EmailOperator(
to=['data-team@company.com'],
subject=f'Pipeline Success: {result["rows_loaded"]} rows loaded',
html_content=f"""
<h2>Pipeline Completed</h2>
<p>Rows loaded: {result['rows_loaded']}</p>
<p>Status: {result['status']}</p>
""",
).execute(context={})
# Pipeline dependencies
orders = extract_orders()
products = extract_products()
transformed = transform_data(orders, products)
validated = validate_data(transformed)
loaded = load_to_warehouse(validated)
send_notifications(loaded)
ecommerce_pipeline()
βΉοΈPro Tip
Design pipelines with idempotency in mind. Each run should produce the same result regardless of how many times it runs.
2. Data Quality Framework
# data_quality_framework.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any, List
class DataQualityFramework:
"""Reusable data quality framework"""
@staticmethod
def check_nulls(data: List[Dict], columns: List[str]) -> bool:
"""Check for null values"""
for row in data:
for col in columns:
if row.get(col) is None:
return False
return True
@staticmethod
def check_duplicates(data: List[Dict], key_column: str) -> bool:
"""Check for duplicates"""
keys = [row.get(key_column) for row in data]
return len(keys) == len(set(keys))
@staticmethod
def check_range(
data: List[Dict],
column: str,
min_val: float,
max_val: float,
) -> bool:
"""Check value range"""
for row in data:
val = row.get(column, 0)
if not (min_val <= val <= max_val):
return False
return True
@dag(
dag_id='quality_framework_example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def quality_example():
@task
def validate_orders(data: List[Dict]) -> Dict[str, Any]:
"""Validate order data"""
framework = DataQualityFramework()
checks = {
'no_nulls': framework.check_nulls(data, ['order_id', 'customer_id']),
'no_duplicates': framework.check_duplicates(data, 'order_id'),
'valid_amounts': framework.check_range(data, 'total_amount', 0, 1000000),
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"Quality checks failed: {failed}")
return {'checks_passed': len(checks), 'data': data}
validate_orders()
quality_example()
3. Error Handling Patterns
# error_patterns.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
@dag(
dag_id='error_handling_patterns',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def error_patterns():
@task
def critical_task() -> Dict[str, Any]:
"""Critical task with error handling"""
try:
result = perform_operation()
return {'status': 'success', 'result': result}
except Exception as e:
# Log error
logging.error(f"Task failed: {e}")
# Send alert
send_alert(f"Critical task failed: {e}")
# Re-raise for retry
raise
@task(trigger_rule=TriggerRule.ALL_FAILED)
def failure_handler() -> Dict[str, Any]:
"""Handle pipeline failure"""
# Send failure notification
send_failure_notification()
# Log failure
logging.error("Pipeline failed")
return {'status': 'failure_handled'}
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup() -> Dict[str, Any]:
"""Cleanup resources"""
# Clean temp files
cleanup_temp_files()
# Log cleanup
logging.info("Cleanup completed")
return {'status': 'cleanup_done'}
critical = critical_task()
failure = failure_handler()
cleanup_task = cleanup()
critical >> failure >> cleanup_task
error_patterns()
Real-World Scenarios
Scenario 1: Amazon's E-Commerce Pipeline
# amazon_pipeline.py
"""
Amazon-style e-commerce pipeline:
- High throughput
- Real-time monitoring
- Cost optimization
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any
@dag(
dag_id='amazon_ecommerce',
schedule_interval='*/15 * * * *', # Every 15 minutes
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=3,
tags=['amazon', 'ecommerce', 'production'],
)
def amazon_pipeline():
@task
def extract_realtime() -> List[Dict]:
"""Extract real-time data"""
# Kafka consumer logic
return [{'order_id': '123', 'amount': 99.99}]
@task
def process_orders(orders: List[Dict]) -> Dict[str, Any]:
"""Process orders"""
# Process logic
return {'processed': len(orders)}
@task
def update_inventory(processed: Dict) -> Dict[str, Any]:
"""Update inventory"""
# Inventory update logic
return {'inventory_updated': True}
@task
def send_notifications(processed: Dict) -> None:
"""Send notifications"""
# Notification logic
pass
orders = extract_realtime()
processed = process_orders(orders)
inventory = update_inventory(processed)
send_notifications(processed)
amazon_pipeline()
QuizBox
Best Practices
# best_practices.py
"""
Pipeline Design Best Practices:
1. Idempotency:
- Design for re-runs
- Use unique identifiers
- Handle partial failures
2. Data Quality:
- Validate early
- Fail fast on quality issues
- Log quality metrics
3. Error Handling:
- Implement retry logic
- Use callbacks for alerts
- Clean up resources
4. Monitoring:
- Track key metrics
- Set up alerts
- Monitor costs
5. Documentation:
- Document pipeline purpose
- Document data contracts
- Document runbooks
"""
βΉοΈAmazon Interview Tip
At Amazon, they emphasize operational excellence. When discussing pipeline design, highlight idempotency, data quality, and monitoring. Also mention how they handle failures and optimize costs.
Summary
End-to-end pipeline design requires careful consideration of multiple factors. Key takeaways:
- Idempotency for reliability
- Data quality for accuracy
- Error handling for resilience
- Monitoring for observability
- Modularity for maintainability
For Amazon and Netflix interviews, focus on:
- Scalability patterns
- Cost optimization
- Operational excellence
- Data quality frameworks
- Monitoring strategies
This question is part of the Apache Airflow Advanced interview preparation series. Practice designing pipelines before your interview.