Multi-Cloud Data Mesh
Federated Governance + Self-Serve Platforms + Domain Ownership
ℹ️
Project Difficulty: Expert | Duration: 6-8 weeks | Cloud: Multi-Cloud (AWS + GCP + Azure) Implement a production data mesh architecture enabling domain teams to own and serve their data as products across multiple cloud providers.
Project Overview
Problem Statement
Centralized data teams become bottlenecks as organizations scale. Data quality issues proliferate, domain expertise is lost, and time-to-value for new data products increases. Organizations need a decentralized approach where domain teams own their data.
Objectives
- Enable domain-driven data ownership
- Implement self-serve data infrastructure
- Establish federated computational governance
- Support cross-domain data discovery and access
- Maintain data quality and compliance across clouds
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Data Platform | Kubernetes + Terraform | Multi-cloud orchestration |
| Data Catalog | DataHub + Amundsen | Metadata management |
| Data Products | Domain-specific APIs | Data as a service |
| Governance | Open Policy Agent | Policy enforcement |
| Discovery | Data Marketplace | Data product discovery |
Architecture Diagram
Data Source Setup and Schema
Domain Definitions
# domains/domain_definitions.yaml
domains:
customer:
owner: "customer-team@company.com"
cloud: "aws"
region: "us-east-1"
data_products:
- name: "customer-profile"
type: "batch"
freshness_sla: "1h"
quality_sla: 0.99
schema:
type: record
fields:
- name: customer_id
type: string
nullable: false
- name: email
type: string
nullable: false
- name: full_name
type: string
nullable: true
- name: segment
type: string
enum: ["VIP", "Premium", "Regular", "New"]
- name: lifetime_value
type: float
min: 0
max: 1000000
endpoints:
- type: rest
url: "https://customer-api.internal/customers"
auth: "oauth2"
- type: graphql
url: "https://customer-api.internal/graphql"
access_policies:
- role: "data-analyst"
permissions: ["read"]
- role: "ml-engineer"
permissions: ["read", "export"]
- name: "customer-behavior"
type: "streaming"
freshness_sla: "5m"
quality_sla: 0.95
schema:
type: record
fields:
- name: customer_id
type: string
- name: event_type
type: string
enum: ["page_view", "add_to_cart", "purchase", "search"]
- name: timestamp
type: timestamp
- name: session_id
type: string
- name: metadata
type: map
values: string
endpoints:
- type: kafka
topic: "customer-events"
brokers: ["kafka.aws.internal:9092"]
product:
owner: "product-team@company.com"
cloud: "gcp"
region: "us-central1"
data_products:
- name: "product-catalog"
type: "batch"
freshness_sla: "24h"
quality_sla: 0.98
schema:
type: record
fields:
- name: product_id
type: string
nullable: false
- name: name
type: string
nullable: false
- name: category
type: string
- name: price
type: float
min: 0
- name: inventory_count
type: integer
min: 0
endpoints:
- type: rest
url: "https://product-api.internal/products"
auth: "api-key"
- type: bigquery
project: "product-analytics"
dataset: "catalog"
table: "products"
order:
owner: "order-team@company.com"
cloud: "azure"
region: "eastus"
data_products:
- name: "order-transactions"
type: "batch"
freshness_sla: "15m"
quality_sla: 0.999
schema:
type: record
fields:
- name: order_id
type: string
nullable: false
- name: customer_id
type: string
references: "customer.customer-profile.customer_id"
- name: product_ids
type: array
items: string
- name: total_amount
type: float
min: 0
- name: status
type: string
enum: ["pending", "processing", "shipped", "delivered", "cancelled"]
- name: order_date
type: timestamp
endpoints:
- type: rest
url: "https://order-api.internal/orders"
auth: "oauth2"
- type: cosmosdb
account: "order-analytics"
database: "orders"
container: "transactions"
analytics:
owner: "analytics-team@company.com"
cloud: "aws"
region: "us-east-1"
data_products:
- name: "customer-360"
type: "composite"
description: "Unified customer view combining all domains"
dependencies:
- domain: customer
product: customer-profile
- domain: customer
product: customer-behavior
- domain: order
product: order-transactions
freshness_sla: "1h"
quality_sla: 0.95
schema:
type: record
fields:
- name: customer_id
type: string
- name: profile
type: record
source: "customer.customer-profile"
- name: behavior_summary
type: record
source: "customer.customer-behavior"
- name: order_summary
type: record
source: "order.order-transactions"
- name: computed_metrics
type: record
fields:
- name: engagement_score
type: float
- name: churn_risk
type: float
- name: recommended_products
type: array
items: string
Data Product Schema Registry
# schema_registry/registry.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json
from datetime import datetime
import hashlib
class SchemaType(Enum):
AVRO = "avro"
PROTOBUF = "protobuf"
JSON_SCHEMA = "json_schema"
class CompatibilityMode(Enum):
BACKWARD = "backward"
FORWARD = "forward"
FULL = "full"
NONE = "none"
@dataclass
class SchemaField:
name: str
type: str
nullable: bool = False
description: str = ""
metadata: Dict = field(default_factory=dict)
constraints: Dict = field(default_factory=dict)
@dataclass
class DataProductSchema:
name: str
version: str
schema_type: SchemaType
fields: List[SchemaField]
compatibility: CompatibilityMode = CompatibilityMode.BACKWARD
description: str = ""
owner: str = ""
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
def to_avro(self) -> Dict:
"""Convert to Avro schema format."""
return {
"type": "record",
"name": self.name,
"fields": [
{
"name": f.name,
"type": ["null", f.type] if f.nullable else f.type,
"doc": f.description
}
for f in self.fields
]
}
def to_json_schema(self) -> Dict:
"""Convert to JSON Schema format."""
properties = {}
required = []
for f in self.fields:
prop = {"type": f.type}
if f.description:
prop["description"] = f.description
if f.constraints:
prop.update(f.constraints)
properties[f.name] = prop
if not f.nullable:
required.append(f.name)
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": self.name,
"type": "object",
"properties": properties,
"required": required
}
def calculate_fingerprint(self) -> str:
"""Calculate schema fingerprint for versioning."""
schema_str = json.dumps(self.to_avro(), sort_keys=True)
return hashlib.sha256(schema_str.encode()).hexdigest()[:16]
class SchemaRegistry:
def __init__(self):
self.schemas: Dict[str, List[DataProductSchema]] = {}
self.compatibility_cache: Dict[str, bool] = {}
def register_schema(self, schema: DataProductSchema) -> str:
"""Register a new schema version."""
key = f"{schema.name}:{schema.version}"
if key in self.schemas:
raise ValueError(f"Schema {key} already exists")
# Check compatibility with previous version
if schema.name in self.schemas:
previous = self.schemas[schema.name][-1]
if not self._check_compatibility(previous, schema):
raise ValueError(f"Schema {schema.name} is not compatible with previous version")
if schema.name not in self.schemas:
self.schemas[schema.name] = []
self.schemas[schema.name].append(schema)
return schema.calculate_fingerprint()
def _check_compatibility(self, old_schema: DataProductSchema,
new_schema: DataProductSchema) -> bool:
"""Check schema compatibility."""
if old_schema.compatibility == CompatibilityMode.NONE:
return True
old_fields = {f.name: f for f in old_schema.fields}
new_fields = {f.name: f for f in new_schema.fields}
if old_schema.compatibility in [CompatibilityMode.BACKWARD, CompatibilityMode.FULL]:
# New schema must be able to read old data
for field_name, field in old_fields.items():
if field_name not in new_fields:
if not field.nullable:
return False
elif field.type != new_fields[field_name].type:
return False
if old_schema.compatibility in [CompatibilityMode.FORWARD, CompatibilityMode.FULL]:
# Old schema must be able to read new data
for field_name, field in new_fields.items():
if field_name not in old_fields:
if not field.nullable:
return False
return True
def get_schema(self, name: str, version: Optional[str] = None) -> Optional[DataProductSchema]:
"""Get schema by name and optional version."""
if name not in self.schemas:
return None
if version:
for schema in self.schemas[name]:
if schema.version == version:
return schema
return None
return self.schemas[name][-1] if self.schemas[name] else None
def list_schemas(self) -> List[Dict]:
"""List all registered schemas."""
return [
{
"name": name,
"versions": len(versions),
"latest_version": versions[-1].version if versions else None,
"owner": versions[-1].owner if versions else None
}
for name, versions in self.schemas.items()
]
Step-by-Step Implementation Guide
Step 1: Self-Serve Platform Setup
# platform/self_serve.py
from kubernetes import client, config
from typing import Dict, List, Optional
import yaml
import json
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class SelfServePlatform:
def __init__(self, kubeconfig_path: Optional[str] = None):
if kubeconfig_path:
config.load_kube_config(config_file=kubeconfig_path)
else:
config.load_incluster_config()
self.core_api = client.CoreV1Api()
self.apps_api = client.AppsV1Api()
self.custom_api = client.CustomObjectsApi()
def create_data_product_namespace(self, domain: str, product_name: str) -> str:
"""Create isolated namespace for a data product."""
namespace = f"data-product-{domain}-{product_name}"
namespace_body = client.V1Namespace(
metadata=client.V1ObjectMeta(
name=namespace,
labels={
"mesh.company.com/domain": domain,
"mesh.company.com/product": product_name,
"mesh.company.com/managed-by": "data-mesh-platform"
},
annotations={
"mesh.company.com/owner": f"{domain}-team@company.com",
"mesh.company.com/created-at": datetime.utcnow().isoformat()
}
)
)
try:
self.core_api.create_namespace(body=namespace_body)
logger.info(f"Created namespace: {namespace}")
except client.exceptions.ApiException as e:
if e.status == 409:
logger.info(f"Namespace {namespace} already exists")
else:
raise
return namespace
def deploy_data_product(self, domain: str, product_name: str,
config: Dict) -> Dict:
"""Deploy a data product with its infrastructure."""
namespace = self.create_data_product_namespace(domain, product_name)
# Deploy compute resources
self._deploy_compute_resources(namespace, config)
# Deploy storage resources
self._deploy_storage_resources(namespace, config)
# Deploy networking resources
self._deploy_networking_resources(namespace, config)
# Deploy monitoring resources
self._deploy_monitoring_resources(namespace, config)
# Register with service mesh
self._register_with_service_mesh(namespace, product_name, config)
return {
"namespace": namespace,
"status": "deployed",
"endpoints": self._get_endpoints(namespace)
}
def _deploy_compute_resources(self, namespace: str, config: Dict):
"""Deploy compute resources (Deployment, Service)."""
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=f"{config['product_name']}-api",
namespace=namespace,
labels={"app": config['product_name']}
),
spec=client.V1DeploymentSpec(
replicas=config.get('replicas', 3),
selector=client.V1LabelSelector(
match_labels={"app": config['product_name']}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": config['product_name']}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="api",
image=config['image'],
ports=[client.V1ContainerPort(container_port=8080)],
env=[
client.V1EnvVar(name="DOMAIN", value=namespace.split("-")[2]),
client.V1EnvVar(name="PRODUCT", value=config['product_name'])
],
resources=client.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "256Mi"},
limits={"cpu": "500m", "memory": "512Mi"}
)
)
]
)
)
)
)
self.apps_api.create_namespaced_deployment(namespace=namespace, body=deployment)
def _deploy_storage_resources(self, namespace: str, config: Dict):
"""Deploy storage resources (PVC, ConfigMaps)."""
pvc = client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(
name=f"{config['product_name']}-storage",
namespace=namespace
),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(
requests={"storage": config.get('storage_size', '10Gi')}
),
storage_class_name=config.get('storage_class', 'standard')
)
)
self.core_api.create_namespaced_persistent_volume_claim(
namespace=namespace, body=pvc
)
def _deploy_networking_resources(self, namespace: str, config: Dict):
"""Deploy networking resources (Service, Ingress)."""
service = client.V1Service(
metadata=client.V1ObjectMeta(
name=f"{config['product_name']}-service",
namespace=namespace,
annotations={
"mesh.company.com/protocol": config.get('protocol', 'grpc'),
"mesh.company.com/version": config.get('version', 'v1')
}
),
spec=client.V1ServiceSpec(
selector={"app": config['product_name']},
ports=[
client.V1ServicePort(
port=8080,
target_port=8080,
protocol="TCP"
)
],
type="ClusterIP"
)
)
self.core_api.create_namespaced_service(namespace=namespace, body=service)
def _deploy_monitoring_resources(self, namespace: str, config: Dict):
"""Deploy monitoring resources (ServiceMonitor, PrometheusRule)."""
service_monitor = {
"apiVersion": "monitoring.coreos.com/v1",
"kind": "ServiceMonitor",
"metadata": {
"name": f"{config['product_name']}-monitor",
"namespace": namespace,
"labels": {
"release": "prometheus"
}
},
"spec": {
"selector": {
"matchLabels": {"app": config['product_name']}
},
"endpoints": [
{
"port": "metrics",
"interval": "30s"
}
]
}
}
self.custom_api.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace=namespace,
plural="servicemonitors",
body=service_monitor
)
def _register_with_service_mesh(self, namespace: str, product_name: str,
config: Dict):
"""Register data product with Istio service mesh."""
virtual_service = {
"apiVersion": "networking.istio.io/v1beta1",
"kind": "VirtualService",
"metadata": {
"name": f"{product_name}-vs",
"namespace": namespace
},
"spec": {
"hosts": [f"{product_name}.{namespace}.svc.cluster.local"],
"http": [
{
"match": [{"headers": {"x-data-product-version": {"exact": "v1"}}}],
"route": [
{
"destination": {
"host": product_name,
"port": {"number": 8080}
}
}
]
}
]
}
}
self.custom_api.create_namespaced_custom_object(
group="networking.istio.io",
version="v1beta1",
namespace=namespace,
plural="virtualservices",
body=virtual_service
)
def _get_endpoints(self, namespace: str) -> Dict:
"""Get deployed endpoints for the data product."""
services = self.core_api.list_namespaced_service(namespace)
endpoints = {}
for service in services.items:
endpoints[service.metadata.name] = {
"cluster_ip": service.spec.cluster_ip,
"ports": [
{"port": p.port, "protocol": p.protocol}
for p in service.spec.ports
]
}
return endpoints
Step 2: Federated Governance Engine
# governance/governance_engine.py
from dataclasses import DataClass, field
from typing import Dict, List, Any, Callable
from enum import Enum
import json
import re
from datetime import datetime
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class PolicyType(Enum):
ACCESS = "access"
QUALITY = "quality"
PRIVACY = "privacy"
RETENTION = "retention"
COMPLIANCE = "compliance"
@dataclass
class Policy:
name: str
policy_type: PolicyType
description: str
rules: List[Dict[str, Any]]
enforcement: str = "deny" # deny, warn, audit
priority: int = 0
enabled: bool = True
metadata: Dict = field(default_factory=dict)
@dataclass
class PolicyEvaluation:
policy_name: str
allowed: bool
reason: str
details: Dict = field(default_factory=dict)
evaluated_at: datetime = field(default_factory=datetime.utcnow)
class FederatedGovernanceEngine:
def __init__(self):
self.policies: Dict[str, Policy] = {}
self.custom_validators: Dict[str, Callable] = {}
self.evaluation_cache: Dict[str, PolicyEvaluation] = {}
def register_policy(self, policy: Policy):
"""Register a governance policy."""
self.policies[policy.name] = policy
logger.info(f"Registered policy: {policy.name}")
def register_custom_validator(self, policy_name: str, validator: Callable):
"""Register a custom validator function for a policy."""
self.custom_validators[policy_name] = validator
def evaluate_access_policy(self, context: Dict) -> PolicyEvaluation:
"""Evaluate access control policy."""
policies = [p for p in self.policies.values()
if p.policy_type == PolicyType.ACCESS and p.enabled]
for policy in sorted(policies, key=lambda p: p.priority, reverse=True):
if self._matches_policy(context, policy):
# Check custom validator if exists
if policy.name in self.custom_validators:
allowed = self.custom_validators[policy.name](context)
return PolicyEvaluation(
policy_name=policy.name,
allowed=allowed,
reason=f"Custom validation: {'passed' if allowed else 'failed'}"
)
# Default rule evaluation
for rule in policy.rules:
if self._evaluate_rule(rule, context):
return PolicyEvaluation(
policy_name=policy.name,
allowed=rule.get('action') == 'allow',
reason=rule.get('reason', 'Rule matched')
)
# Default deny if no policy matches
return PolicyEvaluation(
policy_name="default",
allowed=False,
reason="No matching policy found, default deny"
)
def evaluate_quality_policy(self, data_product: Dict,
metrics: Dict) -> PolicyEvaluation:
"""Evaluate data quality policy."""
policies = [p for p in self.policies.values()
if p.policy_type == PolicyType.QUALITY and p.enabled]
violations = []
for policy in policies:
for rule in policy.rules:
metric_name = rule.get('metric')
threshold = rule.get('threshold')
operator = rule.get('operator', 'gte')
if metric_name in metrics:
actual_value = metrics[metric_name]
if operator == 'gte' and actual_value < threshold:
violations.append({
'policy': policy.name,
'metric': metric_name,
'expected': f">= {threshold}",
'actual': actual_value
})
elif operator == 'lte' and actual_value > threshold:
violations.append({
'policy': policy.name,
'metric': metric_name,
'expected': f"<= {threshold}",
'actual': actual_value
})
if violations:
return PolicyEvaluation(
policy_name="quality",
allowed=False,
reason=f"Quality violations: {len(violations)}",
details={'violations': violations}
)
return PolicyEvaluation(
policy_name="quality",
allowed=True,
reason="All quality checks passed"
)
def evaluate_privacy_policy(self, data: Dict,
classification: str) -> PolicyEvaluation:
"""Evaluate privacy and data classification policy."""
policies = [p for p in self.policies.values()
if p.policy_type == PolicyType.PRIVACY and p.enabled]
for policy in policies:
for rule in policy.rules:
if rule.get('classification') == classification:
# Check required protections
required_protections = rule.get('required_protections', [])
actual_protections = data.get('applied_protections', [])
missing = set(required_protections) - set(actual_protections)
if missing:
return PolicyEvaluation(
policy_name=policy.name,
allowed=False,
reason=f"Missing privacy protections: {', '.join(missing)}",
details={'missing_protections': list(missing)}
)
return PolicyEvaluation(
policy_name="privacy",
allowed=True,
reason="Privacy requirements met"
)
def _matches_policy(self, context: Dict, policy: Policy) -> bool:
"""Check if context matches policy scope."""
scope = policy.metadata.get('scope', {})
if 'domains' in scope:
if context.get('domain') not in scope['domains']:
return False
if 'roles' in scope:
if context.get('role') not in scope['roles']:
return False
return True
def _evaluate_rule(self, rule: Dict, context: Dict) -> bool:
"""Evaluate a single policy rule."""
conditions = rule.get('conditions', [])
for condition in conditions:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
actual_value = context.get(field)
if operator == 'equals' and actual_value != value:
return False
elif operator == 'in' and actual_value not in value:
return False
elif operator == 'not_in' and actual_value in value:
return False
return True
def generate_policy_report(self) -> Dict:
"""Generate comprehensive policy report."""
report = {
'timestamp': datetime.utcnow().isoformat(),
'total_policies': len(self.policies),
'policies_by_type': {},
'evaluation_summary': {
'total_evaluations': len(self.evaluation_cache),
'allowed': sum(1 for e in self.evaluation_cache.values() if e.allowed),
'denied': sum(1 for e in self.evaluation_cache.values() if not e.allowed)
}
}
for policy_type in PolicyType:
count = sum(1 for p in self.policies.values()
if p.policy_type == policy_type)
report['policies_by_type'][policy_type.value] = count
return report
Step 3: Data Product Marketplace
# marketplace/data_marketplace.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
import json
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class ProductStatus(Enum):
ACTIVE = "active"
DEPRECATED = "deprecated"
ARCHIVED = "archived"
DRAFT = "draft"
class AccessTier(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class DataProduct:
product_id: str
name: str
domain: str
owner: str
description: str
status: ProductStatus = ProductStatus.DRAFT
access_tier: AccessTier = AccessTier.INTERNAL
version: str = "1.0.0"
schema: Dict = field(default_factory=dict)
endpoints: List[Dict] = field(default_factory=list)
quality_metrics: Dict = field(default_factory=dict)
sla: Dict = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
documentation_url: str = ""
contact_email: str = ""
def to_catalog_entry(self) -> Dict:
return {
"product_id": self.product_id,
"name": self.name,
"domain": self.domain,
"owner": self.owner,
"description": self.description,
"status": self.status.value,
"access_tier": self.access_tier.value,
"version": self.version,
"tags": self.tags,
"quality_score": self.quality_metrics.get('score', 0),
"freshness_sla": self.sla.get('freshness', 'N/A'),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
@dataclass
class AccessRequest:
request_id: str
product_id: str
requester: str
requester_email: str
business_justification: str
requested_permissions: List[str]
status: str = "pending"
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class DataMarketplace:
def __init__(self):
self.products: Dict[str, DataProduct] = {}
self.access_requests: Dict[str, AccessRequest] = {}
self.usage_metrics: Dict[str, List[Dict]] = {}
self.search_index: Dict[str, List[str]] = {}
def register_product(self, product: DataProduct) -> str:
"""Register a new data product in the marketplace."""
if product.product_id in self.products:
raise ValueError(f"Product {product.product_id} already exists")
self.products[product.product_id] = product
self._update_search_index(product)
logger.info(f"Registered data product: {product.name}")
return product.product_id
def update_product(self, product_id: str, updates: Dict):
"""Update an existing data product."""
if product_id not in self.products:
raise ValueError(f"Product {product_id} not found")
product = self.products[product_id]
for key, value in updates.items():
if hasattr(product, key):
setattr(product, key, value)
product.updated_at = datetime.utcnow()
self._update_search_index(product)
def search_products(self, query: str = "",
filters: Dict = None) -> List[Dict]:
"""Search for data products."""
results = []
for product in self.products.values():
if product.status != ProductStatus.ACTIVE:
continue
# Apply filters
if filters:
if 'domain' in filters and product.domain != filters['domain']:
continue
if 'access_tier' in filters and product.access_tier.value != filters['access_tier']:
continue
if 'tags' in filters:
if not set(filters['tags']).intersection(set(product.tags)):
continue
# Apply text search
if query:
searchable_text = f"{product.name} {product.description} {' '.join(product.tags)}".lower()
if query.lower() not in searchable_text:
continue
results.append(product.to_catalog_entry())
return sorted(results, key=lambda x: x.get('quality_score', 0), reverse=True)
def request_access(self, product_id: str, requester: str,
requester_email: str, justification: str,
permissions: List[str]) -> AccessRequest:
"""Request access to a data product."""
if product_id not in self.products:
raise ValueError(f"Product {product_id} not found")
product = self.products[product_id]
# Check if product is accessible
if product.access_tier == AccessTier.RESTRICTED:
raise PermissionError("Restricted products require special approval")
request = AccessRequest(
request_id=f"REQ-{len(self.access_requests) + 1:06d}",
product_id=product_id,
requester=requester,
requester_email=requester_email,
business_justification=justification,
requested_permissions=permissions
)
self.access_requests[request.request_id] = request
# Auto-approve for public/internal products
if product.access_tier in [AccessTier.PUBLIC, AccessTier.INTERNAL]:
request.status = "approved"
request.reviewed_by = "system"
request.reviewed_at = datetime.utcnow()
logger.info(f"Access request created: {request.request_id}")
return request
def approve_access_request(self, request_id: str,
reviewer: str) -> AccessRequest:
"""Approve an access request."""
if request_id not in self.access_requests:
raise ValueError(f"Request {request_id} not found")
request = self.access_requests[request_id]
request.status = "approved"
request.reviewed_by = reviewer
request.reviewed_at = datetime.utcnow()
logger.info(f"Access request approved: {request_id}")
return request
def get_product_usage_metrics(self, product_id: str) -> Dict:
"""Get usage metrics for a data product."""
if product_id not in self.usage_metrics:
return {"error": "No usage data available"}
metrics = self.usage_metrics[product_id]
return {
"total_queries": sum(m.get('queries', 0) for m in metrics),
"unique_users": len(set(m.get('user_id') for m in metrics)),
"avg_response_time_ms": sum(m.get('response_time_ms', 0) for m in metrics) / len(metrics),
"last_30_days": [m for m in metrics if (datetime.utcnow() - datetime.fromisoformat(m.get('timestamp', datetime.utcnow().isoformat()))).days <= 30]
}
def _update_search_index(self, product: DataProduct):
"""Update search index for a product."""
searchable_text = f"{product.name} {product.description} {' '.join(product.tags)}".lower()
words = set(searchable_text.split())
for word in words:
if word not in self.search_index:
self.search_index[word] = []
if product.product_id not in self.search_index[word]:
self.search_index[word].append(product.product_id)
def generate_marketplace_report(self) -> Dict:
"""Generate marketplace analytics report."""
report = {
'timestamp': datetime.utcnow().isoformat(),
'summary': {
'total_products': len(self.products),
'active_products': sum(1 for p in self.products.values()
if p.status == ProductStatus.ACTIVE),
'total_access_requests': len(self.access_requests),
'pending_requests': sum(1 for r in self.access_requests.values()
if r.status == 'pending')
},
'by_domain': {},
'by_access_tier': {},
'by_status': {}
}
for product in self.products.values():
# By domain
if product.domain not in report['by_domain']:
report['by_domain'][product.domain] = 0
report['by_domain'][product.domain] += 1
# By access tier
tier = product.access_tier.value
if tier not in report['by_access_tier']:
report['by_access_tier'][tier] = 0
report['by_access_tier'][tier] += 1
# By status
status = product.status.value
if status not in report['by_status']:
report['by_status'][status] = 0
report['by_status'][status] += 1
return report
Infrastructure Setup (Terraform)
# infrastructure/multi_cloud_data_mesh.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
google = {
source = "hashicorp/google"
version = "~> 4.0"
}
azurerm = {
source = "hashicorp/azurerm"
version = "~> 3.0"
}
kubernetes = {
source = "hashicorp/kubernetes"
version = "~> 2.0"
}
}
}
# AWS Provider
provider "aws" {
region = "us-east-1"
default_tags {
tags = {
Project = "data-mesh"
Environment = "production"
ManagedBy = "terraform"
}
}
}
# GCP Provider
provider "google" {
project = "data-mesh-project"
region = "us-central1"
}
# Azure Provider
provider "azurerm" {
features {}
subscription_id = var.azure_subscription_id
tenant_id = var.azure_tenant_id
}
# Kubernetes Cluster (Multi-Cloud Federation)
module "eks_cluster" {
source = "terraform-aws-modules/eks/aws"
version = "19.0.0"
cluster_name = "data-mesh-eks"
cluster_version = "1.28"
vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets
eks_managed_node_groups = {
general = {
instance_types = ["m5.xlarge"]
min_size = 3
max_size = 10
desired_size = 5
}
}
}
module "gke_cluster" {
source = "terraform-google-modules/kubernetes-engine/google"
version = "26.0"
project_id = "data-mesh-project"
name = "data-mesh-gke"
region = "us-central1"
network = module.vpc_gcp.network_name
subnetwork = module.vpc_gcp.subnets_names[0]
node_pools = [
{
name = "default"
machine_type = "e2-standard-4"
min_count = 2
max_count = 10
}
]
}
module "aks_cluster" {
source = "Azure/aks/azurerm"
version = "7.0"
resource_group_name = azurerm_resource_group.data_mesh.name
cluster_name = "data-mesh-aks"
kubernetes_version = "1.28"
default_node_pool = {
vm_size = "Standard_D4s_v3"
node_count = 3
}
}
# VPC Peering between clouds (using VPN)
resource "aws_vpn_gateway" "data_mesh" {
vpc_id = module.vpc.vpc_id
tags = {
Name = "data-mesh-vpn"
}
}
# Cross-cloud service mesh (Istio)
resource "helm_release" "istio" {
name = "istio"
repository = "https://istio-release.storage.googleapis.com/charts"
chart = "istiod"
version = "1.20.0"
namespace = "istio-system"
set {
name = "meshConfig.enableAutoMtls"
value = "true"
}
set {
name = "meshConfig.defaultConfig.holdApplicationUntilProxyStarts"
value = "true"
}
}
# DataHub for metadata management
resource "helm_release" "datahub" {
name = "datahub"
repository = "https://helm.datahubproject.io/"
chart = "datahub"
version = "0.2.0"
namespace = "datahub"
values = [
templatefile("${path.module}/values/datahub.yaml", {
postgres_host = module.rds.endpoint
kafka_brokers = module.msk.bootstrap_brokers
elasticsearch_endpoint = module.elasticsearch.endpoint
})
]
depends_on = [
module.rds,
module.msk,
module.elasticsearch
]
}
# Open Policy Agent for governance
resource "helm_release" "opa" {
name = "opa"
repository = "https://open-policy-agent.github.io/helm-charts"
chart = "gatekeeper"
version = "3.14.0"
namespace = "gatekeeper-system"
set {
name = "replicas"
value = "3"
}
}
# Cross-cloud DNS
resource "aws_route53_zone" "data_mesh" {
name = "data-mesh.internal"
vpc {
vpc_id = module.vpc.vpc_id
}
}
# Outputs
output "eks_cluster_endpoint" {
description = "EKS cluster endpoint"
value = module.eks_cluster.cluster_endpoint
}
output "gke_cluster_endpoint" {
description = "GKE cluster endpoint"
value = module.gke_cluster.endpoint
}
output "aks_cluster_endpoint" {
description = "AKS cluster endpoint"
value = module.aks_cluster.endpoint
}
output "datahub_endpoint" {
description = "DataHub endpoint"
value = "http://datahub.datahub.svc.cluster.local:9002"
}
Testing and Validation
# tests/test_data_mesh.py
import pytest
from datetime import datetime
from unittest.mock import Mock, patch
class TestDataMesh:
@pytest.fixture
def data_product(self):
from marketplace.data_marketplace import DataProduct, ProductStatus, AccessTier
return DataProduct(
product_id="customer-360",
name="Customer 360 View",
domain="analytics",
owner="analytics-team@company.com",
description="Unified customer view across all touchpoints",
status=ProductStatus.ACTIVE,
access_tier=AccessTier.INTERNAL,
tags=["customer", "analytics", "360-view"],
sla={"freshness": "1h", "quality": 0.95}
)
@pytest.fixture
def governance_engine(self):
from governance.governance_engine import FederatedGovernanceEngine
return FederatedGovernanceEngine()
def test_product_registration(self, data_product):
"""Test data product registration in marketplace."""
from marketplace.data_marketplace import DataMarketplace
marketplace = DataMarketplace()
product_id = marketplace.register_product(data_product)
assert product_id == data_product.product_id
assert len(marketplace.products) == 1
def test_product_search(self, data_product):
"""Test data product search functionality."""
from marketplace.data_marketplace import DataMarketplace
marketplace = DataMarketplace()
marketplace.register_product(data_product)
# Search by text
results = marketplace.search_products(query="customer")
assert len(results) == 1
# Search by domain
results = marketplace.search_products(filters={"domain": "analytics"})
assert len(results) == 1
# Search with no results
results = marketplace.search_products(query="nonexistent")
assert len(results) == 0
def test_access_request_flow(self, data_product):
"""Test access request and approval flow."""
from marketplace.data_marketplace import DataMarketplace
marketplace = DataMarketplace()
marketplace.register_product(data_product)
# Request access
request = marketplace.request_access(
product_id="customer-360",
requester="data-analyst",
requester_email="analyst@company.com",
justification="Need for quarterly analysis",
permissions=["read"]
)
assert request.status == "approved" # Auto-approved for internal
# Request access to restricted product
data_product.access_tier = "restricted"
marketplace.register_product(data_product)
request = marketplace.request_access(
product_id="customer-360",
requester="external-user",
requester_email="external@partner.com",
justification="Partner analysis",
permissions=["read"]
)
assert request.status == "pending"
def test_policy_evaluation(self, governance_engine):
"""Test governance policy evaluation."""
from governance.governance_engine import Policy, PolicyType
# Register access policy
policy = Policy(
name="analyst-access",
policy_type=PolicyType.ACCESS,
description="Allow analysts to read customer data",
rules=[
{
"conditions": [
{"field": "role", "operator": "equals", "value": "data-analyst"},
{"field": "domain", "operator": "equals", "value": "customer"}
],
"action": "allow",
"reason": "Analysts can read customer data"
}
]
)
governance_engine.register_policy(policy)
# Evaluate access
context = {"role": "data-analyst", "domain": "customer", "resource": "profiles"}
result = governance_engine.evaluate_access_policy(context)
assert result.allowed == True
# Evaluate unauthorized access
context = {"role": "external-user", "domain": "customer", "resource": "profiles"}
result = governance_engine.evaluate_access_policy(context)
assert result.allowed == False
def test_schema_compatibility(self):
"""Test schema compatibility checking."""
from schema_registry.registry import (
SchemaRegistry, DataProductSchema, SchemaField, CompatibilityMode
)
registry = SchemaRegistry()
# Register v1 schema
v1_schema = DataProductSchema(
name="customer",
version="1.0.0",
schema_type="avro",
fields=[
SchemaField(name="id", type="string", nullable=False),
SchemaField(name="email", type="string", nullable=False)
],
compatibility=CompatibilityMode.BACKWARD
)
registry.register_schema(v1_schema)
# Register compatible v2 schema (adding optional field)
v2_schema = DataProductSchema(
name="customer",
version="2.0.0",
schema_type="avro",
fields=[
SchemaField(name="id", type="string", nullable=False),
SchemaField(name="email", type="string", nullable=False),
SchemaField(name="phone", type="string", nullable=True) # Optional
],
compatibility=CompatibilityMode.BACKWARD
)
fingerprint = registry.register_schema(v2_schema)
assert fingerprint is not None
# Try to register incompatible schema
v3_schema = DataProductSchema(
name="customer",
version="3.0.0",
schema_type="avro",
fields=[
SchemaField(name="id", type="integer", nullable=False), # Type changed
SchemaField(name="email", type="string", nullable=False)
],
compatibility=CompatibilityMode.BACKWARD
)
with pytest.raises(ValueError):
registry.register_schema(v3_schema)
def test_marketplace_metrics(self, data_product):
"""Test marketplace usage metrics."""
from marketplace.data_marketplace import DataMarketplace
marketplace = DataMarketplace()
marketplace.register_product(data_product)
# Simulate usage
marketplace.usage_metrics["customer-360"] = [
{
"timestamp": datetime.utcnow().isoformat(),
"user_id": "user1",
"queries": 10,
"response_time_ms": 50
},
{
"timestamp": datetime.utcnow().isoformat(),
"user_id": "user2",
"queries": 5,
"response_time_ms": 75
}
]
metrics = marketplace.get_product_usage_metrics("customer-360")
assert metrics["total_queries"] == 15
assert metrics["unique_users"] == 2
def test_cross_domain_discovery(self):
"""Test cross-domain data product discovery."""
from marketplace.data_marketplace import DataMarketplace, DataProduct
marketplace = DataMarketplace()
# Register products from different domains
customer_product = DataProduct(
product_id="customer-profile",
name="Customer Profile",
domain="customer",
owner="customer-team@company.com",
description="Customer master data",
tags=["customer", "profile"]
)
order_product = DataProduct(
product_id="order-history",
name="Order History",
domain="order",
owner="order-team@company.com",
description="Historical order data",
tags=["order", "history"]
)
marketplace.register_product(customer_product)
marketplace.register_product(order_product)
# Search across domains
results = marketplace.search_products(query="customer")
assert len(results) == 1
assert results[0]["domain"] == "customer"
# Get marketplace report
report = marketplace.generate_marketplace_report()
assert report["summary"]["total_products"] == 2
assert "customer" in report["by_domain"]
assert "order" in report["by_domain"]
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| EKS Cluster | 5 nodes, m5.xlarge | $800 |
| GKE Cluster | 5 nodes, e2-standard-4 | $750 |
| AKS Cluster | 5 nodes, Standard_D4s_v3 | $700 |
| DataHub | 3 replicas | $300 |
| OPA/Gatekeeper | 3 replicas | $200 |
| Cross-Cloud VPN | Site-to-site | $150 |
| Data Transfer | Cross-cloud | $500 |
| Total | $3,400 |
Cost Optimization Strategies
💡
Tip: Optimize multi-cloud data mesh costs:
- Right-sizing: Monitor and adjust instance types
- Spot Instances: Use for non-critical workloads
- Reserved Instances: 1-year commitments for 40% savings
- Data Egress: Minimize cross-cloud data transfer
- Auto-scaling: Scale based on actual usage
Performance Metrics
| Metric | Before Data Mesh | After Data Mesh | Improvement |
|---|---|---|---|
| Data Discovery Time | 2 weeks | 5 minutes | 4032x faster |
| Data Access Provisioning | 1 week | 1 hour | 168x faster |
| Data Quality Issues | 100/month | 10/month | 10x reduction |
| Cross-Domain Analytics | 3 months | 1 week | 12x faster |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Focus on these data mesh concepts in interviews:
-
Why Data Mesh over Centralized Data Lake?
- Eliminates data team bottleneck
- Domain expertise preserved
- Scalable governance
- Faster time-to-value
-
Why Federated Governance?
- Consistent policies across domains
- Local ownership with global standards
- Automated compliance
- Audit trail for all access
-
Why Self-Serve Platform?
- Domain autonomy
- Reduced operational overhead
- Standardized infrastructure
- Cost transparency
Common Interview Questions
Q: "How do you handle cross-domain queries in a data mesh?"
cross_domain_strategies = {
"Data Products": "Expose cross-domain data as new data products",
"Event Streaming": "Kafka topics for real-time cross-domain events",
"Federated Queries": "Trino/Presto for distributed SQL queries",
"Materialized Views": "Pre-computed cross-domain aggregations"
}
Q: "How do you ensure data quality across autonomous domains?"
quality_framework = {
"SLAs": "Domain-specific quality SLAs with monitoring",
"Contracts": "Schema contracts between producers and consumers",
"Testing": "Automated quality tests in CI/CD pipelines",
"Observability": "Centralized quality dashboards and alerting"
}
Q: "How do you manage costs in a multi-cloud data mesh?"
cost_management = {
"Chargeback": "Domain-level cost attribution",
"Budgets": "Automated budget alerts and enforcement",
"Optimization": "Right-sizing recommendations",
"Governance": "Cost-aware resource provisioning"
}
Deployment Checklist
- Set up multi-cloud Kubernetes clusters
- Configure cross-cloud networking (VPN/peering)
- Deploy service mesh (Istio) across clusters
- Set up DataHub for metadata management
- Configure OPA for governance policies
- Deploy self-serve platform for domain teams
- Establish domain ownership and SLAs
- Set up monitoring and alerting
- Document data product templates
- Train domain teams on self-serve tools
⚠️
Warning: Data mesh requires significant organizational change. Start with a pilot domain before full rollout. Ensure executive sponsorship and domain team buy-in.
This project demonstrates advanced data architecture skills and is highly relevant for data platform and architecture interviews at large enterprises.