Kafka Metrics: Lag, Throughput, Consumer Group Health
Difficulty: Senior | Asked at: Netflix, Uber, LinkedIn, Airbnb
ℹ️Interview Context
Monitoring is critical for production Kafka operations. Interviewers expect you to know key metrics, alerting thresholds, and how to diagnose common issues using metrics.
The Question
What are the critical Kafka metrics to monitor? How do you calculate consumer lag? What metrics indicate consumer group health issues?
Critical Metrics Overview
Kafka Metrics Categories:
1. Broker Metrics - Cluster health
2. Producer Metrics - Write performance
3. Consumer Metrics - Read performance
4. Consumer Group Metrics - Group health
5. Request Metrics - Protocol performance
6. Log Metrics - Storage health
Consumer Lag Metrics
Lag Calculation
from kafka import KafkaConsumer, TopicPartition
def calculate_consumer_lag(consumer, topic):
"""
Calculate consumer lag for all partitions.
Lag = Latest Offset - Committed Offset
A positive lag means consumer is behind.
"""
lag_report = {}
partitions = consumer.partitions(topic)
for partition in partitions:
tp = TopicPartition(topic, partition)
# Get latest offset (end of partition)
consumer.seek_to_end(tp)
latest_offset = consumer.position(tp)
# Get committed offset
committed = consumer.committed(tp)
# Calculate lag
lag = latest_offset - (committed or 0)
lag_report[partition] = {
'latest_offset': latest_offset,
'committed_offset': committed or 0,
'lag': lag,
'lag_percentage': (lag / latest_offset * 100) if latest_offset > 0 else 0
}
return lag_report
# Example usage
consumer = KafkaConsumer(
'orders',
group_id='my-group',
bootstrap_servers='localhost:9092'
)
lag = calculate_consumer_lag(consumer, 'orders')
for partition, info in lag.items():
print(f"Partition {partition}: lag={info['lag']} ({info['lag_percentage']:.1f}%)")
Lag Metrics
# Key lag metrics
lag_metrics = {
# Absolute lag
'records-lag-max': 'Maximum lag across all partitions',
'records-lag': 'Lag per partition',
# Rate-based lag
'lag-rate': 'Rate of lag increase/decrease',
'consumption-rate': 'Messages consumed per second',
'production-rate': 'Messages produced per second',
# Time-based lag
'estimated-lag-time': 'Estimated time to catch up',
'lag-time': 'Time since last consumed message'
}
def estimate_lag_time(lag_messages, consumption_rate_per_sec):
"""
Estimate time to consume lag.
Returns estimated seconds to catch up.
"""
if consumption_rate_per_sec == 0:
return float('inf')
return lag_messages / consumption_rate_per_sec
# Example
lag_seconds = estimate_lag_time(
lag_messages=10000,
consumption_rate_per_sec=1000
)
print(f"Estimated time to catch up: {lag_seconds:.1f} seconds")
⚠️Lag Alert Thresholds
- Warning: lag > 10,000 messages OR lag > 1% of total messages
- Critical: lag > 100,000 messages OR lag > 10% of total messages
- Emergency: lag increasing for > 5 minutes continuously
Throughput Metrics
Producer Throughput
# Producer throughput metrics
producer_throughput = {
'record-send-rate': 'Messages sent per second',
'record-send-total': 'Total messages sent',
'throughput': 'Bytes sent per second',
'batch-size-avg': 'Average batch size',
'batch-size-max': 'Maximum batch size',
# Request metrics
'request-rate': 'Requests per second',
'request-latency-avg': 'Average request latency',
'request-latency-max': 'Maximum request latency',
# Error metrics
'record-error-rate': 'Error rate',
'record-error-total': 'Total errors',
'retry-rate': 'Retry rate'
}
def calculate_producer_throughput(
messages_sent: int,
time_window_seconds: int,
avg_message_size: int
) -> dict:
"""
Calculate producer throughput metrics.
"""
messages_per_sec = messages_sent / time_window_seconds
bytes_per_sec = messages_per_sec * avg_message_size
mb_per_sec = bytes_per_sec / (1024 * 1024)
return {
'messages_per_sec': messages_per_sec,
'bytes_per_sec': bytes_per_sec,
'mb_per_sec': mb_per_sec,
'messages_per_min': messages_per_sec * 60
}
# Example
throughput = calculate_producer_throughput(
messages_sent=100000,
time_window_seconds=60,
avg_message_size=500
)
print(f"Throughput: {throughput['mb_per_sec']:.2f} MB/s")
print(f"Message rate: {throughput['messages_per_sec']:.0f} msgs/sec")
Consumer Throughput
# Consumer throughput metrics
consumer_throughput = {
'records-consumed-rate': 'Messages consumed per second',
'records-consumed-total': 'Total messages consumed',
'bytes-consumed-rate': 'Bytes consumed per second',
# Poll metrics
'poll-rate': 'Poll calls per second',
'poll-total': 'Total poll calls',
# Processing metrics
'process-rate': 'Messages processed per second',
'process-total': 'Total messages processed',
# Fetch metrics
'fetch-rate': 'Fetch requests per second',
'fetch-latency-avg': 'Average fetch latency',
'fetch-size-avg': 'Average fetch size'
}
def calculate_consumer_throughput(
messages_consumed: int,
time_window_seconds: int,
avg_message_size: int
) -> dict:
"""
Calculate consumer throughput metrics.
"""
messages_per_sec = messages_consumed / time_window_seconds
bytes_per_sec = messages_per_sec * avg_message_size
mb_per_sec = bytes_per_sec / (1024 * 1024)
return {
'messages_per_sec': messages_per_sec,
'bytes_per_sec': bytes_per_sec,
'mb_per_sec': mb_per_sec,
'messages_per_min': messages_per_sec * 60
}
# Example
throughput = calculate_consumer_throughput(
messages_consumed=95000,
time_window_seconds=60,
avg_message_size=500
)
print(f"Consumer throughput: {throughput['mb_per_sec']:.2f} MB/s")
print(f"Message rate: {throughput['messages_per_sec']:.0f} msgs/sec")
Consumer Group Health
Group State Metrics
# Consumer group health metrics
group_health = {
# State
'group-state': 'Consumer group state (Stable, Rebalancing, Empty)',
'active-members': 'Number of active consumers',
'assigned-partitions': 'Partitions assigned per consumer',
# Rebalance metrics
'rebalance-rate-per-hour': 'Rebalances per hour',
'rebalance-total': 'Total rebalances',
'rebalance-latency-avg': 'Average rebalance time',
# Offset metrics
'offset-commit-rate': 'Offset commits per second',
'offset-commit-latency': 'Average commit latency',
'offset-lag': 'Consumer offset lag'
}
def assess_group_health(consumer_group_metrics):
"""
Assess consumer group health.
Returns health score and recommendations.
"""
score = 100
issues = []
# Check rebalance rate
if consumer_group_metrics.get('rebalance-rate-per-hour', 0) > 1:
score -= 20
issues.append("High rebalance rate detected")
# Check active members
if consumer_group_metrics.get('active-members', 0) == 0:
score -= 50
issues.append("No active consumers in group")
# Check lag
max_lag = consumer_group_metrics.get('records-lag-max', 0)
if max_lag > 100000:
score -= 30
issues.append(f"High consumer lag: {max_lag}")
# Check rebalance latency
if consumer_group_metrics.get('rebalance-latency-avg', 0) > 30000:
score -= 15
issues.append("Slow rebalancing detected")
health_status = 'healthy' if score > 80 else 'degraded' if score > 50 else 'critical'
return {
'health_score': score,
'status': health_status,
'issues': issues,
'recommendations': generate_recommendations(issues)
}
def generate_recommendations(issues):
"""Generate recommendations based on issues"""
recommendations = []
for issue in issues:
if 'rebalance' in issue.lower():
recommendations.append("Consider static group membership or cooperative rebalancing")
elif 'lag' in issue.lower():
recommendations.append("Add more consumers or optimize processing logic")
elif 'no active' in issue.lower():
recommendations.append("Check consumer health and restart if needed")
return recommendations
Rebalance Detection
class RebalanceMonitor:
"""
Monitor and alert on consumer group rebalances.
"""
def __init__(self):
self.rebalance_counts = {}
self.last_rebalance_time = {}
def record_rebalance(self, group_id, timestamp):
"""Record a rebalance event"""
if group_id not in self.rebalance_counts:
self.rebalance_counts[group_id] = 0
self.rebalance_counts[group_id] += 1
self.last_rebalance_time[group_id] = timestamp
def get_rebalance_rate(self, group_id, window_seconds=3600):
"""
Calculate rebalance rate per hour.
"""
if group_id not in self.last_rebalance_time:
return 0
current_time = time.time()
window_start = current_time - window_seconds
# Count rebalances in window
# In production, use a time-series database
return self.rebalance_counts.get(group_id, 0)
def check_health(self, group_id):
"""
Check if rebalance rate is healthy.
"""
rate = self.get_rebalance_rate(group_id)
if rate > 10:
return {'status': 'critical', 'rate': rate}
elif rate > 5:
return {'status': 'warning', 'rate': rate}
else:
return {'status': 'healthy', 'rate': rate}
ℹ️Rebalance Health
Healthy systems should have < 1 rebalance per hour under normal operation. Frequent rebalances indicate:
- Consumers processing too slowly (max.poll.interval.ms exceeded)
- Session timeouts too short
- Network connectivity issues
- Consumer crashes
Broker Metrics
Cluster Health Metrics
# Broker health metrics
broker_health = {
# Basic health
'active-controller': 'Is this broker the controller?',
'offline-partitions': 'Number of offline partitions',
'under-replicated-partitions': 'Partitions below replication factor',
# ISR metrics
'isr-shrink-rate': 'ISR shrink rate per second',
'isr-expand-rate': 'ISR expand rate per second',
'isr-size': 'Current ISR size',
# Request metrics
'request-rate': 'Request rate per second',
'request-latency-avg': 'Average request latency',
'request-latency-max': 'Maximum request latency',
# Log metrics
'log-size': 'Total log size',
'log-size-rate': 'Log growth rate',
'log-flush-rate': 'Log flush rate',
# Network metrics
'network-io-rate': 'Network I/O rate',
'bytes-in-rate': 'Bytes received per second',
'bytes-out-rate': 'Bytes sent per second'
}
def assess_broker_health(broker_metrics):
"""
Assess broker health.
"""
issues = []
score = 100
# Check controller status
if broker_metrics.get('active-controller', 0) == 0:
# Not a controller, skip controller-specific checks
pass
elif broker_metrics.get('active-controller', 0) != 1:
issues.append("Multiple controllers detected")
score -= 30
# Check offline partitions
if broker_metrics.get('offline-partitions', 0) > 0:
issues.append(f"Offline partitions: {broker_metrics['offline-partitions']}")
score -= 40
# Check under-replicated partitions
if broker_metrics.get('under-replicated-partitions', 0) > 0:
issues.append(f"Under-replicated partitions: {broker_metrics['under-replicated-partitions']}")
score -= 20
# Check ISR shrink rate
if broker_metrics.get('isr-shrink-rate', 0) > 0.1:
issues.append("High ISR shrink rate")
score -= 15
# Check request latency
if broker_metrics.get('request-latency-avg', 0) > 100:
issues.append("High request latency")
score -= 10
health_status = 'healthy' if score > 80 else 'degraded' if score > 50 else 'critical'
return {
'health_score': score,
'status': health_status,
'issues': issues
}
JMX Metrics Collection
import jmx_fetch
class KafkaMetricsCollector:
"""
Collect Kafka metrics via JMX.
"""
def __init__(self, jmx_url):
self.jmx_url = jmx_url
def collect_broker_metrics(self):
"""Collect broker metrics via JMX"""
metrics = {
'kafka.server:type=BrokerTopicMetrics': [
'MessagesInPerSec',
'BytesInPerSec',
'BytesOutPerSec',
'FailedProduceRequestsPerSec',
'FailedFetchRequestsPerSec'
],
'kafka.server:type=ReplicaManager': [
'UnderReplicatedPartitions',
'IsrShrinksPerSec',
'IsrExpandsPerSec',
'PartitionCount',
'LeaderCount'
],
'kafka.controller:type=KafkaController': [
'ActiveControllerCount',
'OfflinePartitionsCount',
'PreferredReplicaImbalancesPerSec'
],
'kafka.network:type=RequestMetrics': [
'RequestsPerSec',
'TotalTimeMs',
'RequestQueueSize'
]
}
return self._collect_metrics(metrics)
def collect_consumer_metrics(self):
"""Collect consumer metrics via JMX"""
metrics = {
'kafka.consumer:type=consumer-fetch-manager-metrics': [
'records-lag-max',
'records-consumed-rate',
'bytes-consumed-rate',
'fetch-rate',
'fetch-latency-avg'
],
'kafka.consumer:type=consumer-coordinator-metrics': [
'rebalance-rate-per-hour',
'last-rebalance-seconds-ago'
]
}
return self._collect_metrics(metrics)
Alerting Thresholds
Alert Configuration
# Alert thresholds
alert_thresholds = {
# Consumer lag
'consumer-lag-warning': {
'metric': 'records-lag-max',
'threshold': 10000,
'window': '5m',
'severity': 'warning'
},
'consumer-lag-critical': {
'metric': 'records-lag-max',
'threshold': 100000,
'window': '5m',
'severity': 'critical'
},
# Throughput drop
'throughput-drop': {
'metric': 'records-consumed-rate',
'condition': 'decrease > 50%',
'window': '10m',
'severity': 'warning'
},
# Error rate
'error-rate': {
'metric': 'record-error-rate',
'threshold': 0.01, # 1%
'window': '5m',
'severity': 'warning'
},
# Rebalance rate
'rebalance-rate': {
'metric': 'rebalance-rate-per-hour',
'threshold': 5,
'window': '1h',
'severity': 'warning'
},
# Broker offline
'broker-offline': {
'metric': 'offline-partitions',
'threshold': 0,
'window': '1m',
'severity': 'critical'
},
# ISR shrink
'isr-shrink': {
'metric': 'isr-shrink-rate',
'threshold': 0.1,
'window': '5m',
'severity': 'warning'
}
}
Dashboard Metrics
# Grafana dashboard metrics
dashboard_metrics = {
'Overview': [
'cluster-healthy',
'active-controller',
'offline-partitions',
'under-replicated-partitions'
],
'Throughput': [
'messages-in-rate',
'bytes-in-rate',
'bytes-out-rate',
'request-rate'
],
'Consumer Health': [
'records-lag-max',
'consumption-rate',
'rebalance-rate',
'active-members'
],
'Latency': [
'request-latency-avg',
'request-latency-p99',
'fetch-latency-avg',
'commit-latency-avg'
],
'Storage': [
'log-size',
'log-size-rate',
'log-flush-rate'
]
}
ℹ️Monitoring Stack
Recommended monitoring stack:
- Metrics Collection: JMX Exporter or Kafka Exporter
- Time Series DB: Prometheus or InfluxDB
- Visualization: Grafana
- Alerting: Alertmanager or PagerDuty
- Logging: ELK Stack or Loki
Lag Monitoring Formula
def comprehensive_lag_analysis(consumer, topic):
"""
Comprehensive consumer lag analysis.
"""
analysis = {
'total_lag': 0,
'total_messages': 0,
'partitions': {},
'estimated_catch_up_time': 0,
'health_score': 100
}
for partition in consumer.partitions(topic):
tp = TopicPartition(topic, partition)
consumer.seek_to_end(tp)
latest = consumer.position(tp)
committed = consumer.committed(tp) or 0
lag = latest - committed
analysis['total_lag'] += lag
analysis['total_messages'] += latest
analysis['partitions'][partition] = {
'latest': latest,
'committed': committed,
'lag': lag,
'lag_percentage': (lag / latest * 100) if latest > 0 else 0
}
# Calculate overall metrics
if analysis['total_messages'] > 0:
analysis['lag_percentage'] = (
analysis['total_lag'] / analysis['total_messages'] * 100
)
# Estimate catch-up time
# Assume 1000 msgs/sec consumption rate
consumption_rate = 1000
analysis['estimated_catch_up_time'] = analysis['total_lag'] / consumption_rate
# Calculate health score
if analysis['lag_percentage'] > 10:
analysis['health_score'] = 0
elif analysis['lag_percentage'] > 1:
analysis['health_score'] = 50
else:
analysis['health_score'] = 100
return analysis
⚠️Key Insight
Consumer lag is the most important metric. It directly indicates if your consumers are keeping up with production. Set up alerts for both absolute lag (10k messages) and relative lag (1% of total).