πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Platform Design: Building Enterprise Data Infrastructure

Data EngineeringPlatform Architecture⭐ Premium

Advertisement

Data Platform Design: Building Enterprise Data Infrastructure

Difficulty: Staff Level | Companies: LinkedIn, Uber, Netflix, Airbnb, Stripe

1. Data Platform Architecture

Architecture Diagram
Enterprise Data Platform
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Application Layer                       β”‚
β”‚   BI Tools β”‚ ML Platform β”‚ Data Apps β”‚ Analytics          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Serving Layer                          β”‚
β”‚   Data Warehouse β”‚ Feature Store β”‚ Data Catalog           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Processing Layer                       β”‚
β”‚   Batch (Spark) β”‚ Stream (Flink) β”‚ ML Training            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Storage Layer                          β”‚
β”‚   Data Lake β”‚ Data Lakehouse β”‚ Object Storage              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Ingestion Layer                         β”‚
β”‚   CDC β”‚ Batch Ingest β”‚ Streaming β”‚ APIs                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Governance Layer                        β”‚
β”‚   Metadata β”‚ Lineage β”‚ Quality β”‚ Access Control            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Data Mesh Architecture

Four Principles

Architecture Diagram
Data Mesh Principles:
β”œβ”€β”€ Domain Ownership
β”‚   └── Each business domain owns their data end-to-end
β”œβ”€β”€ Data as a Product
β”‚   └── Data must be discoverable, addressable, trustworthy
β”œβ”€β”€ Self-Serve Platform
β”‚   └── Platform team provides infrastructure as a service
└── Federated Computational Governance
    └── Policies are code, executed automatically

Domain-Based Organization

from dataclasses import dataclass
from typing import List, Dict

@dataclass
class DataDomain:
    name: str
    owner_team: str
    data_products: List[str]
    upstream_dependencies: List[str]
    downstream_consumers: List[str]
    
    def schema(self):
        return {
            "domain": self.name,
            "team": self.owner_team,
            "products": self.data_products,
            "upstream": self.upstream_dependencies,
            "downstream": self.downstream_consumers
        }

# Define domains
domains = [
    DataDomain(
        name="orders",
        owner_team="checkout-team",
        data_products=["orders_fact", "payments_fact", "cart_events"],
        upstream_dependencies=["inventory", "users"],
        downstream_consumers=["analytics", "ml", "finance"]
    ),
    DataDomain(
        name="users",
        owner_team="growth-team",
        data_products=["user_profiles", "user_segments", "user_events"],
        upstream_dependencies=[],
        downstream_consumers=["orders", "marketing", "ml"]
    ),
    DataDomain(
        name="inventory",
        owner_team="supply-chain-team",
        data_products=["stock_levels", "supplier_data", "warehouse_events"],
        upstream_dependencies=["orders"],
        downstream_consumers=["analytics", "operations"]
    ),
]

Data Product Definition

@dataclass
class DataProduct:
    name: str
    domain: str
    description: str
    schema: Dict[str, str]
    sla: Dict[str, any]
    access_patterns: List[str]
    quality_checks: List[str]
    owner: str
    
    def to_data_contract(self):
        return {
            "product": self.name,
            "domain": self.domain,
            "schema": self.schema,
            "sla": {
                "freshness_minutes": self.sla.get("freshness", 60),
                "availability_percent": self.sla.get("availability", 99.9),
                "completeness_percent": self.sla.get("completeness", 99.5),
            },
            "quality": self.quality_checks,
            "access": self.access_patterns,
        }

# Example data product
orders_product = DataProduct(
    name="orders_fact",
    domain="orders",
    description="Fact table of all customer orders",
    schema={
        "order_id": "string (PK)",
        "user_id": "string (FK)",
        "order_total": "decimal(12,2)",
        "order_status": "enum(pending,completed,cancelled)",
        "created_at": "timestamp",
    },
    sla={"freshness": 15, "availability": 99.95, "completeness": 99.9},
    access_patterns=["batch", "streaming"],
    quality_checks=["not_null(order_id)", "unique(order_id)", "valid_status"],
    owner="checkout-team"
)

3. Self-Serve Platform Design

class SelfServeDataPlatform:
    """Platform capabilities that domain teams can self-serve"""
    
    def __init__(self):
        self.capabilities = {
            "ingestion": self._ingestion_service,
            "transformation": self._transformation_service,
            "storage": self._storage_service,
            "serving": self._serving_service,
            "monitoring": self._monitoring_service,
        }
    
    def _ingestion_service(self, config: dict):
        """Domain teams create ingestion pipelines"""
        return {
            "type": "managed_ingestion",
            "sources": config.get("sources", []),
            "targets": config.get("targets", []),
            "schedule": config.get("schedule", "@daily"),
            "monitoring": "auto",
        }
    
    def _transformation_service(self, config: dict):
        """Domain teams define transformations in SQL/Python"""
        return {
            "type": "managed_transformation",
            "engine": config.get("engine", "spark"),
            "code_repo": config.get("repo"),
            "tests": config.get("tests", []),
            "lineage": "auto",
        }
    
    def _storage_service(self, config: dict):
        """Domain teams get managed storage"""
        return {
            "type": "managed_storage",
            "format": config.get("format", "delta"),
            "partitioning": config.get("partitioning"),
            "retention": config.get("retention_days", 365),
            "compression": "auto",
        }
    
    def provision_domain(self, domain_name: str):
        """Provision all resources for a new domain"""
        return {
            "storage_bucket": f"s3://{domain_name}-data-lake",
            "schema_registry": f"{domain_name}-schemas",
            "monitoring": f"{domain_name}-dashboards",
            "access_role": f"arn:aws:iam::role/{domain_name}-data-access",
        }

4. Data Catalog & Discovery

from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class CatalogEntry:
    name: str
    domain: str
    description: str
    owner: str
    schema: Dict[str, str]
    tags: List[str] = field(default_factory=list)
    quality_score: float = 0.0
    last_updated: datetime = field(default_factory=datetime.now)
    lineage_upstream: List[str] = field(default_factory=list)
    lineage_downstream: List[str] = field(default_factory=list)

class DataCatalog:
    def __init__(self):
        self.entries: Dict[str, CatalogEntry] = {}
    
    def register(self, entry: CatalogEntry):
        self.entries[entry.name] = entry
    
    def search(self, query: str, tags: Optional[List[str]] = None) -> List[CatalogEntry]:
        results = []
        for entry in self.entries.values():
            match = query.lower() in entry.description.lower() or \
                   query.lower() in entry.name.lower()
            if tags:
                match = match and any(t in entry.tags for t in tags)
            if match:
                results.append(entry)
        return sorted(results, key=lambda e: e.quality_score, reverse=True)
    
    def impact_analysis(self, dataset_name: str) -> Dict:
        entry = self.entries.get(dataset_name)
        if not entry:
            return {}
        
        return {
            "dataset": dataset_name,
            "upstream": entry.lineage_upstream,
            "downstream": entry.lineage_downstream,
            "downstream_count": len(entry.lineage_downstream),
            "quality_score": entry.quality_score,
        }

5. Federated Governance

class FederatedGovernance:
    """Governance policies as code, executed automatically"""
    
    PII_TYPES = {"email", "phone", "ssn", "credit_card", "address"}
    SENSITIVE_TAGS = {"pii", "financial", "health", "personal"}
    
    def apply_access_policy(self, dataset: CatalogEntry, requester: str) -> bool:
        """Automated access control"""
        if any(tag in self.SENSITIVE_TAGS for tag in dataset.tags):
            # Require explicit approval for sensitive data
            return self._check_approval(dataset.name, requester)
        return True
    
    def validate_quality(self, dataset: CatalogEntry, data) -> Dict:
        """Automated quality validation"""
        results = {}
        
        # Check completeness
        null_ratios = {col: data[col].isnull().mean() for col in data.columns}
        results["completeness"] = all(r < 0.05 for r in null_ratios.values())
        
        # Check uniqueness
        for col in dataset.schema:
            if col.endswith("_id"):
                results[f"unique_{col}"] = data[col].is_unique
        
        # Check freshness
        if "timestamp" in data.columns:
            max_ts = data["timestamp"].max()
            results["freshness_hours"] = (datetime.now() - max_ts).total_seconds() / 3600
        
        return results
    
    def compute_quality_score(self, validation_results: Dict) -> float:
        """Compute 0-100 quality score"""
        weights = {"completeness": 30, "freshness": 30, "uniqueness": 20, "accuracy": 20}
        score = 0
        
        if validation_results.get("completeness"):
            score += weights["completeness"]
        
        freshness_hours = validation_results.get("freshness_hours", 999)
        if freshness_hours <= 1:
            score += weights["freshness"]
        elif freshness_hours <= 24:
            score += weights["freshness"] * 0.5
        
        return score

ℹ️

Best Practice: Start with a centralized platform, then evolve toward data mesh as the organization grows. The key enabler is a self-serve platform that reduces the cognitive load on domain teams.

Follow-Up Questions

  1. How would you migrate from a centralized data platform to data mesh?
  2. Design a data platform for a 500-person engineering organization.
  3. How do you handle cross-domain data joins in a data mesh?
  4. Design a self-serve analytics platform for non-technical users.
  5. How would you measure the ROI of a data platform investment?

Advertisement