Apache Kafka Architecture: A Deep Dive into Distributed Event Streaming
Architecture Diagram: Partition Distribution & Replication
Architecture Diagram: Consumer Group Coordination
Architecture Diagram: Log Segment Internals
Formal Definitions
DfPartition
A partition is an ordered, immutable sequence of records that is continually appended to. Each record in a partition is assigned a sequential identifier called an offset. Partitions are the unit of parallelism in Kafka β the number of partitions determines the maximum consumer parallelism within a consumer group. Formally, a topic T is decomposed as T = {P_0, P_1, ..., P_{n-1}} where n is the partition count, and each P_i is a totally ordered log.
DfReplica
A replica is a fault-tolerant copy of a partition stored on a different broker. Each partition with replication factor R maintains R replicas. One replica is elected as the leader (handles all reads/writes), while the remaining R-1 are followers that replicate from the leader. Replicas ensure durability: if a broker fails, another replica can be promoted to leader with zero data loss.
DfIn-Sync Replicas (ISR)
The In-Sync Replica set (ISR) is the subset of replicas that are fully caught up with the leader. A replica is in the ISR if it has fetched the most recent messages within replica.lag.time.max.ms. The ISR is dynamic: replicas that fall behind are removed, and replicas that catch up are re-added. Writes are only acknowledged to the producer when all ISR replicas have persisted the record (when acks=all and min.insync.replicas is satisfied).
DfConsumer Group
A consumer group is a set of consumers that jointly consume messages from a topic. Each partition is consumed by exactly one consumer within the group, enabling load-balanced parallel consumption. The Group Coordinator broker manages membership and triggers rebalancing when consumers join or leave. Formally, for a topic with partitions P = {P_0, ..., P_{n-1}} and a group with consumers C = {C_0, ..., C_{m-1}}, the assignment function f: P -> C maps each partition to exactly one consumer, where m β€ n for full utilization.
DfBroker
A broker is a single Kafka server node that stores data and serves client requests. Each broker has a unique broker.id and listens on a configured port (default: 9092). Brokers in a cluster communicate for replication (follower pulls from leader) and metadata coordination (via ZooKeeper or KRaft controller). A single broker can handle hundreds of thousands of reads/writes per second depending on hardware and configuration.
DfLog Segment
A log segment is the physical file on disk representing a contiguous portion of a partition's log. Each segment consists of three files: .log (message data), .index (offset-to-position mapping), and .timeindex (timestamp-to-offset mapping). Only the active segment accepts new writes. Segments are rolled when segment.bytes (default 1GB) is reached or segment.ms expires. Older segments are eligible for deletion based on retention policy.
Detailed Explanation
Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day with sub-millisecond latency. At its core, Kafka operates as a distributed commit log, where data is persisted in an append-only fashion across a cluster of brokers. The fundamental architectural principle is decoupling of producers and consumers through a publish-subscribe model that enables horizontal scalability while maintaining strict ordering guarantees within partitions.
Broker Layer Deep Dive
The Broker Layer consists of a cluster of servers (brokers) that each maintain partition data and serve client requests. Each broker has a unique identifier (broker.id) and listens on a configured port (default: 9092). The brokers communicate with each other for replication and with ZooKeeper (or KRaft controller) for cluster metadata management. When a producer sends a message, the broker acknowledges receipt based on the configured acknowledgment policy (acks=0, 1, or all). The broker stores the message in its local commit log, which is a sequence of files on disk optimized for sequential I/O operations. Each partition is a separate directory containing multiple log segment files, with only the active segment accepting writes.
The broker's network layer uses Java NIO with a configurable thread pool (num.network.threads, default 3) for handling socket I/O, and a separate I/O thread pool (num.io.threads, default 8) for disk operations. This two-tier threading model allows Kafka to handle high concurrent connections while maintaining efficient disk utilization. The broker also maintains an in-memory cache of recently accessed index entries to accelerate random offset lookups.
Topic Partitioning and Distribution
Topics serve as logical categories or feed names for messages. Each topic is divided into multiple partitions, which are the unit of parallelism in Kafka. The number of partitions determines the maximum number of consumer instances that can read from the topic in parallel within a consumer group. Partitions are distributed across brokers using a consistent hashing algorithm, ensuring even distribution of load. Each partition maintains its own commit log, and messages within a partition are assigned monotonically increasing offsets. This design enables Kafka to achieve extremely high throughput by parallelizing both writes (across partitions) and reads (via consumer groups).
Partition assignment to brokers follows a rack-aware algorithm that considers broker availability zones for fault tolerance. The controller broker maintains the partition assignment map in memory and propagates changes via ZooKeeper or the KRaft metadata log. When a new broker joins the cluster, partitions are rebalanced to utilize the new capacity.
Replication and Fault Tolerance
Replication provides fault tolerance and high availability. Each partition has a configurable replication factor (typically 3 for production), meaning data is replicated across multiple brokers. One broker is elected as the partition leader, handling all reads and writes for that partition. Follower replicas continuously pull data from the leader to stay synchronized. The set of in-sync replicas (ISR) includes all replicas that are fully caught up with the leader. If a follower falls too far behind (controlled by replica.lag.time.max.ms), it is removed from the ISR. When the leader fails, a new leader is elected from the ISR, ensuring data consistency. The ISR mechanism balances durability guarantees with availability, as the leader can continue serving requests as long as the minimum ISR size is maintained.
The replication protocol uses a pull model where followers fetch from leaders via the FetchRequest API. The leader tracks each follower's fetch position and adjusts the ISR accordingly. Replication latency is bounded by the fetch interval (replica.fetch.max.bytes, replica.fetch.wait.max.ms) and network conditions between brokers.
Consumer Group Coordination
Consumer Groups enable load-balanced consumption across multiple consumers. Each consumer in a group is assigned a subset of partitions, ensuring that each partition is consumed by exactly one consumer within the group. The Group Coordinator (a broker) manages the group membership and triggers rebalancing when consumers join or leave. The rebalancing process uses protocols like RangeAssignor, RoundRobinAssignor, or CooperativeStickyAssignor to distribute partitions. Consumer offsets are tracked in the __consumer_offsets topic, allowing consumers to resume from their last committed position. This design enables horizontal scaling of consumption while maintaining ordering guarantees within partitions.
The heartbeat protocol (heartbeat.interval.ms, default 3s) ensures liveness detection. If a consumer fails to send a heartbeat within session.timeout.ms (default 45s), it is considered dead and its partitions are reassigned. The consumer poll loop must complete within max.poll.interval.ms (default 300s) to avoid being evicted from the group.
Storage Layer Internals
The Storage Layer uses a highly optimized append-only log structure. Each log segment consists of three files: .log (message data), .index (offset index), and .timeindex (timestamp index). The index files enable efficient random access to specific offsets without scanning the entire log. Kafka uses a page cache aggressively, leveraging the operating system's memory management for read performance. The log compaction feature allows Kafka to retain only the latest value for each key, enabling topics to serve as a materialized view of the latest state. Retention policies can be time-based, size-based, or compacted, providing flexibility in data lifecycle management.
Kafka's storage design achieves near-zero-copy transfer from disk to network using sendfile() on Linux. Messages are written to the log in batches using FileChannel.transferTo(), and the OS page cache handles read-ahead for sequential access patterns. Log segments are memory-mapped for efficient random access to index files.
Key Formulas
Here,
- =Total cluster throughput (messages/sec)
- =Number of partitions across the topic
- =Throughput per partition (messages/sec)
Replication Lag Model
Here,
- =Replication lag at time t (bytes)
- =Data written to leader at time t
- =Data fetched by follower at time t
Consumer Lag
Here,
- =Latest offset in the partition log
- =Last committed consumer offset
Partition Count Planning
Here,
- =Required number of partitions
- =Target cluster throughput (msg/sec)
- =Throughput per broker (msg/sec)
- =Replication factor
Partition count directly determines maximum parallelism. For a topic with N partitions, at most N consumers in a group can process in parallel. Beyond N consumers, additional consumers remain idle. Plan partition count based on target throughput: N_partitions β₯ T_target / T_broker.
For topics with high write throughput, increase partition count to distribute load across more brokers. Each partition adds overhead for index files and open file descriptors, so avoid over-partitioning (max ~200K partitions per cluster for ZooKeeper mode, ~2M for KRaft mode).
ThCAP Theorem Applied to Kafka
Kafka's partition-level design offers a configurable trade-off between consistency and availability:
- With
acks=all,min.insync.replicas=2,unclean.leader.election.enable=false: Kafka prioritizes consistency over availability. If fewer thanmin.insync.replicasare available, writes are rejected (CP behavior). - With
acks=1orunclean.leader.election.enable=true: Kafka prioritizes availability over consistency. A partition can continue serving even if it means potential data loss (AP behavior). In practice, most production deployments choose CP (consistency-first) to prevent data loss.
ThOrdering Guarantee Theorem
Kafka guarantees total ordering within a partition but only partial ordering across partitions. If a producer sends messages m_1, m_2, m_3 to the same partition, a consumer will always observe them in that order. However, for messages across different partitions, no ordering guarantee exists. To achieve ordering on a key k, all messages with key k must be routed to the same partition via hash(k) mod N_partitions.
Key Concepts Table
| Component | Description | Default Value | Production Recommendation |
|---|---|---|---|
| Broker | Individual Kafka server node | Port 9092 | 3+ nodes per cluster |
| Topic | Logical message category | N/A | Use kebab-case naming |
| Partition | Unit of parallelism within topic | 1 | 6-12 per topic (based on throughput) |
| Replica | Copy of partition for fault tolerance | 1 | 3 (min.insync.replicas=2) |
| ISR | In-Sync Replicas | All replicas | All replicas minus lagging ones |
| Consumer Group | Set of consumers sharing workload | N/A | One per microservice |
| Offset | Sequential message position | 0 | Auto-committed or manual |
| ZooKeeper | Cluster coordination service | Port 2181 | 3 or 5 node ensemble |
| Controller | Broker managing cluster metadata | Dynamic election | Dedicated controller broker |
| Segment | Physical log file | 1GB | Default (tunable) |
| Log Compaction | Retain latest value per key | cleanup.policy=delete | Use for state topics |
Code Examples
Creating a Topic with Advanced Configuration
# Create topic with specific configurations
# --bootstrap-server: comma-separated list of broker addresses for initial connection
# --topic: name of the topic (kebab-case recommended)
# --partitions: number of partitions (determines max parallelism)
# --replication-factor: number of replicas per partition (3 for production)
# --config: topic-level overrides for server properties
kafka-topics.sh --create \
--bootstrap-server kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092 \
--topic order-events \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config max.message.bytes=10485760 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config compression.type=lz4 \
--config segment.bytes=1073741824 \
--config flush.messages=10000 \
--config index.interval.bytes=4096 \
--config unclean.leader.election.enable=false
# min.insync.replicas=2: minimum replicas that must acknowledge a write before it's acknowledged to producer
# max.message.bytes=10485760: maximum message size (10MB) per batch
# retention.ms=604800000: retain data for 7 days (604800000ms)
# cleanup.policy=delete: delete segments after retention expires (alternative: compact)
# compression.type=lz4: compression algorithm for batch encoding (none, gzip, snappy, lz4, zstd)
# segment.bytes=1073741824: roll log segment after 1GB (1073741824 bytes)
# flush.messages=10000: flush to disk after every 10000 messages (durability vs performance)
# index.interval.bytes=4096: add index entry every 4096 bytes for offset lookup
# unclean.leader.election.enable=false: prevent out-of-sync replica from becoming leader
Cluster Monitoring Script
#!/bin/bash
# Monitor Kafka cluster health and partition distribution
# Uses kafka CLI tools for comprehensive cluster visibility
BOOTSTRAP_SERVER="kafka-broker-0:9092"
echo "=== KAFKA CLUSTER STATUS ==="
echo "Timestamp: $(date)"
echo ""
# List all brokers and their API versions
# Shows available APIs and supported versions for client compatibility
echo "--- Active Brokers ---"
kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVER | head -20
echo ""
echo "--- Topic Partition Distribution ---"
# Describe shows partition leaders, ISRs, and replica placement
# Use --topic to filter to specific topic
kafka-topics.sh --describe --bootstrap-server $BOOTSTRAP_SERVER --topic order-events
echo ""
echo "--- Consumer Group Lag ---"
# Lag = log_end_offset - committed_offset per partition
# High lag indicates consumer cannot keep up with production rate
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
--group order-processing-service \
--describe
echo ""
echo "--- Cluster Config ---"
# List all broker-level configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
--entity-type brokers \
--entity-default \
--describe
echo ""
echo "--- Under-replicated Partitions ---"
# Under-replicated = ISR size < replication factor
# Indicates broker failure or network partition
kafka-topics.sh --describe --bootstrap-server $BOOTSTRAP_SERVER \
| grep -i "under-replicated"
echo ""
echo "--- Active Controller ---"
# Exactly one broker should be the controller
kafka-metadata.sh --snapshot /var/kafka/metadata/metadata-1.log \
--cluster-id <cluster-id> --command-config admin.properties 2>/dev/null || \
echo "Check controller via JMX: kafka.controller:type=KafkaController,name=ActiveControllerCount"
Java Producer Configuration with Detailed Comments
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
// Core configurations
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
// Comma-separated broker list for initial metadata fetch;
// client discovers all brokers from metadata response
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Serializer for message keys; determines partition routing
// Options: StringSerializer, LongSerializer, ByteArraySerializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Serializer for message values; independent of key serializer
// Reliability configurations
props.put(ProducerConfig.ACKS_CONFIG, "all");
// acks=0: no acknowledgment (fastest, no durability)
// acks=1: leader acknowledgment only (moderate durability)
// acks=all: all ISR replicas acknowledge (highest durability)
props.put(ProducerConfig.RETRIES_CONFIG, 10);
// Number of retries on transient failures (e.g., LEADER_NOT_AVAILABLE)
// With idempotence enabled, retries are unlimited by default
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// Delay between retries to avoid overwhelming broker
// Exponential backoff applied on consecutive failures
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Max unacknowledged requests per broker connection
// >1 enables pipelining but may cause reordering without idempotence
// With idempotence, ordering is preserved even with value > 1
// Idempotent producer for exactly-once semantics
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Prevents duplicate messages during retries
// Assigns Producer ID (PID) and per-partition sequence numbers
// Automatically sets acks=all, retries=Integer.MAX_VALUE
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
// Unique identifier for transactional producer
// Enables atomic writes across multiple partitions/topics
// Must be unique per producer instance (fencing via epoch)
// Performance configurations
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
// Maximum batch size in bytes (64KB)
// Batch is sent when full OR linger.ms expires, whichever first
// Larger batches improve throughput, increase latency
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// Time to wait before sending batch even if not full
// 0 = send immediately (lowest latency, lowest throughput)
// 10-50ms = good balance for most workloads
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// Total memory for buffering records (32MB)
// Records waiting to be sent to broker
// If buffer is full, send() blocks until space is available
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Compression algorithm applied to batches
// none: no compression (fastest, largest payload)
// gzip: best compression ratio, high CPU usage
// snappy: moderate compression, low CPU
// lz4: good compression, fast decompression (recommended)
// zstd: best compression ratio, moderate CPU (Kafka 2.1+)
// Monitoring
props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, 10);
// Number of samples to keep for metric calculations
props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, 30000);
// Time window for metric aggregation (30 seconds)
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transactions (required if transactional.id is set)
// Acquires Producer ID and epoch from Transaction Coordinator
producer.initTransactions();
try {
// Begin transaction for atomic multi-partition writes
// All sends until commitTransaction() are atomic
producer.beginTransaction();
for (int i = 0; i < 1000; i++) {
String key = "order-" + (i % 100);
// Key determines partition: hash(key) % numPartitions
// Same key always routes to same partition (ordering guarantee)
String value = "{\"orderId\": \"" + i + "\", \"amount\": " +
(Math.random() * 1000) + "}";
ProducerRecord<String, String> record =
new ProducerRecord<>("order-events", key, value);
// Topic, key, value; partition determined by key hash
// Optional: specify partition directly as 3rd argument
record.headers().add("correlation-id",
java.util.UUID.randomUUID().toString().getBytes());
// Headers carry metadata without affecting partition routing
// Useful for tracing, audit, and cross-system correlation
record.headers().add("source-system", "order-service".getBytes());
// Identify the originating service for observability
Future<RecordMetadata> future = producer.send(record,
(metadata, exception) -> {
if (exception != null) {
System.err.println("Failed to send record: " +
exception.getMessage());
} else {
System.out.printf("Sent record to partition %d, " +
"offset %d, timestamp %d%n",
metadata.partition(), metadata.offset(),
metadata.timestamp());
}
});
// Async send with callback; callback runs on sender thread
// For sync send, call future.get() (blocks, lower throughput)
}
// Commit transaction atomically
// Writes TxnMarker to all affected partitions
// Marks transaction as committed in __transaction_state
producer.commitTransaction();
} catch (Exception e) {
// Abort rolls back all writes in this transaction
// Writes become invisible to read_committed consumers
producer.abortTransaction();
throw e;
} finally {
// Close releases resources, flushes pending sends
producer.close();
}
}
}
Java Consumer with Advanced Configurations
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumerExample {
private static final AtomicBoolean running = new AtomicBoolean(true);
public static void main(String[] args) {
Properties props = new Properties();
// Core configurations
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
// Initial broker list; consumer discovers full cluster from metadata
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-service");
// Consumer group identifier
// Partitions are assigned among consumers with same group.id
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Deserializer for message keys; must match producer's key serializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Deserializer for message values; must match producer's value serializer
// Consumer behavior
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// false = manual offset commit (recommended for exactly-once)
// true = auto-commit every auto.commit.interval.ms (default 5000ms)
// Auto-commit may lose messages if processing fails after commit
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// earliest: read from beginning if no committed offset exists
// latest: read from end (new messages only)
// none: throw exception if no committed offset
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Maximum records returned per poll() call
// Lower values reduce poll loop latency
// Higher values improve throughput
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Maximum time between poll() calls before consumer is evicted
// If processing takes longer, consumer is considered dead
// Increase for slow processing; decrease for fast detection
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
// Maximum time without heartbeat before consumer is considered dead
// Detected by group coordinator; triggers rebalance
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
// Heartbeat sent to coordinator periodically
// Should be <= session.timeout.ms / 3 for reliable detection
// Isolation level for transactional reads
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// read_committed: only see records from committed transactions
// read_uncommitted: see all records including uncommitted
// Required for exactly-once semantics end-to-end
// Partition assignment strategy
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// CooperativeStickyAssignor: incremental rebalancing (recommended)
// Only revokes partitions that need to move
// RangeAssignor: assigns contiguous partition ranges (default)
// RoundRobinAssignor: distributes partitions evenly
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topics with rebalance listener
consumer.subscribe(Arrays.asList("order-events", "payment-events"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Called before partitions are revoked (rebalance start)
// Commit pending offsets to avoid reprocessing
System.out.println("Partitions revoked: " + partitions);
consumer.commitSync(Duration.ofSeconds(30));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Called after new partitions are assigned
// Seek to specific offsets if needed
System.out.println("Partitions assigned: " + partitions);
}
});
try {
while (running.get()) {
// poll() fetches records and maintains group membership
// Timeout controls how long to wait if no records available
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit offsets after successful processing
// commitSync: blocks until successful or timeout
// commitAsync: non-blocking, may fail silently
if (!records.isEmpty()) {
consumer.commitSync(Duration.ofSeconds(30));
}
}
} finally {
// Close releases consumer from group, triggers rebalance
// May want to delay close for graceful shutdown
consumer.close();
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// Access record.timestamp() for event-time processing
// Access record.headers() for message metadata
}
}
Python Producer Example
from kafka import KafkaProducer
import json
import time
# Initialize producer with configuration
producer = KafkaProducer(
bootstrap_servers=['kafka-broker-0:9092', 'kafka-broker-1:9092'],
# Comma-separated broker addresses for metadata discovery
key_serializer=lambda k: k.encode('utf-8'),
# Serialize key to bytes; determines partition via hash
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Serialize value to JSON bytes
acks='all',
# Wait for all ISR replicas to acknowledge
# 'none' = 0 (no ack), 'one' = 1 (leader only), 'all' = all ISR
retries=5,
# Retry transient failures up to 5 times
retry_backoff_ms=100,
# 100ms delay between retries
batch_size=65536,
# 64KB batch size for network efficiency
linger_ms=10,
# Wait up to 10ms to fill batch
compression_type='lz4',
# Compress batches with lz4 (good balance of speed/ratio)
max_in_flight_requests_per_connection=5,
# Allow 5 concurrent requests per broker (pipelining)
enable_idempotence=True,
# Prevent duplicate messages during retries
buffer_memory=33554432,
# 32MB buffer for pending records
request_timeout_ms=30000,
# 30s timeout for broker responses
)
# Send messages with callback
for i in range(1000):
key = f"order-{i % 100}"
value = {
"orderId": str(i),
"amount": round(1000.0, 2),
"timestamp": int(time.time() * 1000)
}
future = producer.send(
topic='order-events',
key=key,
value=value,
headers=[
("correlation-id", str(i).encode('utf-8')),
("source", b"python-producer")
]
)
# Add callback for async result handling
future.add_callback(lambda metadata: print(
f"Sent to partition {metadata.partition()}, "
f"offset {metadata.offset()}, "
f"timestamp {metadata.timestamp}"
))
future.add_errback(lambda exc: print(f"Error: {exc}"))
# Flush ensures all pending records are sent
producer.flush()
producer.close()
Python Consumer Example
from kafka import KafkaConsumer
import json
# Initialize consumer with configuration
consumer = KafkaConsumer(
'order-events',
# Topic(s) to subscribe to; can be list for multiple topics
bootstrap_servers=['kafka-broker-0:9092', 'kafka-broker-1:9092'],
# Initial broker list
group_id='order-processing-service',
# Consumer group for partition assignment
key_deserializer=lambda k: k.decode('utf-8') if k else None,
# Deserialize key from bytes
value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v else None,
# Deserialize value from JSON bytes
auto_offset_reset='earliest',
# 'earliest': read from beginning if no offset
# 'latest': read from end if no offset
# 'none': raise exception if no offset
enable_auto_commit=False,
# Disable auto-commit for manual offset management
isolation_level='read_committed',
# Only read committed transactional records
max_poll_records=500,
# Max records per poll() call
max_poll_interval_ms=300000,
# Max time between polls before eviction (300s)
session_timeout_ms=45000,
# Consumer failure detection timeout
heartbeat_interval_ms=15000,
# Heartbeat frequency to coordinator
fetch_min_bytes=1,
# Minimum bytes to fetch (batch optimization)
fetch_max_wait_ms=500,
# Max wait if fetch_min_bytes not reached
)
# Subscribe with rebalance listener
def on_partitions_revoked(partitions):
print(f"Partitions revoked: {partitions}")
consumer.commitSync()
def on_partitions_assigned(partitions):
print(f"Partitions assigned: {partitions}")
consumer.subscribe(
topics=['order-events', 'payment-events'],
listener=ConsumerRebalanceListener(
on_partitions_revoked=on_partitions_revoked,
on_partitions_assigned=on_partitions_assigned
)
)
# Process records
try:
while True:
records = consumer.poll(timeout_ms=100)
for tp, records_list in records.items():
for record in records_list:
print(f"Topic: {record.topic}, "
f"Partition: {record.partition}, "
f"Offset: {record.offset}, "
f"Key: {record.key}, "
f"Value: {record.value}")
# Process record here
# After processing, commit offset
consumer.commitSync({
tp: OffsetAndMetadata(record.offset + 1, "processed")
})
except KeyboardInterrupt:
consumer.close()
Performance Metrics
| Metric | Single Broker | 3-Broker Cluster | 6-Broker Cluster |
|---|---|---|---|
| Throughput (writes/sec) | 100K | 300K | 600K |
| Throughput (reads/sec) | 200K | 600K | 1.2M |
| Latency (p99) | 5ms | 10ms | 15ms |
| Latency (p999) | 15ms | 25ms | 40ms |
| Message Size (avg) | 1KB | 1KB | 1KB |
| Replication Lag | 0ms | <100ms | <200ms |
| Recovery Time | 0 | 30s | 60s |
| Disk Usage | 1TB | 3TB | 6TB |
| Memory (JVM Heap) | 6GB | 6GB | 6GB |
| Network Bandwidth | 1Gbps | 3Gbps | 6Gbps |
| Broker Configuration | Default | Recommended | Impact |
|---|---|---|---|
num.network.threads | 3 | 8 | Network I/O throughput |
num.io.threads | 8 | 16 | Disk I/O throughput |
socket.send.buffer.bytes | 100KB | 1MB | Network send throughput |
socket.receive.buffer.bytes | 100KB | 1MB | Network receive throughput |
log.flush.interval.messages | Long.MAX_VALUE | 10000 | Durability vs performance |
log.retention.hours | 168 (7 days) | Varies | Storage cost |
Best Practices
-
Partition Count Planning: Start with partitions = (target_throughput / broker_throughput) * replication_factor. Monitor consumer lag to determine if rebalancing is needed. Avoid over-partitioning (max 200K partitions per cluster for ZooKeeper, 2M for KRaft).
-
Replication Factor: Always use replication-factor β₯ 3 for production topics. Set
min.insync.replicas=2to ensure at least two replicas acknowledge writes before acknowledging to the producer. This prevents data loss during single broker failure. -
Consumer Group Sizing: Ensure number of consumers β€ number of partitions. More consumers than partitions results in idle consumers. Use
CooperativeStickyAssignorfor incremental rebalancing to minimize pause time during scaling events. -
Offset Management: Disable auto-commit (
enable.auto.commit=false) and commit offsets explicitly after successful processing to avoid data loss or duplicates. Use commitSync for critical offsets, commitAsync for bulk commits. -
Broker Configuration: Use dedicated disks for Kafka data (avoid shared with OS). Configure
num.io.threadsandnum.network.threadsbased on CPU cores. Setlog.flush.interval.messagesandlog.flush.interval.mscarefully to balance durability with performance. -
Topic Configuration: Use
cleanup.policy=compactfor topics that need to retain latest state. Setretention.msandretention.bytesappropriately based on data volume and replay requirements. Usecompression.type=lz4for optimal throughput-to-latency ratio. -
Monitoring: Monitor consumer lag, ISR shrink/expand events, under-replicated partitions, and disk usage. Use tools like Burrow, Cruise Control, or Confluent Control Center. Set up alerts for ISR shrinks and under-replicated partitions.
-
Security: Enable SSL/TLS for inter-broker and client-broker communication. Use SASL for authentication (SCRAM-SHA-256 or SASL/PLAIN). Implement ACLs for authorization. Use separate listeners for internal and external traffic.
-
ZooKeeper/KRaft: For clusters > 100K partitions, consider migrating to KRaft mode to eliminate ZooKeeper dependency. Use dedicated ZooKeeper nodes (3 or 5) with sufficient I/O capacity. KRaft provides faster controller failover and better metadata throughput.
-
Data Retention: Implement tiered storage for long-term retention. Use log compaction for topics that serve as materialized views. Archive to object storage (S3, GCS) for compliance requirements. Set
retention.bytesper partition to cap storage usage.
Key Takeaways:
- Kafka partitions are the unit of parallelism; throughput scales linearly with partition count up to broker limits
- ISR mechanism balances durability (acks=all) with availability; min.insync.replicas prevents unavailability during broker failures
- Consumer groups map partitions 1:1 to consumers for parallel consumption; beyond N consumers, extras are idle
- Ordering is guaranteed only within a partition; cross-partition ordering requires key-based routing
- Replication lag should be monitored and kept under replica.lag.time.max.ms to avoid ISR shrinkage
- Log segments use append-only files with index files for efficient offset lookup; page cache provides read acceleration
See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming) | Kafka Producer API (kafka/02) | Exactly-Once Semantics (kafka/04)
See Also
- Producer Consumer Patterns β Producer and consumer implementation patterns
- Kafka Streams and Connect β Stream processing and connectors
- Exactly-Once Semantics β Exactly-once delivery guarantees
- Schema Registry β Schema evolution and registry patterns
- Kafka Intro β Kafka fundamentals for data engineering