Kafka Data Governance: Retention, Tiered Storage, Audit
Difficulty: Senior | Asked at: Finance, Healthcare, Enterprise companies
βΉοΈInterview Context
Data governance is critical for regulated industries. Interviewers expect you to understand retention strategies, tiered storage, audit logging, and compliance requirements.
The Question
How do you implement data governance in Kafka? Explain retention policies, tiered storage, and audit logging strategies for compliance.
Data Governance Framework
Kafka Data Governance Components:
1. Data Classification - What data do we have?
2. Retention Policies - How long to keep data?
3. Access Control - Who can access data?
4. Encryption - Protect sensitive data
5. Audit Logging - Track all access
6. Compliance - Meet regulatory requirements
Retention Policies
Retention Configuration
# Topic-level retention configuration
retention_configs = {
# Time-based retention
'retention.ms': 604800000, # 7 days
# Size-based retention
'retention.bytes': 1073741824, # 1GB per partition
# Delete vs compact
'cleanup.policy': 'delete', # or 'compact' or 'compact,delete'
# Segment configuration
'segment.ms': 604800000, # 7 days
'segment.bytes': 1073741824, # 1GB
# Minimum cleanable dirty ratio
'min.cleanable.dirty.ratio': 0.5
}
# Create topic with retention
from kafka.admin import KafkaAdminClient, NewTopic
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
topic = NewTopic(
name='user-events',
num_partitions=12,
replication_factor=3,
topic_configs={
'retention.ms': 2592000000, # 30 days
'retention.bytes': -1, # Unlimited size
'cleanup.policy': 'delete'
}
)
admin.create_topics([topic])
Retention Strategies
retention_strategies = {
'time_based': {
'description': 'Delete data after fixed time period',
'config': 'retention.ms',
'use_case': 'Log data, events with time relevance',
'example': 'retention.ms=604800000 (7 days)'
},
'size_based': {
'description': 'Delete oldest data when size exceeds limit',
'config': 'retention.bytes',
'use_case': 'Storage-constrained environments',
'example': 'retention.bytes=1073741824 (1GB per partition)'
},
'compact': {
'description': 'Keep only latest value per key',
'config': 'cleanup.policy=compact',
'use_case': 'State stores, configuration topics',
'example': 'cleanup.policy=compact'
},
'compact_delete': {
'description': 'Compact with tombstone deletion',
'config': 'cleanup.policy=compact,delete',
'use_case': 'State stores with deletion',
'example': 'cleanup.policy=compact,delete'
}
}
def calculate_retention_size(
messages_per_day: int,
avg_message_size: int,
retention_days: int,
num_partitions: int
) -> dict:
"""
Calculate storage requirements for retention policy.
"""
# Daily data per partition
daily_per_partition = (messages_per_day * avg_message_size) / num_partitions
# Total per partition
total_per_partition = daily_per_partition * retention_days
# Total across partitions
total_bytes = total_per_partition * num_partitions
return {
'daily_per_partition_gb': daily_per_partition / (1024**3),
'total_per_partition_gb': total_per_partition / (1024**3),
'total_gb': total_bytes / (1024**3),
'total_tb': total_bytes / (1024**4)
}
# Example
retention = calculate_retention_size(
messages_per_day=100000000,
avg_message_size=500,
retention_days=30,
num_partitions=12
)
print(f"Total storage: {retention['total_tb']:.2f} TB")
β οΈRetention Warning
Setting retention too short may lose valuable data. Setting too long increases storage costs. Always consider:
- Business requirements
- Regulatory compliance
- Storage costs
- Query patterns
Tiered Storage
How Tiered Storage Works
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Tiered Storage Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Hot Tier (Local Disk) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Recent data (last 7 days) β β
β β - Fast access β β
β β - Full read/write performance β β
β β - Expensive storage β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β After retention period β
β Warm Tier (S3/GCS/Azure Blob) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Older data (7-90 days) β β
β β - Slower access β β
β β - Read-only (mostly) β β
β β - Medium cost β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β After extended period β
β Cold Tier (Glacier/Archive) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Archived data (90+ days) β β
β β - Very slow access β β
β β - Read-only β β
β β - Cheapest storage β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Tiered Storage Configuration
# Confluent Tiered Storage configuration
tiered_storage_config = {
# Enable tiered storage
'confluent.tiered.enable': True,
# Topic-level configuration
'confluent.tiered.topic.level.enable': True,
# Storage policies
'confluent.tiered.archiver.max.bytes.per.segment': 536870912, # 512MB
'confluent.tiered.cleanup.policy': 'delete',
# Remote storage settings
'confluent.tiered.remote.storage.manager.class.name': 'io.confluent.server.redpanda.S3RemoteResourceManager',
'confluent.tiered.remote.storage.manager.s3.bucket.name': 'kafka-tiered-storage',
'confluent.tiered.remote.storage.manager.s3.region': 'us-east-1',
# Hot tier retention
'confluent.tiered.local.retention.ms': 604800000, # 7 days
# Performance tuning
'confluent.tiered.fetch.request.timeout.ms': 30000,
'confluent.tiered.segment.upload.timeout.ms': 60000
}
# Create topic with tiered storage
topic = NewTopic(
name='events-tiered',
num_partitions=12,
replication_factor=3,
topic_configs={
'retention.ms': 31536000000, # 1 year total
'confluent.tiered.enable': True,
'confluent.tiered.local.retention.ms': 604800000, # 7 days hot
'confluent.tiered.archiver.max.bytes.per.segment': 536870912
}
)
Tiered Storage Cost Savings
def calculate_tiered_storage_savings(
total_data_tb: float,
hot_tier_days: int,
total_retention_days: int,
hot_storage_cost_per_gb: float,
warm_storage_cost_per_gb: float,
cold_storage_cost_per_gb: float
) -> dict:
"""
Calculate cost savings from tiered storage.
"""
# Calculate data distribution
hot_fraction = hot_tier_days / total_retention_days
warm_fraction = 0.5 * (1 - hot_fraction) # Half of remaining in warm
cold_fraction = 1 - hot_fraction - warm_fraction
# Calculate storage in each tier
hot_tb = total_data_tb * hot_fraction
warm_tb = total_data_tb * warm_fraction
cold_tb = total_data_tb * cold_fraction
# Calculate costs
hot_cost = hot_tb * 1024 * hot_storage_cost_per_gb
warm_cost = warm_tb * 1024 * warm_storage_cost_per_gb
cold_cost = cold_tb * 1024 * cold_storage_cost_per_gb
# Total tiered cost
tiered_cost = hot_cost + warm_cost + cold_cost
# Traditional cost (all hot)
traditional_cost = total_data_tb * 1024 * hot_storage_cost_per_gb
# Savings
savings = traditional_cost - tiered_cost
savings_percent = (savings / traditional_cost) * 100
return {
'hot_storage_tb': hot_tb,
'warm_storage_tb': warm_tb,
'cold_storage_tb': cold_tb,
'traditional_monthly_cost': traditional_cost,
'tiered_monthly_cost': tiered_cost,
'monthly_savings': savings,
'savings_percent': savings_percent
}
# Example
savings = calculate_tiered_storage_savings(
total_data_tb=10,
hot_tier_days=7,
total_retention_days=365,
hot_storage_cost_per_gb=0.10,
warm_storage_cost_per_gb=0.025,
cold_storage_cost_per_gb=0.004
)
print(f"Traditional cost: ${savings['traditional_monthly_cost']:,.0f}/month")
print(f"Tiered cost: ${savings['tiered_monthly_cost']:,.0f}/month")
print(f"Savings: ${savings['monthly_savings']:,.0f}/month ({savings['savings_percent']:.1f}%)")
βΉοΈTiered Storage Benefit
Tiered storage can reduce costs by 60-80% for long retention periods. The trade-off is slower access to older data. Use for:
- Compliance data requiring long retention
- Historical analysis workloads
- Disaster recovery backups
Audit Logging
Audit Event Schema
{
"event_id": "uuid-1234",
"timestamp": "2024-01-15T10:30:00Z",
"event_type": "topic.produce",
"actor": {
"type": "user",
"id": "alice@example.com",
"ip": "192.168.1.100",
"user_agent": "kafka-producer/3.6.1"
},
"resource": {
"type": "topic",
"name": "user-events",
"cluster": "production"
},
"action": {
"type": "write",
"records_count": 100,
"bytes_written": 50000
},
"result": {
"status": "success",
"partition_offsets": {
"0": 12345,
"1": 67890
}
},
"metadata": {
"request_id": "req-5678",
"duration_ms": 45,
"schema_id": 42
}
}
Audit Logger Implementation
class KafkaAuditLogger:
"""
Audit logger for Kafka operations.
"""
def __init__(self, audit_topic, producer):
self.audit_topic = audit_topic
self.producer = producer
def log_produce(self, user, topic, records_count, bytes_written, result):
"""Log produce operation"""
audit_event = {
'event_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'topic.produce',
'actor': {
'type': 'user',
'id': user,
'ip': self._get_client_ip(),
'user_agent': self._get_user_agent()
},
'resource': {
'type': 'topic',
'name': topic,
'cluster': self._get_cluster_name()
},
'action': {
'type': 'write',
'records_count': records_count,
'bytes_written': bytes_written
},
'result': {
'status': 'success' if result else 'failure',
'details': result
},
'metadata': {
'timestamp_ms': int(time.time() * 1000)
}
}
self.producer.send(self.audit_topic, value=audit_event)
def log_consume(self, user, topic, group, records_count, result):
"""Log consume operation"""
audit_event = {
'event_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'topic.consume',
'actor': {
'type': 'user',
'id': user,
'ip': self._get_client_ip()
},
'resource': {
'type': 'topic',
'name': topic,
'cluster': self._get_cluster_name()
},
'action': {
'type': 'read',
'group': group,
'records_count': records_count
},
'result': {
'status': 'success' if result else 'failure'
}
}
self.producer.send(self.audit_topic, value=audit_event)
def log_admin(self, user, operation, resource, result):
"""Log administrative operation"""
audit_event = {
'event_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'event_type': f'admin.{operation}',
'actor': {
'type': 'user',
'id': user,
'ip': self._get_client_ip()
},
'resource': resource,
'action': {
'type': operation
},
'result': {
'status': 'success' if result else 'failure',
'details': result
}
}
self.producer.send(self.audit_topic, value=audit_event)
Audit Retention
audit_retention_config = {
'topic': 'audit-events',
'retention_days': 365, # 1 year for compliance
'cleanup_policy': 'delete',
# Tiered storage for cost optimization
'tiered_storage': True,
'hot_tier_days': 30,
# Compression for audit logs
'compression_type': 'zstd',
# Replication for durability
'replication_factor': 3,
'min_insync_replicas': 2,
# Encryption
'encryption_at_rest': True,
'encryption_in_transit': True
}
β οΈAudit Requirements
Many compliance frameworks require audit logging:
- PCI DSS: Log all access to cardholder data
- HIPAA: Log all access to PHI
- SOC 2: Log all administrative actions
- GDPR: Log data processing activities
Compliance Strategies
Data Classification
data_classification = {
'Public': {
'description': 'Data publicly available',
'retention': 'Business need',
'encryption': 'Optional',
'audit': 'Minimal'
},
'Internal': {
'description': 'Internal business data',
'retention': '7 years',
'encryption': 'In transit',
'audit': 'Administrative actions'
},
'Confidential': {
'description': 'Sensitive business data',
'retention': '7 years',
'encryption': 'In transit and at rest',
'audit': 'All access'
},
'Restricted': {
'description': 'PII, PHI, PCI data',
'retention': 'As required by regulation',
'encryption': 'In transit and at rest (strong)',
'audit': 'All access with details'
}
}
# Classify topics
topic_classification = {
'user-events': 'Confidential',
'payment-events': 'Restricted',
'public-announcements': 'Public',
'internal-metrics': 'Internal'
}
Compliance Checklist
compliance_checklist = {
'GDPR': [
'Right to erasure (tombstones)',
'Data minimization (retention policies)',
'Purpose limitation (topic access controls)',
'Data portability (export capabilities)'
],
'PCI_DSS': [
'Encrypt cardholder data at rest',
'Encrypt cardholder data in transit',
'Restrict access to cardholder data',
'Log all access to cardholder data',
'Regular security testing'
],
'HIPAA': [
'Encrypt PHI at rest',
'Encrypt PHI in transit',
'Audit all access to PHI',
'Business Associate Agreements',
'Breach notification procedures'
],
'SOC_2': [
'Access controls',
'Audit logging',
'Change management',
'Incident response',
'Data retention policies'
]
}
def assess_compliance(compliance_framework, kafka_config):
"""
Assess Kafka configuration against compliance requirements.
"""
requirements = compliance_checklist.get(compliance_framework, [])
assessment = []
for requirement in requirements:
status = check_requirement(requirement, kafka_config)
assessment.append({
'requirement': requirement,
'status': status,
'recommendation': get_recommendation(requirement, status)
})
return assessment
def check_requirement(requirement, config):
"""Check if requirement is met"""
# Simplified check logic
if 'encrypt' in requirement.lower():
return 'met' if config.get('encryption') else 'not_met'
elif 'audit' in requirement.lower():
return 'met' if config.get('audit_logging') else 'not_met'
elif 'retention' in requirement.lower():
return 'met' if config.get('retention_policy') else 'not_met'
return 'needs_review'
Data Lineage
class DataLineageTracker:
"""
Track data lineage across Kafka topics.
"""
def __init__(self):
self.lineage = {}
def record_transformation(self, source_topic, target_topic, transformation):
"""
Record data transformation.
Tracks how data flows through the system.
"""
if target_topic not in self.lineage:
self.lineage[target_topic] = {
'sources': [],
'transformations': [],
'consumers': []
}
self.lineage[target_topic]['sources'].append(source_topic)
self.lineage[target_topic]['transformations'].append({
'type': transformation,
'timestamp': int(time.time() * 1000)
})
def get_lineage(self, topic):
"""Get lineage for a topic"""
return self.lineage.get(topic, {})
def visualize_lineage(self):
"""Generate lineage visualization"""
for topic, info in self.lineage.items():
print(f"Topic: {topic}")
print(f" Sources: {info['sources']}")
print(f" Transformations: {len(info['transformations'])}")
print()
βΉοΈData Lineage
Data lineage helps you understand:
- Where data comes from
- How it's transformed
- Where it goes
- Who uses it
This is critical for debugging, compliance, and data governance.
Governance Metrics
governance_metrics = {
'Data Quality': {
'schema_compliance_rate': 'Percentage of messages matching schema',
'data_freshness': 'Age of newest message',
'completeness': 'Percentage of required fields present'
},
'Retention': {
'topics_exceeding_retention': 'Topics with data beyond retention',
'storage_utilization': 'Storage usage vs capacity',
'deletion_rate': 'Data deleted per day'
},
'Access Control': {
'unauthorized_access_attempts': 'Failed authorization checks',
'privileged_user_count': 'Users with admin access',
'acl_coverage': 'Percentage of topics with ACLs'
},
'Compliance': {
'audit_log_completeness': 'Percentage of operations logged',
'encryption_coverage': 'Percentage of topics encrypted',
'retention_compliance': 'Topics meeting retention requirements'
}
}
def calculate_governance_score(metrics):
"""
Calculate overall governance score.
"""
weights = {
'schema_compliance_rate': 0.3,
'audit_log_completeness': 0.25,
'encryption_coverage': 0.25,
'acl_coverage': 0.2
}
score = 0
for metric, weight in weights.items():
value = metrics.get(metric, 0)
score += value * weight
return score * 100
β οΈKey Insight
Data governance is not just about technology. It requires:
- Clear policies and procedures
- Training and awareness
- Regular audits and reviews
- Continuous improvement
- Executive sponsorship