πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Internals: Partitions, Replicas, ISR, Controller

Apache KafkaArchitecture⭐ Premium

Advertisement

Kafka Internals: Partitions, Replicas, ISR, Controller

Difficulty: Expert | Asked at: Google, LinkedIn, Confluent, Uber

ℹ️Interview Context

This question tests your understanding of Kafka's core architecture. Senior and Staff-level candidates are expected to explain partition leadership, the ISR mechanism, and controller failover with confidence.

The Question

Explain the internal architecture of Apache Kafka. How do partitions, replicas, the ISR (In-Sync Replicas) set, and the controller interact? Describe what happens when a broker fails and how the system maintains data consistency.

Core Architecture Overview

Kafka's architecture is built on a distributed log abstraction. At the highest level:

Architecture Diagram
Producer β†’ Broker Cluster β†’ Consumer Group
              ↓
    Controller (one broker)
    ↓
    Partition Leaders + Follows

Partition Model

Each topic is divided into P partitions. Partition assignment follows:

partition=hash(key)mod  P\text{partition} = \text{hash(key)} \mod P

For a topic with replication factor RR, each partition has RR replicas spread across different brokers. One replica is the leader and Rβˆ’1R-1 are followers.

Architecture Diagram
Topic: orders (P=3, R=3)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Partition 0    Partition 1    Partition 2      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚ B1 (L)  β”‚   β”‚ B2 (L)  β”‚   β”‚ B3 (L)  β”‚      β”‚
β”‚  β”‚ B2 (F)  β”‚   β”‚ B3 (F)  β”‚   β”‚ B1 (F)  β”‚      β”‚
β”‚  β”‚ B3 (F)  β”‚   β”‚ B1 (F)  β”‚   β”‚ B2 (F)  β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

⚠️Key Insight

All reads and writes go through the partition leader. Followers exist purely for fault tolerance and read scaling (when configured).

In-Sync Replicas (ISR)

The ISR is the set of replicas that are fully caught up with the leader. Formally:

ISR={r∈Replicas:lag(r)≀replica.lag.time.max.ms}\text{ISR} = \{r \in \text{Replicas} : \text{lag}(r) \leq \text{replica.lag.time.max.ms}\}

Where lag is measured as the time difference between the leader's last write and the follower's replication offset.

Key properties of ISR:

  • Every replica starts in the ISR
  • A replica is removed if it falls behind by replica.lag.time.max.ms (default 30s)
  • A replica is re-added once it catches up
  • The ISR must be acknowledged for acks=all

ISR Shrinking and Expansion Events

# Pseudocode for ISR management
class ISRManager:
    def __init__(self, leader, replicas, lag_threshold_ms=30000):
        self.leader = leader
        self.replicas = replicas
        self.isr = set(replicas)
        self.lag_threshold = lag_threshold_ms
    
    def check_isr(self, current_time_ms):
        """Called periodically by the leader"""
        new_isr = set()
        for replica in self.replicas:
            last_caught_up = replica.last_fetch_time_ms
            if current_time_ms - last_caught_up <= self.lag_threshold:
                new_isr.add(replica)
        
        removed = self.isr - new_isr
        added = new_isr - self.isr
        
        if removed:
            log.warn(f"ISR shrinking: removed {removed}")
        if added:
            log.info(f"ISR expanding: added {added}")
        
        self.isr = new_isr
        return self.isr

Controller Election

The controller is a single broker responsible for:

  1. Partition leader election
  2. Topic creation/deletion
  3. Broker registration/deregistration
  4. ISR management coordination

Controller Election Protocol

Architecture Diagram
1. All brokers watch /controller in ZooKeeper
2. When controller dies:
   a. ZooKeeper ephemeral node /controller deleted
   b. All brokers receive watch notification
   c. Each broker attempts sequential write to /controller
   d. First successful writer becomes controller
3. New controller:
   a. Reads all partition metadata
   b. For each partition where leader was dead:
      - Selects new leader from ISR
      - Updates /brokers/topics/<topic>/partitions/<p>/state
   c. Broadcasts leadership changes

⚠️ZooKeeper vs KRaft

In KRaft mode (Kafka 3.4+), the controller quorum replaces ZooKeeper entirely. The metadata log is replicated across controller nodes using Raft consensus, eliminating the single-point-of-failure concern of ZK.

Leader Election Process

When a partition leader fails, the controller initiates election:

class LeaderElection:
    def __init__(self, partition, isr_list, preferred_replicas):
        self.partition = partition
        self.isr = isr_list
        self.preferred = preferred_replicas
    
    def elect_leader(self):
        """Offline leader election"""
        # Priority: preferred replica order within ISR
        for replica in self.preferred:
            if replica in self.isr and replica.is_alive():
                return replica
        
        # Fallback: any live ISR member
        for replica in self.isr:
            if replica.is_alive():
                return replica
        
        # No available replica
        raise NoAvailableLeader(self.partition)
    
    def elect_with_unclean_leader(self):
        """Unclean leader election (dangerous)"""
        # Allows non-ISR replica to become leader
        # Risk: data loss
        for replica in self.partition.replicas:
            if replica.is_alive():
                return replica
        raise NoAvailableLeader(self.partition)

Leader Election Timeline

Architecture Diagram
T+0ms:    Leader broker (B1) crashes
T+0-50ms: ZooKeeper session timeout / heartbeat failure
T+50ms:   Controller notified via Watcher
T+50-100ms: Controller reads partition state from ZK
T+100-150ms: Controller selects new leader from ISR
T+150-200ms: Controller writes new state to ZK
T+200-250ms: New leader begins serving requests
T+250ms:   Producers/consumers updated via metadata refresh

Replication Protocol

Followers pull data from the leader using a fetch-based protocol:

FetchSize=min⁑(available,max.partition.fetch.bytes)\text{FetchSize} = \min(\text{available}, \text{max.partition.fetch.bytes})
class ReplicationFetcher:
    def __init__(self, leader, follower, max_fetch_bytes=1048576):
        self.leader = leader
        self.follower = follower
        self.max_fetch_bytes = max_fetch_bytes
        self.fetch_offset = follower.log_end_offset
    
    def fetch_loop(self):
        while self.follower.is_running():
            # Request batch from leader
            fetch_request = FetchRequest(
                topic=self.partition.topic,
                partition=self.partition.id,
                offset=self.fetch_offset,
                max_bytes=self.max_fetch_bytes
            )
            
            response = self.leader.fetch(fetch_request)
            
            if response.has_data():
                # Append to local log
                bytes_written = self.follower.append(
                    response.records,
                    response.offset
                )
                
                # Update high watermark
                if response.high_watermark > self.follower.high_watermark:
                    self.follower.high_watermark = response.high_watermark
                
                self.fetch_offset = response.high_watermark
            
            time.sleep(0.1)  # Backoff if no data

High Watermark and Data Consistency

The high watermark (HW) is the offset of the last committed message that all ISR members have replicated:

HW=min⁑(log.end.offset(r))βˆ€r∈ISR\text{HW} = \min(\text{log.end.offset}(r)) \quad \forall r \in \text{ISR}
Architecture Diagram
Leader Log:    [0] [1] [2] [3] [4] [5]
HW = 4:        ─────────────────────┐
                                     β”‚
Follower A:    [0] [1] [2] [3] [4] [5]  (caught up)
Follower B:    [0] [1] [2] [3]          (behind)
HW = 4 because Follower B only has up to offset 3
Wait: HW = min(5, 4) = 4? No, HW = min(end offsets of ISR)
Follower B end = 3, so HW = 3

ℹ️Read Visibility

Consumers with isolation.level=read_committed only see messages up to the high watermark. This prevents reading uncommitted or partially replicated data.

Complete Example: Producer-Acks-Broker Flow

from kafka import KafkaProducer
from kafka.errors import NotEnoughReplicas, NotEnoughReplicasAfterAppend
import time

def produce_with_acks():
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        acks='all',              # Wait for all ISR
        retries=5,
        max_in_flight_requests_per_connection=1,  # Ordering
        enable_idempotence=True,  # Exactly-once
        linger_ms=5,              # Batch for 5ms
        batch_size=16384,         # 16KB batches
        compression_type='snappy',
        request_timeout_ms=30000
    )
    
    for i in range(1000):
        message = f"event-{i}"
        future = producer.send(
            topic='user-events',
            key=f"user-{i % 100}".encode(),
            value=message.encode(),
            headers=[('source', b'web-app')]
        )
        
        try:
            record_metadata = future.get(timeout=30)
            print(f"Sent to partition {record_metadata.partition} "
                  f"offset {record_metadata.offset}")
        except NotEnoughReplicas:
            print("Not enough replicas available")
        except NotEnoughReplicasAfterAppend:
            print("Not enough replicas after append")
        except Exception as e:
            print(f"Produce failed: {e}")
    
    producer.flush()
    producer.close()

Broker Failure Scenarios Matrix

ScenarioImpactRecovery TimeData Loss Risk
Follower failureISR shrinks, writes continueMinutes (auto-rejoin)None
Leader failurePartition unavailableSeconds (controller election)None (if unclean=false)
Controller failureNew controller electedSeconds (ZK election)None
Multiple ISR failuresMay block writes if acks=allUntil replicas rejoinNone (or data loss if unclean=true)
Rack failureMultiple brokers downManual intervention possibleDepends on replica placement

Performance Formula

The effective write throughput is bounded by:

Throughputwrite=min⁑(Producerrate,Brokerdisk_bandwidthR,Networkbandwidth)\text{Throughput}_{write} = \min\left(\text{Producer}_{rate}, \frac{\text{Broker}_{disk\_bandwidth}}{R}, \text{Network}_{bandwidth}\right)

Where RR is the replication factor because each write is replicated RR times.

KRaft Mode Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Controller Quorum (3 nodes)       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚ C1  β”‚  β”‚ C2  β”‚  β”‚ C3  β”‚  ← Raft Log   β”‚
β”‚  β”‚(L)  β”‚  β”‚(F)  β”‚  β”‚(F)  β”‚                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜                β”‚
β”‚       ↓ Metadata Changes ↓                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           Broker Quorum (N nodes)           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚ B1  β”‚  β”‚ B2  β”‚  β”‚ B3  β”‚  β”‚ B4  β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ℹ️Production Tip

In KRaft mode, metadata changes are committed via Raft consensus in ~10-50ms, compared to ZooKeeper's 100-200ms. This significantly reduces partition leader election times and improves cluster scalability.

Key Metrics to Monitor

# Critical Kafka metrics for architecture health
metrics_to_watch = {
    # ISR metrics
    "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec": "ISR shrink rate",
    "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec": "ISR expand rate",
    "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions": "Under-replicated count",
    
    # Controller metrics
    "kafka.controller:type=KafkaController,name=ActiveControllerCount": "Controller active",
    "kafka.controller:type=KafkaController,name=OfflinePartitionsCount": "Offline partitions",
    "kafka.controller:type=ControllerChannelManager,name=QueueSize": "Controller queue",
    
    # Partition metrics
    "kafka.server:type=ReplicaManager,name=PartitionCount": "Total partitions",
    "kafka.log:type=Log,name=Size": "Log size per partition"
}

Common Follow-Up Questions

  1. Why does Kafka use pull-based replication instead of push?

    • Pull allows followers to control fetch rate
    • Avoids overwhelming slow followers
    • Enables natural backpressure
    • Simplifies leader logic (stateless replication)
  2. What happens during a controlled shutdown?

    • Broker moves leadership of its partitions to other ISR members first
    • Then shuts down cleanly
    • No data loss, minimal unavailability window
  3. How does rack awareness work?

    • Brokers configured with broker.rack property
    • Controller ensures replicas span different racks
    • Protects against rack-level failures

⚠️Critical Concept

The ISR mechanism is what makes Kafka's acks=all both safe and performant. If ISR always contained all replicas, it would be equivalent to synchronous replication. The ISR allows Kafka to be flexibleβ€”sometimes synchronous (all replicas in sync), sometimes asynchronous (some replicas lagging).

Advertisement