Data Governance: Lineage, Cataloging, Access Control
Managing data assets, compliance, and security at scale
Interview Question
"Design a data governance framework that: (1) tracks data lineage from source to consumption, (2) catalogs all data assets, (3) implements role-based access control, (4) ensures GDPR compliance, (5) monitors data usage. How do you balance governance with agility?"
Difficulty: Hard | Frequently asked at Google, Microsoft, Amazon, Meta
Theoretical Foundation
What is Data Governance?
Data governance is the process of managing data availability, usability, integrity, and security.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Governance Framework β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Data Governance β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β
β β 1. Data Lineage: Where data comes from and goes β β
β β 2. Data Catalog: What data exists and its metadata β β
β β 3. Access Control: Who can access what β β
β β 4. Data Quality: Is data reliable β β
β β 5. Compliance: Does data follow regulations β β
β β 6. Data Lifecycle: How long to keep data β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Stakeholders: β
β - Data Engineers: Build pipelines, implement governance β
β - Data Analysts: Consume data, follow policies β
β - Data Scientists: ML models, feature engineering β
β - Compliance Officers: Regulatory requirements β
β - Business Users: Data consumers β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Lineage
Data lineage tracks the journey of data from source to destination.
Lineage metadata:
- Source system and table
- Transformation logic
- Downstream dependencies
- Timestamp of last update
- Data quality metrics
Data Catalog
A data catalog is a metadata repository that inventories data assets.
Access Control Models
1. Role-Based Access Control (RBAC)
2. Attribute-Based Access Control (ABAC)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Attribute-Based Access Control β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Policy: IF (user.department == "marketing" AND β
β data.classification == "confidential" AND β
β time.hour >= 9 AND time.hour <= 17) β
β THEN ALLOW β
β β
β Attributes: β
β - User: role, department, clearance level β
β - Resource: classification, owner, domain β
β - Environment: time, location, device β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Data Masking
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Masking β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Original Data: Masked Data: β
β βββββββββββββββββββ βββββββββββββββββββ β
β β SSN: 123-45-6789β β SSN: ***-**-6789β β
β β Email: j@co.com β β Email: j@***.comβ β
β β Name: John Doe β β Name: J*** D** β β
β β Phone: 555-1234 β β Phone: ***-1234 β β
β βββββββββββββββββββ βββββββββββββββββββ β
β β
β Masking Types: β
β - Static: Always masked the same way β
β - Dynamic: Masking varies by user role β
β - Partial: Show some characters (e.g., last 4 of SSN) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
GDPR Compliance
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GDPR Requirements β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Right to Access: Users can request their data β
β 2. Right to Erasure: Users can request deletion β
β 3. Right to Rectification: Users can correct data β
β 4. Right to Portability: Users can export data β
β 5. Consent: Must have legal basis for processing β
β 6. Data Minimization: Only collect what's needed β
β 7. Privacy by Design: Build privacy into systems β
β β
β Implementation: β
β - Data inventory: Know what PII you have β
β - Consent management: Track user consent β
β - Data retention: Automatically delete old data β
β - Encryption: Protect data at rest and in transit β
β - Audit logs: Track who accessed what β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Code Implementation
Data Lineage Tracking
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class LineageNode:
name: str
type: str # table, view, pipeline, report
source_system: str
schema: Optional[Dict] = None
owner: Optional[str] = None
description: Optional[str] = None
@dataclass
class LineageEdge:
source: str
target: str
transformation: Optional[str] = None
timestamp: datetime = None
class DataLineageTracker:
def __init__(self):
self.nodes = {}
self.edges = []
def register_table(self, table_name: str, source_system: str,
schema: Dict, owner: str, description: str = ""):
"""Register a table in lineage"""
self.nodes[table_name] = LineageNode(
name=table_name,
type="table",
source_system=source_system,
schema=schema,
owner=owner,
description=description
)
def add_edge(self, source: str, target: str, transformation: str = ""):
"""Add lineage edge"""
self.edges.append(LineageEdge(
source=source,
target=target,
transformation=transformation,
timestamp=datetime.now()
))
def get_upstream(self, table_name: str) -> List[str]:
"""Get all upstream dependencies"""
upstream = []
for edge in self.edges:
if edge.target == table_name:
upstream.append(edge.source)
upstream.extend(self.get_upstream(edge.source))
return list(set(upstream))
def get_downstream(self, table_name: str) -> List[str]:
"""Get all downstream dependencies"""
downstream = []
for edge in self.edges:
if edge.source == table_name:
downstream.append(edge.target)
downstream.extend(self.get_downstream(edge.target))
return list(set(downstream))
def export_lineage(self, format: str = "json"):
"""Export lineage graph"""
if format == "json":
return {
"nodes": {k: v.__dict__ for k, v in self.nodes.items()},
"edges": [e.__dict__ for e in self.edges]
}
elif format == "graphviz":
return self._to_graphviz()
def _to_graphviz(self):
"""Export to Graphviz format"""
lines = ["digraph lineage {"]
for edge in self.edges:
lines.append(f' "{edge.source}" -> "{edge.target}";')
lines.append("}")
return "\n".join(lines)
# Usage
tracker = DataLineageTracker()
# Register tables
tracker.register_table(
"raw_orders",
"PostgreSQL",
{"order_id": "INT", "customer_id": "INT", "amount": "DECIMAL"},
"data-engineering",
"Raw orders from production database"
)
tracker.register_table(
"silver_orders",
"Spark",
{"order_id": "INT", "customer_id": "INT", "amount": "DECIMAL", "cleaned_at": "TIMESTAMP"},
"data-engineering",
"Cleaned orders with data quality checks"
)
# Add lineage
tracker.add_edge("raw_orders", "silver_orders", "Spark transformation with deduplication")
# Get dependencies
upstream = tracker.get_upstream("silver_orders")
print(f"Upstream dependencies: {upstream}")
# Export lineage
lineage_json = tracker.export_lineage("json")
Data Catalog Implementation
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class TableMetadata:
name: str
database: str
schema: str
owner: str
description: str
columns: List[Dict]
tags: List[str]
classification: str # public, internal, confidential, restricted
created_at: datetime
updated_at: datetime
row_count: Optional[int] = None
size_bytes: Optional[int] = None
last_accessed: Optional[datetime] = None
class DataCatalog:
def __init__(self):
self.tables = {}
self.glossary = {}
def register_table(self, metadata: TableMetadata):
"""Register a table in the catalog"""
key = f"{metadata.database}.{metadata.schema}.{metadata.name}"
self.tables[key] = metadata
def search_tables(self, query: str, tags: List[str] = None) -> List[TableMetadata]:
"""Search tables by name, description, or tags"""
results = []
for table in self.tables.values():
if query.lower() in table.name.lower() or \
query.lower() in table.description.lower():
if tags is None or any(tag in table.tags for tag in tags):
results.append(table)
return results
def get_table_lineage(self, table_name: str) -> Dict:
"""Get lineage for a table"""
# This would integrate with lineage tracker
return {"upstream": [], "downstream": []}
def add_glossary_term(self, term: str, definition: str, domain: str):
"""Add term to business glossary"""
self.glossary[term] = {
"definition": definition,
"domain": domain,
"created_at": datetime.now()
}
def export_catalog(self) -> Dict:
"""Export catalog metadata"""
return {
"tables": {k: v.__dict__ for k, v in self.tables.items()},
"glossary": self.glossary
}
# Usage
catalog = DataCatalog()
# Register a table
catalog.register_table(TableMetadata(
name="orders",
database="production",
schema="public",
owner="data-engineering",
description="Customer orders from e-commerce platform",
columns=[
{"name": "order_id", "type": "INT", "description": "Unique order identifier"},
{"name": "customer_id", "type": "INT", "description": "Foreign key to customers"},
{"name": "amount", "type": "DECIMAL", "description": "Order total amount"},
{"name": "created_at", "type": "TIMESTAMP", "description": "Order creation time"}
],
tags=["business-critical", "pii", "financial"],
classification="confidential",
created_at=datetime.now(),
updated_at=datetime.now(),
row_count=1000000,
size_bytes=1024*1024*100
))
# Search tables
results = catalog.search_tables("order", tags=["financial"])
print(f"Found {len(results)} tables")
Access Control Implementation
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Set
from datetime import datetime
class Permission(Enum):
SELECT = "select"
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
CREATE = "create"
DROP = "drop"
ADMIN = "admin"
@dataclass
class Role:
name: str
permissions: Set[Permission]
description: str
@dataclass
class User:
user_id: str
email: str
roles: List[str]
department: str
clearance_level: int
class AccessController:
def __init__(self):
self.roles = {}
self.users = {}
self.policies = []
self.audit_log = []
def create_role(self, role: Role):
"""Create a new role"""
self.roles[role.name] = role
def assign_role(self, user_id: str, role_name: str):
"""Assign role to user"""
if user_id in self.users:
self.users[user_id].roles.append(role_name)
self._log_audit(user_id, "role_assigned", role_name)
def check_permission(self, user_id: str, resource: str,
permission: Permission) -> bool:
"""Check if user has permission for resource"""
user = self.users.get(user_id)
if not user:
return False
for role_name in user.roles:
role = self.roles.get(role_name)
if role and permission in role.permissions:
# Check resource-specific policies
if self._check_resource_policy(user, resource, permission):
self._log_audit(user_id, "access_granted", resource)
return True
self._log_audit(user_id, "access_denied", resource)
return False
def _check_resource_policy(self, user: User, resource: str,
permission: Permission) -> bool:
"""Check resource-specific policies"""
for policy in self.policies:
if policy["resource"] == resource:
if policy["type"] == "department":
if user.department != policy["value"]:
return False
elif policy["type"] == "clearance":
if user.clearance_level < policy["value"]:
return False
return True
def _log_audit(self, user_id: str, action: str, resource: str):
"""Log audit event"""
self.audit_log.append({
"user_id": user_id,
"action": action,
"resource": resource,
"timestamp": datetime.now()
})
# Usage
ac = AccessController()
# Create roles
ac.create_role(Role(
name="data_analyst",
permissions={Permission.SELECT},
description="Can read data"
))
ac.create_role(Role(
name="data_engineer",
permissions={Permission.SELECT, Permission.INSERT, Permission.UPDATE, Permission.DELETE},
description="Can read and write data"
))
# Create user
ac.users["user1"] = User(
user_id="user1",
email="analyst@company.com",
roles=["data_analyst"],
department="analytics",
clearance_level=1
)
# Check permission
can_read = ac.check_permission("user1", "orders", Permission.SELECT)
print(f"Can read orders: {can_read}")
can_write = ac.check_permission("user1", "orders", Permission.INSERT)
print(f"Can write orders: {can_write}")
GDPR Compliance Implementation
class GDPRCompliance:
def __init__(self, catalog: DataCatalog):
self.catalog = catalog
self.consent_records = {}
self.retention_policies = {}
def register_retention_policy(self, table_name: str,
retention_days: int,
auto_delete: bool = True):
"""Register data retention policy"""
self.retention_policies[table_name] = {
"retention_days": retention_days,
"auto_delete": auto_delete,
"registered_at": datetime.now()
}
def handle_access_request(self, user_id: str) -> Dict:
"""Handle GDPR right to access request"""
# Find all data for user
user_data = {}
for table_name in self.catalog.tables:
# Query table for user data
# This is simplified - in practice, you'd query each table
user_data[table_name] = {"status": "data_found"}
return {
"user_id": user_id,
"data": user_data,
"generated_at": datetime.now()
}
def handle_deletion_request(self, user_id: str) -> Dict:
"""Handle GDPR right to erasure request"""
deleted_from = []
for table_name in self.catalog.tables:
# Delete user data from table
# This is simplified - in practice, you'd delete from each table
deleted_from.append(table_name)
return {
"user_id": user_id,
"deleted_from": deleted_from,
"deleted_at": datetime.now()
}
def enforce_retention(self):
"""Enforce data retention policies"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
for table_name, policy in self.retention_policies.items():
if policy["auto_delete"]:
cutoff_date = datetime.now() - timedelta(days=policy["retention_days"])
# Delete old data
spark.sql(f"""
DELETE FROM {table_name}
WHERE created_at < '{cutoff_date}'
""")
print(f"Deleted data older than {policy['retention_days']} days from {table_name}")
# Usage
gdpr = GDPRCompliance(catalog)
# Register retention policy
gdpr.register_retention_policy("orders", retention_days=365)
# Handle access request
access_response = gdpr.handle_access_request("user123")
# Handle deletion request
deletion_response = gdpr.handle_deletion_request("user123")
Data Usage Monitoring
class DataUsageMonitor:
def __init__(self):
self.usage_logs = []
self.alerts = []
def log_usage(self, user_id: str, table_name: str,
query: str, rows_scanned: int):
"""Log data usage"""
self.usage_logs.append({
"user_id": user_id,
"table_name": table_name,
"query": query,
"rows_scanned": rows_scanned,
"timestamp": datetime.now()
})
# Check for anomalies
self._check_anomalies(user_id, table_name, rows_scanned)
def _check_anomalies(self, user_id: str, table_name: str,
rows_scanned: int):
"""Check for unusual data access patterns"""
# Get historical usage for user/table
historical = [log for log in self.usage_logs
if log["user_id"] == user_id
and log["table_name"] == table_name]
if len(historical) > 10:
avg_rows = sum(log["rows_scanned"] for log in historical[:-1]) / len(historical[:-1])
# Alert if current scan is 10x average
if rows_scanned > avg_rows * 10:
self.alerts.append({
"type": "unusual_access",
"user_id": user_id,
"table_name": table_name,
"rows_scanned": rows_scanned,
"avg_rows": avg_rows,
"timestamp": datetime.now()
})
def get_usage_report(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate usage report"""
filtered_logs = [log for log in self.usage_logs
if start_date <= log["timestamp"] <= end_date]
report = {
"total_queries": len(filtered_logs),
"total_rows_scanned": sum(log["rows_scanned"] for log in filtered_logs),
"unique_users": len(set(log["user_id"] for log in filtered_logs)),
"tables_accessed": len(set(log["table_name"] for log in filtered_logs)),
"alerts": len(self.alerts)
}
return report
# Usage
monitor = DataUsageMonitor()
# Log usage
monitor.log_usage("user1", "orders", "SELECT * FROM orders", 1000000)
# Get report
report = monitor.get_usage_report(
datetime.now() - timedelta(days=30),
datetime.now()
)
print(f"Usage report: {report}")
π‘
Production Tip: Start with basic governance (catalog, access control) and gradually add more sophisticated features (lineage, compliance). Don't try to implement everything at onceβiterate based on actual needs.
Common Follow-Up Questions
Q1: How do you balance governance with agility?
- Self-service catalog: Enable discovery without gatekeepers
- Automated policies: Reduce manual approval processes
- Tiered governance: Different rules for different data classifications
- Data contracts: Clear expectations between teams
Q2: How do you handle cross-system lineage?
# Use OpenLineage for cross-system lineage
from openlineage.client import OpenLineageClient
client = OpenLineageClient(url="http://openlineage:5000")
# Emit lineage event
client.emit({
"eventType": "COMPLETE",
"run": {"runId": "run-123"},
"job": {"name": "etl_pipeline"},
"inputs": [{"namespace": "postgres", "name": "orders"}],
"outputs": [{"namespace": "s3", "name": "silver/orders"}]
})
Q3: How do you handle data governance in a data mesh?
- Domain ownership: Each domain owns its data
- Federated governance: Central policies, local implementation
- Data products: Governed, discoverable data assets
- Self-serve platform: Enable domains to publish data
Q4: How do you audit data access?
# Enable audit logging in Spark
spark.sql("SET spark.sql.history.enabled=true")
# Query audit logs
spark.sql("""
SELECT user, query, start_time, end_time
FROM spark.sql.history
WHERE database = 'production'
ORDER BY start_time DESC
""")
β οΈ
Critical Consideration: Data governance is not just about technologyβit's about people and processes. Invest in training, clear documentation, and cultural change to ensure governance is followed.
Company-Specific Tips
Google Interview Tips
- Discuss data classification and sensitivity levels
- Explain access control with IAM
- Mention data retention policies
- Talk about audit logging with Cloud Audit Logs
Microsoft Interview Tips
- Focus on Azure Purview for governance
- Discuss Microsoft Purview compliance
- Mention Azure AD for access control
- Talk about data residency requirements
Amazon Interview Tips
- Discuss AWS Lake Formation for governance
- Explain AWS Glue catalog
- Mention AWS IAM for access control
- Talk about AWS CloudTrail for auditing
βΉοΈ
Final Takeaway: Data governance is essential for trusted, compliant data. Start with basic catalog and access control, then add lineage and compliance. Balance governance with agility by automating policies and enabling self-service.