πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: End-to-End Data Pipeline Design

Apache Airflow AdvancedData Pipeline Design⭐ Premium

Advertisement

End-to-End Data Pipeline Design

Building Production-Ready Pipelines

AmazonNetflixDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Amazon / Netflix Role: Senior Data Engineer / Data Architect Difficulty: Advanced Time: 60-90 minutes

Question: "Design an end-to-end data pipeline for a large-scale e-commerce platform. How would you handle data ingestion, transformation, storage, and monitoring? Consider scalability, reliability, and cost optimization."


Detailed Theory

Pipeline Design Fundamentals

# pipeline_fundamentals.py
"""
End-to-End Pipeline Design:

1. Data Ingestion:
   - Batch vs streaming
   - Multiple sources
   - Schema management

2. Data Processing:
   - Transformation logic
   - Data quality
   - Error handling

3. Data Storage:
   - Data lake/lakehouse
   - Data warehouse
   - Caching layer

4. Data Serving:
   - APIs
   - Dashboards
   - ML models

5. Monitoring:
   - Data quality
   - Pipeline health
   - Cost optimization
"""

1. Complete Pipeline Example

# complete_pipeline.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
from typing import Dict, Any, List

@dag(
    dag_id='ecommerce_pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    default_args={
        'owner': 'data-engineering',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'on_failure_callback': pipeline_failure_callback,
    },
    tags=['ecommerce', 'production', 'daily'],
    doc_md="""
    ## E-Commerce Data Pipeline
    
    ### Overview
    This pipeline processes daily e-commerce data:
    - Ingests from multiple sources (MySQL, API, S3)
    - Transforms and validates data
    - Loads to data warehouse
    - Sends notifications
    
    ### Schedule
    Runs daily at 2 AM UTC
    
    ### Dependencies
    - Upstream: MySQL, S3, External APIs
    - Downstream: BI dashboards, ML models
    """,
)
def ecommerce_pipeline():
    @task
    def extract_orders() -> List[Dict[str, Any]]:
        """Extract orders from MySQL"""
        from airflow.hooks.postgres_hook import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='ecommerce_db')
        
        query = """
            SELECT order_id, customer_id, order_date, total_amount
            FROM orders
            WHERE order_date = CURRENT_DATE - INTERVAL '1 day'
        """
        
        records = hook.get_records(query)
        
        return [
            {
                'order_id': r[0],
                'customer_id': r[1],
                'order_date': str(r[2]),
                'total_amount': float(r[3]),
            }
            for r in records
        ]
    
    @task
    def extract_products() -> List[Dict[str, Any]]:
        """Extract product catalog from S3"""
        import boto3
        import json
        
        s3 = boto3.client('s3')
        
        response = s3.get_object(
            Bucket='product-catalog',
            Key='latest/products.json'
        )
        
        return json.loads(response['Body'].read())
    
    @task
    def transform_data(
        orders: List[Dict[str, Any]],
        products: List[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """Transform and enrich data"""
        # Create product lookup
        product_lookup = {p['product_id']: p for p in products}
        
        # Enrich orders with product details
        enriched_orders = []
        for order in orders:
            # Add business logic
            order['processed_at'] = datetime.now().isoformat()
            order['status'] = 'processed'
            
            enriched_orders.append(order)
        
        # Calculate metrics
        metrics = {
            'total_orders': len(enriched_orders),
            'total_revenue': sum(o['total_amount'] for o in enriched_orders),
            'avg_order_value': sum(o['total_amount'] for o in enriched_orders) / len(enriched_orders) if enriched_orders else 0,
        }
        
        return {
            'orders': enriched_orders,
            'metrics': metrics,
        }
    
    @task
    def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate data quality"""
        orders = data['orders']
        
        # Quality checks
        checks = {
            'has_data': len(orders) > 0,
            'no_nulls': all(o.get('order_id') is not None for o in orders),
            'valid_amounts': all(o.get('total_amount', 0) > 0 for o in orders),
        }
        
        if not all(checks.values()):
            failed_checks = [k for k, v in checks.items() if not v]
            raise ValueError(f"Data quality checks failed: {failed_checks}")
        
        return data
    
    @task
    def load_to_warehouse(data: Dict[str, Any]) -> Dict[str, Any]:
        """Load to data warehouse"""
        from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
        
        hook = RedshiftSQLHook(redshift_conn_id='redshift')
        
        # Insert data
        for order in data['orders']:
            insert_query = """
                INSERT INTO orders (order_id, customer_id, order_date, total_amount, processed_at)
                VALUES (%s, %s, %s, %s, %s)
            """
            hook.run(
                insert_query,
                parameters=(
                    order['order_id'],
                    order['customer_id'],
                    order['order_date'],
                    order['total_amount'],
                    order['processed_at'],
                ),
            )
        
        return {
            'rows_loaded': len(data['orders']),
            'status': 'success',
        }
    
    @task
    def send_notifications(result: Dict[str, Any]) -> None:
        """Send pipeline notifications"""
        from airflow.operators.email import EmailOperator
        
        # Send success email
        EmailOperator(
            to=['data-team@company.com'],
            subject=f'Pipeline Success: {result["rows_loaded"]} rows loaded',
            html_content=f"""
            <h2>Pipeline Completed</h2>
            <p>Rows loaded: {result['rows_loaded']}</p>
            <p>Status: {result['status']}</p>
            """,
        ).execute(context={})
    
    # Pipeline dependencies
    orders = extract_orders()
    products = extract_products()
    
    transformed = transform_data(orders, products)
    validated = validate_data(transformed)
    loaded = load_to_warehouse(validated)
    
    send_notifications(loaded)

ecommerce_pipeline()

ℹ️Pro Tip

Design pipelines with idempotency in mind. Each run should produce the same result regardless of how many times it runs.

2. Data Quality Framework

# data_quality_framework.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any, List

class DataQualityFramework:
    """Reusable data quality framework"""
    
    @staticmethod
    def check_nulls(data: List[Dict], columns: List[str]) -> bool:
        """Check for null values"""
        for row in data:
            for col in columns:
                if row.get(col) is None:
                    return False
        return True
    
    @staticmethod
    def check_duplicates(data: List[Dict], key_column: str) -> bool:
        """Check for duplicates"""
        keys = [row.get(key_column) for row in data]
        return len(keys) == len(set(keys))
    
    @staticmethod
    def check_range(
        data: List[Dict],
        column: str,
        min_val: float,
        max_val: float,
    ) -> bool:
        """Check value range"""
        for row in data:
            val = row.get(column, 0)
            if not (min_val <= val <= max_val):
                return False
        return True

@dag(
    dag_id='quality_framework_example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
)
def quality_example():
    @task
    def validate_orders(data: List[Dict]) -> Dict[str, Any]:
        """Validate order data"""
        framework = DataQualityFramework()
        
        checks = {
            'no_nulls': framework.check_nulls(data, ['order_id', 'customer_id']),
            'no_duplicates': framework.check_duplicates(data, 'order_id'),
            'valid_amounts': framework.check_range(data, 'total_amount', 0, 1000000),
        }
        
        failed = [k for k, v in checks.items() if not v]
        
        if failed:
            raise ValueError(f"Quality checks failed: {failed}")
        
        return {'checks_passed': len(checks), 'data': data}
    
    validate_orders()

quality_example()

3. Error Handling Patterns

# error_patterns.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

@dag(
    dag_id='error_handling_patterns',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
)
def error_patterns():
    @task
    def critical_task() -> Dict[str, Any]:
        """Critical task with error handling"""
        try:
            result = perform_operation()
            return {'status': 'success', 'result': result}
        except Exception as e:
            # Log error
            logging.error(f"Task failed: {e}")
            
            # Send alert
            send_alert(f"Critical task failed: {e}")
            
            # Re-raise for retry
            raise
    
    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def failure_handler() -> Dict[str, Any]:
        """Handle pipeline failure"""
        # Send failure notification
        send_failure_notification()
        
        # Log failure
        logging.error("Pipeline failed")
        
        return {'status': 'failure_handled'}
    
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup() -> Dict[str, Any]:
        """Cleanup resources"""
        # Clean temp files
        cleanup_temp_files()
        
        # Log cleanup
        logging.info("Cleanup completed")
        
        return {'status': 'cleanup_done'}
    
    critical = critical_task()
    failure = failure_handler()
    cleanup_task = cleanup()
    
    critical >> failure >> cleanup_task

error_patterns()

Real-World Scenarios

Scenario 1: Amazon's E-Commerce Pipeline

# amazon_pipeline.py
"""
Amazon-style e-commerce pipeline:
- High throughput
- Real-time monitoring
- Cost optimization
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    dag_id='amazon_ecommerce',
    schedule_interval='*/15 * * * *',  # Every 15 minutes
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=3,
    tags=['amazon', 'ecommerce', 'production'],
)
def amazon_pipeline():
    @task
    def extract_realtime() -> List[Dict]:
        """Extract real-time data"""
        # Kafka consumer logic
        return [{'order_id': '123', 'amount': 99.99}]
    
    @task
    def process_orders(orders: List[Dict]) -> Dict[str, Any]:
        """Process orders"""
        # Process logic
        return {'processed': len(orders)}
    
    @task
    def update_inventory(processed: Dict) -> Dict[str, Any]:
        """Update inventory"""
        # Inventory update logic
        return {'inventory_updated': True}
    
    @task
    def send_notifications(processed: Dict) -> None:
        """Send notifications"""
        # Notification logic
        pass
    
    orders = extract_realtime()
    processed = process_orders(orders)
    inventory = update_inventory(processed)
    send_notifications(processed)

amazon_pipeline()

QuizBox


Best Practices

# best_practices.py
"""
Pipeline Design Best Practices:

1. Idempotency:
   - Design for re-runs
   - Use unique identifiers
   - Handle partial failures

2. Data Quality:
   - Validate early
   - Fail fast on quality issues
   - Log quality metrics

3. Error Handling:
   - Implement retry logic
   - Use callbacks for alerts
   - Clean up resources

4. Monitoring:
   - Track key metrics
   - Set up alerts
   - Monitor costs

5. Documentation:
   - Document pipeline purpose
   - Document data contracts
   - Document runbooks
"""

ℹ️Amazon Interview Tip

At Amazon, they emphasize operational excellence. When discussing pipeline design, highlight idempotency, data quality, and monitoring. Also mention how they handle failures and optimize costs.


Summary

End-to-end pipeline design requires careful consideration of multiple factors. Key takeaways:

  1. Idempotency for reliability
  2. Data quality for accuracy
  3. Error handling for resilience
  4. Monitoring for observability
  5. Modularity for maintainability

For Amazon and Netflix interviews, focus on:

  • Scalability patterns
  • Cost optimization
  • Operational excellence
  • Data quality frameworks
  • Monitoring strategies

This question is part of the Apache Airflow Advanced interview preparation series. Practice designing pipelines before your interview.

Advertisement