The Interview Question
βΉοΈ
Question: You're designing a data pipeline for a real-time analytics platform:
- Source: Multiple streaming sources (Kafka, APIs, databases)
- Processing: Real-time and batch processing
- Destination: Data warehouse, dashboards, ML models
- Requirements: 99.9% uptime, data quality guarantees, 1-hour SLA
Walk through your pipeline design:
- How do you architect the pipeline for both real-time and batch?
- How do you ensure data quality throughout the pipeline?
- How do you handle pipeline failures and recovery?
- How do you monitor and alert on pipeline health?
Detailed Answer
1. Pipeline Architecture Design
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
class PipelineType(Enum):
REALTIME = "realtime"
BATCH = "batch"
MICRO_BATCH = "micro_batch"
class DataSourceType(Enum):
KAFKA = "kafka"
API = "api"
DATABASE = "database"
FILE = "file"
S3 = "s3"
@dataclass
class DataSource:
name: str
source_type: DataSourceType
connection_config: Dict[str, Any]
schema: Optional[Dict] = None
refresh_interval: Optional[int] = None # seconds
@dataclass
class DataDestination:
name: str
destination_type: str
connection_config: Dict[str, Any]
write_mode: str = "append" # append, upsert, overwrite
class DataPipelineArchitect:
"""Design scalable data pipeline architecture"""
def __init__(self):
self.sources = []
self.destinations = []
self.processing_stages = []
self.quality_checks = []
def add_source(self, source: DataSource):
"""Add data source"""
self.sources.append(source)
def add_destination(self, destination: DataDestination):
"""Add data destination"""
self.destinations.append(destination)
def design_realtime_pipeline(self, source: DataSource) -> Dict:
"""Design real-time pipeline architecture"""
if source.source_type == DataSourceType.KAFKA:
architecture = {
'ingestion': {
'technology': 'Apache Kafka',
'pattern': 'Event-driven',
'components': [
'Kafka Producer',
'Kafka Broker (3+ nodes)',
'Schema Registry',
'Consumer Groups'
]
},
'processing': {
'technology': 'Apache Flink / Kafka Streams',
'pattern': 'Stream processing',
'components': [
'Event deserialization',
'Windowed aggregations',
'State management',
'Watermark handling'
]
},
'storage': {
'hot_data': {
'technology': 'Redis / Cassandra',
'use_case': 'Real-time dashboards',
'retention': '24 hours'
},
'warm_data': {
'technology': 'TimescaleDB / ClickHouse',
'use_case': 'Recent analytics',
'retention': '30 days'
}
},
'serving': {
'technology': 'REST API + WebSocket',
'pattern': 'CQRS',
'latency_target': '< 100ms'
}
}
elif source.source_type == DataSourceType.DATABASE:
architecture = {
'ingestion': {
'technology': 'Debezium (CDC)',
'pattern': 'Change Data Capture',
'components': [
'Debezium Connector',
'Kafka Connect',
'Schema evolution handling'
]
},
'processing': {
'technology': 'Apache Beam',
'pattern': 'Unified batch and stream',
'components': [
'Record deduplication',
'Late data handling',
'Exactly-once semantics'
]
}
}
return architecture
def design_batch_pipeline(self, schedule: str = "0 * * * *") -> Dict:
"""Design batch pipeline architecture"""
architecture = {
'orchestration': {
'technology': 'Apache Airflow',
'schedule': schedule,
'components': [
'DAG definitions',
'Task dependencies',
'Retry logic',
'Alerting'
]
},
'ingestion': {
'technology': 'Apache NiFi / custom',
'patterns': {
'incremental': 'Delta loads using timestamps',
'full_refresh': 'Complete table replacement',
'cdc': 'Change data capture'
}
},
'processing': {
'technology': 'Apache Spark',
'execution': {
'local': 'For development',
'yarn': 'For on-premise',
'emr': 'For AWS',
'dataproc': 'For GCP'
},
'optimizations': [
'Partition pruning',
'Broadcast joins',
'Cache management',
'Adaptive query execution'
]
},
'storage': {
'data_lake': {
'technology': 'Delta Lake / Iceberg',
'format': 'Parquet',
'partitioning': 'Date-based partitioning',
'compaction': 'Daily compaction jobs'
},
'data_warehouse': {
'technology': 'BigQuery / Snowflake',
'pattern': 'Kimball dimensional model',
'refresh': 'Hourly / Daily'
}
}
}
return architecture
def design_hybrid_pipeline(self) -> Dict:
"""Design hybrid (real-time + batch) pipeline"""
architecture = {
'layered_architecture': {
'raw_zone': {
'purpose': 'Ingest raw data',
'technology': 'S3 / GCS',
'format': 'JSON / Avro',
'retention': '90 days'
},
'bronze_zone': {
'purpose': 'Validated and deduplicated',
'technology': 'Delta Lake',
'operations': ['Schema validation', 'Deduplication', 'Basic quality checks']
},
'silver_zone': {
'purpose': 'Cleaned and enriched',
'technology': 'Delta Lake',
'operations': ['Data cleaning', 'Joining', 'Feature engineering']
},
'gold_zone': {
'purpose': 'Business-ready aggregates',
'technology': 'Data Warehouse',
'operations': ['Aggregations', 'Business logic', 'ML features']
}
},
'processing_modes': {
'real_time': {
'use_cases': ['Fraud detection', 'Real-time dashboards', 'Alerts'],
'technology': 'Flink / Kafka Streams',
'latency': '< 1 minute'
},
'micro_batch': {
'use_cases': ['Near-real-time analytics', 'Feature updates'],
'technology': 'Spark Structured Streaming',
'latency': '1-5 minutes'
},
'batch': {
'use_cases': ['Daily reports', 'ML training', 'Backfills'],
'technology': 'Airflow + Spark',
'latency': 'Hours'
}
}
}
return architecture
# Example usage
# architect = DataPipelineArchitect()
# kafka_source = DataSource(
# name="user_events",
# source_type=DataSourceType.KAFKA,
# connection_config={"bootstrap.servers": "kafka:9092", "topic": "events"}
# )
# architect.add_source(kafka_source)
# realtime_arch = architect.design_realtime_pipeline(kafka_source)
2. Data Quality Framework
from dataclasses import dataclass
from typing import Callable, List, Dict, Any
from enum import Enum
import great_expectations as ge
from datetime import datetime
class QualityCheckType(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
CONSISTENCY = "consistency"
TIMELINESS = "timeliness"
UNIQUENESS = "uniqueness"
@dataclass
class QualityCheck:
name: str
check_type: QualityCheckType
expectation: str
severity: str # critical, warning, info
threshold: float
class DataQualityFramework:
"""Comprehensive data quality framework"""
def __init__(self):
self.checks = []
self.results = []
self.alerts = []
def add_check(self, check: QualityCheck):
"""Add quality check"""
self.checks.append(check)
def create_expectations(self, df):
"""Create Great Expectations suite"""
ge_df = ge.from_pandas(df)
expectations = []
# Completeness checks
for column in df.columns:
expectations.append(
ge_df.expect_column_values_to_not_be_null(column)
)
# Uniqueness checks
if 'id' in df.columns:
expectations.append(
ge_df.expect_column_values_to_be_unique('id')
)
# Range checks for numeric columns
for column in df.select_dtypes(include=['number']).columns:
expectations.append(
ge_df.expect_column_values_to_be_between(
column,
min_value=df[column].quantile(0.01),
max_value=df[column].quantile(0.99)
)
)
# Pattern checks for string columns
for column in df.select_dtypes(include=['object']).columns:
if 'email' in column.lower():
expectations.append(
ge_df.expect_column_values_to_match_regex(
column,
r'^[\w\.-]+@[\w\.-]+\.\w+$'
)
)
return expectations
def run_quality_checks(self, df, source_name: str) -> Dict:
"""Run all quality checks"""
results = {
'source': source_name,
'timestamp': datetime.now().isoformat(),
'total_rows': len(df),
'checks_passed': 0,
'checks_failed': 0,
'checks_warning': 0,
'details': []
}
for check in self.checks:
check_result = self._execute_check(df, check)
results['details'].append(check_result)
if check_result['passed']:
results['checks_passed'] += 1
elif check_result['severity'] == 'critical':
results['checks_failed'] += 1
else:
results['checks_warning'] += 1
# Calculate overall quality score
total_checks = len(self.checks)
results['quality_score'] = results['checks_passed'] / total_checks * 100
# Generate alerts
if results['checks_failed'] > 0:
self._generate_alert(results)
self.results.append(results)
return results
def _execute_check(self, df, check: QualityCheck) -> Dict:
"""Execute individual quality check"""
try:
if check.check_type == QualityCheckType.COMPLETENESS:
completeness = 1 - df[check.name].isnull().mean()
passed = completeness >= check.threshold
elif check.check_type == QualityCheckType.UNIQUENESS:
uniqueness = 1 - df[check.name].duplicated().mean()
passed = uniqueness >= check.threshold
elif check.check_type == QualityCheckType.ACCURACY:
# Custom accuracy check
passed = True # Implement based on business rules
elif check.check_type == QualityCheckType.CONSISTENCY:
# Check format consistency
passed = True # Implement based on business rules
else:
passed = True
return {
'check_name': check.name,
'check_type': check.check_type.value,
'passed': passed,
'severity': check.severity,
'threshold': check.threshold,
'actual_value': completeness if check.check_type == QualityCheckType.COMPLETENESS else None
}
except Exception as e:
return {
'check_name': check.name,
'check_type': check.check_type.value,
'passed': False,
'severity': 'critical',
'error': str(e)
}
def _generate_alert(self, results: Dict):
"""Generate alert for failed checks"""
failed_checks = [d for d in results['details'] if not d['passed'] and d['severity'] == 'critical']
alert = {
'timestamp': datetime.now().isoformat(),
'source': results['source'],
'severity': 'critical',
'message': f"{len(failed_checks)} critical quality checks failed",
'failed_checks': [c['check_name'] for c in failed_checks],
'quality_score': results['quality_score']
}
self.alerts.append(alert)
# In production, send to alerting system (PagerDuty, Slack, etc.)
print(f"ALERT: {alert['message']}")
print(f"Failed checks: {alert['failed_checks']}")
def generate_quality_report(self) -> pd.DataFrame:
"""Generate quality report"""
report_data = []
for result in self.results:
report_data.append({
'source': result['source'],
'timestamp': result['timestamp'],
'total_rows': result['total_rows'],
'quality_score': result['quality_score'],
'checks_passed': result['checks_passed'],
'checks_failed': result['checks_failed'],
'checks_warning': result['checks_warning']
})
return pd.DataFrame(report_data)
# Example usage
# quality_framework = DataQualityFramework()
# quality_framework.add_check(QualityCheck(
# name="user_id",
# check_type=QualityCheckType.UNIQUENESS,
# expectation="values must be unique",
# severity="critical",
# threshold=0.99
# ))
# results = quality_framework.run_quality_checks(df, "user_events")
3. Airflow DAG Design
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
class AirflowDAGBuilder:
"""Build production-ready Airflow DAGs"""
def __init__(self, dag_id: str, schedule: str = "@daily"):
self.dag_id = dag_id
self.schedule = schedule
self.default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
}
def create_etl_dag(self,
extract_func,
transform_func,
load_func,
quality_check_func) -> DAG:
"""Create standard ETL DAG"""
with DAG(
self.dag_id,
default_args=self.default_args,
description='ETL pipeline',
schedule_interval=self.schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production']
) as dag:
# Start
start = DummyOperator(task_id='start')
# Extract tasks
with TaskGroup("extract") as extract_group:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_func,
execution_timeout=timedelta(minutes=30)
)
# Transform tasks
with TaskGroup("transform") as transform_group:
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_func
)
# Data quality check
quality_check = PythonOperator(
task_id='quality_check',
python_callable=quality_check_func
)
# Load tasks
with TaskGroup("load") as load_group:
load = PythonOperator(
task_id='load_data',
python_callable=load_func
)
# Update metadata
update_metadata = PythonOperator(
task_id='update_metadata',
python_callable=self._update_metadata
)
# End
end = DummyOperator(task_id='end')
# Task dependencies
start >> extract_group >> transform_group >> load_group >> end
return dag
def create_streaming_dag(self) -> DAG:
"""Create streaming pipeline monitoring DAG"""
with DAG(
f"{self.dag_id}_monitoring",
default_args=self.default_args,
description='Monitor streaming pipeline health',
schedule_interval='*/5 * * * *', # Every 5 minutes
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['streaming', 'monitoring']
) as dag:
check_lag = PythonOperator(
task_id='check_consumer_lag',
python_callable=self._check_kafka_lag
)
check_throughput = PythonOperator(
task_id='check_throughput',
python_callable=self._check_throughput
)
check_errors = PythonOperator(
task_id='check_error_rate',
python_callable=self._check_error_rate
)
alert = PythonOperator(
task_id='send_alert',
python_callable=self._send_alert
)
[check_lag, check_throughput, check_errors] >> alert
return dag
def _update_metadata(self):
"""Update pipeline metadata"""
metadata = {
'last_run': datetime.now().isoformat(),
'status': 'success',
'rows_processed': 0 # Would be passed from previous tasks
}
# Store in metadata database
pass
def _check_kafka_lag(self):
"""Check Kafka consumer lag"""
# Implement Kafka lag monitoring
pass
def _check_throughput(self):
"""Check processing throughput"""
pass
def _check_error_rate(self):
"""Check error rate"""
pass
def _send_alert(self, **context):
"""Send alert based on checks"""
# Implement alerting logic
pass
# Example usage
# builder = AirflowDAGBuilder("user_analytics_etl", schedule="0 * * * *")
# dag = builder.create_etl_dag(extract_func, transform_func, load_func, quality_check_func)
4. Pipeline Monitoring and Observability
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from contextlib import contextmanager
class PipelineMonitor:
"""Monitor pipeline health and performance"""
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
# Define metrics
self.records_processed = Counter(
f'{pipeline_name}_records_processed_total',
'Total records processed',
['stage', 'status']
)
self.processing_time = Histogram(
f'{pipeline_name}_processing_seconds',
'Processing time in seconds',
['stage'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 300]
)
self.pipeline_status = Gauge(
f'{pipeline_name}_status',
'Pipeline status (1=healthy, 0=unhealthy)'
)
self.data_freshness = Gauge(
f'{pipeline_name}_data_freshness_seconds',
'Data freshness in seconds'
)
self.error_count = Counter(
f'{pipeline_name}_errors_total',
'Total errors',
['error_type']
)
@contextmanager
def track_processing_time(self, stage: str):
"""Context manager to track processing time"""
start_time = time.time()
try:
yield
duration = time.time() - start_time
self.processing_time.labels(stage=stage).observe(duration)
except Exception as e:
self.error_count.labels(error_type=type(e).__name__).inc()
raise
def record_processed(self, stage: str, count: int, status: str = 'success'):
"""Record processed records"""
self.records_processed.labels(stage=stage, status=status).inc(count)
def update_freshness(self, seconds: float):
"""Update data freshness"""
self.data_freshness.set(seconds)
# Alert if data is stale
if seconds > 3600: # More than 1 hour
self.pipeline_status.set(0)
self._send_stale_data_alert(seconds)
def _send_stale_data_alert(self, staleness_seconds: float):
"""Send alert for stale data"""
alert = {
'pipeline': self.pipeline_name,
'severity': 'warning',
'message': f'Data is {staleness_seconds:.0f} seconds stale',
'timestamp': datetime.now().isoformat()
}
# Send to alerting system
print(f"ALERT: {alert['message']}")
def health_check(self) -> Dict:
"""Perform health check"""
health = {
'pipeline': self.pipeline_name,
'timestamp': datetime.now().isoformat(),
'status': 'healthy',
'metrics': {
'total_records': self.records_processed._value.get(),
'avg_processing_time': self.processing_time._sum.get() / max(self.processing_time._count.get(), 1),
'error_rate': self.error_count._value.get() / max(self.records_processed._value.get(), 1)
}
}
# Determine status
if health['metrics']['error_rate'] > 0.01: # > 1% error rate
health['status'] = 'unhealthy'
elif health['metrics']['avg_processing_time'] > 300: # > 5 min processing
health['status'] = 'degraded'
return health
# Example usage
# monitor = PipelineMonitor("user_analytics")
# with monitor.track_processing_time("extract"):
# # Extract data
# monitor.record_processed("extract", 1000)
# health = monitor.health_check()
5. Real-World Application: Complete Pipeline
class ProductionPipeline:
"""Complete production data pipeline"""
def __init__(self):
self.monitor = PipelineMonitor("production_pipeline")
self.quality_framework = DataQualityFramework()
self.setup_quality_checks()
def setup_quality_checks(self):
"""Setup data quality checks"""
self.quality_framework.add_check(QualityCheck(
name="id",
check_type=QualityCheckType.UNIQUENESS,
expectation="values must be unique",
severity="critical",
threshold=0.99
))
self.quality_framework.add_check(QualityCheck(
name="timestamp",
check_type=QualityCheckType.TIMELINESS,
expectation="data must be recent",
severity="critical",
threshold=0.95
))
def run_pipeline(self, execution_date: datetime):
"""Execute complete pipeline"""
try:
# Stage 1: Extract
with self.monitor.track_processing_time("extract"):
raw_data = self.extract_data(execution_date)
self.monitor.record_processed("extract", len(raw_data))
# Stage 2: Validate
with self.monitor.track_processing_time("validate"):
quality_results = self.quality_framework.run_quality_checks(
raw_data, "raw_data"
)
if quality_results['checks_failed'] > 0:
raise ValueError(f"Quality checks failed: {quality_results}")
# Stage 3: Transform
with self.monitor.track_processing_time("transform"):
transformed_data = self.transform_data(raw_data)
self.monitor.record_processed("transform", len(transformed_data))
# Stage 4: Load
with self.monitor.track_processing_time("load"):
self.load_data(transformed_data)
self.monitor.record_processed("load", len(transformed_data))
# Update freshness
self.monitor.update_freshness(0)
return {
'status': 'success',
'rows_processed': len(transformed_data),
'quality_score': quality_results['quality_score']
}
except Exception as e:
self.monitor.error_count.labels(error_type=type(e).__name__).inc()
self.monitor.pipeline_status.set(0)
return {
'status': 'failed',
'error': str(e)
}
def extract_data(self, execution_date: datetime):
"""Extract data from sources"""
# Implement extraction logic
return pd.DataFrame()
def transform_data(self, raw_data: pd.DataFrame):
"""Transform data"""
# Implement transformation logic
return raw_data
def load_data(self, data: pd.DataFrame):
"""Load data to destination"""
# Implement loading logic
pass
# Example usage
# pipeline = ProductionPipeline()
# result = pipeline.run_pipeline(datetime.now())
π‘
Pro Tip: Always implement idempotent operations in your pipeline. This ensures that re-running failed tasks doesn't create duplicate data or inconsistent states.
6. Common Follow-Up Questions
Follow-up 1: How do you handle schema evolution?
class SchemaEvolutionHandler:
"""Handle schema changes in data pipelines"""
def __init__(self):
self.schema_registry = {}
def register_schema(self, dataset_name: str, schema: Dict):
"""Register new schema version"""
if dataset_name not in self.schema_registry:
self.schema_registry[dataset_name] = []
schema_version = {
'version': len(self.schema_registry[dataset_name]) + 1,
'schema': schema,
'created_at': datetime.now().isoformat(),
'compatible': True
}
# Check compatibility
if self.schema_registry[dataset_name]:
last_schema = self.schema_registry[dataset_name][-1]['schema']
schema_version['compatible'] = self.check_compatibility(
last_schema, schema
)
self.schema_registry[dataset_name].append(schema_version)
def check_compatibility(self, old_schema: Dict, new_schema: Dict) -> bool:
"""Check if new schema is backward compatible"""
# Check for added columns (backward compatible)
old_columns = set(old_schema.keys())
new_columns = set(new_schema.keys())
added_columns = new_columns - old_columns
removed_columns = old_columns - new_columns
# Backward compatible: can add columns, cannot remove
return len(removed_columns) == 0
def handle_schema_change(self, data: pd.DataFrame, dataset_name: str):
"""Handle schema changes during processing"""
current_schema = self.schema_registry[dataset_name][-1]['schema']
# Add missing columns with defaults
for column, dtype in current_schema.items():
if column not in data.columns:
if dtype == 'string':
data[column] = ''
elif dtype == 'integer':
data[column] = 0
elif dtype == 'float':
data[column] = 0.0
elif dtype == 'boolean':
data[column] = False
# Remove extra columns
data = data[[col for col in current_schema.keys() if col in data.columns]]
return data
Follow-up 2: How do you implement exactly-once semantics?
class ExactlyOnceProcessor:
"""Implement exactly-once processing semantics"""
def __init__(self, transaction_id: str):
self.transaction_id = transaction_id
def process_with_transactions(self, data, processing_func):
"""Process data with transaction guarantees"""
# Begin transaction
transaction_started = False
try:
# Begin transaction
self.begin_transaction()
transaction_started = True
# Process data
result = processing_func(data)
# Commit transaction
self.commit_transaction()
return result
except Exception as e:
if transaction_started:
self.rollback_transaction()
raise e
def begin_transaction(self):
"""Begin database transaction"""
# Implement based on your database
pass
def commit_transaction(self):
"""Commit transaction"""
pass
def rollback_transaction(self):
"""Rollback transaction on failure"""
pass
def idempotent_write(self, data, write_func):
"""Write data idempotently using deduplication"""
# Check if this batch was already processed
if self.is_already_processed():
print("Batch already processed, skipping")
return
# Process and write
result = write_func(data)
# Mark as processed
self.mark_as_processed()
return result
def is_already_processed(self) -> bool:
"""Check if batch was already processed"""
# Check transaction log
return False
def mark_as_processed(self):
"""Mark batch as processed"""
# Write to transaction log
pass
Company-Specific Tips
βΉοΈ
Amazon Tips:
- Amazon heavily tests on pipeline scalability and reliability
- Know how to design idempotent operations
- Understand exactly-once vs at-least-once semantics
- Be familiar with AWS services (Kinesis, Glue, Redshift)
Google Tips:
- Google values data quality and monitoring
- Know how to use Dataflow for stream processing
- Understand BigQuery best practices
- Be comfortable with Cloud Composer (Airflow)
Quiz Section
Related Topics
- Data Quality β Data cleaning and validation
- Orchestration β Workflow management
- Real-time Processing β Stream processing patterns
- Data Governance β Data management best practices