πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Pipelines: ETL, Airflow, Data Quality

Data Science Interview PremiumData Engineering⭐ Premium

Advertisement

AMAZON & GOOGLE INTERVIEW QUESTION

Data Pipelines: ETL, Airflow, Data Quality

Data Engineering & Pipeline Design

The Interview Question

ℹ️

Question: You're designing a data pipeline for a real-time analytics platform:

  • Source: Multiple streaming sources (Kafka, APIs, databases)
  • Processing: Real-time and batch processing
  • Destination: Data warehouse, dashboards, ML models
  • Requirements: 99.9% uptime, data quality guarantees, 1-hour SLA

Walk through your pipeline design:

  1. How do you architect the pipeline for both real-time and batch?
  2. How do you ensure data quality throughout the pipeline?
  3. How do you handle pipeline failures and recovery?
  4. How do you monitor and alert on pipeline health?

Detailed Answer

1. Pipeline Architecture Design

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import logging

class PipelineType(Enum):
    REALTIME = "realtime"
    BATCH = "batch"
    MICRO_BATCH = "micro_batch"

class DataSourceType(Enum):
    KAFKA = "kafka"
    API = "api"
    DATABASE = "database"
    FILE = "file"
    S3 = "s3"

@dataclass
class DataSource:
    name: str
    source_type: DataSourceType
    connection_config: Dict[str, Any]
    schema: Optional[Dict] = None
    refresh_interval: Optional[int] = None  # seconds

@dataclass
class DataDestination:
    name: str
    destination_type: str
    connection_config: Dict[str, Any]
    write_mode: str = "append"  # append, upsert, overwrite

class DataPipelineArchitect:
    """Design scalable data pipeline architecture"""
    
    def __init__(self):
        self.sources = []
        self.destinations = []
        self.processing_stages = []
        self.quality_checks = []
    
    def add_source(self, source: DataSource):
        """Add data source"""
        self.sources.append(source)
    
    def add_destination(self, destination: DataDestination):
        """Add data destination"""
        self.destinations.append(destination)
    
    def design_realtime_pipeline(self, source: DataSource) -> Dict:
        """Design real-time pipeline architecture"""
        
        if source.source_type == DataSourceType.KAFKA:
            architecture = {
                'ingestion': {
                    'technology': 'Apache Kafka',
                    'pattern': 'Event-driven',
                    'components': [
                        'Kafka Producer',
                        'Kafka Broker (3+ nodes)',
                        'Schema Registry',
                        'Consumer Groups'
                    ]
                },
                'processing': {
                    'technology': 'Apache Flink / Kafka Streams',
                    'pattern': 'Stream processing',
                    'components': [
                        'Event deserialization',
                        'Windowed aggregations',
                        'State management',
                        'Watermark handling'
                    ]
                },
                'storage': {
                    'hot_data': {
                        'technology': 'Redis / Cassandra',
                        'use_case': 'Real-time dashboards',
                        'retention': '24 hours'
                    },
                    'warm_data': {
                        'technology': 'TimescaleDB / ClickHouse',
                        'use_case': 'Recent analytics',
                        'retention': '30 days'
                    }
                },
                'serving': {
                    'technology': 'REST API + WebSocket',
                    'pattern': 'CQRS',
                    'latency_target': '< 100ms'
                }
            }
        
        elif source.source_type == DataSourceType.DATABASE:
            architecture = {
                'ingestion': {
                    'technology': 'Debezium (CDC)',
                    'pattern': 'Change Data Capture',
                    'components': [
                        'Debezium Connector',
                        'Kafka Connect',
                        'Schema evolution handling'
                    ]
                },
                'processing': {
                    'technology': 'Apache Beam',
                    'pattern': 'Unified batch and stream',
                    'components': [
                        'Record deduplication',
                        'Late data handling',
                        'Exactly-once semantics'
                    ]
                }
            }
        
        return architecture
    
    def design_batch_pipeline(self, schedule: str = "0 * * * *") -> Dict:
        """Design batch pipeline architecture"""
        
        architecture = {
            'orchestration': {
                'technology': 'Apache Airflow',
                'schedule': schedule,
                'components': [
                    'DAG definitions',
                    'Task dependencies',
                    'Retry logic',
                    'Alerting'
                ]
            },
            'ingestion': {
                'technology': 'Apache NiFi / custom',
                'patterns': {
                    'incremental': 'Delta loads using timestamps',
                    'full_refresh': 'Complete table replacement',
                    'cdc': 'Change data capture'
                }
            },
            'processing': {
                'technology': 'Apache Spark',
                'execution': {
                    'local': 'For development',
                    'yarn': 'For on-premise',
                    'emr': 'For AWS',
                    'dataproc': 'For GCP'
                },
                'optimizations': [
                    'Partition pruning',
                    'Broadcast joins',
                    'Cache management',
                    'Adaptive query execution'
                ]
            },
            'storage': {
                'data_lake': {
                    'technology': 'Delta Lake / Iceberg',
                    'format': 'Parquet',
                    'partitioning': 'Date-based partitioning',
                    'compaction': 'Daily compaction jobs'
                },
                'data_warehouse': {
                    'technology': 'BigQuery / Snowflake',
                    'pattern': 'Kimball dimensional model',
                    'refresh': 'Hourly / Daily'
                }
            }
        }
        
        return architecture
    
    def design_hybrid_pipeline(self) -> Dict:
        """Design hybrid (real-time + batch) pipeline"""
        
        architecture = {
            'layered_architecture': {
                'raw_zone': {
                    'purpose': 'Ingest raw data',
                    'technology': 'S3 / GCS',
                    'format': 'JSON / Avro',
                    'retention': '90 days'
                },
                'bronze_zone': {
                    'purpose': 'Validated and deduplicated',
                    'technology': 'Delta Lake',
                    'operations': ['Schema validation', 'Deduplication', 'Basic quality checks']
                },
                'silver_zone': {
                    'purpose': 'Cleaned and enriched',
                    'technology': 'Delta Lake',
                    'operations': ['Data cleaning', 'Joining', 'Feature engineering']
                },
                'gold_zone': {
                    'purpose': 'Business-ready aggregates',
                    'technology': 'Data Warehouse',
                    'operations': ['Aggregations', 'Business logic', 'ML features']
                }
            },
            'processing_modes': {
                'real_time': {
                    'use_cases': ['Fraud detection', 'Real-time dashboards', 'Alerts'],
                    'technology': 'Flink / Kafka Streams',
                    'latency': '< 1 minute'
                },
                'micro_batch': {
                    'use_cases': ['Near-real-time analytics', 'Feature updates'],
                    'technology': 'Spark Structured Streaming',
                    'latency': '1-5 minutes'
                },
                'batch': {
                    'use_cases': ['Daily reports', 'ML training', 'Backfills'],
                    'technology': 'Airflow + Spark',
                    'latency': 'Hours'
                }
            }
        }
        
        return architecture

# Example usage
# architect = DataPipelineArchitect()
# kafka_source = DataSource(
#     name="user_events",
#     source_type=DataSourceType.KAFKA,
#     connection_config={"bootstrap.servers": "kafka:9092", "topic": "events"}
# )
# architect.add_source(kafka_source)
# realtime_arch = architect.design_realtime_pipeline(kafka_source)

2. Data Quality Framework

from dataclasses import dataclass
from typing import Callable, List, Dict, Any
from enum import Enum
import great_expectations as ge
from datetime import datetime

class QualityCheckType(Enum):
    COMPLETENESS = "completeness"
    ACCURACY = "accuracy"
    CONSISTENCY = "consistency"
    TIMELINESS = "timeliness"
    UNIQUENESS = "uniqueness"

@dataclass
class QualityCheck:
    name: str
    check_type: QualityCheckType
    expectation: str
    severity: str  # critical, warning, info
    threshold: float

class DataQualityFramework:
    """Comprehensive data quality framework"""
    
    def __init__(self):
        self.checks = []
        self.results = []
        self.alerts = []
    
    def add_check(self, check: QualityCheck):
        """Add quality check"""
        self.checks.append(check)
    
    def create_expectations(self, df):
        """Create Great Expectations suite"""
        ge_df = ge.from_pandas(df)
        
        expectations = []
        
        # Completeness checks
        for column in df.columns:
            expectations.append(
                ge_df.expect_column_values_to_not_be_null(column)
            )
        
        # Uniqueness checks
        if 'id' in df.columns:
            expectations.append(
                ge_df.expect_column_values_to_be_unique('id')
            )
        
        # Range checks for numeric columns
        for column in df.select_dtypes(include=['number']).columns:
            expectations.append(
                ge_df.expect_column_values_to_be_between(
                    column,
                    min_value=df[column].quantile(0.01),
                    max_value=df[column].quantile(0.99)
                )
            )
        
        # Pattern checks for string columns
        for column in df.select_dtypes(include=['object']).columns:
            if 'email' in column.lower():
                expectations.append(
                    ge_df.expect_column_values_to_match_regex(
                        column,
                        r'^[\w\.-]+@[\w\.-]+\.\w+$'
                    )
                )
        
        return expectations
    
    def run_quality_checks(self, df, source_name: str) -> Dict:
        """Run all quality checks"""
        results = {
            'source': source_name,
            'timestamp': datetime.now().isoformat(),
            'total_rows': len(df),
            'checks_passed': 0,
            'checks_failed': 0,
            'checks_warning': 0,
            'details': []
        }
        
        for check in self.checks:
            check_result = self._execute_check(df, check)
            results['details'].append(check_result)
            
            if check_result['passed']:
                results['checks_passed'] += 1
            elif check_result['severity'] == 'critical':
                results['checks_failed'] += 1
            else:
                results['checks_warning'] += 1
        
        # Calculate overall quality score
        total_checks = len(self.checks)
        results['quality_score'] = results['checks_passed'] / total_checks * 100
        
        # Generate alerts
        if results['checks_failed'] > 0:
            self._generate_alert(results)
        
        self.results.append(results)
        return results
    
    def _execute_check(self, df, check: QualityCheck) -> Dict:
        """Execute individual quality check"""
        try:
            if check.check_type == QualityCheckType.COMPLETENESS:
                completeness = 1 - df[check.name].isnull().mean()
                passed = completeness >= check.threshold
            
            elif check.check_type == QualityCheckType.UNIQUENESS:
                uniqueness = 1 - df[check.name].duplicated().mean()
                passed = uniqueness >= check.threshold
            
            elif check.check_type == QualityCheckType.ACCURACY:
                # Custom accuracy check
                passed = True  # Implement based on business rules
            
            elif check.check_type == QualityCheckType.CONSISTENCY:
                # Check format consistency
                passed = True  # Implement based on business rules
            
            else:
                passed = True
            
            return {
                'check_name': check.name,
                'check_type': check.check_type.value,
                'passed': passed,
                'severity': check.severity,
                'threshold': check.threshold,
                'actual_value': completeness if check.check_type == QualityCheckType.COMPLETENESS else None
            }
        
        except Exception as e:
            return {
                'check_name': check.name,
                'check_type': check.check_type.value,
                'passed': False,
                'severity': 'critical',
                'error': str(e)
            }
    
    def _generate_alert(self, results: Dict):
        """Generate alert for failed checks"""
        failed_checks = [d for d in results['details'] if not d['passed'] and d['severity'] == 'critical']
        
        alert = {
            'timestamp': datetime.now().isoformat(),
            'source': results['source'],
            'severity': 'critical',
            'message': f"{len(failed_checks)} critical quality checks failed",
            'failed_checks': [c['check_name'] for c in failed_checks],
            'quality_score': results['quality_score']
        }
        
        self.alerts.append(alert)
        
        # In production, send to alerting system (PagerDuty, Slack, etc.)
        print(f"ALERT: {alert['message']}")
        print(f"Failed checks: {alert['failed_checks']}")
    
    def generate_quality_report(self) -> pd.DataFrame:
        """Generate quality report"""
        report_data = []
        
        for result in self.results:
            report_data.append({
                'source': result['source'],
                'timestamp': result['timestamp'],
                'total_rows': result['total_rows'],
                'quality_score': result['quality_score'],
                'checks_passed': result['checks_passed'],
                'checks_failed': result['checks_failed'],
                'checks_warning': result['checks_warning']
            })
        
        return pd.DataFrame(report_data)

# Example usage
# quality_framework = DataQualityFramework()
# quality_framework.add_check(QualityCheck(
#     name="user_id",
#     check_type=QualityCheckType.UNIQUENESS,
#     expectation="values must be unique",
#     severity="critical",
#     threshold=0.99
# ))
# results = quality_framework.run_quality_checks(df, "user_events")

3. Airflow DAG Design

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

class AirflowDAGBuilder:
    """Build production-ready Airflow DAGs"""
    
    def __init__(self, dag_id: str, schedule: str = "@daily"):
        self.dag_id = dag_id
        self.schedule = schedule
        self.default_args = {
            'owner': 'data_engineering',
            'depends_on_past': False,
            'email': ['alerts@company.com'],
            'email_on_failure': True,
            'email_on_retry': False,
            'retries': 2,
            'retry_delay': timedelta(minutes=5),
            'execution_timeout': timedelta(hours=1),
        }
    
    def create_etl_dag(self, 
                       extract_func,
                       transform_func,
                       load_func,
                       quality_check_func) -> DAG:
        """Create standard ETL DAG"""
        
        with DAG(
            self.dag_id,
            default_args=self.default_args,
            description='ETL pipeline',
            schedule_interval=self.schedule,
            start_date=datetime(2024, 1, 1),
            catchup=False,
            max_active_runs=1,
            tags=['etl', 'production']
        ) as dag:
            
            # Start
            start = DummyOperator(task_id='start')
            
            # Extract tasks
            with TaskGroup("extract") as extract_group:
                extract = PythonOperator(
                    task_id='extract_data',
                    python_callable=extract_func,
                    execution_timeout=timedelta(minutes=30)
                )
            
            # Transform tasks
            with TaskGroup("transform") as transform_group:
                transform = PythonOperator(
                    task_id='transform_data',
                    python_callable=transform_func
                )
                
                # Data quality check
                quality_check = PythonOperator(
                    task_id='quality_check',
                    python_callable=quality_check_func
                )
            
            # Load tasks
            with TaskGroup("load") as load_group:
                load = PythonOperator(
                    task_id='load_data',
                    python_callable=load_func
                )
                
                # Update metadata
                update_metadata = PythonOperator(
                    task_id='update_metadata',
                    python_callable=self._update_metadata
                )
            
            # End
            end = DummyOperator(task_id='end')
            
            # Task dependencies
            start >> extract_group >> transform_group >> load_group >> end
        
        return dag
    
    def create_streaming_dag(self) -> DAG:
        """Create streaming pipeline monitoring DAG"""
        
        with DAG(
            f"{self.dag_id}_monitoring",
            default_args=self.default_args,
            description='Monitor streaming pipeline health',
            schedule_interval='*/5 * * * *',  # Every 5 minutes
            start_date=datetime(2024, 1, 1),
            catchup=False,
            tags=['streaming', 'monitoring']
        ) as dag:
            
            check_lag = PythonOperator(
                task_id='check_consumer_lag',
                python_callable=self._check_kafka_lag
            )
            
            check_throughput = PythonOperator(
                task_id='check_throughput',
                python_callable=self._check_throughput
            )
            
            check_errors = PythonOperator(
                task_id='check_error_rate',
                python_callable=self._check_error_rate
            )
            
            alert = PythonOperator(
                task_id='send_alert',
                python_callable=self._send_alert
            )
            
            [check_lag, check_throughput, check_errors] >> alert
        
        return dag
    
    def _update_metadata(self):
        """Update pipeline metadata"""
        metadata = {
            'last_run': datetime.now().isoformat(),
            'status': 'success',
            'rows_processed': 0  # Would be passed from previous tasks
        }
        # Store in metadata database
        pass
    
    def _check_kafka_lag(self):
        """Check Kafka consumer lag"""
        # Implement Kafka lag monitoring
        pass
    
    def _check_throughput(self):
        """Check processing throughput"""
        pass
    
    def _check_error_rate(self):
        """Check error rate"""
        pass
    
    def _send_alert(self, **context):
        """Send alert based on checks"""
        # Implement alerting logic
        pass

# Example usage
# builder = AirflowDAGBuilder("user_analytics_etl", schedule="0 * * * *")
# dag = builder.create_etl_dag(extract_func, transform_func, load_func, quality_check_func)

4. Pipeline Monitoring and Observability

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from contextlib import contextmanager

class PipelineMonitor:
    """Monitor pipeline health and performance"""
    
    def __init__(self, pipeline_name: str):
        self.pipeline_name = pipeline_name
        
        # Define metrics
        self.records_processed = Counter(
            f'{pipeline_name}_records_processed_total',
            'Total records processed',
            ['stage', 'status']
        )
        
        self.processing_time = Histogram(
            f'{pipeline_name}_processing_seconds',
            'Processing time in seconds',
            ['stage'],
            buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 300]
        )
        
        self.pipeline_status = Gauge(
            f'{pipeline_name}_status',
            'Pipeline status (1=healthy, 0=unhealthy)'
        )
        
        self.data_freshness = Gauge(
            f'{pipeline_name}_data_freshness_seconds',
            'Data freshness in seconds'
        )
        
        self.error_count = Counter(
            f'{pipeline_name}_errors_total',
            'Total errors',
            ['error_type']
        )
    
    @contextmanager
    def track_processing_time(self, stage: str):
        """Context manager to track processing time"""
        start_time = time.time()
        try:
            yield
            duration = time.time() - start_time
            self.processing_time.labels(stage=stage).observe(duration)
        except Exception as e:
            self.error_count.labels(error_type=type(e).__name__).inc()
            raise
    
    def record_processed(self, stage: str, count: int, status: str = 'success'):
        """Record processed records"""
        self.records_processed.labels(stage=stage, status=status).inc(count)
    
    def update_freshness(self, seconds: float):
        """Update data freshness"""
        self.data_freshness.set(seconds)
        
        # Alert if data is stale
        if seconds > 3600:  # More than 1 hour
            self.pipeline_status.set(0)
            self._send_stale_data_alert(seconds)
    
    def _send_stale_data_alert(self, staleness_seconds: float):
        """Send alert for stale data"""
        alert = {
            'pipeline': self.pipeline_name,
            'severity': 'warning',
            'message': f'Data is {staleness_seconds:.0f} seconds stale',
            'timestamp': datetime.now().isoformat()
        }
        # Send to alerting system
        print(f"ALERT: {alert['message']}")
    
    def health_check(self) -> Dict:
        """Perform health check"""
        health = {
            'pipeline': self.pipeline_name,
            'timestamp': datetime.now().isoformat(),
            'status': 'healthy',
            'metrics': {
                'total_records': self.records_processed._value.get(),
                'avg_processing_time': self.processing_time._sum.get() / max(self.processing_time._count.get(), 1),
                'error_rate': self.error_count._value.get() / max(self.records_processed._value.get(), 1)
            }
        }
        
        # Determine status
        if health['metrics']['error_rate'] > 0.01:  # > 1% error rate
            health['status'] = 'unhealthy'
        elif health['metrics']['avg_processing_time'] > 300:  # > 5 min processing
            health['status'] = 'degraded'
        
        return health

# Example usage
# monitor = PipelineMonitor("user_analytics")
# with monitor.track_processing_time("extract"):
#     # Extract data
#     monitor.record_processed("extract", 1000)
# health = monitor.health_check()

5. Real-World Application: Complete Pipeline

class ProductionPipeline:
    """Complete production data pipeline"""
    
    def __init__(self):
        self.monitor = PipelineMonitor("production_pipeline")
        self.quality_framework = DataQualityFramework()
        self.setup_quality_checks()
    
    def setup_quality_checks(self):
        """Setup data quality checks"""
        self.quality_framework.add_check(QualityCheck(
            name="id",
            check_type=QualityCheckType.UNIQUENESS,
            expectation="values must be unique",
            severity="critical",
            threshold=0.99
        ))
        
        self.quality_framework.add_check(QualityCheck(
            name="timestamp",
            check_type=QualityCheckType.TIMELINESS,
            expectation="data must be recent",
            severity="critical",
            threshold=0.95
        ))
    
    def run_pipeline(self, execution_date: datetime):
        """Execute complete pipeline"""
        try:
            # Stage 1: Extract
            with self.monitor.track_processing_time("extract"):
                raw_data = self.extract_data(execution_date)
                self.monitor.record_processed("extract", len(raw_data))
            
            # Stage 2: Validate
            with self.monitor.track_processing_time("validate"):
                quality_results = self.quality_framework.run_quality_checks(
                    raw_data, "raw_data"
                )
                
                if quality_results['checks_failed'] > 0:
                    raise ValueError(f"Quality checks failed: {quality_results}")
            
            # Stage 3: Transform
            with self.monitor.track_processing_time("transform"):
                transformed_data = self.transform_data(raw_data)
                self.monitor.record_processed("transform", len(transformed_data))
            
            # Stage 4: Load
            with self.monitor.track_processing_time("load"):
                self.load_data(transformed_data)
                self.monitor.record_processed("load", len(transformed_data))
            
            # Update freshness
            self.monitor.update_freshness(0)
            
            return {
                'status': 'success',
                'rows_processed': len(transformed_data),
                'quality_score': quality_results['quality_score']
            }
        
        except Exception as e:
            self.monitor.error_count.labels(error_type=type(e).__name__).inc()
            self.monitor.pipeline_status.set(0)
            
            return {
                'status': 'failed',
                'error': str(e)
            }
    
    def extract_data(self, execution_date: datetime):
        """Extract data from sources"""
        # Implement extraction logic
        return pd.DataFrame()
    
    def transform_data(self, raw_data: pd.DataFrame):
        """Transform data"""
        # Implement transformation logic
        return raw_data
    
    def load_data(self, data: pd.DataFrame):
        """Load data to destination"""
        # Implement loading logic
        pass

# Example usage
# pipeline = ProductionPipeline()
# result = pipeline.run_pipeline(datetime.now())

πŸ’‘

Pro Tip: Always implement idempotent operations in your pipeline. This ensures that re-running failed tasks doesn't create duplicate data or inconsistent states.

6. Common Follow-Up Questions

Follow-up 1: How do you handle schema evolution?

class SchemaEvolutionHandler:
    """Handle schema changes in data pipelines"""
    
    def __init__(self):
        self.schema_registry = {}
    
    def register_schema(self, dataset_name: str, schema: Dict):
        """Register new schema version"""
        if dataset_name not in self.schema_registry:
            self.schema_registry[dataset_name] = []
        
        schema_version = {
            'version': len(self.schema_registry[dataset_name]) + 1,
            'schema': schema,
            'created_at': datetime.now().isoformat(),
            'compatible': True
        }
        
        # Check compatibility
        if self.schema_registry[dataset_name]:
            last_schema = self.schema_registry[dataset_name][-1]['schema']
            schema_version['compatible'] = self.check_compatibility(
                last_schema, schema
            )
        
        self.schema_registry[dataset_name].append(schema_version)
    
    def check_compatibility(self, old_schema: Dict, new_schema: Dict) -> bool:
        """Check if new schema is backward compatible"""
        # Check for added columns (backward compatible)
        old_columns = set(old_schema.keys())
        new_columns = set(new_schema.keys())
        
        added_columns = new_columns - old_columns
        removed_columns = old_columns - new_columns
        
        # Backward compatible: can add columns, cannot remove
        return len(removed_columns) == 0
    
    def handle_schema_change(self, data: pd.DataFrame, dataset_name: str):
        """Handle schema changes during processing"""
        current_schema = self.schema_registry[dataset_name][-1]['schema']
        
        # Add missing columns with defaults
        for column, dtype in current_schema.items():
            if column not in data.columns:
                if dtype == 'string':
                    data[column] = ''
                elif dtype == 'integer':
                    data[column] = 0
                elif dtype == 'float':
                    data[column] = 0.0
                elif dtype == 'boolean':
                    data[column] = False
        
        # Remove extra columns
        data = data[[col for col in current_schema.keys() if col in data.columns]]
        
        return data

Follow-up 2: How do you implement exactly-once semantics?

class ExactlyOnceProcessor:
    """Implement exactly-once processing semantics"""
    
    def __init__(self, transaction_id: str):
        self.transaction_id = transaction_id
    
    def process_with_transactions(self, data, processing_func):
        """Process data with transaction guarantees"""
        # Begin transaction
        transaction_started = False
        
        try:
            # Begin transaction
            self.begin_transaction()
            transaction_started = True
            
            # Process data
            result = processing_func(data)
            
            # Commit transaction
            self.commit_transaction()
            
            return result
        
        except Exception as e:
            if transaction_started:
                self.rollback_transaction()
            raise e
    
    def begin_transaction(self):
        """Begin database transaction"""
        # Implement based on your database
        pass
    
    def commit_transaction(self):
        """Commit transaction"""
        pass
    
    def rollback_transaction(self):
        """Rollback transaction on failure"""
        pass
    
    def idempotent_write(self, data, write_func):
        """Write data idempotently using deduplication"""
        # Check if this batch was already processed
        if self.is_already_processed():
            print("Batch already processed, skipping")
            return
        
        # Process and write
        result = write_func(data)
        
        # Mark as processed
        self.mark_as_processed()
        
        return result
    
    def is_already_processed(self) -> bool:
        """Check if batch was already processed"""
        # Check transaction log
        return False
    
    def mark_as_processed(self):
        """Mark batch as processed"""
        # Write to transaction log
        pass

Company-Specific Tips

ℹ️

Amazon Tips:

  • Amazon heavily tests on pipeline scalability and reliability
  • Know how to design idempotent operations
  • Understand exactly-once vs at-least-once semantics
  • Be familiar with AWS services (Kinesis, Glue, Redshift)

Google Tips:

  • Google values data quality and monitoring
  • Know how to use Dataflow for stream processing
  • Understand BigQuery best practices
  • Be comfortable with Cloud Composer (Airflow)

Quiz Section


Related Topics

Advertisement