Kafka Performance: Throughput vs Latency, Batch Sizes
Difficulty: Senior | Asked at: Netflix, Uber, LinkedIn, Stripe
ℹ️Interview Context
Performance tuning is a critical skill for senior engineers. Interviewers expect you to understand the trade-offs between throughput and latency and to recommend configurations based on specific requirements.
The Question
How do you optimize Kafka for maximum throughput vs minimum latency? What are the key configuration parameters and their trade-offs?
Throughput vs Latency Trade-off
The Fundamental Trade-off
# High throughput configuration
high_throughput_config = {
'batch.size': 131072, # 128KB batches
'linger.ms': 50, # Wait 50ms to fill batch
'compression.type': 'lz4', # Compress for efficiency
'buffer.memory': 268435456, # 256MB buffer
'acks': 'all', # Wait for all ISR
'max.in.flight.requests.per.connection': 5
}
# Low latency configuration
low_latency_config = {
'batch.size': 16384, # 16KB batches (small)
'linger.ms': 0, # Send immediately
'compression.type': None, # No compression
'buffer.memory': 67108864, # 64MB buffer
'acks': '1', # Leader only
'max.in.flight.requests.per.connection': 1
}
# Balanced configuration
balanced_config = {
'batch.size': 32768, # 32KB batches
'linger.ms': 10, # Wait 10ms
'compression.type': 'snappy', # Balanced compression
'buffer.memory': 134217728, # 128MB buffer
'acks': 'all',
'max.in.flight.requests.per.connection': 3
}
Throughput Formula
def calculate_throughput(
batch_size_bytes: int,
num_partitions: int,
linger_ms: int,
compression_ratio: float = 1.0
) -> float:
"""
Calculate theoretical throughput in MB/s.
"""
# Batches per second per partition
batches_per_sec = 1000 / max(linger_ms, 1)
# Throughput per partition
per_partition = (batch_size_bytes * batches_per_sec * compression_ratio)
# Total throughput
total_bytes_per_sec = per_partition * num_partitions
return total_bytes_per_sec / (1024 * 1024)
# Example comparisons
print("High throughput:", calculate_throughput(131072, 12, 50, 0.3), "MB/s")
print("Balanced:", calculate_throughput(32768, 12, 10, 0.5), "MB/s")
print("Low latency:", calculate_throughput(16384, 12, 0, 1.0), "MB/s")
ℹ️Key Insight
The relationship between batch size and latency is linear. Doubling batch size roughly doubles latency but also roughly doubles throughput. The sweet spot depends on your specific requirements.
Producer Tuning
Batch Size Optimization
class BatchSizeOptimizer:
"""
Find optimal batch size for given requirements.
"""
def __init__(self, target_latency_ms, target_throughput_mbps):
self.target_latency = target_latency_ms
self.target_throughput = target_throughput_mbps
def find_optimal_batch_size(
self,
num_partitions: int,
avg_message_size: int,
compression_ratio: float
) -> dict:
"""
Find batch size that meets latency and throughput targets.
"""
# Calculate maximum batch size for latency target
# Assume linger_ms = batch_size / (message_rate * avg_message_size)
max_batch_size_for_latency = (
self.target_latency *
1000 * # Convert to ms
avg_message_size *
compression_ratio
)
# Calculate minimum batch size for throughput target
min_batch_size_for_throughput = (
self.target_throughput * 1024 * 1024 /
(num_partitions * (1000 / self.target_latency))
)
# Optimal batch size
if min_batch_size_for_throughput <= max_batch_size_for_latency:
optimal_batch_size = int(
(min_batch_size_for_throughput + max_batch_size_for_latency) / 2
)
feasible = True
else:
optimal_batch_size = int(max_batch_size_for_latency)
feasible = False
return {
'optimal_batch_size': optimal_batch_size,
'max_for_latency': int(max_batch_size_for_latency),
'min_for_throughput': int(min_batch_size_for_throughput),
'feasible': feasible,
'recommended_linger_ms': int(
optimal_batch_size / (avg_message_size * 100)
)
}
# Example
optimizer = BatchSizeOptimizer(
target_latency_ms=10,
target_throughput_mbps=100
)
result = optimizer.find_optimal_batch_size(
num_partitions=12,
avg_message_size=500,
compression_ratio=0.5
)
print(f"Optimal batch size: {result['optimal_batch_size']} bytes")
print(f"Feasible: {result['feasible']}")
Linger MS Optimization
def optimize_linger_ms(
batch_size_bytes: int,
message_rate_per_sec: int,
avg_message_size: int,
target_latency_ms: int
) -> int:
"""
Optimize linger_ms for batch size and latency target.
"""
# Messages per batch
messages_per_batch = batch_size_bytes / avg_message_size
# Time to fill batch
time_to_fill_ms = (messages_per_batch / message_rate_per_sec) * 1000
# Use smaller of time-to-fill and target latency
optimal_linger = min(time_to_fill_ms, target_latency_ms)
return max(0, int(optimal_linger))
# Example
linger = optimize_linger_ms(
batch_size_bytes=32768,
message_rate_per_sec=10000,
avg_message_size=500,
target_latency_ms=10
)
print(f"Optimal linger_ms: {linger}")
Compression Optimization
def select_compression(
target_throughput_mbps: float,
cpu_budget_percent: float,
avg_message_size: int
) -> dict:
"""
Select compression algorithm based on requirements.
"""
algorithms = {
'none': {
'ratio': 1.0,
'cpu_cost': 0,
'latency_addition_ms': 0
},
'snappy': {
'ratio': 0.6,
'cpu_cost': 0.05,
'latency_addition_ms': 0.1
},
'lz4': {
'ratio': 0.5,
'cpu_cost': 0.03,
'latency_addition_ms': 0.05
},
'zstd': {
'ratio': 0.3,
'cpu_cost': 0.15,
'latency_addition_ms': 0.5
},
'gzip': {
'ratio': 0.25,
'cpu_cost': 0.3,
'latency_addition_ms': 1.0
}
}
recommendations = {}
for name, algo in algorithms.items():
effective_throughput = target_throughput_mbps * algo['ratio']
cpu_impact = algo['cpu_cost'] * 100
if cpu_impact <= cpu_budget_percent:
recommendations[name] = {
'effective_throughput': effective_throughput,
'cpu_impact_percent': cpu_impact,
'latency_impact_ms': algo['latency_addition_ms']
}
# Select best option
best = max(recommendations.items(), key=lambda x: x[1]['effective_throughput'])
return {
'selected': best[0],
'details': best[1],
'all_options': recommendations
}
⚠️Compression Trade-off
Compression reduces network and disk usage but adds CPU overhead. For CPU-bound systems, consider no compression or lz4 (lowest CPU cost). For network-bound systems, use zstd (best ratio).
Consumer Tuning
Consumer Performance Optimization
# Consumer performance parameters
consumer_config = {
# Poll configuration
'max.poll.records': 500, # Records per poll
'max.poll.interval.ms': 300000, # Max processing time
# Fetch configuration
'fetch.min.bytes': 1, # Minimum fetch size
'fetch.max.wait.ms': 500, # Max wait for fetch
'max.partition.fetch.bytes': 1048576, # 1MB per partition
# Buffer configuration
'receive.buffer.bytes': 131072, # 128KB socket buffer
'fetch.backoff.ms': 1000 # Backoff on error
}
def optimize_consumer_for_throughput(
num_partitions: int,
processing_time_per_record_ms: float,
target_throughput_per_sec: int
) -> dict:
"""
Optimize consumer configuration for throughput.
"""
# Calculate max.poll.records
# records_per_sec = max.poll.records / (processing_time_per_record_ms * max.poll.records / 1000)
# Simplified: records_per_sec = 1000 / processing_time_per_record_ms
records_per_sec_per_consumer = 1000 / processing_time_per_record_ms
# Consumers needed
consumers_needed = max(1, int(target_throughput_per_sec / records_per_sec_per_consumer))
# Partitions per consumer
partitions_per_consumer = num_partitions / consumers_needed
# Optimal max.poll.records
optimal_poll_records = int(target_throughput_per_sec * processing_time_per_record_ms / 1000)
return {
'consumers_needed': consumers_needed,
'partitions_per_consumer': partitions_per_consumer,
'optimal_max_poll_records': optimal_poll_records,
'theoretical_throughput': records_per_sec_per_consumer * consumers_needed
}
# Example
result = optimize_consumer_for_throughput(
num_partitions=12,
processing_time_per_record_ms=1.0,
target_throughput_per_sec=50000
)
print(f"Consumers needed: {result['consumers_needed']}")
print(f"Optimal max.poll.records: {result['optimal_max_poll_records']}")
Consumer Lag Formula
def estimate_consumer_lag_time(
lag_messages: int,
consumption_rate_per_sec: int,
num_consumers: int
) -> dict:
"""
Estimate time to consume lag.
"""
effective_rate = consumption_rate_per_sec * num_consumers
if effective_rate == 0:
lag_time_seconds = float('inf')
else:
lag_time_seconds = lag_messages / effective_rate
return {
'lag_messages': lag_messages,
'effective_rate_per_sec': effective_rate,
'lag_time_seconds': lag_time_seconds,
'lag_time_minutes': lag_time_seconds / 60
}
# Example
lag_time = estimate_consumer_lag_time(
lag_messages=100000,
consumption_rate_per_sec=1000,
num_consumers=6
)
print(f"Lag time: {lag_time['lag_time_minutes']:.1f} minutes")
ℹ️Consumer Tuning
Key consumer metrics to monitor:
records-consumed-rate: Messages consumed per secondrecords-lag-max: Maximum lag across partitionspoll-rate: Poll calls per secondfetch-latency-avg: Average fetch latency
System-Level Tuning
OS-Level Optimization
# Linux kernel tuning for Kafka
# Increase file descriptor limit
ulimit -n 100000
# Increase socket buffer sizes
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.core.rmem_default=16777216
sysctl -w net.core.wmem_default=16777216
# Enable TCP window scaling
sysctl -w net.ipv4.tcp_window_scaling=1
# Increase connection tracking
sysctl -w net.netfilter.nf_conntrack_max=1048576
# Disable swap
sysctl -w vm.swappiness=0
# Use deadline or noop I/O scheduler for SSD
echo deadline > /sys/block/sda/queue/scheduler
JVM Tuning
# Kafka broker JVM options
KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-XX:MaxInlineLevel=15
-Djava.awt.headless=true
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
"
# Key JVM parameters:
# -Xmx/-Xms: Heap size (set equal to avoid GC)
# -XX:+UseG1GC: Use G1 garbage collector
# -XX:MaxGCPauseMillis: Target GC pause time
# -XX:InitiatingHeapOccupancyPercent: When to start concurrent GC
Disk Configuration
disk_recommendations = {
'storage_type': {
'SSD': {
'throughput': '500-3000 MB/s',
'latency': '0.1ms',
'best_for': 'Low latency, high IOPS'
},
'HDD': {
'throughput': '100-200 MB/s',
'latency': '5-10ms',
'best_for': 'Large storage, sequential writes'
},
'NVMe': {
'throughput': '3000-7000 MB/s',
'latency': '0.02ms',
'best_for': 'Maximum performance'
}
},
'raid_config': {
'RAID 0': 'Maximum throughput, no redundancy',
'RAID 1': 'Mirror, good for small logs',
'RAID 5': 'Balanced, not recommended for Kafka',
'RAID 10': 'Best for Kafka (throughput + redundancy)'
},
'file_system': {
'ext4': 'Good default',
'XFS': 'Better for large files',
'ZFS': 'Best for data integrity'
}
}
def estimate_disk_requirements(
messages_per_day: int,
avg_message_size: int,
retention_days: int,
replication_factor: int
) -> dict:
"""
Calculate disk requirements for Kafka.
"""
# Daily data
daily_data_bytes = messages_per_day * avg_message_size
# Total with replication
total_data_bytes = daily_data_bytes * retention_days * replication_factor
# Convert to GB
total_data_gb = total_data_bytes / (1024**3)
# Add 20% overhead
total_with_overhead = total_data_gb * 1.2
return {
'daily_data_gb': daily_data_bytes / (1024**3),
'total_data_gb': total_data_gb,
'total_with_overhead_gb': total_with_overhead,
'recommended_disk_gb': int(total_with_overhead * 1.5)
}
# Example
disk = estimate_disk_requirements(
messages_per_day=100000000,
avg_message_size=500,
retention_days=7,
replication_factor=3
)
print(f"Recommended disk: {disk['recommended_disk_gb']} GB")
⚠️Disk Warning
Kafka is I/O intensive. Use dedicated disks for log storage. Don't share disks with other applications. Monitor disk I/O utilization and alert when > 70%.
Performance Testing
Benchmark Framework
class KafkaBenchmark:
"""
Benchmark Kafka producer and consumer performance.
"""
def __init__(self, bootstrap_servers):
self.bootstrap_servers = bootstrap_servers
def benchmark_producer(
self,
topic: str,
num_messages: int,
message_size: int,
batch_size: int,
linger_ms: int,
compression: str
) -> dict:
"""
Benchmark producer performance.
"""
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
batch_size=batch_size,
linger_ms=linger_ms,
compression_type=compression,
acks='all'
)
# Generate test message
message = b'x' * message_size
# Warm up
for _ in range(1000):
producer.send(topic, value=message)
producer.flush()
# Benchmark
start_time = time.perf_counter()
for i in range(num_messages):
producer.send(topic, value=message)
producer.flush()
end_time = time.perf_counter()
elapsed = end_time - start_time
return {
'messages_per_sec': num_messages / elapsed,
'mb_per_sec': (num_messages * message_size) / (elapsed * 1024 * 1024),
'elapsed_seconds': elapsed,
'total_messages': num_messages
}
def benchmark_consumer(
self,
topic: str,
num_messages: int,
max_poll_records: int
) -> dict:
"""
Benchmark consumer performance.
"""
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
max_poll_records=max_poll_records,
auto_offset_reset='earliest'
)
# Wait for topic to have messages
time.sleep(2)
# Benchmark
start_time = time.perf_counter()
messages_consumed = 0
while messages_consumed < num_messages:
records = consumer.poll(timeout_ms=1000)
for tp, records in records.items():
messages_consumed += len(records)
end_time = time.perf_counter()
elapsed = end_time - start_time
consumer.close()
return {
'messages_per_sec': messages_consumed / elapsed,
'elapsed_seconds': elapsed,
'total_messages': messages_consumed
}
# Example usage
benchmark = KafkaBenchmark('localhost:9092')
# Test different configurations
configs = [
{'batch_size': 16384, 'linger_ms': 0, 'compression': None},
{'batch_size': 32768, 'linger_ms': 10, 'compression': 'snappy'},
{'batch_size': 65536, 'linger_ms': 20, 'compression': 'lz4'},
{'batch_size': 131072, 'linger_ms': 50, 'compression': 'lz4'},
]
for config in configs:
result = benchmark.benchmark_producer(
topic='benchmark-topic',
num_messages=100000,
message_size=500,
**config
)
print(f"Config: {config}")
print(f" Throughput: {result['mb_per_sec']:.1f} MB/s")
print(f" Message rate: {result['messages_per_sec']:.0f} msgs/sec")
ℹ️Benchmarking Tip
Always benchmark with YOUR specific workload. Generic benchmarks may not reflect your real-world performance. Test with realistic message sizes, patterns, and configurations.
Performance Monitoring
# Key performance metrics
performance_metrics = {
'Producer': {
'record-send-rate': 'Messages sent per second',
'throughput': 'Bytes sent per second',
'batch-size-avg': 'Average batch size',
'request-latency-avg': 'Average request latency'
},
'Consumer': {
'records-consumed-rate': 'Messages consumed per second',
'bytes-consumed-rate': 'Bytes consumed per second',
'records-lag-max': 'Maximum lag across partitions',
'fetch-latency-avg': 'Average fetch latency'
},
'Broker': {
'request-rate': 'Request rate per second',
'request-latency-avg': 'Average request latency',
'bytes-in-rate': 'Bytes received per second',
'bytes-out-rate': 'Bytes sent per second'
}
}
⚠️Key Insight
Performance tuning is iterative. Start with baseline measurements, make one change at a time, measure the impact, and keep improving. There's no one-size-fits-all configuration.