🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Kafka Performance: Throughput vs Latency, Batch Sizes

Apache KafkaPerformance⭐ Premium

Advertisement

Kafka Performance: Throughput vs Latency, Batch Sizes

Difficulty: Senior | Asked at: Netflix, Uber, LinkedIn, Stripe

ℹ️Interview Context

Performance tuning is a critical skill for senior engineers. Interviewers expect you to understand the trade-offs between throughput and latency and to recommend configurations based on specific requirements.

The Question

How do you optimize Kafka for maximum throughput vs minimum latency? What are the key configuration parameters and their trade-offs?

Throughput vs Latency Trade-off

The Fundamental Trade-off

Throughput    Latency\text{Throughput} \uparrow \implies \text{Latency} \uparrow
Latency    Throughput\text{Latency} \downarrow \implies \text{Throughput} \downarrow
# High throughput configuration
high_throughput_config = {
    'batch.size': 131072,        # 128KB batches
    'linger.ms': 50,             # Wait 50ms to fill batch
    'compression.type': 'lz4',   # Compress for efficiency
    'buffer.memory': 268435456,  # 256MB buffer
    'acks': 'all',               # Wait for all ISR
    'max.in.flight.requests.per.connection': 5
}

# Low latency configuration
low_latency_config = {
    'batch.size': 16384,         # 16KB batches (small)
    'linger.ms': 0,              # Send immediately
    'compression.type': None,    # No compression
    'buffer.memory': 67108864,   # 64MB buffer
    'acks': '1',                 # Leader only
    'max.in.flight.requests.per.connection': 1
}

# Balanced configuration
balanced_config = {
    'batch.size': 32768,         # 32KB batches
    'linger.ms': 10,             # Wait 10ms
    'compression.type': 'snappy', # Balanced compression
    'buffer.memory': 134217728,  # 128MB buffer
    'acks': 'all',
    'max.in.flight.requests.per.connection': 3
}

Throughput Formula

Throughput=batch_size×partitions×batches_per_second1024×1024 MB/s\text{Throughput} = \frac{\text{batch\_size} \times \text{partitions} \times \text{batches\_per\_second}}{1024 \times 1024} \text{ MB/s}
def calculate_throughput(
    batch_size_bytes: int,
    num_partitions: int,
    linger_ms: int,
    compression_ratio: float = 1.0
) -> float:
    """
    Calculate theoretical throughput in MB/s.
    """
    # Batches per second per partition
    batches_per_sec = 1000 / max(linger_ms, 1)
    
    # Throughput per partition
    per_partition = (batch_size_bytes * batches_per_sec * compression_ratio)
    
    # Total throughput
    total_bytes_per_sec = per_partition * num_partitions
    
    return total_bytes_per_sec / (1024 * 1024)

# Example comparisons
print("High throughput:", calculate_throughput(131072, 12, 50, 0.3), "MB/s")
print("Balanced:", calculate_throughput(32768, 12, 10, 0.5), "MB/s")
print("Low latency:", calculate_throughput(16384, 12, 0, 1.0), "MB/s")

ℹ️Key Insight

The relationship between batch size and latency is linear. Doubling batch size roughly doubles latency but also roughly doubles throughput. The sweet spot depends on your specific requirements.

Producer Tuning

Batch Size Optimization

class BatchSizeOptimizer:
    """
    Find optimal batch size for given requirements.
    """
    
    def __init__(self, target_latency_ms, target_throughput_mbps):
        self.target_latency = target_latency_ms
        self.target_throughput = target_throughput_mbps
    
    def find_optimal_batch_size(
        self,
        num_partitions: int,
        avg_message_size: int,
        compression_ratio: float
    ) -> dict:
        """
        Find batch size that meets latency and throughput targets.
        """
        # Calculate maximum batch size for latency target
        # Assume linger_ms = batch_size / (message_rate * avg_message_size)
        max_batch_size_for_latency = (
            self.target_latency * 
            1000 *  # Convert to ms
            avg_message_size * 
            compression_ratio
        )
        
        # Calculate minimum batch size for throughput target
        min_batch_size_for_throughput = (
            self.target_throughput * 1024 * 1024 /
            (num_partitions * (1000 / self.target_latency))
        )
        
        # Optimal batch size
        if min_batch_size_for_throughput <= max_batch_size_for_latency:
            optimal_batch_size = int(
                (min_batch_size_for_throughput + max_batch_size_for_latency) / 2
            )
            feasible = True
        else:
            optimal_batch_size = int(max_batch_size_for_latency)
            feasible = False
        
        return {
            'optimal_batch_size': optimal_batch_size,
            'max_for_latency': int(max_batch_size_for_latency),
            'min_for_throughput': int(min_batch_size_for_throughput),
            'feasible': feasible,
            'recommended_linger_ms': int(
                optimal_batch_size / (avg_message_size * 100)
            )
        }

# Example
optimizer = BatchSizeOptimizer(
    target_latency_ms=10,
    target_throughput_mbps=100
)

result = optimizer.find_optimal_batch_size(
    num_partitions=12,
    avg_message_size=500,
    compression_ratio=0.5
)
print(f"Optimal batch size: {result['optimal_batch_size']} bytes")
print(f"Feasible: {result['feasible']}")

Linger MS Optimization

def optimize_linger_ms(
    batch_size_bytes: int,
    message_rate_per_sec: int,
    avg_message_size: int,
    target_latency_ms: int
) -> int:
    """
    Optimize linger_ms for batch size and latency target.
    """
    # Messages per batch
    messages_per_batch = batch_size_bytes / avg_message_size
    
    # Time to fill batch
    time_to_fill_ms = (messages_per_batch / message_rate_per_sec) * 1000
    
    # Use smaller of time-to-fill and target latency
    optimal_linger = min(time_to_fill_ms, target_latency_ms)
    
    return max(0, int(optimal_linger))

# Example
linger = optimize_linger_ms(
    batch_size_bytes=32768,
    message_rate_per_sec=10000,
    avg_message_size=500,
    target_latency_ms=10
)
print(f"Optimal linger_ms: {linger}")

Compression Optimization

def select_compression(
    target_throughput_mbps: float,
    cpu_budget_percent: float,
    avg_message_size: int
) -> dict:
    """
    Select compression algorithm based on requirements.
    """
    algorithms = {
        'none': {
            'ratio': 1.0,
            'cpu_cost': 0,
            'latency_addition_ms': 0
        },
        'snappy': {
            'ratio': 0.6,
            'cpu_cost': 0.05,
            'latency_addition_ms': 0.1
        },
        'lz4': {
            'ratio': 0.5,
            'cpu_cost': 0.03,
            'latency_addition_ms': 0.05
        },
        'zstd': {
            'ratio': 0.3,
            'cpu_cost': 0.15,
            'latency_addition_ms': 0.5
        },
        'gzip': {
            'ratio': 0.25,
            'cpu_cost': 0.3,
            'latency_addition_ms': 1.0
        }
    }
    
    recommendations = {}
    for name, algo in algorithms.items():
        effective_throughput = target_throughput_mbps * algo['ratio']
        cpu_impact = algo['cpu_cost'] * 100
        
        if cpu_impact <= cpu_budget_percent:
            recommendations[name] = {
                'effective_throughput': effective_throughput,
                'cpu_impact_percent': cpu_impact,
                'latency_impact_ms': algo['latency_addition_ms']
            }
    
    # Select best option
    best = max(recommendations.items(), key=lambda x: x[1]['effective_throughput'])
    
    return {
        'selected': best[0],
        'details': best[1],
        'all_options': recommendations
    }

⚠️Compression Trade-off

Compression reduces network and disk usage but adds CPU overhead. For CPU-bound systems, consider no compression or lz4 (lowest CPU cost). For network-bound systems, use zstd (best ratio).

Consumer Tuning

Consumer Performance Optimization

# Consumer performance parameters
consumer_config = {
    # Poll configuration
    'max.poll.records': 500,           # Records per poll
    'max.poll.interval.ms': 300000,    # Max processing time
    
    # Fetch configuration
    'fetch.min.bytes': 1,              # Minimum fetch size
    'fetch.max.wait.ms': 500,          # Max wait for fetch
    'max.partition.fetch.bytes': 1048576,  # 1MB per partition
    
    # Buffer configuration
    'receive.buffer.bytes': 131072,    # 128KB socket buffer
    'fetch.backoff.ms': 1000           # Backoff on error
}

def optimize_consumer_for_throughput(
    num_partitions: int,
    processing_time_per_record_ms: float,
    target_throughput_per_sec: int
) -> dict:
    """
    Optimize consumer configuration for throughput.
    """
    # Calculate max.poll.records
    # records_per_sec = max.poll.records / (processing_time_per_record_ms * max.poll.records / 1000)
    # Simplified: records_per_sec = 1000 / processing_time_per_record_ms
    
    records_per_sec_per_consumer = 1000 / processing_time_per_record_ms
    
    # Consumers needed
    consumers_needed = max(1, int(target_throughput_per_sec / records_per_sec_per_consumer))
    
    # Partitions per consumer
    partitions_per_consumer = num_partitions / consumers_needed
    
    # Optimal max.poll.records
    optimal_poll_records = int(target_throughput_per_sec * processing_time_per_record_ms / 1000)
    
    return {
        'consumers_needed': consumers_needed,
        'partitions_per_consumer': partitions_per_consumer,
        'optimal_max_poll_records': optimal_poll_records,
        'theoretical_throughput': records_per_sec_per_consumer * consumers_needed
    }

# Example
result = optimize_consumer_for_throughput(
    num_partitions=12,
    processing_time_per_record_ms=1.0,
    target_throughput_per_sec=50000
)
print(f"Consumers needed: {result['consumers_needed']}")
print(f"Optimal max.poll.records: {result['optimal_max_poll_records']}")

Consumer Lag Formula

Lag=Latest OffsetCommitted Offset\text{Lag} = \text{Latest Offset} - \text{Committed Offset}
Lag Time=Lag MessagesConsumption Rate\text{Lag Time} = \frac{\text{Lag Messages}}{\text{Consumption Rate}}
def estimate_consumer_lag_time(
    lag_messages: int,
    consumption_rate_per_sec: int,
    num_consumers: int
) -> dict:
    """
    Estimate time to consume lag.
    """
    effective_rate = consumption_rate_per_sec * num_consumers
    
    if effective_rate == 0:
        lag_time_seconds = float('inf')
    else:
        lag_time_seconds = lag_messages / effective_rate
    
    return {
        'lag_messages': lag_messages,
        'effective_rate_per_sec': effective_rate,
        'lag_time_seconds': lag_time_seconds,
        'lag_time_minutes': lag_time_seconds / 60
    }

# Example
lag_time = estimate_consumer_lag_time(
    lag_messages=100000,
    consumption_rate_per_sec=1000,
    num_consumers=6
)
print(f"Lag time: {lag_time['lag_time_minutes']:.1f} minutes")

ℹ️Consumer Tuning

Key consumer metrics to monitor:

  • records-consumed-rate: Messages consumed per second
  • records-lag-max: Maximum lag across partitions
  • poll-rate: Poll calls per second
  • fetch-latency-avg: Average fetch latency

System-Level Tuning

OS-Level Optimization

# Linux kernel tuning for Kafka

# Increase file descriptor limit
ulimit -n 100000

# Increase socket buffer sizes
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.core.rmem_default=16777216
sysctl -w net.core.wmem_default=16777216

# Enable TCP window scaling
sysctl -w net.ipv4.tcp_window_scaling=1

# Increase connection tracking
sysctl -w net.netfilter.nf_conntrack_max=1048576

# Disable swap
sysctl -w vm.swappiness=0

# Use deadline or noop I/O scheduler for SSD
echo deadline > /sys/block/sda/queue/scheduler

JVM Tuning

# Kafka broker JVM options
KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
KAFKA_JVM_PERFORMANCE_OPTS="
  -server
  -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20
  -XX:InitiatingHeapOccupancyPercent=35
  -XX:+ExplicitGCInvokesConcurrent
  -XX:MaxInlineLevel=15
  -Djava.awt.headless=true
  -Dcom.sun.management.jmxremote=true
  -Dcom.sun.management.jmxremote.authenticate=false
  -Dcom.sun.management.jmxremote.ssl=false
"

# Key JVM parameters:
# -Xmx/-Xms: Heap size (set equal to avoid GC)
# -XX:+UseG1GC: Use G1 garbage collector
# -XX:MaxGCPauseMillis: Target GC pause time
# -XX:InitiatingHeapOccupancyPercent: When to start concurrent GC

Disk Configuration

disk_recommendations = {
    'storage_type': {
        'SSD': {
            'throughput': '500-3000 MB/s',
            'latency': '0.1ms',
            'best_for': 'Low latency, high IOPS'
        },
        'HDD': {
            'throughput': '100-200 MB/s',
            'latency': '5-10ms',
            'best_for': 'Large storage, sequential writes'
        },
        'NVMe': {
            'throughput': '3000-7000 MB/s',
            'latency': '0.02ms',
            'best_for': 'Maximum performance'
        }
    },
    'raid_config': {
        'RAID 0': 'Maximum throughput, no redundancy',
        'RAID 1': 'Mirror, good for small logs',
        'RAID 5': 'Balanced, not recommended for Kafka',
        'RAID 10': 'Best for Kafka (throughput + redundancy)'
    },
    'file_system': {
        'ext4': 'Good default',
        'XFS': 'Better for large files',
        'ZFS': 'Best for data integrity'
    }
}

def estimate_disk_requirements(
    messages_per_day: int,
    avg_message_size: int,
    retention_days: int,
    replication_factor: int
) -> dict:
    """
    Calculate disk requirements for Kafka.
    """
    # Daily data
    daily_data_bytes = messages_per_day * avg_message_size
    
    # Total with replication
    total_data_bytes = daily_data_bytes * retention_days * replication_factor
    
    # Convert to GB
    total_data_gb = total_data_bytes / (1024**3)
    
    # Add 20% overhead
    total_with_overhead = total_data_gb * 1.2
    
    return {
        'daily_data_gb': daily_data_bytes / (1024**3),
        'total_data_gb': total_data_gb,
        'total_with_overhead_gb': total_with_overhead,
        'recommended_disk_gb': int(total_with_overhead * 1.5)
    }

# Example
disk = estimate_disk_requirements(
    messages_per_day=100000000,
    avg_message_size=500,
    retention_days=7,
    replication_factor=3
)
print(f"Recommended disk: {disk['recommended_disk_gb']} GB")

⚠️Disk Warning

Kafka is I/O intensive. Use dedicated disks for log storage. Don't share disks with other applications. Monitor disk I/O utilization and alert when > 70%.

Performance Testing

Benchmark Framework

class KafkaBenchmark:
    """
    Benchmark Kafka producer and consumer performance.
    """
    
    def __init__(self, bootstrap_servers):
        self.bootstrap_servers = bootstrap_servers
    
    def benchmark_producer(
        self,
        topic: str,
        num_messages: int,
        message_size: int,
        batch_size: int,
        linger_ms: int,
        compression: str
    ) -> dict:
        """
        Benchmark producer performance.
        """
        from kafka import KafkaProducer
        import time
        
        producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            batch_size=batch_size,
            linger_ms=linger_ms,
            compression_type=compression,
            acks='all'
        )
        
        # Generate test message
        message = b'x' * message_size
        
        # Warm up
        for _ in range(1000):
            producer.send(topic, value=message)
        producer.flush()
        
        # Benchmark
        start_time = time.perf_counter()
        
        for i in range(num_messages):
            producer.send(topic, value=message)
        
        producer.flush()
        end_time = time.perf_counter()
        
        elapsed = end_time - start_time
        
        return {
            'messages_per_sec': num_messages / elapsed,
            'mb_per_sec': (num_messages * message_size) / (elapsed * 1024 * 1024),
            'elapsed_seconds': elapsed,
            'total_messages': num_messages
        }
    
    def benchmark_consumer(
        self,
        topic: str,
        num_messages: int,
        max_poll_records: int
    ) -> dict:
        """
        Benchmark consumer performance.
        """
        from kafka import KafkaConsumer
        import time
        
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            max_poll_records=max_poll_records,
            auto_offset_reset='earliest'
        )
        
        # Wait for topic to have messages
        time.sleep(2)
        
        # Benchmark
        start_time = time.perf_counter()
        messages_consumed = 0
        
        while messages_consumed < num_messages:
            records = consumer.poll(timeout_ms=1000)
            for tp, records in records.items():
                messages_consumed += len(records)
        
        end_time = time.perf_counter()
        elapsed = end_time - start_time
        
        consumer.close()
        
        return {
            'messages_per_sec': messages_consumed / elapsed,
            'elapsed_seconds': elapsed,
            'total_messages': messages_consumed
        }

# Example usage
benchmark = KafkaBenchmark('localhost:9092')

# Test different configurations
configs = [
    {'batch_size': 16384, 'linger_ms': 0, 'compression': None},
    {'batch_size': 32768, 'linger_ms': 10, 'compression': 'snappy'},
    {'batch_size': 65536, 'linger_ms': 20, 'compression': 'lz4'},
    {'batch_size': 131072, 'linger_ms': 50, 'compression': 'lz4'},
]

for config in configs:
    result = benchmark.benchmark_producer(
        topic='benchmark-topic',
        num_messages=100000,
        message_size=500,
        **config
    )
    print(f"Config: {config}")
    print(f"  Throughput: {result['mb_per_sec']:.1f} MB/s")
    print(f"  Message rate: {result['messages_per_sec']:.0f} msgs/sec")

ℹ️Benchmarking Tip

Always benchmark with YOUR specific workload. Generic benchmarks may not reflect your real-world performance. Test with realistic message sizes, patterns, and configurations.

Performance Monitoring

# Key performance metrics
performance_metrics = {
    'Producer': {
        'record-send-rate': 'Messages sent per second',
        'throughput': 'Bytes sent per second',
        'batch-size-avg': 'Average batch size',
        'request-latency-avg': 'Average request latency'
    },
    'Consumer': {
        'records-consumed-rate': 'Messages consumed per second',
        'bytes-consumed-rate': 'Bytes consumed per second',
        'records-lag-max': 'Maximum lag across partitions',
        'fetch-latency-avg': 'Average fetch latency'
    },
    'Broker': {
        'request-rate': 'Request rate per second',
        'request-latency-avg': 'Average request latency',
        'bytes-in-rate': 'Bytes received per second',
        'bytes-out-rate': 'Bytes sent per second'
    }
}

⚠️Key Insight

Performance tuning is iterative. Start with baseline measurements, make one change at a time, measure the impact, and keep improving. There's no one-size-fits-all configuration.

Advertisement