Design a Unified Data Platform: Ingestion to Analytics
Difficulty: Staff Level | Companies: Netflix, Uber, Airbnb, Databricks, Snowflake
Interview Question
"Design a unified data platform that handles batch and streaming data for analytics, machine learning, and real-time dashboards. How do you ensure data quality, governance, and cost efficiency?"
โน๏ธKey Concepts
This question tests your understanding of data architecture patterns, modern data stack, and lakehouse architecture.
Complete Data Platform Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ UNIFIED DATA PLATFORM ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ DATA SOURCES โโโโโโโโโโโโโโโโโโโ โ
โ โ Databases โ APIs โ Files โ IoT โ Logs โ Events โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ INGESTION LAYER โโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Batch Ingestion โ โ โ
โ โ โ (Airflow, Glue, EMR) โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Stream Ingestion โ โ โ
โ โ โ (Kinesis, Kafka, Flink) โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ STORAGE LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Data โ โ Data โ โ Data โ โ โ
โ โ โ Lake โ โ Warehouseโ โ Lakehouse โ โ
โ โ โ (S3) โ โ (Redshift)โ โ (Delta) โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ PROCESSING LAYER โโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Spark โ โ Flink โ โ dbt โ โ โ
โ โ โ (Batch) โ โ (Stream) โ โ(Transform)โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ SERVING LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โAnalytics โ โ ML โ โReal-Time โ โ โ
โ โ โ BI โ โ Platform โ โDashboardsโ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Data Volume
Data Ingestion Rate:
- Batch data: B = 1TB/day
- Streaming data: S = 100GB/hour = 2.4TB/day
- Total daily ingestion: T = B + S = 3.4TB/day
- Monthly ingestion: M = T ร 30 = 102TB/month
- Yearly ingestion: Y = T ร 365 = 1,241TB/year
Storage Requirements:
- Raw data: R = Y = 1,241TB
- Processed data: P = Y ร 1.5 = 1,861.5TB (1.5x for transformations)
- Total storage: S_total = R + P = 3,102.5TB
Processing Capacity:
- Batch processing: B_proc = 1TB/day รท 24hours = 41.7GB/hour
- Stream processing: S_proc = 100GB/hour
- Total processing: T_proc = B_proc + S_proc = 141.7GB/hour
AWS Data Platform Implementation
# S3 Data Lake
resource "aws_s3_bucket" "data_lake" {
bucket = "company-data-lake-${var.environment}"
tags = {
Environment = var.environment
ManagedBy = "Terraform"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "data_lake" {
bucket = aws_s3_bucket.data_lake.id
rule {
id = "archive-old-data"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 90
storage_class = "GLACIER"
}
transition {
days = 365
storage_class = "DEEP_ARCHIVE"
}
expiration {
days = 2555 # 7 years
}
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "data_lake" {
bucket = aws_s3_bucket.data_lake.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_lake.arn
}
}
}
# Glue Data Catalog
resource "aws_glue_catalog_database" "data_lake" {
name = "data_lake_${var.environment}"
}
resource "aws_glue_catalog_table" "raw_events" {
name = "raw_events"
database_name = aws_glue_catalog_database.data_lake.name
storage_descriptor {
location = "s3://${aws_s3_bucket.data_lake.id}/raw/events/"
input_format = "org.apache.hadoop.mapred.TextInputFormat"
output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
ser_de_info {
serialization_library = "org.openx.data.jsonserde.JsonSerDe"
}
columns {
name = "event_id"
type = "string"
}
columns {
name = "event_type"
type = "string"
}
columns {
name = "timestamp"
type = "bigint"
}
columns {
name = "user_id"
type = "string"
}
columns {
name = "payload"
type = "string"
}
}
table_type = "EXTERNAL_TABLE"
parameters = {
"classification" = "json"
}
}
# Kinesis Data Stream for real-time ingestion
resource "aws_kinesis_stream" "events" {
name = "events-stream-${var.environment}"
shard_count = 10
retention_period = 24
stream_mode_details {
stream_mode = "PROVISIONED"
}
encryption_type = "KMS"
kms_key_id = aws_kms_key.kinesis.arn
tags = {
Environment = var.environment
}
}
# Redshift Data Warehouse
resource "aws_redshift_cluster" "analytics" {
cluster_identifier = "analytics-cluster-${var.environment}"
database_name = "analytics"
master_username = var.redshift_username
master_password = var.redshift_password
node_type = "dc2.large"
cluster_type = "single-node"
encrypted = true
kms_key_id = aws_kms_key.redshift.arn
vpc_security_group_ids = [aws_security_group.redshift.id]
subnet_group_name = aws_redshift_subnet_group.analytics.name
skip_final_snapshot = var.environment != "production"
tags = {
Environment = var.environment
}
}
# EMR Cluster for Spark processing
resource "aws_emr_cluster" "spark" {
name = "spark-cluster-${var.environment}"
release_label = "emr-6.10.0"
applications = ["Spark", "Hive", "Hue"]
service_role = aws_iam_role.emr_service.arn
master_instance_group {
instance_type = "m5.xlarge"
instance_count = 1
}
core_instance_group {
instance_type = "m5.2xlarge"
instance_count = 3
ebs_config {
size = 100
type = "gp3"
volumes_per_instance = 2
}
}
ec2_attributes {
instance_profile = aws_iam_instance_profile.emr.arn
subnet_id = aws_subnet.private[0].id
}
tags = {
Environment = var.environment
}
}
Data Ingestion with Apache Kafka
# Kafka producer for data ingestion
from kafka import KafkaProducer
from typing import Dict, Any, List
import json
from datetime import datetime
import uuid
class DataIngestionProducer:
"""Kafka producer for data ingestion"""
def __init__(self, bootstrap_servers: List[str]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3
)
def ingest_event(self, topic: str, event: Dict[str, Any],
partition_key: str = None):
"""Ingest single event"""
event['ingestion_timestamp'] = datetime.utcnow().isoformat()
event['event_id'] = str(uuid.uuid4())
if partition_key is None:
partition_key = event.get('user_id', event['event_id'])
future = self.producer.send(
topic=topic,
key=partition_key,
value=event
)
return future.get(timeout=10)
def ingest_batch(self, topic: str, events: List[Dict[str, Any]]):
"""Ingest batch of events"""
futures = []
for event in events:
future = self.producer.send(
topic=topic,
key=event.get('user_id'),
value=event
)
futures.append(future)
# Wait for all to complete
for future in futures:
future.get(timeout=10)
def flush(self):
self.producer.flush()
def close(self):
self.producer.close()
# Kafka consumer for stream processing
from kafka import KafkaConsumer
from typing import Callable
class DataIngestionConsumer:
"""Kafka consumer for stream processing"""
def __init__(self, topics: List[str], bootstrap_servers: List[str],
group_id: str):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def consume(self, handler: Callable):
"""Consume messages"""
try:
for message in self.consumer:
try:
handler(message.value)
self.consumer.commit()
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
Data Transformation with dbt
-- dbt model: stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
transformed AS (
SELECT
order_id,
user_id,
order_date,
status,
total_amount,
currency,
created_at,
updated_at,
-- Data quality checks
CASE
WHEN total_amount < 0 THEN 0
ELSE total_amount
END AS clean_amount,
-- Derived columns
DATE_TRUNC('day', order_date) AS order_day,
DATE_TRUNC('week', order_date) AS order_week,
DATE_TRUNC('month', order_date) AS order_month
FROM source
WHERE order_id IS NOT NULL
)
SELECT * FROM transformed
-- dbt model: fct_daily_revenue.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
daily_metrics AS (
SELECT
order_day,
COUNT(DISTINCT order_id) AS total_orders,
COUNT(DISTINCT user_id) AS unique_users,
SUM(clean_amount) AS total_revenue,
AVG(clean_amount) AS avg_order_value,
MIN(clean_amount) AS min_order_value,
MAX(clean_amount) AS max_order_value,
-- Percentiles
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY clean_amount) AS median_order_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY clean_amount) AS p95_order_value
FROM orders
GROUP BY order_day
)
SELECT
*,
-- Growth metrics
LAG(total_revenue, 1) OVER (ORDER BY order_day) AS prev_day_revenue,
(total_revenue - LAG(total_revenue, 1) OVER (ORDER BY order_day)) /
NULLIF(LAG(total_revenue, 1) OVER (ORDER BY order_day), 0) AS revenue_growth_rate
FROM daily_metrics
Real-Time Analytics with Apache Flink
# Flink stream processing
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit
def create_stream_processing_job():
"""Create Flink stream processing job"""
# Create Flink table environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
# Define source table (Kafka)
t_env.execute_sql("""
CREATE TABLE kafka_orders (
order_id STRING,
user_id STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-processor',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Define sink table (Redis for real-time dashboard)
t_env.execute_sql("""
CREATE TABLE redis_metrics (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
order_count BIGINT,
total_amount DOUBLE,
avg_amount DOUBLE,
PRIMARY KEY (window_start) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = 'redis',
'port' = '6379'
)
""")
# Windowed aggregation
t_env.execute_sql("""
INSERT INTO redis_metrics
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM kafka_orders
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
""")
# Data quality checks
class DataQualityChecker:
"""Data quality validation"""
def __init__(self):
self.rules = []
def add_rule(self, name: str, check_func, severity: str = 'error'):
self.rules.append({
'name': name,
'check': check_func,
'severity': severity
})
def validate(self, data) -> Dict[str, Any]:
"""Validate data against rules"""
results = {
'passed': True,
'violations': []
}
for rule in self.rules:
try:
if not rule['check'](data):
results['passed'] = False
results['violations'].append({
'rule': rule['name'],
'severity': rule['severity']
})
except Exception as e:
results['passed'] = False
results['violations'].append({
'rule': rule['name'],
'error': str(e),
'severity': 'error'
})
return results
# Usage
checker = DataQualityChecker()
checker.add_rule('not_null', lambda x: x is not None)
checker.add_rule('positive_amount', lambda x: x.get('amount', 0) > 0)
checker.add_rule('valid_email', lambda x: '@' in x.get('email', ''))
โ ๏ธData Quality
Implement data quality checks at ingestion, transformation, and serving layers. Use data contracts to ensure schema compatibility between producers and consumers.
Data Governance
# Data catalog and governance
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DataAsset:
asset_id: str
name: str
description: str
owner: str
classification: str
pii_fields: List[str]
retention_days: int
created_at: datetime
tags: Dict[str, str]
class DataCatalog:
"""Data catalog for governance"""
def __init__(self):
self.assets: Dict[str, DataAsset] = {}
def register_asset(self, asset: DataAsset):
"""Register data asset"""
self.assets[asset.asset_id] = asset
def get_asset(self, asset_id: str) -> DataAsset:
"""Get data asset"""
return self.assets.get(asset_id)
def search_assets(self, query: str) -> List[DataAsset]:
"""Search data assets"""
results = []
for asset in self.assets.values():
if (query.lower() in asset.name.lower() or
query.lower() in asset.description.lower()):
results.append(asset)
return results
def get_assets_by_classification(self, classification: str) -> List[DataAsset]:
"""Get assets by classification"""
return [a for a in self.assets.values()
if a.classification == classification]
class DataRetentionManager:
"""Data retention management"""
def __init__(self):
self.retention_policies = {}
def add_policy(self, asset_type: str, retention_days: int):
"""Add retention policy"""
self.retention_policies[asset_type] = retention_days
def enforce_retention(self, s3_client, bucket: str, prefix: str):
"""Enforce data retention"""
response = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
for obj in response.get('Contents', []):
asset = self._get_asset_type(obj['Key'])
retention_days = self.retention_policies.get(asset, 365)
age_days = (datetime.utcnow() - obj['LastModified']).days
if age_days > retention_days:
self._delete_object(s3_client, bucket, obj['Key'])
โ Data Platform Benefits
A unified data platform enables data-driven decision making, ML model training, and real-time analytics. Use the medallion architecture (bronze/silver/gold) for data quality.
Summary
| Layer | Purpose | AWS Services |
|---|---|---|
| Ingestion | Data collection | Kinesis, MSK, Glue |
| Storage | Data lake/warehouse | S3, Redshift, Delta Lake |
| Processing | Batch/Stream | EMR, Flink, dbt |
| Serving | Analytics/ML | QuickSight, SageMaker |
| Governance | Quality/Security | Glue Catalog, Lake Formation |