Multi-Datacenter Kafka
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Multi-datacenter Kafka enables disaster recovery, geo-redundancy, and low-latency access across regions. This is critical for global applications.
Architecture Patterns
Architecture Diagram
Pattern 1: Active-Passive
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Active DC (US-East) β
β βββ Kafka Cluster (Primary) β
β βββ Producers & Consumers β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β MirrorMaker 2
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Passive DC (US-West) β
β βββ Kafka Cluster (Backup) β
β βββ No Producers (Standby) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Pattern 2: Active-Active
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β DC 1 (US-East) DC 2 (EU-West) β
β βββββββββββββββ βββββββββββββββ β
β β Kafka βββββββββββββΊβ Kafka β β
β β Cluster β MirrorMakerβ Cluster β β
β βββββββββββββββ βββββββββββββββ β
β β² β² β
β β β β
β Producers Producers β
β Consumers Consumers β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
MirrorMaker 2 Configuration
MM2 Properties
# mm2.properties
clusters = us-east, us-west
# Connection settings
us-east.bootstrap.servers = kafka-us-east1:9092,kafka-us-east2:9092
us-west.bootstrap.servers = kafka-us-west1:9092,kafka-us-west2:9092
# Replication flows
us-east->us-west.enabled = true
us-east->us-west.topics = orders, payments, users
us-west->us-east.enabled = true
us-west->us-east.topics = orders, payments, users
# Replication settings
replication.factor = 3
sync.topic.configs.enabled = true
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
emit.checkpoints.enabled = true
sync.group.offsets.enabled = true
offset.lag.max = 1000
# Checkpoint settings
checkpoint.group.consistency.level = 2
checkpoint.topic = __consumer_offsets
MM2 Configuration File
# mm2.properties
clusters = primary, secondary
primary.bootstrap.servers = kafka-primary1:9092,kafka-primary2:9092,kafka-primary3:9092
secondary.bootstrap.servers = kafka-secondary1:9092,kafka-secondary2:9092,kafka-secondary3:9092
primary->secondary.enabled = true
primary->secondary.topics = .*
secondary->primary.enabled = false # Active-Passive mode
replication.factor = 3
sync.topic.configs.enabled = true
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
emit.checkpoints.enabled = true
sync.group.offsets.enabled = true
Java Admin Client for Multi-DC
import org.apache.kafka.clients.admin.*;
public class MultiDCAdmin {
private final AdminClient primaryAdmin;
private final AdminClient secondaryAdmin;
public MultiDCAdmin() {
Properties primaryProps = new Properties();
primaryProps.put("bootstrap.servers", "kafka-primary:9092");
primaryAdmin = AdminClient.create(primaryProps);
Properties secondaryProps = new Properties();
secondaryProps.put("bootstrap.servers", "kafka-secondary:9092");
secondaryAdmin = AdminClient.create(secondaryProps);
}
public void createTopicInBothDCs(String topic, int partitions) {
NewTopic newTopic = new NewTopic(topic, partitions, (short) 3);
// Create in primary
primaryAdmin.createTopics(Collections.singleton(newTopic));
// Create in secondary
secondaryAdmin.createTopics(Collections.singleton(newTopic));
System.out.println("Topic created in both DCs");
}
public void verifyReplication(String topic) throws Exception {
// Get topic details from primary
DescribeTopicsResult primaryResult = primaryAdmin.describeTopics(
Collections.singleton(topic)
);
// Get topic details from secondary
DescribeTopicsResult secondaryResult = secondaryAdmin.describeTopics(
Collections.singleton(topic)
);
// Compare
TopicDescription primaryDesc = primaryResult.all().get().get(topic);
TopicDescription secondaryDesc = secondaryResult.all().get().get(topic);
System.out.println("Primary partitions: " + primaryDesc.partitions().size());
System.out.println("Secondary partitions: " + secondaryDesc.partitions().size());
}
}
Python Multi-DC Operations
from kafka import KafkaAdminClient
from kafka.admin import TopicPartition, NewTopic
import time
class MultiDCManager:
def __init__(self, primary_servers, secondary_servers):
self.primary_admin = KafkaAdminClient(
bootstrap_servers=primary_servers,
client_id='primary-admin'
)
self.secondary_admin = KafkaAdminClient(
bootstrap_servers=secondary_servers,
client_id='secondary-admin'
)
def create_topic(self, topic, num_partitions, replication_factor=3):
"""Create topic in both DCs"""
topic_config = NewTopic(
name=topic,
num_partitions=num_partitions,
replication_factor=replication_factor
)
# Create in primary
self.primary_admin.create_topics([topic_config])
print(f"Created topic {topic} in primary DC")
# Create in secondary
self.secondary_admin.create_topics([topic_config])
print(f"Created topic {topic} in secondary DC")
def verify_replication(self, topic):
"""Verify topic exists in both DCs"""
primary_topics = self.primary_admin.list_topics()
secondary_topics = self.secondary_admin.list_topics()
if topic in primary_topics and topic in secondary_topics:
print(f"Topic {topic} exists in both DCs")
return True
else:
print(f"Topic {topic} missing in one or both DCs")
return False
def monitor_lag(self, topic):
"""Monitor replication lag between DCs"""
while True:
# Get offsets from primary
primary_offsets = self.get_offsets(self.primary_admin, topic)
# Get offsets from secondary
secondary_offsets = self.get_offsets(self.secondary_admin, topic)
# Calculate lag
for partition in primary_offsets:
primary_offset = primary_offsets[partition]
secondary_offset = secondary_offsets.get(partition, 0)
lag = primary_offset - secondary_offset
print(f"Partition {partition}: Primary={primary_offset}, "
f"Secondary={secondary_offset}, Lag={lag}")
time.sleep(10)
def get_offsets(self, admin, topic):
"""Get latest offsets for topic"""
# This is a simplified example
# In production, use list_offsets method
return {}
Disaster Recovery Procedures
Failover Process
#!/bin/bash
# failover.sh - Manual failover to secondary DC
PRIMARY_DC="us-east"
SECONDARY_DC="us-west"
TOPIC="orders"
echo "Starting failover from $PRIMARY_DC to $SECONDARY_DC"
# 1. Stop producers in primary DC
echo "Stopping producers..."
# Send shutdown signal to producer applications
# 2. Wait for replication to catch up
echo "Waiting for replication lag to reach 0..."
while [ $(get_lag $TOPIC) -gt 0 ]; do
sleep 5
done
# 3. Verify secondary has all data
echo "Verifying secondary DC data..."
verify_secondary_data $TOPIC
# 4. Update DNS/load balancers to point to secondary
echo "Updating DNS records..."
update_dns $SECONDARY_DC
# 5. Start consumers in secondary DC
echo "Starting consumers in secondary DC..."
start_consumers $SECONDARY_DC
echo "Failover complete. Secondary DC is now active."
Failback Process
#!/bin/bash
# failback.sh - Return to primary DC
echo "Starting failback to primary DC"
# 1. Stop producers in secondary DC
echo "Stopping producers in secondary DC..."
# 2. Enable replication from secondary to primary
echo "Enabling reverse replication..."
enable_reverse_replication
# 3. Wait for primary to catch up
echo "Waiting for primary to catch up..."
while [ $(get_reverse_lag) -gt 0 ]; do
sleep 5
done
# 4. Update DNS to point to primary
echo "Updating DNS records..."
update_dns "us-east"
# 5. Start producers in primary DC
echo "Starting producers in primary DC..."
echo "Failback complete. Primary DC is now active."
Geo-Partitioning Strategy
// Geo-aware partitioning
public class GeoPartitioner implements Partitioner {
private Map<String, Integer> regionPartitions;
@Override
public void configure(Map<String, ?> configs) {
regionPartitions = new HashMap<>();
regionPartitions.put("us-east", 0);
regionPartitions.put("us-west", 1);
regionPartitions.put("eu-west", 2);
regionPartitions.put("ap-south", 3);
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// Extract region from value
String region = extractRegion(value.toString());
// Route to region-specific partition
int basePartition = regionPartitions.getOrDefault(region, 0);
// Add some distribution within region
int hash = Math.abs(Utils.murmur2(keyBytes));
return basePartition + (hash % 2); # 2 partitions per region
}
}
Follow-Up Questions
- What is the difference between MirrorMaker 1 and MirrorMaker 2?
- How does Kafka handle exactly-once semantics in multi-DC replication?
- Explain the trade-offs between active-active and active-passive multi-DC setups.
- How would you handle schema evolution across multiple datacenters?
- What are the network requirements for multi-DC Kafka deployments?