Kafka Partitioning Strategies
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Partitioning determines how messages are distributed across partitions in a Kafka topic. Understanding partitioning strategies is critical for designing systems with proper data locality and parallelism.
Why Partitioning Matters
Partitioning Impact:
βββ Data Locality β Related messages go to same partition
βββ Ordering Guarantees β Order maintained within a partition
βββ Parallelism β Each partition consumed by one consumer in group
βββ Load Distribution β Even spread prevents hotspots
Partition Assignment Diagram
Topic: user-events (4 partitions, replication factor 3)
Producer (key: user_id=42)
β
βΌ
Partitioner: hash(42) % 4 = 2
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partition 0 Partition 1 Partition 2 Partition 3 β
β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
β β B1 (L) β β B2 (L) β β B3 (L) β β B4 (L) β β
β β B2 (F) β β B3 (F) β β B4 (F) β β B1 (F) β β
β β B3 (F) β β B4 (F) β β B1 (F) β β B2 (F) β β
β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Default Partitioner Behavior
Kafka provides a default partitioner with specific behavior:
// Default partitioner logic (simplified)
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes != null) {
// Key-based partitioning
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
} else {
// Round-robin for null keys
return nextPartition(topic, null, numPartitions);
}
}
βΉοΈ
Key Insight: The default partitioner uses murmur2 hash algorithm (same as Java's String.hashCode()). This means the same key always goes to the same partition, but the distribution may not be perfectly uniform.
Key-Based Partitioning
Most production systems use key-based partitioning for ordering guarantees:
// Java Producer with explicit key-based partitioning
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// Messages with same key go to same partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
"user-42", // key
"{\"action\": \"login\"}" // value
);
producer.send(record, (metadata, exception) -> {
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
});
Python Producer Example
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Key-based partitioning
for event in user_events:
producer.send(
topic='user-events',
key=event['user_id'], # Ensures same user goes to same partition
value=event
)
producer.flush()
Custom Partitioner Implementation
When you need control over partition assignment:
// Custom partitioner for geographic routing
public class GeoPartitioner implements Partitioner {
private Map<String, Integer> geoPartitions;
@Override
public void configure(Map<String, ?> configs) {
geoPartitions = new HashMap<>();
geoPartitions.put("US", 0);
geoPartitions.put("EU", 1);
geoPartitions.put("ASIA", 2);
// Default partition for unknown regions
geoPartitions.put("DEFAULT", 3);
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = extractRegion(value.toString());
int partition = geoPartitions.getOrDefault(region,
geoPartitions.get("DEFAULT"));
// Ensure partition is within range
int numPartitions = cluster.partitionsForTopic(topic).size();
return partition % numPartitions;
}
@Override
public void close() {}
@Override
public void onNewBatch(String topic, TopicPartition partition, int batchRoleId) {}
}
Configuration
props.put("partitioner.class", "com.example.GeoPartitioner");
# producer.config
bootstrap.servers=kafka1:9092,kafka2:9092
partitioner.class=com.example.GeoPartitioner
Sticky Partitioning (Kafka 2.4+)
Sticky partitioning reduces batch latency by sending to the same partition until it fills:
// Sticky partitioning is the default in Kafka 2.4+
props.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
// Behavior:
// 1. First message with null key goes to partition 0
// 2. Subsequent messages go to partition 0 until batch is full
// 3. Then moves to next partition
// 4. Reduces metadata requests and improves throughput
When to Use Sticky
Use sticky partitioning when:
βββ Messages have null keys
βββ Throughput is more important than ordering
βββ You want to reduce batch latency
βββ Load balancing across partitions is critical
Round-Robin Partitioning
For uniform distribution when ordering doesn't matter:
// Custom round-robin partitioner
public class RoundRobinPartitioner implements Partitioner {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionsForTopic(topic).size();
return counter.getAndIncrement() % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
@Override
public void onNewBatch(String topic, TopicPartition partition, int batchRoleId) {}
}
Custom Partitioner in Python
from kafka.partitioner.base import Partitioner
from kafka.partitioner import murmur2
class GeoPartitioner(Partitioner):
def partition(self, key, num_partitions):
"""Custom partition logic for geographic routing"""
if key is None:
return 0
region = self.extract_region(key)
region_map = {
'US': 0,
'EU': 1,
'ASIA': 2
}
partition = region_map.get(region, num_partitions - 1)
return partition % num_partitions
def extract_region(self, key):
"""Extract region from key or value"""
# Your custom logic here
return 'US'
# Usage
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092'],
partitioner=GeoPartitioner
)
Partitioning Best Practices
1. Key Design
// Good: Deterministic, well-distributed keys
String key = "user:" + userId; // Clear prefix
String key = "order:" + orderId; // Unique identifier
// Bad: Keys that cause skew
String key = "ALL"; // All messages to one partition
String key = timestamp.toString(); // May create hotspots
2. Monitoring Partition Distribution
# Check partition distribution
kafka-topics.sh --describe --topic user-events --bootstrap-server kafka1:9092
# Output shows offset ranges per partition
Topic: user-events TopicId: abc123 PartitionCount: 4 ReplicationFactor: 3
Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
3. Avoiding Hot Partitions
# Monitor partition metrics
from kafka import KafkaConsumer
from prometheus_client import Gauge
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['kafka1:9092'],
group_id='monitoring-group'
)
partition_offsets = Gauge('kafka_partition_offset',
'Current offset per partition',
['partition'])
for message in consumer:
partition_offsets.labels(
partition=message.partition
).set(message.offset)
Follow-Up Questions
- How does the default partitioner handle messages with null keys versus non-null keys?
- What are the trade-offs between key-based partitioning and round-robin partitioning?
- When would you implement a custom partitioner instead of using the default?
- How can you detect and handle partition skew in a production Kafka cluster?
- Explain how sticky partitioning improves performance compared to round-robin.