Kafka Data Governance
Overview
Data governance in Kafka ensures data quality, compliance, security, and accountability across your event streaming platform. This guide covers schema evolution, access control, data lineage, and regulatory compliance.
Governance Goals
- Data Quality: Ensure data meets standards
- Compliance: Meet regulatory requirements
- Security: Protect sensitive data
- Accountability: Track data ownership and changes
- Discoverability: Make data easy to find and understand
Schema Evolution
Schema Compatibility Modes
| Mode | Description | Use Case |
|---|---|---|
| BACKWARD | New schema can read old data | Consumer upgrades first |
| FORWARD | Old schema can read new data | Producer upgrades first |
| FULL | Both directions compatible | Maximum flexibility |
| NONE | No compatibility checks | Development only |
Avro Schema Evolution
// Schema v1
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"}
]
}
// Schema v2 (backward compatible)
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
]
}
Schema Registry Configuration
# Schema Registry configuration
schema.registry.url=http://schema-registry:8081
schema.registry.compatibility.level=BACKWARD
schema.registry.compatibility.config=full
schema.registry.auto.register.schemas=false
schema.registry.topic=_schemas
Schema Validation
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
class SchemaValidator:
def __init__(self, schema_registry_url):
self.registry = CachedSchemaRegistryClient({'url': schema_registry_url})
def validate_schema(self, subject, schema_str):
"""Validate schema against compatibility rules"""
try:
# Get latest schema
latest_schema, latest_id = self.registry.get_latest_schema(subject)
# Parse new schema
new_schema = json.loads(schema_str)
# Check compatibility
is_compatible = self.registry.test_compatibility(subject, new_schema)
return {
'compatible': is_compatible,
'latest_version': latest_schema.version if hasattr(latest_schema, 'version') else None
}
except Exception as e:
return {'compatible': False, 'error': str(e)}
def register_schema(self, subject, schema_str):
"""Register new schema version"""
schema = json.loads(schema_str)
schema_id = self.registry.register(subject, schema)
return schema_id
Data Classification
Classification Levels
# data_classification.yml
classification_levels:
PUBLIC:
description: "Non-sensitive data"
encryption: false
retention: "unlimited"
access: "all_employees"
INTERNAL:
description: "Internal business data"
encryption: true
retention: "7_years"
access: "employees"
CONFIDENTIAL:
description: "Sensitive business data"
encryption: true
retention: "5_years"
access: "authorized_teams"
RESTRICTED:
description: "PII and regulated data"
encryption: true
retention: "3_years"
access: "compliance_team"
audit: true
Topic Classification
# Topic classification metadata
TOPIC_CLASSIFICATIONS = {
'orders': {
'classification': 'CONFIDENTIAL',
'owner': 'payments-team',
'description': 'Customer order events',
'pii_fields': ['customer_email', 'shipping_address'],
'retention_days': 2555, # 7 years
'tags': ['revenue', 'customer-data']
},
'user-events': {
'classification': 'RESTRICTED',
'owner': 'platform-team',
'description': 'User behavior tracking',
'pii_fields': ['user_id', 'ip_address', 'device_id'],
'retention_days': 365,
'tags': ['analytics', 'user-tracking']
}
}
PII Detection and Masking
import re
from functools import wraps
class PIIDetector:
PATTERNS = {
'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
}
@classmethod
def detect_pii(cls, data):
pii_found = []
for pii_type, pattern in cls.PATTERNS.items():
if re.search(pattern, str(data)):
pii_found.append(pii_type)
return pii_found
class PIIMasker:
def mask_email(self, email):
local, domain = email.split('@')
return f"{local[0]}***@{domain}"
def mask_phone(self, phone):
return f"***-***-{phone[-4:]}"
def mask_pii(self, data, fields_to_mask):
masked_data = data.copy()
for field in fields_to_mask:
if field in masked_data:
value = masked_data[field]
if isinstance(value, str):
pii_types = PIIDetector.detect_pii(value)
if 'email' in pii_types:
masked_data[field] = self.mask_email(value)
elif 'phone' in pii_types:
masked_data[field] = self.mask_phone(value)
return masked_data
GDPR Compliance
Right to Erasure Implementation
class GDPRComplianceManager:
def __init__(self, kafka_producer, schema_registry):
self.producer = kafka_producer
self.schema_registry = schema_registry
def process_erasure_request(self, user_id):
"""Process GDPR erasure request"""
# 1. Publish erasure event
erasure_event = {
'event_type': 'USER_ERASURE_REQUEST',
'user_id': user_id,
'timestamp': int(time.time() * 1000),
'request_id': str(uuid.uuid4())
}
self.producer.send('gdpr-events', value=erasure_event)
# 2. Mark user data for deletion in all topics
topics_with_user_data = self.get_topics_with_user_data(user_id)
for topic in topics_with_user_data:
self.mark_for_deletion(topic, user_id)
return erasure_event['request_id']
def mark_for_deletion(self, topic, user_id):
"""Mark user records for deletion"""
# Use Kafka Streams to filter out user data
deletion_config = {
'input_topic': topic,
'output_topic': f'{topic}-redacted',
'user_id': user_id,
'action': 'REDACT'
}
self.start_redaction_job(deletion_config)
def get_topics_with_user_data(self, user_id):
"""Find all topics containing user data"""
# Query data catalog for topics with user PII
return ['orders', 'user-events', 'analytics', 'profiles']
Data Retention Policies
class RetentionPolicyManager:
def __init__(self, admin_client):
self.admin = admin_client
def apply_retention_policies(self):
"""Apply retention policies to all topics"""
policies = {
'orders': {'retention_ms': 2555 * 24 * 3600 * 1000}, # 7 years
'user-events': {'retention_ms': 365 * 24 * 3600 * 1000}, # 1 year
'logs': {'retention_ms': 30 * 24 * 3600 * 1000}, # 30 days
'temp': {'retention_ms': 7 * 24 * 3600 * 1000} # 7 days
}
for topic, policy in policies.items():
self.update_topic_config(topic, policy)
def update_topic_config(self, topic, config):
"""Update topic configuration"""
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
# Create config resource
config_resource = ConfigResource(
ResourceType.TOPIC,
topic,
config_entries=[
ConfigEntry('retention.ms', str(config['retention_ms']))
]
)
# Apply configuration
futures = admin_client.alter_configs([config_resource])
for future in futures:
future.result()
Audit Logging
Audit Event Structure
{
"audit_id": "audit-12345",
"timestamp": "2024-01-15T10:30:00Z",
"event_type": "TOPIC_ACCESS",
"actor": {
"user_id": "user-789",
"service": "order-service",
"ip_address": "10.0.1.100"
},
"resource": {
"type": "topic",
"name": "orders",
"operation": "WRITE"
},
"details": {
"message_count": 150,
"bytes_written": 153600,
"partition_keys": ["order-123", "order-124"]
},
"result": "SUCCESS"
}
Audit Logger
import json
from datetime import datetime
from kafka import KafkaProducer
class AuditLogger:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def log_access(self, actor, resource, operation, result):
audit_event = {
'audit_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'ACCESS',
'actor': actor,
'resource': resource,
'operation': operation,
'result': result
}
self.producer.send('audit-log', value=audit_event)
return audit_event['audit_id']
def log_data_change(self, actor, resource, change_type, before, after):
audit_event = {
'audit_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'DATA_CHANGE',
'actor': actor,
'resource': resource,
'change_type': change_type,
'before': before,
'after': after
}
self.producer.send('audit-log', value=audit_event)
return audit_event['audit_id']
Access Control Policies
# access_policies.yml
policies:
- name: "order-service-producer"
principal: "User:order-service"
resource:
type: "topic"
name: "orders"
operations:
- "WRITE"
- "DESCRIBE"
effect: "ALLOW"
- name: "analytics-consumer"
principal: "User:analytics-service"
resource:
type: "topic"
name: "orders"
operations:
- "READ"
- "DESCRIBE"
effect: "ALLOW"
- name: "admin-full-access"
principal: "User:admin"
resource:
type: "topic"
name: "*"
operations:
- "ALL"
effect: "ALLOW"
Data Lineage
Lineage Tracking
class DataLineageTracker:
def __init__(self, kafka_producer):
self.producer = kafka_producer
def track_transformation(self, input_topics, output_topic, transformation):
lineage_event = {
'event_type': 'DATA_LINEAGE',
'timestamp': int(time.time() * 1000),
'input_topics': input_topics,
'output_topic': output_topic,
'transformation': transformation,
'producer_service': transformation.get('service_name')
}
self.producer.send('data-lineage', value=lineage_event)
def track_consumption(self, topic, consumer_group, message_count):
consumption_event = {
'event_type': 'DATA_CONSUMPTION',
'timestamp': int(time.time() * 1000),
'topic': topic,
'consumer_group': consumer_group,
'message_count': message_count
}
self.producer.send('data-lineage', value=consumption_event)
Lineage Visualization
def generate_lineage_graph(lineage_events):
"""Generate data lineage graph"""
graph = {
'nodes': [],
'edges': []
}
for event in lineage_events:
if event['event_type'] == 'DATA_LINEAGE':
# Add transformation node
graph['nodes'].append({
'id': event['transformation']['service_name'],
'type': 'transformation',
'label': event['transformation']['service_name']
})
# Add edges from input topics
for input_topic in event['input_topics']:
graph['edges'].append({
'source': input_topic,
'target': event['transformation']['service_name'],
'type': 'input'
})
# Add edge to output topic
graph['edges'].append({
'source': event['transformation']['service_name'],
'target': event['output_topic'],
'type': 'output'
})
return graph
Topic Naming Conventions
Naming Standards
# Topic naming convention
TOPIC_NAMING_CONVENTION = {
'pattern': '{domain}.{subdomain}.{entity}.{event-type}',
'examples': [
'orders.payment.order.completed',
'users.profile.user.created',
'inventory.stock.product.updated'
],
'rules': [
'Use lowercase letters only',
'Use dots as separators',
'Keep names under 255 characters',
'Include domain and entity',
'Describe the event type'
]
}
def validate_topic_name(topic_name):
"""Validate topic name against convention"""
pattern = r'^[a-z]+(\.[a-z]+)*$'
if not re.match(pattern, topic_name):
return False, f"Invalid topic name: {topic_name}"
if len(topic_name) > 255:
return False, f"Topic name too long: {len(topic_name)} characters"
parts = topic_name.split('.')
if len(parts) < 3:
return False, f"Topic name must have at least 3 parts: {topic_name}"
return True, "Valid"
Best Practices
Governance Checklist
- Schema Registry configured with compatibility rules
- Access control policies implemented
- Data classification applied to all topics
- PII detection and masking in place
- Audit logging enabled
- Retention policies configured
- Lineage tracking implemented
- Topic naming conventions enforced
Regular Governance Reviews
def governance_review():
"""Conduct regular governance review"""
checks = {
'schema_compatibility': check_schema_compatibility(),
'access_control': audit_access_policies(),
'retention_compliance': verify_retention_policies(),
'pii_detection': scan_for_pii(),
'audit_coverage': check_audit_logging()
}
return all(checks.values()), checks
Summary
Kafka data governance requires comprehensive implementation of schema evolution, access control, data lineage, and compliance measures. Establish clear policies for data classification, retention, and audit logging to maintain data quality and regulatory compliance.