Consumer Groups: Rebalancing, Static Groups, Cooperative
Difficulty: Senior | Asked at: LinkedIn, Uber, Airbnb, Spotify
βΉοΈInterview Context
Consumer group rebalancing is one of the most misunderstood Kafka concepts. Interviewers want to see you understand the different rebalance protocols and when to use each one.
The Question
Explain how Kafka consumer group rebalancing works. What are the different rebalance protocols? How do static group membership and cooperative rebalancing improve system stability?
Consumer Group Architecture
ββββββββββββββββββββββββββββ
β Consumer Group β
β group.id: "orders" β
ββββββββββββββββββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
β β β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Consumer 1 β β Consumer 2 β β Consumer 3 β
β β β β β β
β P0, P1 β β P2, P3 β β P4, P5 β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
Topic: orders (6 partitions)
Partition assignment: RangeAssignor or RoundRobinAssignor
Rebalance Trigger Events
# Rebalance is triggered by:
triggers = {
"consumer_join": "New consumer joins the group",
"consumer_leave": "Consumer crashes or sends LeaveGroup",
"heartbeat_timeout": "Consumer fails to heartbeat within session.timeout.ms",
"subscribe_change": "Consumer changes topic subscription",
"partition_change": "Topic partitions added/removed",
"group_coordinator_change": "Coordinator broker changes"
}
# Heartbeat mechanism
class ConsumerHeartbeat:
def __init__(self, session_timeout_ms=30000, heartbeat_interval_ms=3000):
self.session_timeout = session_timeout_ms
self.heartbeat_interval = heartbeat_interval_ms
self.last_heartbeat = time.time()
def start_heartbeat_loop(self):
while self.running:
try:
self.send_heartbeat()
self.last_heartbeat = time.time()
except Exception as e:
log.error(f"Heartbeat failed: {e}")
time.sleep(self.heartbeat_interval / 1000)
β οΈRebalance Cost
During a rebalance, ALL consumers in the group stop processing. With N consumers and P partitions, the entire group halts until rebalancing completes. This is why choosing the right protocol is critical.
Rebalance Protocols
1. Eager Rebalance (Default)
from kafka import KafkaConsumer
# Eager rebalance - default behavior
consumer = KafkaConsumer(
'orders',
group_id='my-group',
bootstrap_servers='localhost:9092',
# Eager rebalance settings
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_interval_ms=300000,
# Partition assignment strategy
partition_assignment_strategy=['org.apache.kafka.clients.consumer.RangeAssignor']
)
How Eager Rebalance Works:
Step 1: Trigger event (e.g., new consumer joins)
Step 2: ALL consumers stop processing
Step 3: ALL consumers revoke their partitions
Step 4: Coordinator collects all consumer subscriptions
Step 5: New partition assignment computed
Step 6: All consumers receive new assignments
Step 7: All consumers resume processing
Timeline:
T+0: Event triggers rebalance
T+0-5: All consumers stop, revoke partitions
T+5-10: Coordinator computes new assignment
T+10-15: New assignments sent to consumers
T+15: Consumers resume processing
Total downtime: 10-30 seconds typically
2. Cooperative Rebalance (Incremental)
from kafka import KafkaConsumer
from kafka.cooperative.assignor import CooperativeStickyAssignor
consumer = KafkaConsumer(
'orders',
group_id='my-group',
bootstrap_servers='localhost:9092',
# Cooperative rebalance
partition_assignment_strategy=[
CooperativeStickyAssignor # Must be set
],
# Session management
session_timeout_ms=30000,
heartbeat_interval_ms=3000
)
How Cooperative Rebalance Works:
Step 1: Trigger event (e.g., new consumer joins)
Step 2: Only AFFECTED consumers stop processing
Step 3: Only partitions being MOVED are revoked
Step 4: Coordinator computes delta changes
Step 5: Affected consumers receive new assignments
Step 6: Only affected consumers resume processing
Timeline:
T+0: Event triggers rebalance
T+0-2: Only affected consumers stop, revoke moved partitions
T+2-5: Coordinator computes delta assignment
T+5-7: Affected consumers receive new assignments
T+7: Affected consumers resume processing
Total downtime: 2-5 seconds for affected consumers only
Other consumers: CONTINUE PROCESSING throughout
βΉοΈCooperative Advantage
With cooperative rebalancing, if Consumer 3 joins and only needs partitions from Consumer 2, Consumers 1 and 3 continue processing uninterrupted. Only Consumer 2 pauses briefly while partitions are transferred.
3. Static Group Membership
from kafka import KafkaConsumer
# Static membership - consumer keeps assignment across restarts
consumer = KafkaConsumer(
'orders',
group_id='my-group',
bootstrap_servers='localhost:9092',
# Static membership
group_instance_id='consumer-1-fixed', # Unique per instance
# Longer session timeout for stability
session_timeout_ms=300000, # 5 minutes
# Cooperative rebalance
partition_assignment_strategy=[
CooperativeStickyAssignor
]
)
Static Membership Benefits:
Scenario: Rolling deployment of 10 consumers
Without static membership:
1. Deploy consumer-1 with new version
2. Consumer-1 crashes (graceful shutdown)
3. Rebalance triggered: all 10 consumers paused
4. Consumer-1 rejoins with new version
5. Another rebalance triggered: all 10 consumers paused again
6. Repeat for each consumer...
Total rebalances: 10
Total pause time: 10 Γ rebalance_time
With static membership:
1. Deploy consumer-1 with new version
2. Consumer-1 sends LeaveGroup request
3. Coordinator marks consumer-1 as "leaving"
4. No rebalance triggered (session timeout not exceeded)
5. Consumer-1 rejoins with same instance_id
6. Coordinator reassigns partitions to same consumer
Total rebalances: 0 (if within session timeout)
Total pause time: 0
Session Timeout Calculation
def calculate_session_timeout(
processing_time_ms: int,
num_partitions: int,
safety_margin: float = 2.0
) -> int:
"""
Calculate appropriate session timeout.
Args:
processing_time_ms: Max time to process one poll() batch
num_partitions: Partitions assigned to this consumer
safety_margin: Multiplier for safety
Returns:
Recommended session timeout in ms
"""
# Base processing time
base_time = processing_time_ms
# Scale with partitions (more partitions = more work)
scaled_time = base_time * (1 + num_partitions * 0.1)
# Apply safety margin
timeout = int(scaled_time * safety_margin)
# Minimum 30 seconds
return max(timeout, 30000)
# Example
timeout = calculate_session_timeout(
processing_time_ms=5000, # 5 seconds per poll
num_partitions=10,
safety_margin=2.0
)
print(f"Recommended session.timeout.ms: {timeout}")
# ~16 seconds, but we'd use 30000 minimum
β οΈCommon Mistake
Setting session.timeout.ms too low causes frequent rebalances during slow processing. Setting it too high delays failure detection. Rule of thumb: 3x your max processing time per poll() batch.
Partition Assignment Strategies
Range Assignor
# RangeAssignor: Assigns contiguous partitions to each consumer
# Good for: Topics with similar partition counts
# Example: 6 partitions, 3 consumers
# Consumer 1: P0, P1 (range 0-1)
# Consumer 2: P2, P3 (range 2-3)
# Consumer 3: P4, P5 (range 4-5)
class RangeAssignor:
def assign(self, partitions, num_consumers):
"""
Assign contiguous ranges of partitions.
P0-P1 β C1, P2-P3 β C2, P4-P5 β C3
"""
assignment = {}
partitions_per_consumer = len(partitions) // num_consumers
remainder = len(partitions) % num_consumers
start = 0
for consumer_id in range(num_consumers):
# Extra partition for first 'remainder' consumers
count = partitions_per_consumer + (1 if consumer_id < remainder else 0)
end = start + count
assignment[consumer_id] = partitions[start:end]
start = end
return assignment
RoundRobin Assignor
# RoundRobinAssignor: Distributes partitions evenly
# Good for: Balancing across consumers with different subscriptions
# Example: 2 topics Γ 3 partitions, 2 consumers
# Topic A: P0, P1, P2
# Topic B: P0, P1, P2
# Consumer 1 subscribes to both
# Consumer 2 subscribes to both
# RoundRobin:
# Consumer 1: A-P0, A-P2, B-P1
# Consumer 2: A-P1, B-P0, B-P2
class RoundRobinAssignor:
def assign(self, topic_partitions, num_consumers):
"""
Round-robin assignment across all partitions.
"""
all_partitions = []
for topic, partitions in topic_partitions.items():
for p in partitions:
all_partitions.append((topic, p))
# Sort for determinism
all_partitions.sort()
assignment = {i: [] for i in range(num_consumers)}
for idx, partition in enumerate(all_partitions):
consumer_id = idx % num_consumers
assignment[consumer_id].append(partition)
return assignment
Sticky Assignor
# StickyAssignor: Minimizes partition movement
# Good for: Reducing rebalance impact
class StickyAssignor:
def assign(self, current_assignment, partitions, num_consumers):
"""
Keep partitions on same consumer when possible.
Only move partitions when absolutely necessary.
"""
new_assignment = {}
# Step 1: Try to keep existing assignments
for consumer_id, assigned in current_assignment.items():
new_assignment[consumer_id] = [
p for p in assigned if p in partitions
]
# Step 2: Find unassigned partitions
all_assigned = set()
for assigned in new_assignment.values():
all_assigned.update(assigned)
unassigned = [p for p in partitions if p not in all_assigned]
# Step 3: Distribute unassigned partitions
# to least loaded consumers
for partition in unassigned:
min_consumer = min(
new_assignment.keys(),
key=lambda c: len(new_assignment[c])
)
new_assignment[min_consumer].append(partition)
return new_assignment
βΉοΈAssignor Selection
- RangeAssignor: Best for single-topic subscriptions with similar partition counts
- RoundRobinAssignor: Best for multi-topic subscriptions
- StickyAssignor: Best for minimizing rebalance impact
- CooperativeStickyAssignor: Best for incremental rebalancing (recommended)
CooperativeStickyAssignor
# CooperativeStickyAssignor: Combines cooperative rebalancing with sticky assignment
# This is the recommended strategy for production use
from kafka.cooperative.assignor import CooperativeStickyAssignor
consumer = KafkaConsumer(
'topic-a', 'topic-b', 'topic-c',
group_id='my-group',
bootstrap_servers='localhost:9092',
# Use cooperative sticky assignment
partition_assignment_strategy=[
CooperativeStickyAssignor()
],
# Important: set cooperative rebalance protocol
session_timeout_ms=45000,
heartbeat_interval_ms=3000,
# Static membership for rolling deployments
group_instance_id='consumer-instance-1'
)
# The cooperative sticky assignor will:
# 1. Compute current assignment
# 2. Compute desired assignment
# 3. Minimize partition movements
# 4. Only revoke partitions that MUST move
# 5. Keep other partitions assigned to their current consumer
Assignment Algorithm Comparison
Scenario: 6 partitions, 3 consumers, Consumer 4 joins
Eager Rebalance:
Step 1: All consumers revoke ALL partitions
Step 2: Compute new assignment
Step 3: Assign new partitions
Result: ALL consumers pause for 10-30 seconds
Cooperative Sticky:
Step 1: Compute delta (which partitions need to move)
Step 2: Only move P4, P5 from C2 to C4
Step 3: C2 revokes only P4, P5
Result: Only C2 pauses for 2-5 seconds
Sticky (non-cooperative):
Step 1: All consumers revoke ALL partitions
Step 2: Compute assignment minimizing movement
Step 3: Assign new partitions
Result: ALL consumers pause, but assignment is sticky
β οΈMigration Warning
When migrating from eager to cooperative rebalancing, you must do a rolling restart. All consumers in the group must use the same assignment strategy. Mixing eager and cooperative strategies causes unpredictable behavior.
Consumer Offset Management
Offset Commit Strategies
from kafka import KafkaConsumer
import atexit
consumer = KafkaConsumer(
'orders',
group_id='my-group',
bootstrap_servers='localhost:9092',
# Auto commit settings
enable_auto_commit=False, # Manual commit for control
auto_commit_interval_ms=5000,
# Isolation level
isolation_level='read_committed' # For exactly-once with transactions
)
# Manual offset commit
def process_messages():
while True:
messages = consumer.poll(timeout_ms=1000)
for topic_partition, records in messages.items():
for record in records:
# Process message
process_record(record)
# Commit after processing batch
consumer.commit()
# Or commit specific offset
# consumer.commit({
# topic_partition: OffsetAndMetadata(
# offset=last_offset + 1,
# metadata='optional'
# )
# })
# Commit on shutdown
def shutdown_hook():
consumer.commit()
consumer.close()
atexit.register(shutdown_hook)
Offset Storage
Consumer offsets stored in __consumer_offsets topic:
Key: {group_id, topic, partition}
Value: {offset, metadata, timestamp}
Example:
Key: {"group": "orders", "topic": "events", "partition": 0}
Value: {"offset": 12345, "metadata": "", "timestamp": 1690000000000}
Consumer reads its last committed offset on startup
to know where to resume processing.
Offset Lag Monitoring
def calculate_offset_lag(consumer, topic):
"""
Calculate consumer lag for monitoring.
"""
from kafka import TopicPartition
lag_info = {}
for partition in consumer.partitions(topic):
tp = TopicPartition(topic, partition)
# Get committed offset
committed = consumer.committed(tp)
# Get end offset (latest)
consumer.seek_to_end(tp)
end_offset = consumer.position(tp)
# Calculate lag
lag = end_offset - (committed or 0)
lag_info[partition] = {
'committed': committed,
'end_offset': end_offset,
'lag': lag,
'lag_percentage': (lag / end_offset * 100) if end_offset > 0 else 0
}
return lag_info
# Monitoring example
lag = calculate_offset_lag(consumer, 'orders')
for partition, info in lag.items():
print(f"Partition {partition}: lag={info['lag']} ({info['lag_percentage']:.1f}%)")
βΉοΈLag Threshold
Set alerts when lag exceeds 10,000 messages or 1% of total messages. High lag indicates consumer is falling behind, potentially causing stale data for downstream systems.
Consumer Performance Formula
def estimate_consumer_throughput(
max_poll_records: int,
processing_time_per_record_ms: float,
num_consumers: int,
num_partitions: int
) -> dict:
"""
Estimate consumer throughput capacity.
"""
# Time per poll batch
batch_time = processing_time_per_record_ms * max_poll_records
# Records per second per consumer
records_per_sec = (max_poll_records * 1000) / max(batch_time, 1)
# Total capacity
total_records_per_sec = records_per_sec * num_consumers
# Bottleneck: partitions
partitions_per_consumer = num_partitions / num_consumers
max_load_per_consumer = records_per_sec * partitions_per_consumer
return {
'records_per_sec_per_consumer': records_per_sec,
'total_records_per_sec': total_records_per_sec,
'partitions_per_consumer': partitions_per_consumer,
'recommended_consumers': min(num_partitions, total_records_per_sec // records_per_sec)
}
# Example
result = estimate_consumer_throughput(
max_poll_records=500,
processing_time_per_record_ms=1.0,
num_consumers=6,
num_partitions=12
)
print(f"Per consumer: {result['records_per_sec_per_consumer']:.0f} records/sec")
print(f"Total: {result['total_records_per_sec']:.0f} records/sec")
Common Rebalance Scenarios
Scenario 1: Consumer crash
Trigger: Heartbeat timeout (session.timeout.ms exceeded)
Behavior: Eager - all consumers rebalance
Behavior: Cooperative - only affected partitions move
Scenario 2: Rolling deployment
Trigger: Consumer sends LeaveGroup on shutdown
Behavior: Without static membership - rebalance per consumer
Behavior: With static membership - no rebalance (within session timeout)
Scenario 3: Partition expansion
Trigger: New partitions added to topic
Behavior: Rebalance to distribute new partitions
Behavior: Only new partitions assigned (cooperative)
Scenario 4: Consumer processing too slow
Trigger: max.poll.interval.ms exceeded
Behavior: Consumer kicked from group
Behavior: Rebalance triggered
β οΈProduction Recommendation
Use CooperativeStickyAssignor with static group membership for production workloads. Set session.timeout.ms to 5-10 minutes and heartbeat.interval.ms to 3 seconds. This minimizes rebalance impact while detecting real failures quickly.
Consumer Group State Machine
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Consumer Group States β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β EMPTY β PREPARING_REBALANCE β COMPLETING_REBALANCE β
β β β β β
β β β β β
β β βββββββββββββββ STABLE β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β States: β
β - EMPTY: No members β
β - PREPARING_REBALANCE: Rebalance in progress β
β - COMPLETING_REBALANCE: Assignment computed β
β - STABLE: All members processing β
β β
β Transitions: β
β - Consumer joins: PREPARING β COMPLETING β STABLE β
β - Consumer leaves: STABLE β PREPARING β COMPLETING β
β - All consumers leave: β EMPTY β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βΉοΈMonitoring Tip
Track rebalance-rate-per-hour metric. If it's consistently high, consider static membership or cooperative rebalancing. A healthy system should have < 1 rebalance per hour under normal operation.