πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka: Partitions, Consumer Groups, Exactly-Once

Data EngineeringStream Processing⭐ Premium

Advertisement

LinkedIn & Uber Interview

Kafka: Partitions, Consumer Groups, Exactly-Once

Deep dive into Apache Kafka internals and best practices

Interview Question

"You have a Kafka topic 'orders' with 12 partitions. A consumer group 'order-processor' has 8 consumers. Explain: (1) How are messages distributed? (2) What happens if you add 4 more consumers? (3) What happens if a consumer crashes? (4) How do you achieve exactly-once processing?"

Difficulty: Hard | Frequently asked at LinkedIn, Uber, Netflix, Stripe


Theoretical Foundation

Kafka Architecture

Kafka Cluster β€” Topic: orders (3 partitions, RF: 3)Broker 1P0 (Leader)P1 (Follower)P2 (Follower)Broker 2P0 (Follower)P1 (Leader)P2 (Follower)Broker 3P0 (Follower)P1 (Follower)P2 (Leader)

Partitions

A partition is an ordered, immutable sequence of records. Each record has a sequential id called an offset.

Topic: orders (Partition 0)0msg1msg2msg3msg4msg5msg6msg7msgOffset 0Offset 7

Key properties:

  • Ordering: Guaranteed within a partition (not across partitions)
  • Immutability: Records cannot be modified after writing
  • Append-only: New records are added to the end
  • Retention: Records are retained for a configurable time

Partition assignment:

partition=hash(key)mod  Npartitions\text{partition} = \text{hash(key)} \mod N_{\text{partitions}}

For the orders topic with 12 partitions:

  • Message with key "user_123" β†’ partition hash("user_123") % 12
  • Same key always goes to same partition (ordering guarantee)

Consumer Groups

A consumer group is a set of consumers that jointly consume a topic. Each partition is assigned to exactly one consumer in the group.

Consumer Group: order-processor (8 consumers, 12 partitions)Consumer 1P0, P4Consumer 2P1, P5Consumer 3P2, P6Consumer 4P3, P7Consumer 5P8Consumer 6P9Consumer 7P10Consumer 8P11

Assignment algorithm:

  1. Sort partitions numerically
  2. Sort consumers lexicographically
  3. Assign partitions round-robin

For 12 partitions, 8 consumers:

  • Consumers 1-4: 2 partitions each (P0-P7)
  • Consumers 5-8: 1 partition each (P8-P11)

Actually, the correct distribution:

  • Consumer 0: P0, P12 (but we only have 12 partitions, so P0, P12 doesn't exist)
  • Let me recalculate: 12 partitions / 8 consumers = 1.5 partitions per consumer
  • Consumers 0-3: 2 partitions each (P0-P7)
  • Consumers 4-7: 1 partition each (P8-P11)

Consumer Group Coordination

Consumer Group CoordinationKafka Broker (Group Coordinator)Group ID: order-processor | State: Stable | Members: 8 | Generation: 5Heartbeat ProtocolConsumer β†’ Broker: Heartbeat every 3s | If no heartbeat for 45s β†’ Consumer removed

Exactly-Once Semantics (EOS)

Exactly-once processing ensures each message is processed exactly once, despite failures.

Three components:

  1. Producer Idempotency:
# Producer config
enable.idempotence=true
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=5
  1. Transactional Producer:
producer.initTransactions();
producer.beginTransaction();
try {
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}
  1. Transactional Consumer:
# Consumer config
isolation.level=read_committed

Transaction Flow:

Exactly-Once Transaction Flow1. Producer β†’ Broker: InitTransactions2. Producer β†’ Broker: BeginTransaction3. Producer β†’ Broker: Send messages (to internal topic)4. Producer β†’ Broker: AddOffsetsToTransaction + CommitTransactionOn failure: Transaction aborted, consumer sees either all messages or none

Offset Management

Consumer offsets track the last processed message per partition.

Consumer Group: order-processor β€” Offset ManagementPartitionConsumerOffsetStatus0Cons-11234Committed1Cons-11235Committed2Cons-21236Committed............11Cons-81245Committed

Offset commit strategies:

  1. Auto-commit: Kafka commits offsets automatically
  2. Manual commit: Application commits after processing
  3. Exactly-once commit: Transactional commit with processing

Rebalancing

When consumers join or leave, partitions are reassigned.

Architecture Diagram
Before Rebalance (6 consumers, 12 partitions):
Consumer 0: P0, P1, P2
Consumer 1: P3, P4, P5
Consumer 2: P6, P7, P8
Consumer 3: P9, P10, P11
Consumer 4: (idle)
Consumer 5: (idle)

After Consumer 4 joins (7 consumers, 12 partitions):
Consumer 0: P0, P1
Consumer 1: P2, P3
Consumer 2: P4, P5
Consumer 3: P6, P7
Consumer 4: P8, P9
Consumer 5: P10, P11
Consumer 6: (idle)

Rebalance triggers:

  • Consumer joins group
  • Consumer leaves group (graceful or crash)
  • Consumer heartbeat timeout (45 seconds)
  • Topic partitions change (admin operation)
  • New topic added to group

Rebalance protocols:

  1. Eager: Stop-the-world rebalance (all consumers stop)
  2. Cooperative (Incremental): Only affected partitions are reassigned
  3. Static group membership: Consumers keep partitions across restarts

⚠️

Common Interview Trap: Many candidates don't understand that rebalancing causes processing pauses. During rebalance, all consumers stop processing. For low-latency applications, use cooperative rebalancing or static group membership.

Partition Reassignment Strategies

RangeAssignor (default):

Architecture Diagram
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Consumers: [C0, C1, C2, C3]

C0: [0, 1, 2]  (3 partitions)
C1: [3, 4, 5]  (3 partitions)
C2: [6, 7, 8]  (3 partitions)
C3: [9, 10, 11] (3 partitions)

RoundRobinAssignor:

Architecture Diagram
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Consumers: [C0, C1, C2, C3]

C0: [0, 4, 8]   (3 partitions)
C1: [1, 5, 9]   (3 partitions)
C2: [2, 6, 10]  (3 partitions)
C3: [3, 7, 11]  (3 partitions)

StickyAssignor:

  • Minimizes partition movement during rebalance
  • Preserves existing assignments when possible

CooperativeStickyAssignor:

  • Incremental rebalance
  • Only moves partitions that need to move

Code Implementation

Kafka Producer Configuration

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.partitioner import RoundRobinPartitioner
import json
import time

# Create topic
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')

topics = [
    NewTopic(
        name='orders',
        num_partitions=12,
        replication_factor=3,
        config={
            'retention.ms': str(7 * 24 * 60 * 60 * 1000),  # 7 days
            'segment.ms': str(24 * 60 * 60 * 1000),  # 1 day
            'cleanup.policy': 'delete'
        }
    )
]

admin_client.create_topics(new_topics=topics)

# Producer with idempotency
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    enable_idempotence=True,
    retries=2147483647,
    max_in_flight_requests_per_connection=5,
    linger_ms=10,
    batch_size=32768,
    compression_type='snappy'
)

# Send messages
for i in range(1000000):
    order = {
        'order_id': f'order_{i}',
        'user_id': f'user_{i % 1000}',
        'amount': float(i * 10),
        'timestamp': time.time()
    }
    
    # Partition by user_id for ordering guarantee
    producer.send('orders', key=order['user_id'], value=order)
    
    if i % 10000 == 0:
        producer.flush()
        print(f"Sent {i} messages")

producer.flush()

Kafka Consumer Configuration

from kafka import KafkaConsumer
from kafka import TopicPartition
import json

# Consumer with exactly-once semantics
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='kafka:9092',
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    isolation_level='read_committed',
    max_poll_records=500,
    max_poll_interval_ms=300000,
    session_timeout_ms=30000,
    heartbeat_interval_ms=10000
)

# Process messages with manual offset commit
def process_messages(consumer):
    try:
        while True:
            # Poll for messages
            messages = consumer.poll(timeout_ms=1000)
            
            for tp, msgs in messages.items():
                for msg in msgs:
                    # Process message
                    order = json.loads(msg.value.decode('utf-8'))
                    process_order(order)
                
                # Commit offset after processing all messages in partition
                consumer.commit()
                
    except KeyboardInterrupt:
        consumer.close()

def process_order(order):
    """Process a single order"""
    # Business logic here
    print(f"Processing order: {order['order_id']}")

Transactional Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

# Transactional producer
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    transactional_id='order-processor-tx-1'  # Unique per producer instance
)

# Initialize transactions
producer.init_transactions()

def process_order_with_transaction(order, consumer):
    """Process order with transactional guarantees"""
    
    try:
        # Begin transaction
        producer.begin_transaction()
        
        # Process order (business logic)
        enriched_order = enrich_order(order)
        
        # Send to output topic
        producer.send('enriched-orders', key=order['user_id'], value=enriched_order)
        
        # Send to audit topic
        producer.send('order-audit', key=order['order_id'], value={
            'order_id': order['order_id'],
            'action': 'processed',
            'timestamp': time.time()
        })
        
        # Commit consumer offset in same transaction
        producer.send_offsets_to_transaction(
            consumer.position(consumer.assignment()),
            consumer.group_metadata()
        )
        
        # Commit transaction
        producer.commit_transaction()
        
    except KafkaError as e:
        # Abort transaction on error
        producer.abort_transaction()
        raise e

def enrich_order(order):
    """Enrich order with additional data"""
    # Add user info, product details, etc.
    return {**order, 'enriched': True}

Consumer Group with Cooperative Rebalancing

from kafka import KafkaConsumer
from kafka.coordinator.assignors import CooperativeStickyAssignor

# Consumer with cooperative rebalancing
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='kafka:9092',
    group_id='order-processor',
    partition_assignment_strategy=[CooperativeStickyAssignor()],
    session_timeout_ms=45000,
    heartbeat_interval_ms=15000,
    max_poll_interval_ms=300000
)

# Track partition assignments
def on_partitions_assigned(partitions):
    print(f"Partitions assigned: {partitions}")

def on_partitions_revoked(partitions):
    print(f"Partitions revoked: {partitions}")

consumer.on_partitions_assigned = on_partitions_assigned
consumer.on_partitions_revoked = on_partitions_revoked

Monitoring and Metrics

from kafka import KafkaConsumer
from prometheus_client import start_http_server, Gauge
import time

# Prometheus metrics
CONSUMER_LAG = Gauge('kafka_consumer_lag', 'Consumer lag', ['topic', 'partition'])
CONSUMER_RATE = Gauge('kafka_consumer_rate', 'Messages per second', ['topic'])
CONSUMER_ERRORS = Gauge('kafka_consumer_errors', 'Processing errors', ['topic'])

# Start Prometheus metrics server
start_http_server(8000)

def monitor_consumer(consumer):
    """Monitor consumer health and lag"""
    
    while True:
        # Get partition assignments
        assignments = consumer.assignment()
        
        for tp in assignments:
            # Get current position (last processed offset)
            position = consumer.position(tp)
            
            # Get end offset (latest offset in partition)
            end_offsets = consumer.end_offsets([tp])
            end_offset = end_offsets[tp]
            
            # Calculate lag
            lag = end_offset - position
            
            # Update Prometheus metrics
            CONSUMER_LAG.labels(topic=tp.topic, partition=tp.partition).set(lag)
        
        # Log lag if too high
        total_lag = sum(CONSUMER_LAG._metrics.values())
        if total_lag > 10000:
            print(f"WARNING: High consumer lag: {total_lag}")
        
        time.sleep(10)

Performance Tuning

# ============================================================
# PERFORMANCE TUNING
# ============================================================

# Producer tuning
producer_config = {
    # Batching
    'linger.ms': 100,  # Wait up to 100ms to batch messages
    'batch.size': 65536,  # 64KB batch size
    'buffer.memory': 33554432,  # 32MB buffer
    
    # Compression
    'compression.type': 'snappy',  # or lz4, zstd
    
    # Acknowledgments
    'acks': 'all',  # Wait for all replicas
    'retries': 2147483647,
    
    # Idempotency
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    
    # Network
    'send.buffer.bytes': 131072,  # 128KB
    'receive.buffer.bytes': 131072,  # 128KB
}

# Consumer tuning
consumer_config = {
    # Polling
    'max.poll.records': 1000,
    'max.poll.interval.ms': 300000,
    
    # Fetching
    'fetch.min.bytes': 1,
    'fetch.max.wait.ms': 500,
    'max.partition.fetch.bytes': 1048576,  # 1MB
    
    # Session
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 10000,
    
    # Offsets
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

πŸ’‘

Production Tip: Always monitor these Kafka metrics:

  1. Consumer lag: How far behind consumers are
  2. Under-replicated partitions: Replication health
  3. Request latency: Producer/consumer latency
  4. Disk usage: Broker storage health
  5. Network throughput: Bandwidth utilization

Common Follow-Up Questions

Q1: What happens when you have more consumers than partitions?

Extra consumers are idle. For 12 partitions and 16 consumers:

  • 12 consumers get 1 partition each
  • 4 consumers are idle (no partitions assigned)

This is wasteful. Solution: Increase partitions or reduce consumers.

Q2: How do you handle ordering across partitions?

Kafka only guarantees ordering within a partition. For cross-partition ordering:

  1. Use a single partition (loses parallelism)
  2. Use a global ordering service (complex)
  3. Accept eventual consistency (most common)

Q3: What's the difference between at-least-once, at-most-once, and exactly-once?

SemanticsDuplicatesData LossComplexity
At-most-onceNoYesLow
At-least-onceYesNoMedium
Exactly-onceNoNoHigh

Q4: How do you handle schema evolution in Kafka?

# Using Avro with Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# Define schema
schema_str = """
{
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "user_id", "type": "string"}
    ]
}
"""

# Producer with schema
avroProducer = AvroProducer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': 'http://schema-registry:8081'
})

# Schema evolution: Add new field with default
schema_v2 = """
{
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "user_id", "type": "string"},
        {"name": "currency", "type": "string", "default": "USD"}
    ]
}
"""

⚠️

Critical Consideration: Kafka's exactly-once semantics only works within the Kafka ecosystem (Kafka Streams, Transactional Producers). For external systems, you need idempotent operations and transactional outbox patterns.


Company-Specific Tips

LinkedIn Interview Tips

  • Discuss Kafka's origin at LinkedIn
  • Explain consumer group coordination
  • Mention schema evolution with Schema Registry
  • Talk about Kafka Connect for data integration

Uber Interview Tips

  • Focus on real-time ride matching with Kafka
  • Discuss exactly-once for payment processing
  • Mention Kafka Streams for real-time aggregations
  • Talk about dead letter queues for error handling

Netflix Interview Tips

  • Discuss content delivery with Kafka
  • Explain A/B testing event streaming
  • Mention viewing history real-time processing
  • Talk about multi-region Kafka deployment

ℹ️

Final Takeaway: Kafka is more than just a message queueβ€”it's a distributed streaming platform. Master partitions, consumer groups, and exactly-once semantics to design robust real-time systems. Always consider: (1) partition count, (2) consumer count, (3) rebalancing strategy, and (4) offset management.

Advertisement