Producer Tuning: Batching, Compression, Acks, Idempotency
Difficulty: Senior | Asked at: Netflix, Stripe, Airbnb, Uber
βΉοΈInterview Context
Producer tuning is a critical skill for senior engineers. Interviewers expect you to explain trade-offs between throughput, latency, and durability, and to recommend configurations based on specific use cases.
The Question
How do you optimize a Kafka producer for maximum throughput while maintaining durability guarantees? Explain the interaction between batching, compression, acks, and idempotency.
Producer Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Producer β
β β
β βββββββββββ ββββββββββββ ββββββββββββ β
β β send() β β β Buffer β β β Batch β β
β β β β (RecordAccumulator) β β Collector β β
β βββββββββββ ββββββββββββ ββββββββββββ β
β β β β
β βββββββββββββββ ββββββββββββ β
β β Memory Pool β β Sender β β
β β (32MB max) β β Thread β β
β βββββββββββββββ ββββββββββββ β
β β β
β ββββββββββββ β
β β Network β β
β β I/O β β
β ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Batching Strategy
The RecordAccumulator
Messages accumulate in a per-partition buffer before being sent in batches:
from kafka import KafkaProducer
import time
# Optimal batching configuration
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
# Batching parameters
batch_size=32768, # 32KB batch size (default: 16KB)
linger_ms=10, # Wait 10ms to fill batch
buffer_memory=67108864, # 64MB total buffer
# Thread management
max_block_ms=5000, # Max time to block on buffer full
send_buffer_bytes=131072, # 128KB socket send buffer
)
Batch Formation Formula
The effective batch size depends on message arrival rate:
# Calculate optimal linger_ms
def calculate_optimal_linger(
target_batch_bytes: int,
message_rate_per_ms: float,
avg_message_bytes: int
) -> int:
"""
Calculate optimal linger_ms to achieve target batch size.
Args:
target_batch_bytes: Desired batch size in bytes
message_rate_per_ms: Messages arriving per millisecond
avg_message_bytes: Average size of each message
Returns:
Optimal linger_ms value
"""
messages_per_batch = target_batch_bytes / avg_message_bytes
linger_needed = messages_per_batch / message_rate_per_ms
# Add 20% buffer for variance
return max(1, int(linger_needed * 1.2))
# Example: 10,000 msgs/sec, 500 bytes avg, target 32KB batch
optimal_linger = calculate_optimal_linger(
target_batch_bytes=32768,
message_rate_per_ms=10, # 10000/1000
avg_message_bytes=500
)
print(f"Optimal linger_ms: {optimal_linger}") # ~8ms
β οΈLatency vs Throughput Trade-off
Higher linger_ms increases throughput by allowing larger batches but adds latency. For sub-10ms latency requirements, use linger_ms=0 or linger_ms=1. For maximum throughput, use linger_ms=50 or higher.
Compression Algorithms
Compression Comparison
| Algorithm | Compression Ratio | CPU Usage | Latency | Best For |
|---|---|---|---|---|
| none | 1.0x | None | Lowest | Small messages, low latency |
| gzip | 3-5x | High | High | Storage-constrained |
| snappy | 2-3x | Low | Low | Balanced performance |
| lz4 | 2-3x | Very Low | Very Low | Real-time applications |
| zstd | 3-6x | Medium | Medium | Best compression ratio |
Compression Implementation
from kafka import KafkaProducer
import snappy
import lz4.frame
import zstandard as zstd
import gzip
import time
class CompressionBenchmark:
def __init__(self):
self.algorithms = {
'none': lambda x: x,
'snappy': snappy.compress,
'lz4': lz4.frame.compress,
'gzip': gzip.compress,
'zstd': zstd.ZstdCompressor().compress
}
def benchmark(self, data: bytes, iterations: int = 1000):
results = {}
for name, compress_fn in self.algorithms.items():
start = time.perf_counter()
for _ in range(iterations):
compressed = compress_fn(data)
ratio = len(data) / len(compressed)
elapsed = time.perf_counter() - start
results[name] = {
'compression_ratio': ratio,
'throughput_mbps': (len(data) * iterations) / (elapsed * 1024 * 1024),
'compressed_size': len(compressed)
}
return results
# Producer with compression
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
compression_type='snappy', # or 'lz4', 'zstd', 'gzip'
batch_size=65536, # 64KB for better compression
linger_ms=20,
buffer_memory=134217728 # 128MB buffer
)
# Compression happens at batch level
# Larger batches compress better
for i in range(10000):
data = generate_event() # Your data generation
producer.send('events', value=data)
producer.flush()
Compression Ratio Formula
βΉοΈKafka Compression Details
Compression happens at the batch level, not per-message. The broker stores compressed batches and serves them compressed to consumers. Decompression happens at the consumer. This means compression saves network bandwidth AND disk space.
Acknowledgment Levels
Acks Configuration
# Three acknowledgment modes
configs = {
'acks=0': {
'description': 'No acknowledgment',
'latency': 'Lowest',
'durability': 'None',
'use_case': 'Metrics, logs where loss is acceptable'
},
'acks=1': {
'description': 'Leader acknowledges',
'latency': 'Low',
'durability': 'Leader-only',
'use_case': 'When leader failure is rare'
},
'acks=all': {
'description': 'All ISR acknowledge',
'latency': 'Higher',
'durability': 'Strong',
'use_case': 'Financial, critical data'
}
}
# Production configuration
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks='all',
min_in_sync_replicas=2, # Set on broker side
retries=2147483647, # Infinite retries
max_in_flight_requests_per_connection=5
)
Acks and ISR Interaction
acks=all, min.insync.replicas=2, ISR=[B1, B2, B3]
Producer sends message M:
B1 (leader): receives M, writes to disk β
B2 (follower): fetches M, writes to disk β
B3 (follower): fetches M, writes to disk β
Broker checks: ISR count (3) >= min.insync.replicas (2) β
Leader sends ACK to producer
If B3 fails during replication:
ISR shrinks to [B1, B2]
ISR count (2) >= min.insync.replicas (2) β
Writes continue, ACK sent
β οΈCritical Configuration
min.insync.replicas is set on the broker side in server.properties. If acks=all with min.insync.replicas=2 and only 1 replica is in sync, the producer will get NotEnoughReplicasException. This protects against data loss but can cause availability issues.
Idempotent Producer
How Idempotency Works
The idempotent producer uses sequence numbers to detect and prevent duplicates:
from kafka import KafkaProducer
from kafka.errors import DuplicateSequenceError
# Idempotent producer configuration
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True, # Enables exactly-once
acks='all', # Required for idempotence
retries=2147483647, # Required for idempotence
max_in_flight_requests_per_connection=5, # Safe with idempotence
transactional_id='my-app-1' # For transactions (optional)
)
# Each message gets a sequence number
# Broker deduplicates based on ProducerID + Partition + SeqNum
for i in range(100):
try:
future = producer.send('events', value=f'message-{i}'.encode())
metadata = future.get(timeout=10)
print(f"Sent to {metadata.partition}:{metadata.offset}")
except DuplicateSequenceError:
print("Duplicate detected, message already committed")
Sequence Number Protocol
Producer (PID=123):
SeqNum=0: send("msg-1") β Broker stores {PID:123, Seq:0, Data:"msg-1"}
SeqNum=1: send("msg-2") β Broker stores {PID:123, Seq:1, Data:"msg-2"}
Network failure, producer retries:
SeqNum=1: send("msg-2") β Broker detects duplicate (PID:123, Seq:1 exists)
β Broker returns success without re-appending
Consumer sees: msg-1, msg-2 (exactly once)
Idempotence Constraints
# Idempotence has limitations:
# 1. Per partition ordering preserved
# 2. In-order across partitions NOT guaranteed
# 3. Max 5 in-flight requests (with idempotence)
# 4. Works with transactions for cross-partition exactly-once
class IdempotentProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True,
acks='all',
retries=2147483647,
max_in_flight_requests_per_connection=5,
delivery_timeout_ms=120000,
request_timeout_ms=30000,
linger_ms=5,
batch_size=32768,
compression_type='lz4'
)
def send_with_retry(self, topic, key, value, headers=None):
"""Send with proper retry logic"""
attempts = 0
max_attempts = 10
while attempts < max_attempts:
try:
future = self.producer.send(
topic=topic,
key=key,
value=value,
headers=headers
)
# Block for acknowledgment
metadata = future.get(timeout=30)
return metadata
except DuplicateSequenceError:
# Message was already committed
return None
except Exception as e:
attempts += 1
if attempts >= max_attempts:
raise
# Exponential backoff
time.sleep(min(2 ** attempts * 0.1, 10))
βΉοΈIdempotence vs Transactions
Idempotent producer prevents duplicates within a single producer session. Transactions extend this across multiple partitions and topics. For true exactly-once across producers, use transactions with transactional_id.
Complete Producer Configuration
High Throughput Configuration
# Maximum throughput producer
producer_throughput = KafkaProducer(
bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092',
# Batching - maximize batch size
batch_size=131072, # 128KB batches
linger_ms=50, # Wait up to 50ms
buffer_memory=268435456, # 256MB buffer
# Compression - balance CPU and ratio
compression_type='lz4',
# Durability
acks='all',
min_in_sync_replicas=2,
# Reliability
retries=2147483647,
delivery_timeout_ms=180000,
request_timeout_ms=30000,
# Idempotence
enable_idempotence=True,
max_in_flight_requests_per_connection=5,
# Network
send_buffer_bytes=262144, # 256KB socket buffer
receive_buffer_bytes=131072, # 128KB socket buffer
# Connection
connections_max_idle_ms=600000, # 10 minutes
metadata_max_age_ms=300000 # 5 minutes
)
Low Latency Configuration
# Low latency producer (< 10ms target)
producer_latency = KafkaProducer(
bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092',
# Batching - minimal
batch_size=16384, # 16KB (small batches)
linger_ms=0, # Send immediately
buffer_memory=67108864, # 64MB
# No compression (CPU overhead)
compression_type=None,
# Durability
acks='1', # Leader only (faster)
# Reliability
retries=3,
delivery_timeout_ms=30000,
request_timeout_ms=10000,
# Idempotence still possible with acks=1? No!
# enable_idempotence=False, # Cannot use with acks=1
# Network - tuned for low latency
send_buffer_bytes=65536, # 64KB
receive_buffer_bytes=65536,
# Connections
connections_max_idle_ms=600000,
metadata_max_age_ms=60000 # Refresh more often
)
Throughput Calculation
def estimate_throughput(
batch_size_bytes: int,
num_partitions: int,
linger_ms: int,
network_bandwidth_mbps: float
) -> float:
"""
Estimate producer throughput in MB/s.
"""
# Batches per second per partition
batches_per_sec = 1000 / max(linger_ms, 1)
# Theoretical throughput per partition
per_partition = batch_size_bytes * batches_per_sec
# Total across partitions
total_bytes_per_sec = per_partition * num_partitions
# Convert to MB/s
total_mb_per_sec = total_bytes_per_sec / (1024 * 1024)
# Cap at network bandwidth
return min(total_mb_per_sec, network_bandwidth_mbps)
# Example calculation
throughput = estimate_throughput(
batch_size_bytes=131072, # 128KB
num_partitions=12,
linger_ms=10,
network_bandwidth_mbps=1000 # 1 Gbps
)
print(f"Estimated throughput: {throughput:.1f} MB/s")
# ~150 MB/s per partition, ~1800 MB/s total (limited by network)
β οΈCommon Pitfall
Setting linger_ms=0 with high message rates causes many tiny batches, degrading throughput by 10-50x. Always tune linger_ms based on your latency requirements and message volume.
Producer Metrics to Monitor
# Critical producer metrics
producer_metrics = {
# Throughput
"record-send-rate": "Messages sent per second",
"record-send-total": "Total messages sent",
"throughput": "Bytes sent per second",
# Batching efficiency
"batch-size-avg": "Average batch size",
"batch-size-max": "Maximum batch size",
"batch-size-p99": "99th percentile batch size",
# Latency
"request-latency-avg": "Average request latency",
"request-latency-max": "Maximum request latency",
"record-queue-time-avg": "Average time in queue",
"record-queue-time-max": "Maximum time in queue",
# Errors
"record-error-rate": "Error rate",
"record-error-total": "Total errors",
"retry-rate": "Retry rate",
# Buffer
"buffer-available-bytes": "Available buffer space",
"buffer-total-bytes": "Total buffer size",
"buffer-used-bytes": "Used buffer space",
# Network
"network-io-rate": "Network I/O rate",
"request-rate": "Request rate",
"response-rate": "Response rate"
}
Common Producer Configurations by Use Case
| Use Case | batch_size | linger_ms | acks | compression | idempotent |
|---|---|---|---|---|---|
| Real-time analytics | 16KB | 0-1 | 1 | none | No |
| Log aggregation | 128KB | 50-100 | all | lz4 | Yes |
| Event sourcing | 32KB | 5-10 | all | snappy | Yes |
| Metrics collection | 64KB | 20-50 | 0 | lz4 | No |
| Financial transactions | 32KB | 5 | all | none | Yes |
βΉοΈProduction Tip
Always monitor record-error-rate and retry-rate. A sudden spike indicates broker issues, network problems, or configuration errors. Use delivery.timeout-ms to prevent infinite retries for poison messages.
Advanced: Custom Partitioner
from kafka.partitioner import Partitioner
import hashlib
class GeoPartitioner(Partner):
"""
Route messages based on geographic region.
Ensures same region always goes to same partition.
"""
def partition(self, key_bytes, partitions, metadata=None):
if key_bytes is None:
return hash(time.time()) % len(partitions)
# Extract region from key
key = key_bytes.decode('utf-8')
region = key.split(':')[0] # e.g., "us-east:user123"
# Consistent hashing for region
region_hash = int(hashlib.md5(region.encode()).hexdigest(), 16)
return region_hash % len(partitions)
def on_assignment(self, partitions, topic):
"""Called when partitions are assigned to this producer"""
pass
# Use custom partitioner
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=GeoPartitioner,
acks='all',
enable_idempotence=True
)
β οΈKey Insight
The idempotent producer with enable_idempotence=True automatically sets acks=all, retries=INT_MAX, and max.in.flight.requests.per.connection=5. You don't need to set these separately, but setting them explicitly won't cause errors.