πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Performance Tuning

Apache KafkaPerformance⭐ Premium

Advertisement

Kafka Performance Tuning

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Performance tuning is critical for achieving high throughput and low latency in Kafka. Understanding the trade-offs between these two goals is essential.

Performance Dimensions

Architecture Diagram
Performance Dimensions:
β”œβ”€β”€ Throughput (messages/sec or bytes/sec)
β”œβ”€β”€ Latency (time from produce to consume)
β”œβ”€β”€ Durability (data safety guarantees)
└── Availability (uptime requirements)

Trade-offs:
β”œβ”€β”€ Higher throughput ↔ Higher latency
β”œβ”€β”€ Stronger durability ↔ Lower throughput
└── Higher availability ↔ More resources

Producer Tuning

Batching Configuration

# Producer batching settings
batch.size=16384              # 16KB default batch size
linger.ms=5                   # Wait 5ms to fill batch
buffer.memory=33554432        # 32MB buffer
max.in.flight.requests.per.connection=5

# Compression
compression.type=lz4          # or snappy, gzip, zstd

# Batch size calculation:
# Throughput = batch.size * (1000 / linger.ms) * num_partitions
# Example: 16384 * (1000/5) * 10 = 32,768,000 bytes/sec β‰ˆ 31 MB/s

Java Producer Configuration

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Throughput optimization
props.put("batch.size", 65536);           # 64KB batches
props.put("linger.ms", 10);               # Wait 10ms to fill batch
props.put("buffer.memory", 67108864);     # 64MB buffer
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);

// Reliability (can trade for throughput)
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);

Producer<String, String> producer = new KafkaProducer<>(props);

// Benchmark producer
long startTime = System.currentTimeMillis();
int messageCount = 1000000;

for (int i = 0; i < messageCount; i++) {
    producer.send(new ProducerRecord<>("benchmark", "key-" + i, "value-" + i));
}

producer.flush();
long endTime = System.currentTimeMillis();

double throughput = (double) messageCount / ((endTime - startTime) / 1000.0);
System.out.printf("Throughput: %.2f messages/sec%n", throughput);

Python Producer Optimization

from kafka import KafkaProducer
import time

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    batch_size=65536,           # 64KB batches
    linger_ms=10,               # Wait 10ms
    buffer_memory=67108864,     # 64MB buffer
    compression_type='lz4',
    acks='all',
    retries=2147483647,
    max_in_flight_requests_per_connection=5
)

# Benchmark
start_time = time.time()
message_count = 1000000

for i in range(message_count):
    producer.send(
        topic='benchmark',
        key=f'key-{i}'.encode('utf-8'),
        value=f'value-{i}'.encode('utf-8')
    )

producer.flush()
end_time = time.time()

throughput = message_count / (end_time - start_time)
print(f"Throughput: {throughput:.2f} messages/sec")

Consumer Tuning

Fetch Configuration

# Consumer fetch settings
fetch.min.bytes=1                    # Wait for at least 1 byte
fetch.max.wait.ms=500                # Max wait time
max.partition.fetch.bytes=1048576    # 1MB per partition
fetch.max.bytes=52428800             # 50MB total fetch

# Session settings
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
max.poll.records=500                 # Records per poll

Java Consumer Optimization

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "performance-consumer");
props.put("enable.auto.commit", false);

// Fetch optimization
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.partition.fetch.bytes", 1048576);  // 1MB
props.put("max.poll.records", 500);

// Session optimization
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("benchmark"));

// Benchmark consumer
long startTime = System.currentTimeMillis();
long totalMessages = 0;

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    if (records.count() == 0) continue;
    
    totalMessages += records.count();
    
    // Process records
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    
    // Commit offsets
    consumer.commitSync();
    
    // Check if benchmark complete
    if (totalMessages >= 1000000) break;
}

long endTime = System.currentTimeMillis();
double throughput = (double) totalMessages / ((endTime - startTime) / 1000.0);
System.out.printf("Consumer throughput: %.2f messages/sec%n", throughput);

Broker Tuning

Server Configuration

# Broker performance settings
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log configuration
log.retention.hours=168
log.segment.bytes=1073741824        # 1GB segments
log.retention.check.interval.ms=300000
log.cleaner.enable=true

# Replication
replica.fetch.wait.max.ms=500
replica.fetch.min.bytes=1
replica.fetch.max.bytes=10485760
num.replica.fetchers=4

JVM Tuning

# Kafka JVM settings
export KAFKA_HEAP_OPTS="-Xmx4g -Xms4g"
export KAFKA_JVM_PERFORMANCE_OPTS="
  -server
  -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20
  -XX:InitiatingHeapOccupancyPercent=35
  -XX:+ExplicitGCInvokesConcurrent
  -Djava.awt.headless=true
"

Performance Benchmarks

Throughput Test

import time
from kafka import KafkaProducer, KafkaConsumer
from concurrent.futures import ThreadPoolExecutor

class KafkaBenchmark:
    def __init__(self, bootstrap_servers):
        self.bootstrap_servers = bootstrap_servers
    
    def producer_throughput(self, topic, message_count, batch_size=16384):
        producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            batch_size=batch_size,
            linger_ms=5,
            compression_type='lz4'
        )
        
        start = time.time()
        
        for i in range(message_count):
            producer.send(topic, value=f'message-{i}'.encode('utf-8'))
        
        producer.flush()
        end = time.time()
        
        throughput = message_count / (end - start)
        print(f"Producer throughput: {throughput:.2f} msg/sec")
        return throughput
    
    def consumer_throughput(self, topic, consumer_group, duration=60):
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id=consumer_group,
            auto_offset_reset='earliest'
        )
        
        start = time.time()
        total_messages = 0
        
        while time.time() - start < duration:
            records = consumer.poll(timeout_ms=1000)
            total_messages += sum(len(records[tp]) for tp in records)
        
        consumer.close()
        
        throughput = total_messages / duration
        print(f"Consumer throughput: {throughput:.2f} msg/sec")
        return throughput

# Run benchmark
benchmark = KafkaBenchmark(['kafka1:9092'])
benchmark.producer_throughput('benchmark', 1000000)
benchmark.consumer_throughput('benchmark', 'benchmark-group')

Performance Monitoring

from prometheus_client import Gauge, Histogram, Counter

# Performance metrics
producer_throughput = Gauge(
    'kafka_producer_throughput',
    'Producer throughput in messages/sec'
)

consumer_throughput = Gauge(
    'kafka_consumer_throughput',
    'Consumer throughput in messages/sec'
)

message_latency = Histogram(
    'kafka_message_latency_seconds',
    'Message latency from produce to consume',
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)

batch_size = Gauge(
    'kafka_batch_size',
    'Average batch size in bytes'
)

def measure_latency(producer, topic, key, value, consumer):
    """Measure end-to-end latency"""
    start = time.time()
    
    # Produce
    producer.send(topic, key=key, value=value)
    producer.flush()
    
    # Consume
    while True:
        records = consumer.poll(timeout_ms=100)
        for tp, messages in records.items():
            for message in messages:
                if message.key == key:
                    latency = time.time() - start
                    message_latency.observe(latency)
                    return latency

Performance Optimization Checklist

Architecture Diagram
Producer Optimization:
☐ Increase batch.size (16KB β†’ 64KB or higher)
☐ Set linger.ms (5-10ms for throughput)
☐ Enable compression (lz4 for speed, zstd for ratio)
☐ Increase buffer.memory (32MB β†’ 64MB)
☐ Use idempotent producers (enable.idempotence=true)

Consumer Optimization:
☐ Increase max.poll.records (100 β†’ 500)
☐ Set fetch.min.bytes (1 for low latency, higher for throughput)
☐ Optimize max.partition.fetch.bytes (1MB default)
☐ Use cooperative rebalancing (CooperativeStickyAssignor)

Broker Optimization:
☐ Increase num.network.threads (4-8)
☐ Increase num.io.threads (8-16)
☐ Optimize socket buffers (128KB+)
☐ Use G1GC with appropriate heap size
☐ Enable log compaction for appropriate topics

ℹ️

Key Insight: Always benchmark with realistic data and workloads. Theoretical maximums may not match real-world performance due to network, disk, and application overhead.

Follow-Up Questions

  1. What is the trade-off between batch.size and linger.ms?
  2. How does compression affect CPU usage and throughput?
  3. Explain how fetch.min.bytes impacts consumer latency.
  4. What JVM settings would you use for a Kafka broker with 64GB RAM?
  5. How would you diagnose and fix a sudden drop in producer throughput?

Advertisement