πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Kafka Architecture: A Deep Dive into Distributed Event Streaming

🟒 Free Lesson

Advertisement

Apache Kafka Architecture: A Deep Dive into Distributed Event Streaming

Kafka Cluster TopologyZK Node 1Leader :2181ZK Node 2Follower :2181ZK Node 3Follower :2181Broker 0Controller :9092Broker 1Standby :9092Broker 2Standby :9092Storage Layer/var/kafka/data/KRaft ModeNo ZooKeeper neededBrokers store partitions, ZooKeeper manages metadata & leader election

Architecture Diagram: Partition Distribution & Replication

Architecture Diagram: Consumer Group Coordination

Architecture Diagram: Log Segment Internals

Formal Definitions

DfPartition

A partition is an ordered, immutable sequence of records that is continually appended to. Each record in a partition is assigned a sequential identifier called an offset. Partitions are the unit of parallelism in Kafka β€” the number of partitions determines the maximum consumer parallelism within a consumer group. Formally, a topic T is decomposed as T = {P_0, P_1, ..., P_{n-1}} where n is the partition count, and each P_i is a totally ordered log.

DfReplica

A replica is a fault-tolerant copy of a partition stored on a different broker. Each partition with replication factor R maintains R replicas. One replica is elected as the leader (handles all reads/writes), while the remaining R-1 are followers that replicate from the leader. Replicas ensure durability: if a broker fails, another replica can be promoted to leader with zero data loss.

DfIn-Sync Replicas (ISR)

The In-Sync Replica set (ISR) is the subset of replicas that are fully caught up with the leader. A replica is in the ISR if it has fetched the most recent messages within replica.lag.time.max.ms. The ISR is dynamic: replicas that fall behind are removed, and replicas that catch up are re-added. Writes are only acknowledged to the producer when all ISR replicas have persisted the record (when acks=all and min.insync.replicas is satisfied).

DfConsumer Group

A consumer group is a set of consumers that jointly consume messages from a topic. Each partition is consumed by exactly one consumer within the group, enabling load-balanced parallel consumption. The Group Coordinator broker manages membership and triggers rebalancing when consumers join or leave. Formally, for a topic with partitions P = {P_0, ..., P_{n-1}} and a group with consumers C = {C_0, ..., C_{m-1}}, the assignment function f: P -> C maps each partition to exactly one consumer, where m ≀ n for full utilization.

DfBroker

A broker is a single Kafka server node that stores data and serves client requests. Each broker has a unique broker.id and listens on a configured port (default: 9092). Brokers in a cluster communicate for replication (follower pulls from leader) and metadata coordination (via ZooKeeper or KRaft controller). A single broker can handle hundreds of thousands of reads/writes per second depending on hardware and configuration.

DfLog Segment

A log segment is the physical file on disk representing a contiguous portion of a partition's log. Each segment consists of three files: .log (message data), .index (offset-to-position mapping), and .timeindex (timestamp-to-offset mapping). Only the active segment accepts new writes. Segments are rolled when segment.bytes (default 1GB) is reached or segment.ms expires. Older segments are eligible for deletion based on retention policy.

Detailed Explanation

Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day with sub-millisecond latency. At its core, Kafka operates as a distributed commit log, where data is persisted in an append-only fashion across a cluster of brokers. The fundamental architectural principle is decoupling of producers and consumers through a publish-subscribe model that enables horizontal scalability while maintaining strict ordering guarantees within partitions.

Broker Layer Deep Dive

The Broker Layer consists of a cluster of servers (brokers) that each maintain partition data and serve client requests. Each broker has a unique identifier (broker.id) and listens on a configured port (default: 9092). The brokers communicate with each other for replication and with ZooKeeper (or KRaft controller) for cluster metadata management. When a producer sends a message, the broker acknowledges receipt based on the configured acknowledgment policy (acks=0, 1, or all). The broker stores the message in its local commit log, which is a sequence of files on disk optimized for sequential I/O operations. Each partition is a separate directory containing multiple log segment files, with only the active segment accepting writes.

The broker's network layer uses Java NIO with a configurable thread pool (num.network.threads, default 3) for handling socket I/O, and a separate I/O thread pool (num.io.threads, default 8) for disk operations. This two-tier threading model allows Kafka to handle high concurrent connections while maintaining efficient disk utilization. The broker also maintains an in-memory cache of recently accessed index entries to accelerate random offset lookups.

Topic Partitioning and Distribution

Topics serve as logical categories or feed names for messages. Each topic is divided into multiple partitions, which are the unit of parallelism in Kafka. The number of partitions determines the maximum number of consumer instances that can read from the topic in parallel within a consumer group. Partitions are distributed across brokers using a consistent hashing algorithm, ensuring even distribution of load. Each partition maintains its own commit log, and messages within a partition are assigned monotonically increasing offsets. This design enables Kafka to achieve extremely high throughput by parallelizing both writes (across partitions) and reads (via consumer groups).

Partition assignment to brokers follows a rack-aware algorithm that considers broker availability zones for fault tolerance. The controller broker maintains the partition assignment map in memory and propagates changes via ZooKeeper or the KRaft metadata log. When a new broker joins the cluster, partitions are rebalanced to utilize the new capacity.

Replication and Fault Tolerance

Replication provides fault tolerance and high availability. Each partition has a configurable replication factor (typically 3 for production), meaning data is replicated across multiple brokers. One broker is elected as the partition leader, handling all reads and writes for that partition. Follower replicas continuously pull data from the leader to stay synchronized. The set of in-sync replicas (ISR) includes all replicas that are fully caught up with the leader. If a follower falls too far behind (controlled by replica.lag.time.max.ms), it is removed from the ISR. When the leader fails, a new leader is elected from the ISR, ensuring data consistency. The ISR mechanism balances durability guarantees with availability, as the leader can continue serving requests as long as the minimum ISR size is maintained.

The replication protocol uses a pull model where followers fetch from leaders via the FetchRequest API. The leader tracks each follower's fetch position and adjusts the ISR accordingly. Replication latency is bounded by the fetch interval (replica.fetch.max.bytes, replica.fetch.wait.max.ms) and network conditions between brokers.

Consumer Group Coordination

Consumer Groups enable load-balanced consumption across multiple consumers. Each consumer in a group is assigned a subset of partitions, ensuring that each partition is consumed by exactly one consumer within the group. The Group Coordinator (a broker) manages the group membership and triggers rebalancing when consumers join or leave. The rebalancing process uses protocols like RangeAssignor, RoundRobinAssignor, or CooperativeStickyAssignor to distribute partitions. Consumer offsets are tracked in the __consumer_offsets topic, allowing consumers to resume from their last committed position. This design enables horizontal scaling of consumption while maintaining ordering guarantees within partitions.

The heartbeat protocol (heartbeat.interval.ms, default 3s) ensures liveness detection. If a consumer fails to send a heartbeat within session.timeout.ms (default 45s), it is considered dead and its partitions are reassigned. The consumer poll loop must complete within max.poll.interval.ms (default 300s) to avoid being evicted from the group.

Storage Layer Internals

The Storage Layer uses a highly optimized append-only log structure. Each log segment consists of three files: .log (message data), .index (offset index), and .timeindex (timestamp index). The index files enable efficient random access to specific offsets without scanning the entire log. Kafka uses a page cache aggressively, leveraging the operating system's memory management for read performance. The log compaction feature allows Kafka to retain only the latest value for each key, enabling topics to serve as a materialized view of the latest state. Retention policies can be time-based, size-based, or compacted, providing flexibility in data lifecycle management.

Kafka's storage design achieves near-zero-copy transfer from disk to network using sendfile() on Linux. Messages are written to the log in batches using FileChannel.transferTo(), and the OS page cache handles read-ahead for sequential access patterns. Log segments are memory-mapped for efficient random access to index files.

Key Formulas

Kafka Cluster Throughput
Tcluster=NpartitionsΓ—TpartitionT_{\text{cluster}} = N_{\text{partitions}} \times T_{\text{partition}}

Here,

  • TclusterT_{\text{cluster}}=Total cluster throughput (messages/sec)
  • NpartitionsN_{\text{partitions}}=Number of partitions across the topic
  • TpartitionT_{\text{partition}}=Throughput per partition (messages/sec)

Replication Lag Model

Lreplica(t)=Lreplica(tβˆ’1)+Rin(t)βˆ’Rout(t)L_{\text{replica}}(t) = L_{\text{replica}}(t-1) + R_{\text{in}}(t) - R_{\text{out}}(t)

Here,

  • Lreplica(t)L_{\text{replica}}(t)=Replication lag at time t (bytes)
  • Rin(t)R_{\text{in}}(t)=Data written to leader at time t
  • Rout(t)R_{\text{out}}(t)=Data fetched by follower at time t

Consumer Lag

Lagconsumer=Olog_endβˆ’Ocommitted\text{Lag}_{\text{consumer}} = O_{\text{log\_end}} - O_{\text{committed}}

Here,

  • Olog_endO_{\text{log\_end}}=Latest offset in the partition log
  • OcommittedO_{\text{committed}}=Last committed consumer offset

Partition Count Planning

Npartitionsβ‰₯⌈TtargetTbrokerβŒ‰Γ—RfactorN_{\text{partitions}} \geq \lceil \frac{T_{\text{target}}}{T_{\text{broker}}} \rceil \times R_{\text{factor}}

Here,

  • NpartitionsN_{\text{partitions}}=Required number of partitions
  • TtargetT_{\text{target}}=Target cluster throughput (msg/sec)
  • TbrokerT_{\text{broker}}=Throughput per broker (msg/sec)
  • RfactorR_{\text{factor}}=Replication factor

Partition count directly determines maximum parallelism. For a topic with N partitions, at most N consumers in a group can process in parallel. Beyond N consumers, additional consumers remain idle. Plan partition count based on target throughput: N_partitions β‰₯ T_target / T_broker.

For topics with high write throughput, increase partition count to distribute load across more brokers. Each partition adds overhead for index files and open file descriptors, so avoid over-partitioning (max ~200K partitions per cluster for ZooKeeper mode, ~2M for KRaft mode).

ThCAP Theorem Applied to Kafka

Kafka's partition-level design offers a configurable trade-off between consistency and availability:

  • With acks=all, min.insync.replicas=2, unclean.leader.election.enable=false: Kafka prioritizes consistency over availability. If fewer than min.insync.replicas are available, writes are rejected (CP behavior).
  • With acks=1 or unclean.leader.election.enable=true: Kafka prioritizes availability over consistency. A partition can continue serving even if it means potential data loss (AP behavior). In practice, most production deployments choose CP (consistency-first) to prevent data loss.

ThOrdering Guarantee Theorem

Kafka guarantees total ordering within a partition but only partial ordering across partitions. If a producer sends messages m_1, m_2, m_3 to the same partition, a consumer will always observe them in that order. However, for messages across different partitions, no ordering guarantee exists. To achieve ordering on a key k, all messages with key k must be routed to the same partition via hash(k) mod N_partitions.

Key Concepts Table

ComponentDescriptionDefault ValueProduction Recommendation
BrokerIndividual Kafka server nodePort 90923+ nodes per cluster
TopicLogical message categoryN/AUse kebab-case naming
PartitionUnit of parallelism within topic16-12 per topic (based on throughput)
ReplicaCopy of partition for fault tolerance13 (min.insync.replicas=2)
ISRIn-Sync ReplicasAll replicasAll replicas minus lagging ones
Consumer GroupSet of consumers sharing workloadN/AOne per microservice
OffsetSequential message position0Auto-committed or manual
ZooKeeperCluster coordination servicePort 21813 or 5 node ensemble
ControllerBroker managing cluster metadataDynamic electionDedicated controller broker
SegmentPhysical log file1GBDefault (tunable)
Log CompactionRetain latest value per keycleanup.policy=deleteUse for state topics

Code Examples

Creating a Topic with Advanced Configuration

# Create topic with specific configurations
# --bootstrap-server: comma-separated list of broker addresses for initial connection
# --topic: name of the topic (kebab-case recommended)
# --partitions: number of partitions (determines max parallelism)
# --replication-factor: number of replicas per partition (3 for production)
# --config: topic-level overrides for server properties

kafka-topics.sh --create \
  --bootstrap-server kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092 \
  --topic order-events \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config max.message.bytes=10485760 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete \
  --config compression.type=lz4 \
  --config segment.bytes=1073741824 \
  --config flush.messages=10000 \
  --config index.interval.bytes=4096 \
  --config unclean.leader.election.enable=false

# min.insync.replicas=2: minimum replicas that must acknowledge a write before it's acknowledged to producer
# max.message.bytes=10485760: maximum message size (10MB) per batch
# retention.ms=604800000: retain data for 7 days (604800000ms)
# cleanup.policy=delete: delete segments after retention expires (alternative: compact)
# compression.type=lz4: compression algorithm for batch encoding (none, gzip, snappy, lz4, zstd)
# segment.bytes=1073741824: roll log segment after 1GB (1073741824 bytes)
# flush.messages=10000: flush to disk after every 10000 messages (durability vs performance)
# index.interval.bytes=4096: add index entry every 4096 bytes for offset lookup
# unclean.leader.election.enable=false: prevent out-of-sync replica from becoming leader

Cluster Monitoring Script

#!/bin/bash
# Monitor Kafka cluster health and partition distribution
# Uses kafka CLI tools for comprehensive cluster visibility

BOOTSTRAP_SERVER="kafka-broker-0:9092"

echo "=== KAFKA CLUSTER STATUS ==="
echo "Timestamp: $(date)"
echo ""

# List all brokers and their API versions
# Shows available APIs and supported versions for client compatibility
echo "--- Active Brokers ---"
kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVER | head -20

echo ""
echo "--- Topic Partition Distribution ---"
# Describe shows partition leaders, ISRs, and replica placement
# Use --topic to filter to specific topic
kafka-topics.sh --describe --bootstrap-server $BOOTSTRAP_SERVER --topic order-events

echo ""
echo "--- Consumer Group Lag ---"
# Lag = log_end_offset - committed_offset per partition
# High lag indicates consumer cannot keep up with production rate
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --group order-processing-service \
  --describe

echo ""
echo "--- Cluster Config ---"
# List all broker-level configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --entity-type brokers \
  --entity-default \
  --describe

echo ""
echo "--- Under-replicated Partitions ---"
# Under-replicated = ISR size < replication factor
# Indicates broker failure or network partition
kafka-topics.sh --describe --bootstrap-server $BOOTSTRAP_SERVER \
  | grep -i "under-replicated"

echo ""
echo "--- Active Controller ---"
# Exactly one broker should be the controller
kafka-metadata.sh --snapshot /var/kafka/metadata/metadata-1.log \
  --cluster-id <cluster-id> --command-config admin.properties 2>/dev/null || \
  echo "Check controller via JMX: kafka.controller:type=KafkaController,name=ActiveControllerCount"

Java Producer Configuration with Detailed Comments

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaProducerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Core configurations
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        // Comma-separated broker list for initial metadata fetch;
        // client discovers all brokers from metadata response
        
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        // Serializer for message keys; determines partition routing
        // Options: StringSerializer, LongSerializer, ByteArraySerializer
        
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        // Serializer for message values; independent of key serializer
        
        // Reliability configurations
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // acks=0: no acknowledgment (fastest, no durability)
        // acks=1: leader acknowledgment only (moderate durability)
        // acks=all: all ISR replicas acknowledge (highest durability)
        
        props.put(ProducerConfig.RETRIES_CONFIG, 10);
        // Number of retries on transient failures (e.g., LEADER_NOT_AVAILABLE)
        // With idempotence enabled, retries are unlimited by default
        
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
        // Delay between retries to avoid overwhelming broker
        // Exponential backoff applied on consecutive failures
        
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        // Max unacknowledged requests per broker connection
        // >1 enables pipelining but may cause reordering without idempotence
        // With idempotence, ordering is preserved even with value > 1
        
        // Idempotent producer for exactly-once semantics
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // Prevents duplicate messages during retries
        // Assigns Producer ID (PID) and per-partition sequence numbers
        // Automatically sets acks=all, retries=Integer.MAX_VALUE
        
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
        // Unique identifier for transactional producer
        // Enables atomic writes across multiple partitions/topics
        // Must be unique per producer instance (fencing via epoch)
        
        // Performance configurations
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
        // Maximum batch size in bytes (64KB)
        // Batch is sent when full OR linger.ms expires, whichever first
        // Larger batches improve throughput, increase latency
        
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        // Time to wait before sending batch even if not full
        // 0 = send immediately (lowest latency, lowest throughput)
        // 10-50ms = good balance for most workloads
        
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // Total memory for buffering records (32MB)
        // Records waiting to be sent to broker
        // If buffer is full, send() blocks until space is available
        
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        // Compression algorithm applied to batches
        // none: no compression (fastest, largest payload)
        // gzip: best compression ratio, high CPU usage
        // snappy: moderate compression, low CPU
        // lz4: good compression, fast decompression (recommended)
        // zstd: best compression ratio, moderate CPU (Kafka 2.1+)
        
        // Monitoring
        props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, 10);
        // Number of samples to keep for metric calculations
        
        props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, 30000);
        // Time window for metric aggregation (30 seconds)
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Initialize transactions (required if transactional.id is set)
        // Acquires Producer ID and epoch from Transaction Coordinator
        producer.initTransactions();
        
        try {
            // Begin transaction for atomic multi-partition writes
            // All sends until commitTransaction() are atomic
            producer.beginTransaction();
            
            for (int i = 0; i < 1000; i++) {
                String key = "order-" + (i % 100);
                // Key determines partition: hash(key) % numPartitions
                // Same key always routes to same partition (ordering guarantee)
                
                String value = "{\"orderId\": \"" + i + "\", \"amount\": " + 
                    (Math.random() * 1000) + "}";
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("order-events", key, value);
                // Topic, key, value; partition determined by key hash
                // Optional: specify partition directly as 3rd argument
                
                record.headers().add("correlation-id", 
                    java.util.UUID.randomUUID().toString().getBytes());
                // Headers carry metadata without affecting partition routing
                // Useful for tracing, audit, and cross-system correlation
                
                record.headers().add("source-system", "order-service".getBytes());
                // Identify the originating service for observability
                
                Future<RecordMetadata> future = producer.send(record, 
                    (metadata, exception) -> {
                        if (exception != null) {
                            System.err.println("Failed to send record: " + 
                                exception.getMessage());
                        } else {
                            System.out.printf("Sent record to partition %d, " +
                                "offset %d, timestamp %d%n",
                                metadata.partition(), metadata.offset(),
                                metadata.timestamp());
                        }
                    });
                // Async send with callback; callback runs on sender thread
                // For sync send, call future.get() (blocks, lower throughput)
            }
            
            // Commit transaction atomically
            // Writes TxnMarker to all affected partitions
            // Marks transaction as committed in __transaction_state
            producer.commitTransaction();
            
        } catch (Exception e) {
            // Abort rolls back all writes in this transaction
            // Writes become invisible to read_committed consumers
            producer.abortTransaction();
            throw e;
        } finally {
            // Close releases resources, flushes pending sends
            producer.close();
        }
    }
}

Java Consumer with Advanced Configurations

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerExample {
    
    private static final AtomicBoolean running = new AtomicBoolean(true);
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Core configurations
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        // Initial broker list; consumer discovers full cluster from metadata
        
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-service");
        // Consumer group identifier
        // Partitions are assigned among consumers with same group.id
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        // Deserializer for message keys; must match producer's key serializer
        
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        // Deserializer for message values; must match producer's value serializer
        
        // Consumer behavior
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // false = manual offset commit (recommended for exactly-once)
        // true = auto-commit every auto.commit.interval.ms (default 5000ms)
        // Auto-commit may lose messages if processing fails after commit
        
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // earliest: read from beginning if no committed offset exists
        // latest: read from end (new messages only)
        // none: throw exception if no committed offset
        
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // Maximum records returned per poll() call
        // Lower values reduce poll loop latency
        // Higher values improve throughput
        
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        // Maximum time between poll() calls before consumer is evicted
        // If processing takes longer, consumer is considered dead
        // Increase for slow processing; decrease for fast detection
        
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
        // Maximum time without heartbeat before consumer is considered dead
        // Detected by group coordinator; triggers rebalance
        
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
        // Heartbeat sent to coordinator periodically
        // Should be <= session.timeout.ms / 3 for reliable detection
        
        // Isolation level for transactional reads
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        // read_committed: only see records from committed transactions
        // read_uncommitted: see all records including uncommitted
        // Required for exactly-once semantics end-to-end
        
        // Partition assignment strategy
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
            "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
        // CooperativeStickyAssignor: incremental rebalancing (recommended)
        //   Only revokes partitions that need to move
        // RangeAssignor: assigns contiguous partition ranges (default)
        // RoundRobinAssignor: distributes partitions evenly
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Subscribe to topics with rebalance listener
        consumer.subscribe(Arrays.asList("order-events", "payment-events"), 
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Called before partitions are revoked (rebalance start)
                    // Commit pending offsets to avoid reprocessing
                    System.out.println("Partitions revoked: " + partitions);
                    consumer.commitSync(Duration.ofSeconds(30));
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // Called after new partitions are assigned
                    // Seek to specific offsets if needed
                    System.out.println("Partitions assigned: " + partitions);
                }
            });
        
        try {
            while (running.get()) {
                // poll() fetches records and maintains group membership
                // Timeout controls how long to wait if no records available
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    processRecord(record);
                }
                
                // Commit offsets after successful processing
                // commitSync: blocks until successful or timeout
                // commitAsync: non-blocking, may fail silently
                if (!records.isEmpty()) {
                    consumer.commitSync(Duration.ofSeconds(30));
                }
            }
        } finally {
            // Close releases consumer from group, triggers rebalance
            // May want to delay close for graceful shutdown
            consumer.close();
        }
    }
    
    private static void processRecord(ConsumerRecord<String, String> record) {
        System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
            record.topic(), record.partition(), record.offset(), 
            record.key(), record.value());
        // Access record.timestamp() for event-time processing
        // Access record.headers() for message metadata
    }
}

Python Producer Example

from kafka import KafkaProducer
import json
import time

# Initialize producer with configuration
producer = KafkaProducer(
    bootstrap_servers=['kafka-broker-0:9092', 'kafka-broker-1:9092'],
    # Comma-separated broker addresses for metadata discovery
    
    key_serializer=lambda k: k.encode('utf-8'),
    # Serialize key to bytes; determines partition via hash
    
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    # Serialize value to JSON bytes
    
    acks='all',
    # Wait for all ISR replicas to acknowledge
    # 'none' = 0 (no ack), 'one' = 1 (leader only), 'all' = all ISR
    
    retries=5,
    # Retry transient failures up to 5 times
    
    retry_backoff_ms=100,
    # 100ms delay between retries
    
    batch_size=65536,
    # 64KB batch size for network efficiency
    
    linger_ms=10,
    # Wait up to 10ms to fill batch
    
    compression_type='lz4',
    # Compress batches with lz4 (good balance of speed/ratio)
    
    max_in_flight_requests_per_connection=5,
    # Allow 5 concurrent requests per broker (pipelining)
    
    enable_idempotence=True,
    # Prevent duplicate messages during retries
    
    buffer_memory=33554432,
    # 32MB buffer for pending records
    
    request_timeout_ms=30000,
    # 30s timeout for broker responses
)

# Send messages with callback
for i in range(1000):
    key = f"order-{i % 100}"
    value = {
        "orderId": str(i),
        "amount": round(1000.0, 2),
        "timestamp": int(time.time() * 1000)
    }
    
    future = producer.send(
        topic='order-events',
        key=key,
        value=value,
        headers=[
            ("correlation-id", str(i).encode('utf-8')),
            ("source", b"python-producer")
        ]
    )
    
    # Add callback for async result handling
    future.add_callback(lambda metadata: print(
        f"Sent to partition {metadata.partition()}, "
        f"offset {metadata.offset()}, "
        f"timestamp {metadata.timestamp}"
    ))
    
    future.add_errback(lambda exc: print(f"Error: {exc}"))

# Flush ensures all pending records are sent
producer.flush()
producer.close()

Python Consumer Example

from kafka import KafkaConsumer
import json

# Initialize consumer with configuration
consumer = KafkaConsumer(
    'order-events',
    # Topic(s) to subscribe to; can be list for multiple topics
    
    bootstrap_servers=['kafka-broker-0:9092', 'kafka-broker-1:9092'],
    # Initial broker list
    
    group_id='order-processing-service',
    # Consumer group for partition assignment
    
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    # Deserialize key from bytes
    
    value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v else None,
    # Deserialize value from JSON bytes
    
    auto_offset_reset='earliest',
    # 'earliest': read from beginning if no offset
    # 'latest': read from end if no offset
    # 'none': raise exception if no offset
    
    enable_auto_commit=False,
    # Disable auto-commit for manual offset management
    
    isolation_level='read_committed',
    # Only read committed transactional records
    
    max_poll_records=500,
    # Max records per poll() call
    
    max_poll_interval_ms=300000,
    # Max time between polls before eviction (300s)
    
    session_timeout_ms=45000,
    # Consumer failure detection timeout
    
    heartbeat_interval_ms=15000,
    # Heartbeat frequency to coordinator
    
    fetch_min_bytes=1,
    # Minimum bytes to fetch (batch optimization)
    
    fetch_max_wait_ms=500,
    # Max wait if fetch_min_bytes not reached
)

# Subscribe with rebalance listener
def on_partitions_revoked(partitions):
    print(f"Partitions revoked: {partitions}")
    consumer.commitSync()

def on_partitions_assigned(partitions):
    print(f"Partitions assigned: {partitions}")

consumer.subscribe(
    topics=['order-events', 'payment-events'],
    listener=ConsumerRebalanceListener(
        on_partitions_revoked=on_partitions_revoked,
        on_partitions_assigned=on_partitions_assigned
    )
)

# Process records
try:
    while True:
        records = consumer.poll(timeout_ms=100)
        
        for tp, records_list in records.items():
            for record in records_list:
                print(f"Topic: {record.topic}, "
                      f"Partition: {record.partition}, "
                      f"Offset: {record.offset}, "
                      f"Key: {record.key}, "
                      f"Value: {record.value}")
                
                # Process record here
                # After processing, commit offset
                consumer.commitSync({
                    tp: OffsetAndMetadata(record.offset + 1, "processed")
                })
                
except KeyboardInterrupt:
    consumer.close()

Performance Metrics

MetricSingle Broker3-Broker Cluster6-Broker Cluster
Throughput (writes/sec)100K300K600K
Throughput (reads/sec)200K600K1.2M
Latency (p99)5ms10ms15ms
Latency (p999)15ms25ms40ms
Message Size (avg)1KB1KB1KB
Replication Lag0ms<100ms<200ms
Recovery Time030s60s
Disk Usage1TB3TB6TB
Memory (JVM Heap)6GB6GB6GB
Network Bandwidth1Gbps3Gbps6Gbps
Broker ConfigurationDefaultRecommendedImpact
num.network.threads38Network I/O throughput
num.io.threads816Disk I/O throughput
socket.send.buffer.bytes100KB1MBNetwork send throughput
socket.receive.buffer.bytes100KB1MBNetwork receive throughput
log.flush.interval.messagesLong.MAX_VALUE10000Durability vs performance
log.retention.hours168 (7 days)VariesStorage cost

Best Practices

  1. Partition Count Planning: Start with partitions = (target_throughput / broker_throughput) * replication_factor. Monitor consumer lag to determine if rebalancing is needed. Avoid over-partitioning (max 200K partitions per cluster for ZooKeeper, 2M for KRaft).

  2. Replication Factor: Always use replication-factor β‰₯ 3 for production topics. Set min.insync.replicas=2 to ensure at least two replicas acknowledge writes before acknowledging to the producer. This prevents data loss during single broker failure.

  3. Consumer Group Sizing: Ensure number of consumers ≀ number of partitions. More consumers than partitions results in idle consumers. Use CooperativeStickyAssignor for incremental rebalancing to minimize pause time during scaling events.

  4. Offset Management: Disable auto-commit (enable.auto.commit=false) and commit offsets explicitly after successful processing to avoid data loss or duplicates. Use commitSync for critical offsets, commitAsync for bulk commits.

  5. Broker Configuration: Use dedicated disks for Kafka data (avoid shared with OS). Configure num.io.threads and num.network.threads based on CPU cores. Set log.flush.interval.messages and log.flush.interval.ms carefully to balance durability with performance.

  6. Topic Configuration: Use cleanup.policy=compact for topics that need to retain latest state. Set retention.ms and retention.bytes appropriately based on data volume and replay requirements. Use compression.type=lz4 for optimal throughput-to-latency ratio.

  7. Monitoring: Monitor consumer lag, ISR shrink/expand events, under-replicated partitions, and disk usage. Use tools like Burrow, Cruise Control, or Confluent Control Center. Set up alerts for ISR shrinks and under-replicated partitions.

  8. Security: Enable SSL/TLS for inter-broker and client-broker communication. Use SASL for authentication (SCRAM-SHA-256 or SASL/PLAIN). Implement ACLs for authorization. Use separate listeners for internal and external traffic.

  9. ZooKeeper/KRaft: For clusters > 100K partitions, consider migrating to KRaft mode to eliminate ZooKeeper dependency. Use dedicated ZooKeeper nodes (3 or 5) with sufficient I/O capacity. KRaft provides faster controller failover and better metadata throughput.

  10. Data Retention: Implement tiered storage for long-term retention. Use log compaction for topics that serve as materialized views. Archive to object storage (S3, GCS) for compliance requirements. Set retention.bytes per partition to cap storage usage.

Key Takeaways:

  • Kafka partitions are the unit of parallelism; throughput scales linearly with partition count up to broker limits
  • ISR mechanism balances durability (acks=all) with availability; min.insync.replicas prevents unavailability during broker failures
  • Consumer groups map partitions 1:1 to consumers for parallel consumption; beyond N consumers, extras are idle
  • Ordering is guaranteed only within a partition; cross-partition ordering requires key-based routing
  • Replication lag should be monitored and kept under replica.lag.time.max.ms to avoid ISR shrinkage
  • Log segments use append-only files with index files for efficient offset lookup; page cache provides read acceleration

See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming) | Kafka Producer API (kafka/02) | Exactly-Once Semantics (kafka/04)

See Also

⭐

Premium Content

Apache Kafka Architecture: A Deep Dive into Distributed Event Streaming

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement