πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Data Governance

🟒 Free Lesson

Advertisement

Kafka Data Governance

Schema RegistryVersion ControlCompatibilityValidationEvolution RulesSchema HistoryBackward CompatAccess ControlACLsRBACTopic PoliciesConsumer GroupsService AccountsRole AssignmentsData LineageSource TrackingFlow MappingTransformationDependency GraphImpact AnalysisData CatalogAudit & ComplianceAccess LogsChange TrackingGDPR ComplianceData RetentionEncryptionRight to ErasureKafka Data Governance Framework

Overview

Data governance in Kafka ensures data quality, compliance, security, and accountability across your event streaming platform. This guide covers schema evolution, access control, data lineage, and regulatory compliance.

Governance Goals

  • Data Quality: Ensure data meets standards
  • Compliance: Meet regulatory requirements
  • Security: Protect sensitive data
  • Accountability: Track data ownership and changes
  • Discoverability: Make data easy to find and understand

Schema Evolution

Schema Compatibility Modes

ModeDescriptionUse Case
BACKWARDNew schema can read old dataConsumer upgrades first
FORWARDOld schema can read new dataProducer upgrades first
FULLBoth directions compatibleMaximum flexibility
NONENo compatibility checksDevelopment only

Avro Schema Evolution

// Schema v1
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string"}
  ]
}

// Schema v2 (backward compatible)
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Schema Registry Configuration

# Schema Registry configuration
schema.registry.url=http://schema-registry:8081
schema.registry.compatibility.level=BACKWARD
schema.registry.compatibility.config=full
schema.registry.auto.register.schemas=false
schema.registry.topic=_schemas

Schema Validation

from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

class SchemaValidator:
    def __init__(self, schema_registry_url):
        self.registry = CachedSchemaRegistryClient({'url': schema_registry_url})
    
    def validate_schema(self, subject, schema_str):
        """Validate schema against compatibility rules"""
        try:
            # Get latest schema
            latest_schema, latest_id = self.registry.get_latest_schema(subject)
            
            # Parse new schema
            new_schema = json.loads(schema_str)
            
            # Check compatibility
            is_compatible = self.registry.test_compatibility(subject, new_schema)
            
            return {
                'compatible': is_compatible,
                'latest_version': latest_schema.version if hasattr(latest_schema, 'version') else None
            }
        except Exception as e:
            return {'compatible': False, 'error': str(e)}
    
    def register_schema(self, subject, schema_str):
        """Register new schema version"""
        schema = json.loads(schema_str)
        schema_id = self.registry.register(subject, schema)
        return schema_id

Data Classification

Classification Levels

# data_classification.yml
classification_levels:
  PUBLIC:
    description: "Non-sensitive data"
    encryption: false
    retention: "unlimited"
    access: "all_employees"
    
  INTERNAL:
    description: "Internal business data"
    encryption: true
    retention: "7_years"
    access: "employees"
    
  CONFIDENTIAL:
    description: "Sensitive business data"
    encryption: true
    retention: "5_years"
    access: "authorized_teams"
    
  RESTRICTED:
    description: "PII and regulated data"
    encryption: true
    retention: "3_years"
    access: "compliance_team"
    audit: true

Topic Classification

# Topic classification metadata
TOPIC_CLASSIFICATIONS = {
    'orders': {
        'classification': 'CONFIDENTIAL',
        'owner': 'payments-team',
        'description': 'Customer order events',
        'pii_fields': ['customer_email', 'shipping_address'],
        'retention_days': 2555,  # 7 years
        'tags': ['revenue', 'customer-data']
    },
    'user-events': {
        'classification': 'RESTRICTED',
        'owner': 'platform-team',
        'description': 'User behavior tracking',
        'pii_fields': ['user_id', 'ip_address', 'device_id'],
        'retention_days': 365,
        'tags': ['analytics', 'user-tracking']
    }
}

PII Detection and Masking

import re
from functools import wraps

class PIIDetector:
    PATTERNS = {
        'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
        'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
        'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
    }
    
    @classmethod
    def detect_pii(cls, data):
        pii_found = []
        for pii_type, pattern in cls.PATTERNS.items():
            if re.search(pattern, str(data)):
                pii_found.append(pii_type)
        return pii_found

class PIIMasker:
    def mask_email(self, email):
        local, domain = email.split('@')
        return f"{local[0]}***@{domain}"
    
    def mask_phone(self, phone):
        return f"***-***-{phone[-4:]}"
    
    def mask_pii(self, data, fields_to_mask):
        masked_data = data.copy()
        for field in fields_to_mask:
            if field in masked_data:
                value = masked_data[field]
                if isinstance(value, str):
                    pii_types = PIIDetector.detect_pii(value)
                    if 'email' in pii_types:
                        masked_data[field] = self.mask_email(value)
                    elif 'phone' in pii_types:
                        masked_data[field] = self.mask_phone(value)
        return masked_data

GDPR Compliance

Right to Erasure Implementation

class GDPRComplianceManager:
    def __init__(self, kafka_producer, schema_registry):
        self.producer = kafka_producer
        self.schema_registry = schema_registry
    
    def process_erasure_request(self, user_id):
        """Process GDPR erasure request"""
        # 1. Publish erasure event
        erasure_event = {
            'event_type': 'USER_ERASURE_REQUEST',
            'user_id': user_id,
            'timestamp': int(time.time() * 1000),
            'request_id': str(uuid.uuid4())
        }
        
        self.producer.send('gdpr-events', value=erasure_event)
        
        # 2. Mark user data for deletion in all topics
        topics_with_user_data = self.get_topics_with_user_data(user_id)
        
        for topic in topics_with_user_data:
            self.mark_for_deletion(topic, user_id)
        
        return erasure_event['request_id']
    
    def mark_for_deletion(self, topic, user_id):
        """Mark user records for deletion"""
        # Use Kafka Streams to filter out user data
        deletion_config = {
            'input_topic': topic,
            'output_topic': f'{topic}-redacted',
            'user_id': user_id,
            'action': 'REDACT'
        }
        
        self.start_redaction_job(deletion_config)
    
    def get_topics_with_user_data(self, user_id):
        """Find all topics containing user data"""
        # Query data catalog for topics with user PII
        return ['orders', 'user-events', 'analytics', 'profiles']

Data Retention Policies

class RetentionPolicyManager:
    def __init__(self, admin_client):
        self.admin = admin_client
    
    def apply_retention_policies(self):
        """Apply retention policies to all topics"""
        policies = {
            'orders': {'retention_ms': 2555 * 24 * 3600 * 1000},  # 7 years
            'user-events': {'retention_ms': 365 * 24 * 3600 * 1000},  # 1 year
            'logs': {'retention_ms': 30 * 24 * 3600 * 1000},  # 30 days
            'temp': {'retention_ms': 7 * 24 * 3600 * 1000}  # 7 days
        }
        
        for topic, policy in policies.items():
            self.update_topic_config(topic, policy)
    
    def update_topic_config(self, topic, config):
        """Update topic configuration"""
        admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
        
        # Create config resource
        config_resource = ConfigResource(
            ResourceType.TOPIC,
            topic,
            config_entries=[
                ConfigEntry('retention.ms', str(config['retention_ms']))
            ]
        )
        
        # Apply configuration
        futures = admin_client.alter_configs([config_resource])
        
        for future in futures:
            future.result()

Audit Logging

Audit Event Structure

{
  "audit_id": "audit-12345",
  "timestamp": "2024-01-15T10:30:00Z",
  "event_type": "TOPIC_ACCESS",
  "actor": {
    "user_id": "user-789",
    "service": "order-service",
    "ip_address": "10.0.1.100"
  },
  "resource": {
    "type": "topic",
    "name": "orders",
    "operation": "WRITE"
  },
  "details": {
    "message_count": 150,
    "bytes_written": 153600,
    "partition_keys": ["order-123", "order-124"]
  },
  "result": "SUCCESS"
}

Audit Logger

import json
from datetime import datetime
from kafka import KafkaProducer

class AuditLogger:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    
    def log_access(self, actor, resource, operation, result):
        audit_event = {
            'audit_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'ACCESS',
            'actor': actor,
            'resource': resource,
            'operation': operation,
            'result': result
        }
        
        self.producer.send('audit-log', value=audit_event)
        return audit_event['audit_id']
    
    def log_data_change(self, actor, resource, change_type, before, after):
        audit_event = {
            'audit_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'DATA_CHANGE',
            'actor': actor,
            'resource': resource,
            'change_type': change_type,
            'before': before,
            'after': after
        }
        
        self.producer.send('audit-log', value=audit_event)
        return audit_event['audit_id']

Access Control Policies

# access_policies.yml
policies:
  - name: "order-service-producer"
    principal: "User:order-service"
    resource:
      type: "topic"
      name: "orders"
    operations:
      - "WRITE"
      - "DESCRIBE"
    effect: "ALLOW"
    
  - name: "analytics-consumer"
    principal: "User:analytics-service"
    resource:
      type: "topic"
      name: "orders"
    operations:
      - "READ"
      - "DESCRIBE"
    effect: "ALLOW"
    
  - name: "admin-full-access"
    principal: "User:admin"
    resource:
      type: "topic"
      name: "*"
    operations:
      - "ALL"
    effect: "ALLOW"

Data Lineage

Lineage Tracking

class DataLineageTracker:
    def __init__(self, kafka_producer):
        self.producer = kafka_producer
    
    def track_transformation(self, input_topics, output_topic, transformation):
        lineage_event = {
            'event_type': 'DATA_LINEAGE',
            'timestamp': int(time.time() * 1000),
            'input_topics': input_topics,
            'output_topic': output_topic,
            'transformation': transformation,
            'producer_service': transformation.get('service_name')
        }
        
        self.producer.send('data-lineage', value=lineage_event)
    
    def track_consumption(self, topic, consumer_group, message_count):
        consumption_event = {
            'event_type': 'DATA_CONSUMPTION',
            'timestamp': int(time.time() * 1000),
            'topic': topic,
            'consumer_group': consumer_group,
            'message_count': message_count
        }
        
        self.producer.send('data-lineage', value=consumption_event)

Lineage Visualization

def generate_lineage_graph(lineage_events):
    """Generate data lineage graph"""
    graph = {
        'nodes': [],
        'edges': []
    }
    
    for event in lineage_events:
        if event['event_type'] == 'DATA_LINEAGE':
            # Add transformation node
            graph['nodes'].append({
                'id': event['transformation']['service_name'],
                'type': 'transformation',
                'label': event['transformation']['service_name']
            })
            
            # Add edges from input topics
            for input_topic in event['input_topics']:
                graph['edges'].append({
                    'source': input_topic,
                    'target': event['transformation']['service_name'],
                    'type': 'input'
                })
            
            # Add edge to output topic
            graph['edges'].append({
                'source': event['transformation']['service_name'],
                'target': event['output_topic'],
                'type': 'output'
            })
    
    return graph

Topic Naming Conventions

Naming Standards

# Topic naming convention
TOPIC_NAMING_CONVENTION = {
    'pattern': '{domain}.{subdomain}.{entity}.{event-type}',
    'examples': [
        'orders.payment.order.completed',
        'users.profile.user.created',
        'inventory.stock.product.updated'
    ],
    'rules': [
        'Use lowercase letters only',
        'Use dots as separators',
        'Keep names under 255 characters',
        'Include domain and entity',
        'Describe the event type'
    ]
}

def validate_topic_name(topic_name):
    """Validate topic name against convention"""
    pattern = r'^[a-z]+(\.[a-z]+)*$'
    
    if not re.match(pattern, topic_name):
        return False, f"Invalid topic name: {topic_name}"
    
    if len(topic_name) > 255:
        return False, f"Topic name too long: {len(topic_name)} characters"
    
    parts = topic_name.split('.')
    if len(parts) < 3:
        return False, f"Topic name must have at least 3 parts: {topic_name}"
    
    return True, "Valid"

Best Practices

Governance Checklist

  • Schema Registry configured with compatibility rules
  • Access control policies implemented
  • Data classification applied to all topics
  • PII detection and masking in place
  • Audit logging enabled
  • Retention policies configured
  • Lineage tracking implemented
  • Topic naming conventions enforced

Regular Governance Reviews

def governance_review():
    """Conduct regular governance review"""
    checks = {
        'schema_compatibility': check_schema_compatibility(),
        'access_control': audit_access_policies(),
        'retention_compliance': verify_retention_policies(),
        'pii_detection': scan_for_pii(),
        'audit_coverage': check_audit_logging()
    }
    
    return all(checks.values()), checks

Summary

Kafka data governance requires comprehensive implementation of schema evolution, access control, data lineage, and compliance measures. Establish clear policies for data classification, retention, and audit logging to maintain data quality and regulatory compliance.

⭐

Premium Content

Kafka Data Governance

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement