πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Multi-Datacenter Replication

🟒 Free Lesson

Advertisement

Kafka Multi-Datacenter Replication

Datacenter 1 (Primary)ProducersKafka Cluster3 Brokers / 9 PartitionsLocal ConsumersMirrorMaker 2Replication PoliciesOffset SyncTopic TranslationCheckpoint ConnectorDatacenter 2 (Secondary)Kafka Cluster3 Brokers / 9 ReplicatedReplicated TopicsFailover ConsumersBi-directional Replication

Overview

Multi-datacenter Kafka replication enables disaster recovery, geo-redundancy, and data locality across distributed infrastructure. This guide covers MirrorMaker 2 replication, failover strategies, and active-active architectures.

Use Cases

  • Disaster Recovery: Failover to secondary DC during outages
  • Geo-Redundancy: Serve consumers from nearest datacenter
  • Compliance: Meet data residency requirements
  • Performance: Reduce latency for geographically distributed users

MirrorMaker 2 Architecture

Core Components

ComponentFunction
Source ConnectorReads from source cluster
Sink ConnectorWrites to target cluster
Checkpoint ConnectorSyncs consumer offsets
Heartbeat ConnectorMonitors cluster health

MirrorMaker 2 Configuration

# mm2.properties
clusters = dc1, dc2

# DC1 connection
dc1.bootstrap.servers = kafka-dc1:9092
dc1.security.protocol = SSL

# DC2 connection
dc2.bootstrap.servers = kafka-dc2:9092
dc2.security.protocol = SSL

# Replication flow
dc1->dc2.enabled = true
dc1->dc2.topics = orders.*, payments.*

# Topic naming
topics.pattern = ${sourceCluster}.${topic}

# Replication policy
replication.factor = 3
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 60

Kafka Connect Configuration

{
  "name": "mirror-maker-2",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "tasks.max": "5",
    "clusters": "dc1,dc2",
    "dc1.bootstrap.servers": "kafka-dc1-1:9092,kafka-dc1-2:9092,kafka-dc1-3:9092",
    "dc2.bootstrap.servers": "kafka-dc2-1:9092,kafka-dc2-2:9092,kafka-dc2-3:9092",
    "dc1->dc2.enabled": "true",
    "dc1->dc2.topics": "orders,payments,users",
    "topics.pattern": "${sourceCluster}.${topic}",
    "sync.topic.configs.enabled": "true",
    "replication.factor": "3",
    "emit.heartbeats.enabled": "true",
    "emit.checkpoints.enabled": "true"
  }
}

Replication Policies

Topic Translation

# Custom topic naming convention
# Source: orders (DC1)
# Replicated: dc1.orders (DC2)

TOPIC_TRANSLATION = {
    'source_pattern': '{topic}',
    'target_pattern': '{source_cluster}.{topic}',
    'examples': {
        'orders': 'dc1.orders',
        'payments': 'dc1.payments',
        'users': 'dc1.users'
    }
}

Replication Factor

# Check replication status
kafka-topics.sh --bootstrap-server kafka-dc2:9092 \
  --describe --topic dc1.orders

# Output:
# Topic: dc1.orders  Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3
# Topic: dc1.orders  Partition: 1  Leader: 2  Replicas: 2,3,1  Isr: 2,3,1

Filter Policies

# Replicate only specific topics
dc1->dc2.topics = orders.*, payments.*

# Exclude certain topics
dc1->dc2.topics.exclude = .*\.internal, .*\.temp

# Replicate by pattern
topics.regex = orders|payments|users

Offset Synchronization

Checkpoint Connector

# Offset sync configuration
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60

# Consumer group translation
groups.enabled = true
groups = order-processor, payment-processor

Offset Sync Example

# DC1 Consumer Group Offsets
{
    "group": "order-processor",
    "topic": "orders",
    "partition": 0,
    "offset": 152345,
    "timestamp": 1640995200000
}

# Synced to DC2
{
    "group": "dc1.order-processor",
    "topic": "dc1.orders",
    "partition": 0,
    "offset": 152345,  # Same offset
    "timestamp": 1640995200000
}

Failover with Offset Reset

from kafka import KafkaConsumer

# After failover, reset to synced offset
consumer = KafkaConsumer(
    'dc1.orders',
    bootstrap_servers=['kafka-dc2:9092'],
    group_id='order-processor-failover',
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

# Manually seek to synced offset
offsets = get_synced_offsets('order-processor', 'orders')
for partition, offset in offsets.items():
    consumer.seek(partition, offset)

Failover Strategies

Active-Passive Failover

class KafkaFailoverManager:
    def __init__(self, primary_cluster, secondary_cluster):
        self.primary = primary_cluster
        self.secondary = secondary_cluster
        self.active = 'primary'
    
    def get_producer(self):
        if self.active == 'primary':
            return KafkaProducer(bootstrap_servers=self.primary)
        else:
            return KafkaProducer(bootstrap_servers=self.secondary)
    
    def failover(self):
        self.active = 'secondary'
        # Reset consumer offsets to synced position
        self.reset_consumer_offsets()
    
    def failback(self):
        # Ensure all data is replicated back
        self.wait_for_replication_complete()
        self.active = 'primary'

Health Check Configuration

import requests
from kafka import KafkaConsumer

def check_cluster_health(bootstrap_servers):
    """Check if Kafka cluster is healthy"""
    try:
        consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            request_timeout_ms=5000
        )
        # Check if we can list topics
        topics = consumer.topics()
        consumer.close()
        return True
    except Exception as e:
        print(f"Cluster health check failed: {e}")
        return False

# Monitor and failover
def monitor_and_failover(primary, secondary):
    while True:
        if not check_cluster_health(primary):
            print("Primary cluster failed, initiating failover")
            failover_manager.failover()
            break
        time.sleep(10)

Automatic Failover with ZooKeeper

from kazoo.client import KazooClient

class ZooKeeperFailover:
    def __init__(self, zk_hosts):
        self.zk = KazooClient(hosts=zk_hosts)
        self.zk.start()
    
    def watch_cluster(self, cluster_path, callback):
        @self.zk.DataWatch(cluster_path)
        def watch_node(data, stat):
            if data is None:
                callback('cluster_down')
            else:
                callback('cluster_up')

Active-Active Architecture

Bi-directional Replication

# Both directions enabled
dc1->dc2.enabled = true
dc1->dc2.topics = orders.*, payments.*

dc2->dc1.enabled = true
dc2->dc1.topics = orders.*, payments.*

# Prevent infinite loops
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

Conflict Resolution

# Use unique keys to prevent conflicts
def produce_with_unique_key(producer, topic, message):
    # Add datacenter identifier to key
    key = f"{message['id']}-{get_datacenter_id()}"
    producer.send(topic, key=key.encode(), value=json.dumps(message).encode())

# Merge strategy for conflicts
class ConflictResolver:
    def resolve(self, dc1_message, dc2_message):
        # Last-write-wins based on timestamp
        if dc1_message['timestamp'] > dc2_message['timestamp']:
            return dc1_message
        return dc2_message

Monitoring Multi-DC Replication

Key Metrics

# Prometheus metrics to monitor
metrics:
  - name: mirrormaker_replication_latency
    description: "Replication latency between DCs"
    threshold: "> 1000ms"
    
  - name: mirrormaker_replication_lag
    description: "Messages pending replication"
    threshold: "> 10000"
    
  - name: mirrormaker_replication_rate
    description: "Messages replicated per second"
    alert: "< expected_rate * 0.5"

Monitoring Dashboard

{
  "panels": [
    {
      "title": "DC1 to DC2 Replication Lag",
      "type": "timeseries",
      "targets": [
        {
          "expr": "sum(mirrormaker_replication_lag{source='dc1', target='dc2'})",
          "legendFormat": "{{topic}}"
        }
      ]
    },
    {
      "title": "Replication Latency",
      "type": "timeseries",
      "targets": [
        {
          "expr": "histogram_quantile(0.99, mirrormaker_replication_latency_bucket)",
          "legendFormat": "p99 latency"
        }
      ]
    }
  ]
}

Disaster Recovery Testing

Regular DR Drills

#!/bin/bash
# dr_drill.sh

echo "Starting DR drill..."

# 1. Verify replication status
kafka-mirror-maker.sh --describe --mm2-cluster dc1-to-dc2

# 2. Simulate primary failure
kubectl scale statefulset kafka-dc1 --replicas=0

# 3. Verify failover
sleep 30
check_consumer_lags --cluster dc2

# 4. Failback
kubectl scale statefulset kafka-dc1 --replicas=3

# 5. Verify replication restored
verify_replication_complete --mm2-cluster dc1-to-dc2

Summary

Multi-datacenter Kafka replication with MirrorMaker 2 provides robust disaster recovery and geo-redundancy. Implement proper offset synchronization, failover strategies, and active-active configurations to ensure high availability across datacenters.

⭐

Premium Content

Kafka Multi-Datacenter Replication

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement