🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Multi-Cloud Data Mesh Implementation

Data Engineering ProjectsData Architecture⭐ Premium

Advertisement

Multi-Cloud Data Mesh

Federated Governance + Self-Serve Platforms + Domain Ownership

ℹ️

Project Difficulty: Expert | Duration: 6-8 weeks | Cloud: Multi-Cloud (AWS + GCP + Azure) Implement a production data mesh architecture enabling domain teams to own and serve their data as products across multiple cloud providers.

Project Overview

Problem Statement

Centralized data teams become bottlenecks as organizations scale. Data quality issues proliferate, domain expertise is lost, and time-to-value for new data products increases. Organizations need a decentralized approach where domain teams own their data.

Objectives

  1. Enable domain-driven data ownership
  2. Implement self-serve data infrastructure
  3. Establish federated computational governance
  4. Support cross-domain data discovery and access
  5. Maintain data quality and compliance across clouds

Tech Stack

ComponentTechnologyPurpose
Data PlatformKubernetes + TerraformMulti-cloud orchestration
Data CatalogDataHub + AmundsenMetadata management
Data ProductsDomain-specific APIsData as a service
GovernanceOpen Policy AgentPolicy enforcement
DiscoveryData MarketplaceData product discovery

Architecture Diagram

DATA MESH OVERVIEWCustomer Domain(AWS)Product Domain(GCP)Order Domain(Azure)Analytics Domain(AWS)INTER-DOMAIN MESHData Products(REST/gRPC)Data Contracts(Schema)Event Streaming(Kafka)PLATFORM LAYERSelf-Serve Platform(K8s + GitOps)Governance Engine(OPA)Discovery Service(Catalog)

Data Source Setup and Schema

Domain Definitions

# domains/domain_definitions.yaml
domains:
  customer:
    owner: "customer-team@company.com"
    cloud: "aws"
    region: "us-east-1"
    data_products:
      - name: "customer-profile"
        type: "batch"
        freshness_sla: "1h"
        quality_sla: 0.99
        schema:
          type: record
          fields:
            - name: customer_id
              type: string
              nullable: false
            - name: email
              type: string
              nullable: false
            - name: full_name
              type: string
              nullable: true
            - name: segment
              type: string
              enum: ["VIP", "Premium", "Regular", "New"]
            - name: lifetime_value
              type: float
              min: 0
              max: 1000000
        endpoints:
          - type: rest
            url: "https://customer-api.internal/customers"
            auth: "oauth2"
          - type: graphql
            url: "https://customer-api.internal/graphql"
        access_policies:
          - role: "data-analyst"
            permissions: ["read"]
          - role: "ml-engineer"
            permissions: ["read", "export"]
    
    - name: "customer-behavior"
      type: "streaming"
      freshness_sla: "5m"
      quality_sla: 0.95
      schema:
        type: record
        fields:
          - name: customer_id
            type: string
          - name: event_type
            type: string
            enum: ["page_view", "add_to_cart", "purchase", "search"]
          - name: timestamp
            type: timestamp
          - name: session_id
            type: string
          - name: metadata
            type: map
            values: string
      endpoints:
        - type: kafka
          topic: "customer-events"
          brokers: ["kafka.aws.internal:9092"]

  product:
    owner: "product-team@company.com"
    cloud: "gcp"
    region: "us-central1"
    data_products:
      - name: "product-catalog"
        type: "batch"
        freshness_sla: "24h"
        quality_sla: 0.98
        schema:
          type: record
          fields:
            - name: product_id
              type: string
              nullable: false
            - name: name
              type: string
              nullable: false
            - name: category
              type: string
            - name: price
              type: float
              min: 0
            - name: inventory_count
              type: integer
              min: 0
        endpoints:
          - type: rest
            url: "https://product-api.internal/products"
            auth: "api-key"
          - type: bigquery
            project: "product-analytics"
            dataset: "catalog"
            table: "products"

  order:
    owner: "order-team@company.com"
    cloud: "azure"
    region: "eastus"
    data_products:
      - name: "order-transactions"
        type: "batch"
        freshness_sla: "15m"
        quality_sla: 0.999
        schema:
          type: record
          fields:
            - name: order_id
              type: string
              nullable: false
            - name: customer_id
              type: string
              references: "customer.customer-profile.customer_id"
            - name: product_ids
              type: array
              items: string
            - name: total_amount
              type: float
              min: 0
            - name: status
              type: string
              enum: ["pending", "processing", "shipped", "delivered", "cancelled"]
            - name: order_date
              type: timestamp
        endpoints:
          - type: rest
            url: "https://order-api.internal/orders"
            auth: "oauth2"
          - type: cosmosdb
            account: "order-analytics"
            database: "orders"
            container: "transactions"

  analytics:
    owner: "analytics-team@company.com"
    cloud: "aws"
    region: "us-east-1"
    data_products:
      - name: "customer-360"
        type: "composite"
        description: "Unified customer view combining all domains"
        dependencies:
          - domain: customer
            product: customer-profile
          - domain: customer
            product: customer-behavior
          - domain: order
            product: order-transactions
        freshness_sla: "1h"
        quality_sla: 0.95
        schema:
          type: record
          fields:
            - name: customer_id
              type: string
            - name: profile
              type: record
              source: "customer.customer-profile"
            - name: behavior_summary
              type: record
              source: "customer.customer-behavior"
            - name: order_summary
              type: record
              source: "order.order-transactions"
            - name: computed_metrics
              type: record
              fields:
                - name: engagement_score
                  type: float
                - name: churn_risk
                  type: float
                - name: recommended_products
                  type: array
                  items: string

Data Product Schema Registry

# schema_registry/registry.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json
from datetime import datetime
import hashlib

class SchemaType(Enum):
    AVRO = "avro"
    PROTOBUF = "protobuf"
    JSON_SCHEMA = "json_schema"

class CompatibilityMode(Enum):
    BACKWARD = "backward"
    FORWARD = "forward"
    FULL = "full"
    NONE = "none"

@dataclass
class SchemaField:
    name: str
    type: str
    nullable: bool = False
    description: str = ""
    metadata: Dict = field(default_factory=dict)
    constraints: Dict = field(default_factory=dict)

@dataclass
class DataProductSchema:
    name: str
    version: str
    schema_type: SchemaType
    fields: List[SchemaField]
    compatibility: CompatibilityMode = CompatibilityMode.BACKWARD
    description: str = ""
    owner: str = ""
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    
    def to_avro(self) -> Dict:
        """Convert to Avro schema format."""
        return {
            "type": "record",
            "name": self.name,
            "fields": [
                {
                    "name": f.name,
                    "type": ["null", f.type] if f.nullable else f.type,
                    "doc": f.description
                }
                for f in self.fields
            ]
        }
    
    def to_json_schema(self) -> Dict:
        """Convert to JSON Schema format."""
        properties = {}
        required = []
        
        for f in self.fields:
            prop = {"type": f.type}
            if f.description:
                prop["description"] = f.description
            if f.constraints:
                prop.update(f.constraints)
            
            properties[f.name] = prop
            if not f.nullable:
                required.append(f.name)
        
        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "title": self.name,
            "type": "object",
            "properties": properties,
            "required": required
        }
    
    def calculate_fingerprint(self) -> str:
        """Calculate schema fingerprint for versioning."""
        schema_str = json.dumps(self.to_avro(), sort_keys=True)
        return hashlib.sha256(schema_str.encode()).hexdigest()[:16]

class SchemaRegistry:
    def __init__(self):
        self.schemas: Dict[str, List[DataProductSchema]] = {}
        self.compatibility_cache: Dict[str, bool] = {}
    
    def register_schema(self, schema: DataProductSchema) -> str:
        """Register a new schema version."""
        key = f"{schema.name}:{schema.version}"
        
        if key in self.schemas:
            raise ValueError(f"Schema {key} already exists")
        
        # Check compatibility with previous version
        if schema.name in self.schemas:
            previous = self.schemas[schema.name][-1]
            if not self._check_compatibility(previous, schema):
                raise ValueError(f"Schema {schema.name} is not compatible with previous version")
        
        if schema.name not in self.schemas:
            self.schemas[schema.name] = []
        
        self.schemas[schema.name].append(schema)
        
        return schema.calculate_fingerprint()
    
    def _check_compatibility(self, old_schema: DataProductSchema,
                           new_schema: DataProductSchema) -> bool:
        """Check schema compatibility."""
        if old_schema.compatibility == CompatibilityMode.NONE:
            return True
        
        old_fields = {f.name: f for f in old_schema.fields}
        new_fields = {f.name: f for f in new_schema.fields}
        
        if old_schema.compatibility in [CompatibilityMode.BACKWARD, CompatibilityMode.FULL]:
            # New schema must be able to read old data
            for field_name, field in old_fields.items():
                if field_name not in new_fields:
                    if not field.nullable:
                        return False
                elif field.type != new_fields[field_name].type:
                    return False
        
        if old_schema.compatibility in [CompatibilityMode.FORWARD, CompatibilityMode.FULL]:
            # Old schema must be able to read new data
            for field_name, field in new_fields.items():
                if field_name not in old_fields:
                    if not field.nullable:
                        return False
        
        return True
    
    def get_schema(self, name: str, version: Optional[str] = None) -> Optional[DataProductSchema]:
        """Get schema by name and optional version."""
        if name not in self.schemas:
            return None
        
        if version:
            for schema in self.schemas[name]:
                if schema.version == version:
                    return schema
            return None
        
        return self.schemas[name][-1] if self.schemas[name] else None
    
    def list_schemas(self) -> List[Dict]:
        """List all registered schemas."""
        return [
            {
                "name": name,
                "versions": len(versions),
                "latest_version": versions[-1].version if versions else None,
                "owner": versions[-1].owner if versions else None
            }
            for name, versions in self.schemas.items()
        ]

Step-by-Step Implementation Guide

Step 1: Self-Serve Platform Setup

# platform/self_serve.py
from kubernetes import client, config
from typing import Dict, List, Optional
import yaml
import json
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class SelfServePlatform:
    def __init__(self, kubeconfig_path: Optional[str] = None):
        if kubeconfig_path:
            config.load_kube_config(config_file=kubeconfig_path)
        else:
            config.load_incluster_config()
        
        self.core_api = client.CoreV1Api()
        self.apps_api = client.AppsV1Api()
        self.custom_api = client.CustomObjectsApi()
    
    def create_data_product_namespace(self, domain: str, product_name: str) -> str:
        """Create isolated namespace for a data product."""
        namespace = f"data-product-{domain}-{product_name}"
        
        namespace_body = client.V1Namespace(
            metadata=client.V1ObjectMeta(
                name=namespace,
                labels={
                    "mesh.company.com/domain": domain,
                    "mesh.company.com/product": product_name,
                    "mesh.company.com/managed-by": "data-mesh-platform"
                },
                annotations={
                    "mesh.company.com/owner": f"{domain}-team@company.com",
                    "mesh.company.com/created-at": datetime.utcnow().isoformat()
                }
            )
        )
        
        try:
            self.core_api.create_namespace(body=namespace_body)
            logger.info(f"Created namespace: {namespace}")
        except client.exceptions.ApiException as e:
            if e.status == 409:
                logger.info(f"Namespace {namespace} already exists")
            else:
                raise
        
        return namespace
    
    def deploy_data_product(self, domain: str, product_name: str,
                           config: Dict) -> Dict:
        """Deploy a data product with its infrastructure."""
        namespace = self.create_data_product_namespace(domain, product_name)
        
        # Deploy compute resources
        self._deploy_compute_resources(namespace, config)
        
        # Deploy storage resources
        self._deploy_storage_resources(namespace, config)
        
        # Deploy networking resources
        self._deploy_networking_resources(namespace, config)
        
        # Deploy monitoring resources
        self._deploy_monitoring_resources(namespace, config)
        
        # Register with service mesh
        self._register_with_service_mesh(namespace, product_name, config)
        
        return {
            "namespace": namespace,
            "status": "deployed",
            "endpoints": self._get_endpoints(namespace)
        }
    
    def _deploy_compute_resources(self, namespace: str, config: Dict):
        """Deploy compute resources (Deployment, Service)."""
        deployment = client.V1Deployment(
            metadata=client.V1ObjectMeta(
                name=f"{config['product_name']}-api",
                namespace=namespace,
                labels={"app": config['product_name']}
            ),
            spec=client.V1DeploymentSpec(
                replicas=config.get('replicas', 3),
                selector=client.V1LabelSelector(
                    match_labels={"app": config['product_name']}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": config['product_name']}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="api",
                                image=config['image'],
                                ports=[client.V1ContainerPort(container_port=8080)],
                                env=[
                                    client.V1EnvVar(name="DOMAIN", value=namespace.split("-")[2]),
                                    client.V1EnvVar(name="PRODUCT", value=config['product_name'])
                                ],
                                resources=client.V1ResourceRequirements(
                                    requests={"cpu": "100m", "memory": "256Mi"},
                                    limits={"cpu": "500m", "memory": "512Mi"}
                                )
                            )
                        ]
                    )
                )
            )
        )
        
        self.apps_api.create_namespaced_deployment(namespace=namespace, body=deployment)
    
    def _deploy_storage_resources(self, namespace: str, config: Dict):
        """Deploy storage resources (PVC, ConfigMaps)."""
        pvc = client.V1PersistentVolumeClaim(
            metadata=client.V1ObjectMeta(
                name=f"{config['product_name']}-storage",
                namespace=namespace
            ),
            spec=client.V1PersistentVolumeClaimSpec(
                access_modes=["ReadWriteOnce"],
                resources=client.V1ResourceRequirements(
                    requests={"storage": config.get('storage_size', '10Gi')}
                ),
                storage_class_name=config.get('storage_class', 'standard')
            )
        )
        
        self.core_api.create_namespaced_persistent_volume_claim(
            namespace=namespace, body=pvc
        )
    
    def _deploy_networking_resources(self, namespace: str, config: Dict):
        """Deploy networking resources (Service, Ingress)."""
        service = client.V1Service(
            metadata=client.V1ObjectMeta(
                name=f"{config['product_name']}-service",
                namespace=namespace,
                annotations={
                    "mesh.company.com/protocol": config.get('protocol', 'grpc'),
                    "mesh.company.com/version": config.get('version', 'v1')
                }
            ),
            spec=client.V1ServiceSpec(
                selector={"app": config['product_name']},
                ports=[
                    client.V1ServicePort(
                        port=8080,
                        target_port=8080,
                        protocol="TCP"
                    )
                ],
                type="ClusterIP"
            )
        )
        
        self.core_api.create_namespaced_service(namespace=namespace, body=service)
    
    def _deploy_monitoring_resources(self, namespace: str, config: Dict):
        """Deploy monitoring resources (ServiceMonitor, PrometheusRule)."""
        service_monitor = {
            "apiVersion": "monitoring.coreos.com/v1",
            "kind": "ServiceMonitor",
            "metadata": {
                "name": f"{config['product_name']}-monitor",
                "namespace": namespace,
                "labels": {
                    "release": "prometheus"
                }
            },
            "spec": {
                "selector": {
                    "matchLabels": {"app": config['product_name']}
                },
                "endpoints": [
                    {
                        "port": "metrics",
                        "interval": "30s"
                    }
                ]
            }
        }
        
        self.custom_api.create_namespaced_custom_object(
            group="monitoring.coreos.com",
            version="v1",
            namespace=namespace,
            plural="servicemonitors",
            body=service_monitor
        )
    
    def _register_with_service_mesh(self, namespace: str, product_name: str,
                                   config: Dict):
        """Register data product with Istio service mesh."""
        virtual_service = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "VirtualService",
            "metadata": {
                "name": f"{product_name}-vs",
                "namespace": namespace
            },
            "spec": {
                "hosts": [f"{product_name}.{namespace}.svc.cluster.local"],
                "http": [
                    {
                        "match": [{"headers": {"x-data-product-version": {"exact": "v1"}}}],
                        "route": [
                            {
                                "destination": {
                                    "host": product_name,
                                    "port": {"number": 8080}
                                }
                            }
                        ]
                    }
                ]
            }
        }
        
        self.custom_api.create_namespaced_custom_object(
            group="networking.istio.io",
            version="v1beta1",
            namespace=namespace,
            plural="virtualservices",
            body=virtual_service
        )
    
    def _get_endpoints(self, namespace: str) -> Dict:
        """Get deployed endpoints for the data product."""
        services = self.core_api.list_namespaced_service(namespace)
        
        endpoints = {}
        for service in services.items:
            endpoints[service.metadata.name] = {
                "cluster_ip": service.spec.cluster_ip,
                "ports": [
                    {"port": p.port, "protocol": p.protocol}
                    for p in service.spec.ports
                ]
            }
        
        return endpoints

Step 2: Federated Governance Engine

# governance/governance_engine.py
from dataclasses import DataClass, field
from typing import Dict, List, Any, Callable
from enum import Enum
import json
import re
from datetime import datetime
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class PolicyType(Enum):
    ACCESS = "access"
    QUALITY = "quality"
    PRIVACY = "privacy"
    RETENTION = "retention"
    COMPLIANCE = "compliance"

@dataclass
class Policy:
    name: str
    policy_type: PolicyType
    description: str
    rules: List[Dict[str, Any]]
    enforcement: str = "deny"  # deny, warn, audit
    priority: int = 0
    enabled: bool = True
    metadata: Dict = field(default_factory=dict)

@dataclass
class PolicyEvaluation:
    policy_name: str
    allowed: bool
    reason: str
    details: Dict = field(default_factory=dict)
    evaluated_at: datetime = field(default_factory=datetime.utcnow)

class FederatedGovernanceEngine:
    def __init__(self):
        self.policies: Dict[str, Policy] = {}
        self.custom_validators: Dict[str, Callable] = {}
        self.evaluation_cache: Dict[str, PolicyEvaluation] = {}
    
    def register_policy(self, policy: Policy):
        """Register a governance policy."""
        self.policies[policy.name] = policy
        logger.info(f"Registered policy: {policy.name}")
    
    def register_custom_validator(self, policy_name: str, validator: Callable):
        """Register a custom validator function for a policy."""
        self.custom_validators[policy_name] = validator
    
    def evaluate_access_policy(self, context: Dict) -> PolicyEvaluation:
        """Evaluate access control policy."""
        policies = [p for p in self.policies.values() 
                   if p.policy_type == PolicyType.ACCESS and p.enabled]
        
        for policy in sorted(policies, key=lambda p: p.priority, reverse=True):
            if self._matches_policy(context, policy):
                # Check custom validator if exists
                if policy.name in self.custom_validators:
                    allowed = self.custom_validators[policy.name](context)
                    return PolicyEvaluation(
                        policy_name=policy.name,
                        allowed=allowed,
                        reason=f"Custom validation: {'passed' if allowed else 'failed'}"
                    )
                
                # Default rule evaluation
                for rule in policy.rules:
                    if self._evaluate_rule(rule, context):
                        return PolicyEvaluation(
                            policy_name=policy.name,
                            allowed=rule.get('action') == 'allow',
                            reason=rule.get('reason', 'Rule matched')
                        )
        
        # Default deny if no policy matches
        return PolicyEvaluation(
            policy_name="default",
            allowed=False,
            reason="No matching policy found, default deny"
        )
    
    def evaluate_quality_policy(self, data_product: Dict, 
                              metrics: Dict) -> PolicyEvaluation:
        """Evaluate data quality policy."""
        policies = [p for p in self.policies.values() 
                   if p.policy_type == PolicyType.QUALITY and p.enabled]
        
        violations = []
        
        for policy in policies:
            for rule in policy.rules:
                metric_name = rule.get('metric')
                threshold = rule.get('threshold')
                operator = rule.get('operator', 'gte')
                
                if metric_name in metrics:
                    actual_value = metrics[metric_name]
                    
                    if operator == 'gte' and actual_value < threshold:
                        violations.append({
                            'policy': policy.name,
                            'metric': metric_name,
                            'expected': f">= {threshold}",
                            'actual': actual_value
                        })
                    elif operator == 'lte' and actual_value > threshold:
                        violations.append({
                            'policy': policy.name,
                            'metric': metric_name,
                            'expected': f"<= {threshold}",
                            'actual': actual_value
                        })
        
        if violations:
            return PolicyEvaluation(
                policy_name="quality",
                allowed=False,
                reason=f"Quality violations: {len(violations)}",
                details={'violations': violations}
            )
        
        return PolicyEvaluation(
            policy_name="quality",
            allowed=True,
            reason="All quality checks passed"
        )
    
    def evaluate_privacy_policy(self, data: Dict, 
                              classification: str) -> PolicyEvaluation:
        """Evaluate privacy and data classification policy."""
        policies = [p for p in self.policies.values() 
                   if p.policy_type == PolicyType.PRIVACY and p.enabled]
        
        for policy in policies:
            for rule in policy.rules:
                if rule.get('classification') == classification:
                    # Check required protections
                    required_protections = rule.get('required_protections', [])
                    
                    actual_protections = data.get('applied_protections', [])
                    
                    missing = set(required_protections) - set(actual_protections)
                    
                    if missing:
                        return PolicyEvaluation(
                            policy_name=policy.name,
                            allowed=False,
                            reason=f"Missing privacy protections: {', '.join(missing)}",
                            details={'missing_protections': list(missing)}
                        )
        
        return PolicyEvaluation(
            policy_name="privacy",
            allowed=True,
            reason="Privacy requirements met"
        )
    
    def _matches_policy(self, context: Dict, policy: Policy) -> bool:
        """Check if context matches policy scope."""
        scope = policy.metadata.get('scope', {})
        
        if 'domains' in scope:
            if context.get('domain') not in scope['domains']:
                return False
        
        if 'roles' in scope:
            if context.get('role') not in scope['roles']:
                return False
        
        return True
    
    def _evaluate_rule(self, rule: Dict, context: Dict) -> bool:
        """Evaluate a single policy rule."""
        conditions = rule.get('conditions', [])
        
        for condition in conditions:
            field = condition.get('field')
            operator = condition.get('operator')
            value = condition.get('value')
            
            actual_value = context.get(field)
            
            if operator == 'equals' and actual_value != value:
                return False
            elif operator == 'in' and actual_value not in value:
                return False
            elif operator == 'not_in' and actual_value in value:
                return False
        
        return True
    
    def generate_policy_report(self) -> Dict:
        """Generate comprehensive policy report."""
        report = {
            'timestamp': datetime.utcnow().isoformat(),
            'total_policies': len(self.policies),
            'policies_by_type': {},
            'evaluation_summary': {
                'total_evaluations': len(self.evaluation_cache),
                'allowed': sum(1 for e in self.evaluation_cache.values() if e.allowed),
                'denied': sum(1 for e in self.evaluation_cache.values() if not e.allowed)
            }
        }
        
        for policy_type in PolicyType:
            count = sum(1 for p in self.policies.values() 
                       if p.policy_type == policy_type)
            report['policies_by_type'][policy_type.value] = count
        
        return report

Step 3: Data Product Marketplace

# marketplace/data_marketplace.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
import json
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class ProductStatus(Enum):
    ACTIVE = "active"
    DEPRECATED = "deprecated"
    ARCHIVED = "archived"
    DRAFT = "draft"

class AccessTier(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

@dataclass
class DataProduct:
    product_id: str
    name: str
    domain: str
    owner: str
    description: str
    status: ProductStatus = ProductStatus.DRAFT
    access_tier: AccessTier = AccessTier.INTERNAL
    version: str = "1.0.0"
    schema: Dict = field(default_factory=dict)
    endpoints: List[Dict] = field(default_factory=list)
    quality_metrics: Dict = field(default_factory=dict)
    sla: Dict = field(default_factory=dict)
    tags: List[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    documentation_url: str = ""
    contact_email: str = ""
    
    def to_catalog_entry(self) -> Dict:
        return {
            "product_id": self.product_id,
            "name": self.name,
            "domain": self.domain,
            "owner": self.owner,
            "description": self.description,
            "status": self.status.value,
            "access_tier": self.access_tier.value,
            "version": self.version,
            "tags": self.tags,
            "quality_score": self.quality_metrics.get('score', 0),
            "freshness_sla": self.sla.get('freshness', 'N/A'),
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat()
        }

@dataclass
class AccessRequest:
    request_id: str
    product_id: str
    requester: str
    requester_email: str
    business_justification: str
    requested_permissions: List[str]
    status: str = "pending"
    reviewed_by: Optional[str] = None
    reviewed_at: Optional[datetime] = None
    created_at: datetime = field(default_factory=datetime.utcnow)

class DataMarketplace:
    def __init__(self):
        self.products: Dict[str, DataProduct] = {}
        self.access_requests: Dict[str, AccessRequest] = {}
        self.usage_metrics: Dict[str, List[Dict]] = {}
        self.search_index: Dict[str, List[str]] = {}
    
    def register_product(self, product: DataProduct) -> str:
        """Register a new data product in the marketplace."""
        if product.product_id in self.products:
            raise ValueError(f"Product {product.product_id} already exists")
        
        self.products[product.product_id] = product
        self._update_search_index(product)
        
        logger.info(f"Registered data product: {product.name}")
        return product.product_id
    
    def update_product(self, product_id: str, updates: Dict):
        """Update an existing data product."""
        if product_id not in self.products:
            raise ValueError(f"Product {product_id} not found")
        
        product = self.products[product_id]
        
        for key, value in updates.items():
            if hasattr(product, key):
                setattr(product, key, value)
        
        product.updated_at = datetime.utcnow()
        self._update_search_index(product)
    
    def search_products(self, query: str = "", 
                       filters: Dict = None) -> List[Dict]:
        """Search for data products."""
        results = []
        
        for product in self.products.values():
            if product.status != ProductStatus.ACTIVE:
                continue
            
            # Apply filters
            if filters:
                if 'domain' in filters and product.domain != filters['domain']:
                    continue
                if 'access_tier' in filters and product.access_tier.value != filters['access_tier']:
                    continue
                if 'tags' in filters:
                    if not set(filters['tags']).intersection(set(product.tags)):
                        continue
            
            # Apply text search
            if query:
                searchable_text = f"{product.name} {product.description} {' '.join(product.tags)}".lower()
                if query.lower() not in searchable_text:
                    continue
            
            results.append(product.to_catalog_entry())
        
        return sorted(results, key=lambda x: x.get('quality_score', 0), reverse=True)
    
    def request_access(self, product_id: str, requester: str,
                      requester_email: str, justification: str,
                      permissions: List[str]) -> AccessRequest:
        """Request access to a data product."""
        if product_id not in self.products:
            raise ValueError(f"Product {product_id} not found")
        
        product = self.products[product_id]
        
        # Check if product is accessible
        if product.access_tier == AccessTier.RESTRICTED:
            raise PermissionError("Restricted products require special approval")
        
        request = AccessRequest(
            request_id=f"REQ-{len(self.access_requests) + 1:06d}",
            product_id=product_id,
            requester=requester,
            requester_email=requester_email,
            business_justification=justification,
            requested_permissions=permissions
        )
        
        self.access_requests[request.request_id] = request
        
        # Auto-approve for public/internal products
        if product.access_tier in [AccessTier.PUBLIC, AccessTier.INTERNAL]:
            request.status = "approved"
            request.reviewed_by = "system"
            request.reviewed_at = datetime.utcnow()
        
        logger.info(f"Access request created: {request.request_id}")
        return request
    
    def approve_access_request(self, request_id: str, 
                             reviewer: str) -> AccessRequest:
        """Approve an access request."""
        if request_id not in self.access_requests:
            raise ValueError(f"Request {request_id} not found")
        
        request = self.access_requests[request_id]
        request.status = "approved"
        request.reviewed_by = reviewer
        request.reviewed_at = datetime.utcnow()
        
        logger.info(f"Access request approved: {request_id}")
        return request
    
    def get_product_usage_metrics(self, product_id: str) -> Dict:
        """Get usage metrics for a data product."""
        if product_id not in self.usage_metrics:
            return {"error": "No usage data available"}
        
        metrics = self.usage_metrics[product_id]
        
        return {
            "total_queries": sum(m.get('queries', 0) for m in metrics),
            "unique_users": len(set(m.get('user_id') for m in metrics)),
            "avg_response_time_ms": sum(m.get('response_time_ms', 0) for m in metrics) / len(metrics),
            "last_30_days": [m for m in metrics if (datetime.utcnow() - datetime.fromisoformat(m.get('timestamp', datetime.utcnow().isoformat()))).days <= 30]
        }
    
    def _update_search_index(self, product: DataProduct):
        """Update search index for a product."""
        searchable_text = f"{product.name} {product.description} {' '.join(product.tags)}".lower()
        
        words = set(searchable_text.split())
        
        for word in words:
            if word not in self.search_index:
                self.search_index[word] = []
            if product.product_id not in self.search_index[word]:
                self.search_index[word].append(product.product_id)
    
    def generate_marketplace_report(self) -> Dict:
        """Generate marketplace analytics report."""
        report = {
            'timestamp': datetime.utcnow().isoformat(),
            'summary': {
                'total_products': len(self.products),
                'active_products': sum(1 for p in self.products.values() 
                                     if p.status == ProductStatus.ACTIVE),
                'total_access_requests': len(self.access_requests),
                'pending_requests': sum(1 for r in self.access_requests.values() 
                                      if r.status == 'pending')
            },
            'by_domain': {},
            'by_access_tier': {},
            'by_status': {}
        }
        
        for product in self.products.values():
            # By domain
            if product.domain not in report['by_domain']:
                report['by_domain'][product.domain] = 0
            report['by_domain'][product.domain] += 1
            
            # By access tier
            tier = product.access_tier.value
            if tier not in report['by_access_tier']:
                report['by_access_tier'][tier] = 0
            report['by_access_tier'][tier] += 1
            
            # By status
            status = product.status.value
            if status not in report['by_status']:
                report['by_status'][status] = 0
            report['by_status'][status] += 1
        
        return report

Infrastructure Setup (Terraform)

# infrastructure/multi_cloud_data_mesh.tf
terraform {
  required_version = ">= 1.5.0"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    google = {
      source  = "hashicorp/google"
      version = "~> 4.0"
    }
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~> 3.0"
    }
    kubernetes = {
      source  = "hashicorp/kubernetes"
      version = "~> 2.0"
    }
  }
}

# AWS Provider
provider "aws" {
  region = "us-east-1"
  
  default_tags {
    tags = {
      Project     = "data-mesh"
      Environment = "production"
      ManagedBy   = "terraform"
    }
  }
}

# GCP Provider
provider "google" {
  project = "data-mesh-project"
  region  = "us-central1"
}

# Azure Provider
provider "azurerm" {
  features {}
  
  subscription_id = var.azure_subscription_id
  tenant_id       = var.azure_tenant_id
}

# Kubernetes Cluster (Multi-Cloud Federation)
module "eks_cluster" {
  source  = "terraform-aws-modules/eks/aws"
  version = "19.0.0"
  
  cluster_name    = "data-mesh-eks"
  cluster_version = "1.28"
  
  vpc_id     = module.vpc.vpc_id
  subnet_ids = module.vpc.private_subnets
  
  eks_managed_node_groups = {
    general = {
      instance_types = ["m5.xlarge"]
      min_size       = 3
      max_size       = 10
      desired_size   = 5
    }
  }
}

module "gke_cluster" {
  source  = "terraform-google-modules/kubernetes-engine/google"
  version = "26.0"
  
  project_id = "data-mesh-project"
  name       = "data-mesh-gke"
  region     = "us-central1"
  
  network    = module.vpc_gcp.network_name
  subnetwork = module.vpc_gcp.subnets_names[0]
  
  node_pools = [
    {
      name       = "default"
      machine_type = "e2-standard-4"
      min_count  = 2
      max_count  = 10
    }
  ]
}

module "aks_cluster" {
  source  = "Azure/aks/azurerm"
  version = "7.0"
  
  resource_group_name = azurerm_resource_group.data_mesh.name
  cluster_name        = "data-mesh-aks"
  kubernetes_version  = "1.28"
  
  default_node_pool = {
    vm_size    = "Standard_D4s_v3"
    node_count = 3
  }
}

# VPC Peering between clouds (using VPN)
resource "aws_vpn_gateway" "data_mesh" {
  vpc_id = module.vpc.vpc_id
  
  tags = {
    Name = "data-mesh-vpn"
  }
}

# Cross-cloud service mesh (Istio)
resource "helm_release" "istio" {
  name       = "istio"
  repository = "https://istio-release.storage.googleapis.com/charts"
  chart      = "istiod"
  version    = "1.20.0"
  namespace  = "istio-system"
  
  set {
    name  = "meshConfig.enableAutoMtls"
    value = "true"
  }
  
  set {
    name  = "meshConfig.defaultConfig.holdApplicationUntilProxyStarts"
    value = "true"
  }
}

# DataHub for metadata management
resource "helm_release" "datahub" {
  name       = "datahub"
  repository = "https://helm.datahubproject.io/"
  chart      = "datahub"
  version    = "0.2.0"
  namespace  = "datahub"
  
  values = [
    templatefile("${path.module}/values/datahub.yaml", {
      postgres_host = module.rds.endpoint
      kafka_brokers = module.msk.bootstrap_brokers
      elasticsearch_endpoint = module.elasticsearch.endpoint
    })
  ]
  
  depends_on = [
    module.rds,
    module.msk,
    module.elasticsearch
  ]
}

# Open Policy Agent for governance
resource "helm_release" "opa" {
  name       = "opa"
  repository = "https://open-policy-agent.github.io/helm-charts"
  chart      = "gatekeeper"
  version    = "3.14.0"
  namespace  = "gatekeeper-system"
  
  set {
    name  = "replicas"
    value = "3"
  }
}

# Cross-cloud DNS
resource "aws_route53_zone" "data_mesh" {
  name = "data-mesh.internal"
  
  vpc {
    vpc_id = module.vpc.vpc_id
  }
}

# Outputs
output "eks_cluster_endpoint" {
  description = "EKS cluster endpoint"
  value       = module.eks_cluster.cluster_endpoint
}

output "gke_cluster_endpoint" {
  description = "GKE cluster endpoint"
  value       = module.gke_cluster.endpoint
}

output "aks_cluster_endpoint" {
  description = "AKS cluster endpoint"
  value       = module.aks_cluster.endpoint
}

output "datahub_endpoint" {
  description = "DataHub endpoint"
  value       = "http://datahub.datahub.svc.cluster.local:9002"
}

Testing and Validation

# tests/test_data_mesh.py
import pytest
from datetime import datetime
from unittest.mock import Mock, patch

class TestDataMesh:
    @pytest.fixture
    def data_product(self):
        from marketplace.data_marketplace import DataProduct, ProductStatus, AccessTier
        
        return DataProduct(
            product_id="customer-360",
            name="Customer 360 View",
            domain="analytics",
            owner="analytics-team@company.com",
            description="Unified customer view across all touchpoints",
            status=ProductStatus.ACTIVE,
            access_tier=AccessTier.INTERNAL,
            tags=["customer", "analytics", "360-view"],
            sla={"freshness": "1h", "quality": 0.95}
        )
    
    @pytest.fixture
    def governance_engine(self):
        from governance.governance_engine import FederatedGovernanceEngine
        return FederatedGovernanceEngine()
    
    def test_product_registration(self, data_product):
        """Test data product registration in marketplace."""
        from marketplace.data_marketplace import DataMarketplace
        
        marketplace = DataMarketplace()
        product_id = marketplace.register_product(data_product)
        
        assert product_id == data_product.product_id
        assert len(marketplace.products) == 1
    
    def test_product_search(self, data_product):
        """Test data product search functionality."""
        from marketplace.data_marketplace import DataMarketplace
        
        marketplace = DataMarketplace()
        marketplace.register_product(data_product)
        
        # Search by text
        results = marketplace.search_products(query="customer")
        assert len(results) == 1
        
        # Search by domain
        results = marketplace.search_products(filters={"domain": "analytics"})
        assert len(results) == 1
        
        # Search with no results
        results = marketplace.search_products(query="nonexistent")
        assert len(results) == 0
    
    def test_access_request_flow(self, data_product):
        """Test access request and approval flow."""
        from marketplace.data_marketplace import DataMarketplace
        
        marketplace = DataMarketplace()
        marketplace.register_product(data_product)
        
        # Request access
        request = marketplace.request_access(
            product_id="customer-360",
            requester="data-analyst",
            requester_email="analyst@company.com",
            justification="Need for quarterly analysis",
            permissions=["read"]
        )
        
        assert request.status == "approved"  # Auto-approved for internal
        
        # Request access to restricted product
        data_product.access_tier = "restricted"
        marketplace.register_product(data_product)
        
        request = marketplace.request_access(
            product_id="customer-360",
            requester="external-user",
            requester_email="external@partner.com",
            justification="Partner analysis",
            permissions=["read"]
        )
        
        assert request.status == "pending"
    
    def test_policy_evaluation(self, governance_engine):
        """Test governance policy evaluation."""
        from governance.governance_engine import Policy, PolicyType
        
        # Register access policy
        policy = Policy(
            name="analyst-access",
            policy_type=PolicyType.ACCESS,
            description="Allow analysts to read customer data",
            rules=[
                {
                    "conditions": [
                        {"field": "role", "operator": "equals", "value": "data-analyst"},
                        {"field": "domain", "operator": "equals", "value": "customer"}
                    ],
                    "action": "allow",
                    "reason": "Analysts can read customer data"
                }
            ]
        )
        
        governance_engine.register_policy(policy)
        
        # Evaluate access
        context = {"role": "data-analyst", "domain": "customer", "resource": "profiles"}
        result = governance_engine.evaluate_access_policy(context)
        
        assert result.allowed == True
        
        # Evaluate unauthorized access
        context = {"role": "external-user", "domain": "customer", "resource": "profiles"}
        result = governance_engine.evaluate_access_policy(context)
        
        assert result.allowed == False
    
    def test_schema_compatibility(self):
        """Test schema compatibility checking."""
        from schema_registry.registry import (
            SchemaRegistry, DataProductSchema, SchemaField, CompatibilityMode
        )
        
        registry = SchemaRegistry()
        
        # Register v1 schema
        v1_schema = DataProductSchema(
            name="customer",
            version="1.0.0",
            schema_type="avro",
            fields=[
                SchemaField(name="id", type="string", nullable=False),
                SchemaField(name="email", type="string", nullable=False)
            ],
            compatibility=CompatibilityMode.BACKWARD
        )
        
        registry.register_schema(v1_schema)
        
        # Register compatible v2 schema (adding optional field)
        v2_schema = DataProductSchema(
            name="customer",
            version="2.0.0",
            schema_type="avro",
            fields=[
                SchemaField(name="id", type="string", nullable=False),
                SchemaField(name="email", type="string", nullable=False),
                SchemaField(name="phone", type="string", nullable=True)  # Optional
            ],
            compatibility=CompatibilityMode.BACKWARD
        )
        
        fingerprint = registry.register_schema(v2_schema)
        assert fingerprint is not None
        
        # Try to register incompatible schema
        v3_schema = DataProductSchema(
            name="customer",
            version="3.0.0",
            schema_type="avro",
            fields=[
                SchemaField(name="id", type="integer", nullable=False),  # Type changed
                SchemaField(name="email", type="string", nullable=False)
            ],
            compatibility=CompatibilityMode.BACKWARD
        )
        
        with pytest.raises(ValueError):
            registry.register_schema(v3_schema)
    
    def test_marketplace_metrics(self, data_product):
        """Test marketplace usage metrics."""
        from marketplace.data_marketplace import DataMarketplace
        
        marketplace = DataMarketplace()
        marketplace.register_product(data_product)
        
        # Simulate usage
        marketplace.usage_metrics["customer-360"] = [
            {
                "timestamp": datetime.utcnow().isoformat(),
                "user_id": "user1",
                "queries": 10,
                "response_time_ms": 50
            },
            {
                "timestamp": datetime.utcnow().isoformat(),
                "user_id": "user2",
                "queries": 5,
                "response_time_ms": 75
            }
        ]
        
        metrics = marketplace.get_product_usage_metrics("customer-360")
        
        assert metrics["total_queries"] == 15
        assert metrics["unique_users"] == 2
    
    def test_cross_domain_discovery(self):
        """Test cross-domain data product discovery."""
        from marketplace.data_marketplace import DataMarketplace, DataProduct
        
        marketplace = DataMarketplace()
        
        # Register products from different domains
        customer_product = DataProduct(
            product_id="customer-profile",
            name="Customer Profile",
            domain="customer",
            owner="customer-team@company.com",
            description="Customer master data",
            tags=["customer", "profile"]
        )
        
        order_product = DataProduct(
            product_id="order-history",
            name="Order History",
            domain="order",
            owner="order-team@company.com",
            description="Historical order data",
            tags=["order", "history"]
        )
        
        marketplace.register_product(customer_product)
        marketplace.register_product(order_product)
        
        # Search across domains
        results = marketplace.search_products(query="customer")
        assert len(results) == 1
        assert results[0]["domain"] == "customer"
        
        # Get marketplace report
        report = marketplace.generate_marketplace_report()
        assert report["summary"]["total_products"] == 2
        assert "customer" in report["by_domain"]
        assert "order" in report["by_domain"]

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
EKS Cluster5 nodes, m5.xlarge$800
GKE Cluster5 nodes, e2-standard-4$750
AKS Cluster5 nodes, Standard_D4s_v3$700
DataHub3 replicas$300
OPA/Gatekeeper3 replicas$200
Cross-Cloud VPNSite-to-site$150
Data TransferCross-cloud$500
Total$3,400

Cost Optimization Strategies

💡

Tip: Optimize multi-cloud data mesh costs:

  1. Right-sizing: Monitor and adjust instance types
  2. Spot Instances: Use for non-critical workloads
  3. Reserved Instances: 1-year commitments for 40% savings
  4. Data Egress: Minimize cross-cloud data transfer
  5. Auto-scaling: Scale based on actual usage

Performance Metrics

MetricBefore Data MeshAfter Data MeshImprovement
Data Discovery Time2 weeks5 minutes4032x faster
Data Access Provisioning1 week1 hour168x faster
Data Quality Issues100/month10/month10x reduction
Cross-Domain Analytics3 months1 week12x faster

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: Focus on these data mesh concepts in interviews:

  1. Why Data Mesh over Centralized Data Lake?

    • Eliminates data team bottleneck
    • Domain expertise preserved
    • Scalable governance
    • Faster time-to-value
  2. Why Federated Governance?

    • Consistent policies across domains
    • Local ownership with global standards
    • Automated compliance
    • Audit trail for all access
  3. Why Self-Serve Platform?

    • Domain autonomy
    • Reduced operational overhead
    • Standardized infrastructure
    • Cost transparency

Common Interview Questions

Q: "How do you handle cross-domain queries in a data mesh?"

cross_domain_strategies = {
    "Data Products": "Expose cross-domain data as new data products",
    "Event Streaming": "Kafka topics for real-time cross-domain events",
    "Federated Queries": "Trino/Presto for distributed SQL queries",
    "Materialized Views": "Pre-computed cross-domain aggregations"
}

Q: "How do you ensure data quality across autonomous domains?"

quality_framework = {
    "SLAs": "Domain-specific quality SLAs with monitoring",
    "Contracts": "Schema contracts between producers and consumers",
    "Testing": "Automated quality tests in CI/CD pipelines",
    "Observability": "Centralized quality dashboards and alerting"
}

Q: "How do you manage costs in a multi-cloud data mesh?"

cost_management = {
    "Chargeback": "Domain-level cost attribution",
    "Budgets": "Automated budget alerts and enforcement",
    "Optimization": "Right-sizing recommendations",
    "Governance": "Cost-aware resource provisioning"
}

Deployment Checklist

  • Set up multi-cloud Kubernetes clusters
  • Configure cross-cloud networking (VPN/peering)
  • Deploy service mesh (Istio) across clusters
  • Set up DataHub for metadata management
  • Configure OPA for governance policies
  • Deploy self-serve platform for domain teams
  • Establish domain ownership and SLAs
  • Set up monitoring and alerting
  • Document data product templates
  • Train domain teams on self-serve tools

⚠️

Warning: Data mesh requires significant organizational change. Start with a pilot domain before full rollout. Ensure executive sponsorship and domain team buy-in.


This project demonstrates advanced data architecture skills and is highly relevant for data platform and architecture interviews at large enterprises.

Advertisement