🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Kafka Metrics: Lag, Throughput, Consumer Group Health

Apache KafkaMonitoring⭐ Premium

Advertisement

Kafka Metrics: Lag, Throughput, Consumer Group Health

Difficulty: Senior | Asked at: Netflix, Uber, LinkedIn, Airbnb

ℹ️Interview Context

Monitoring is critical for production Kafka operations. Interviewers expect you to know key metrics, alerting thresholds, and how to diagnose common issues using metrics.

The Question

What are the critical Kafka metrics to monitor? How do you calculate consumer lag? What metrics indicate consumer group health issues?

Critical Metrics Overview

Architecture Diagram
Kafka Metrics Categories:
1. Broker Metrics - Cluster health
2. Producer Metrics - Write performance
3. Consumer Metrics - Read performance
4. Consumer Group Metrics - Group health
5. Request Metrics - Protocol performance
6. Log Metrics - Storage health

Consumer Lag Metrics

Lag Calculation

from kafka import KafkaConsumer, TopicPartition

def calculate_consumer_lag(consumer, topic):
    """
    Calculate consumer lag for all partitions.
    
    Lag = Latest Offset - Committed Offset
    
    A positive lag means consumer is behind.
    """
    lag_report = {}
    
    partitions = consumer.partitions(topic)
    
    for partition in partitions:
        tp = TopicPartition(topic, partition)
        
        # Get latest offset (end of partition)
        consumer.seek_to_end(tp)
        latest_offset = consumer.position(tp)
        
        # Get committed offset
        committed = consumer.committed(tp)
        
        # Calculate lag
        lag = latest_offset - (committed or 0)
        
        lag_report[partition] = {
            'latest_offset': latest_offset,
            'committed_offset': committed or 0,
            'lag': lag,
            'lag_percentage': (lag / latest_offset * 100) if latest_offset > 0 else 0
        }
    
    return lag_report

# Example usage
consumer = KafkaConsumer(
    'orders',
    group_id='my-group',
    bootstrap_servers='localhost:9092'
)

lag = calculate_consumer_lag(consumer, 'orders')
for partition, info in lag.items():
    print(f"Partition {partition}: lag={info['lag']} ({info['lag_percentage']:.1f}%)")

Lag Metrics

# Key lag metrics
lag_metrics = {
    # Absolute lag
    'records-lag-max': 'Maximum lag across all partitions',
    'records-lag': 'Lag per partition',
    
    # Rate-based lag
    'lag-rate': 'Rate of lag increase/decrease',
    'consumption-rate': 'Messages consumed per second',
    'production-rate': 'Messages produced per second',
    
    # Time-based lag
    'estimated-lag-time': 'Estimated time to catch up',
    'lag-time': 'Time since last consumed message'
}

def estimate_lag_time(lag_messages, consumption_rate_per_sec):
    """
    Estimate time to consume lag.
    
    Returns estimated seconds to catch up.
    """
    if consumption_rate_per_sec == 0:
        return float('inf')
    
    return lag_messages / consumption_rate_per_sec

# Example
lag_seconds = estimate_lag_time(
    lag_messages=10000,
    consumption_rate_per_sec=1000
)
print(f"Estimated time to catch up: {lag_seconds:.1f} seconds")

⚠️Lag Alert Thresholds

  • Warning: lag > 10,000 messages OR lag > 1% of total messages
  • Critical: lag > 100,000 messages OR lag > 10% of total messages
  • Emergency: lag increasing for > 5 minutes continuously

Throughput Metrics

Producer Throughput

# Producer throughput metrics
producer_throughput = {
    'record-send-rate': 'Messages sent per second',
    'record-send-total': 'Total messages sent',
    'throughput': 'Bytes sent per second',
    'batch-size-avg': 'Average batch size',
    'batch-size-max': 'Maximum batch size',
    
    # Request metrics
    'request-rate': 'Requests per second',
    'request-latency-avg': 'Average request latency',
    'request-latency-max': 'Maximum request latency',
    
    # Error metrics
    'record-error-rate': 'Error rate',
    'record-error-total': 'Total errors',
    'retry-rate': 'Retry rate'
}

def calculate_producer_throughput(
    messages_sent: int,
    time_window_seconds: int,
    avg_message_size: int
) -> dict:
    """
    Calculate producer throughput metrics.
    """
    messages_per_sec = messages_sent / time_window_seconds
    bytes_per_sec = messages_per_sec * avg_message_size
    mb_per_sec = bytes_per_sec / (1024 * 1024)
    
    return {
        'messages_per_sec': messages_per_sec,
        'bytes_per_sec': bytes_per_sec,
        'mb_per_sec': mb_per_sec,
        'messages_per_min': messages_per_sec * 60
    }

# Example
throughput = calculate_producer_throughput(
    messages_sent=100000,
    time_window_seconds=60,
    avg_message_size=500
)
print(f"Throughput: {throughput['mb_per_sec']:.2f} MB/s")
print(f"Message rate: {throughput['messages_per_sec']:.0f} msgs/sec")

Consumer Throughput

# Consumer throughput metrics
consumer_throughput = {
    'records-consumed-rate': 'Messages consumed per second',
    'records-consumed-total': 'Total messages consumed',
    'bytes-consumed-rate': 'Bytes consumed per second',
    
    # Poll metrics
    'poll-rate': 'Poll calls per second',
    'poll-total': 'Total poll calls',
    
    # Processing metrics
    'process-rate': 'Messages processed per second',
    'process-total': 'Total messages processed',
    
    # Fetch metrics
    'fetch-rate': 'Fetch requests per second',
    'fetch-latency-avg': 'Average fetch latency',
    'fetch-size-avg': 'Average fetch size'
}

def calculate_consumer_throughput(
    messages_consumed: int,
    time_window_seconds: int,
    avg_message_size: int
) -> dict:
    """
    Calculate consumer throughput metrics.
    """
    messages_per_sec = messages_consumed / time_window_seconds
    bytes_per_sec = messages_per_sec * avg_message_size
    mb_per_sec = bytes_per_sec / (1024 * 1024)
    
    return {
        'messages_per_sec': messages_per_sec,
        'bytes_per_sec': bytes_per_sec,
        'mb_per_sec': mb_per_sec,
        'messages_per_min': messages_per_sec * 60
    }

# Example
throughput = calculate_consumer_throughput(
    messages_consumed=95000,
    time_window_seconds=60,
    avg_message_size=500
)
print(f"Consumer throughput: {throughput['mb_per_sec']:.2f} MB/s")
print(f"Message rate: {throughput['messages_per_sec']:.0f} msgs/sec")

Consumer Group Health

Group State Metrics

# Consumer group health metrics
group_health = {
    # State
    'group-state': 'Consumer group state (Stable, Rebalancing, Empty)',
    'active-members': 'Number of active consumers',
    'assigned-partitions': 'Partitions assigned per consumer',
    
    # Rebalance metrics
    'rebalance-rate-per-hour': 'Rebalances per hour',
    'rebalance-total': 'Total rebalances',
    'rebalance-latency-avg': 'Average rebalance time',
    
    # Offset metrics
    'offset-commit-rate': 'Offset commits per second',
    'offset-commit-latency': 'Average commit latency',
    'offset-lag': 'Consumer offset lag'
}

def assess_group_health(consumer_group_metrics):
    """
    Assess consumer group health.
    
    Returns health score and recommendations.
    """
    score = 100
    issues = []
    
    # Check rebalance rate
    if consumer_group_metrics.get('rebalance-rate-per-hour', 0) > 1:
        score -= 20
        issues.append("High rebalance rate detected")
    
    # Check active members
    if consumer_group_metrics.get('active-members', 0) == 0:
        score -= 50
        issues.append("No active consumers in group")
    
    # Check lag
    max_lag = consumer_group_metrics.get('records-lag-max', 0)
    if max_lag > 100000:
        score -= 30
        issues.append(f"High consumer lag: {max_lag}")
    
    # Check rebalance latency
    if consumer_group_metrics.get('rebalance-latency-avg', 0) > 30000:
        score -= 15
        issues.append("Slow rebalancing detected")
    
    health_status = 'healthy' if score > 80 else 'degraded' if score > 50 else 'critical'
    
    return {
        'health_score': score,
        'status': health_status,
        'issues': issues,
        'recommendations': generate_recommendations(issues)
    }

def generate_recommendations(issues):
    """Generate recommendations based on issues"""
    recommendations = []
    
    for issue in issues:
        if 'rebalance' in issue.lower():
            recommendations.append("Consider static group membership or cooperative rebalancing")
        elif 'lag' in issue.lower():
            recommendations.append("Add more consumers or optimize processing logic")
        elif 'no active' in issue.lower():
            recommendations.append("Check consumer health and restart if needed")
    
    return recommendations

Rebalance Detection

class RebalanceMonitor:
    """
    Monitor and alert on consumer group rebalances.
    """
    
    def __init__(self):
        self.rebalance_counts = {}
        self.last_rebalance_time = {}
    
    def record_rebalance(self, group_id, timestamp):
        """Record a rebalance event"""
        if group_id not in self.rebalance_counts:
            self.rebalance_counts[group_id] = 0
        
        self.rebalance_counts[group_id] += 1
        self.last_rebalance_time[group_id] = timestamp
    
    def get_rebalance_rate(self, group_id, window_seconds=3600):
        """
        Calculate rebalance rate per hour.
        """
        if group_id not in self.last_rebalance_time:
            return 0
        
        current_time = time.time()
        window_start = current_time - window_seconds
        
        # Count rebalances in window
        # In production, use a time-series database
        return self.rebalance_counts.get(group_id, 0)
    
    def check_health(self, group_id):
        """
        Check if rebalance rate is healthy.
        """
        rate = self.get_rebalance_rate(group_id)
        
        if rate > 10:
            return {'status': 'critical', 'rate': rate}
        elif rate > 5:
            return {'status': 'warning', 'rate': rate}
        else:
            return {'status': 'healthy', 'rate': rate}

ℹ️Rebalance Health

Healthy systems should have < 1 rebalance per hour under normal operation. Frequent rebalances indicate:

  1. Consumers processing too slowly (max.poll.interval.ms exceeded)
  2. Session timeouts too short
  3. Network connectivity issues
  4. Consumer crashes

Broker Metrics

Cluster Health Metrics

# Broker health metrics
broker_health = {
    # Basic health
    'active-controller': 'Is this broker the controller?',
    'offline-partitions': 'Number of offline partitions',
    'under-replicated-partitions': 'Partitions below replication factor',
    
    # ISR metrics
    'isr-shrink-rate': 'ISR shrink rate per second',
    'isr-expand-rate': 'ISR expand rate per second',
    'isr-size': 'Current ISR size',
    
    # Request metrics
    'request-rate': 'Request rate per second',
    'request-latency-avg': 'Average request latency',
    'request-latency-max': 'Maximum request latency',
    
    # Log metrics
    'log-size': 'Total log size',
    'log-size-rate': 'Log growth rate',
    'log-flush-rate': 'Log flush rate',
    
    # Network metrics
    'network-io-rate': 'Network I/O rate',
    'bytes-in-rate': 'Bytes received per second',
    'bytes-out-rate': 'Bytes sent per second'
}

def assess_broker_health(broker_metrics):
    """
    Assess broker health.
    """
    issues = []
    score = 100
    
    # Check controller status
    if broker_metrics.get('active-controller', 0) == 0:
        # Not a controller, skip controller-specific checks
        pass
    elif broker_metrics.get('active-controller', 0) != 1:
        issues.append("Multiple controllers detected")
        score -= 30
    
    # Check offline partitions
    if broker_metrics.get('offline-partitions', 0) > 0:
        issues.append(f"Offline partitions: {broker_metrics['offline-partitions']}")
        score -= 40
    
    # Check under-replicated partitions
    if broker_metrics.get('under-replicated-partitions', 0) > 0:
        issues.append(f"Under-replicated partitions: {broker_metrics['under-replicated-partitions']}")
        score -= 20
    
    # Check ISR shrink rate
    if broker_metrics.get('isr-shrink-rate', 0) > 0.1:
        issues.append("High ISR shrink rate")
        score -= 15
    
    # Check request latency
    if broker_metrics.get('request-latency-avg', 0) > 100:
        issues.append("High request latency")
        score -= 10
    
    health_status = 'healthy' if score > 80 else 'degraded' if score > 50 else 'critical'
    
    return {
        'health_score': score,
        'status': health_status,
        'issues': issues
    }

JMX Metrics Collection

import jmx_fetch

class KafkaMetricsCollector:
    """
    Collect Kafka metrics via JMX.
    """
    
    def __init__(self, jmx_url):
        self.jmx_url = jmx_url
    
    def collect_broker_metrics(self):
        """Collect broker metrics via JMX"""
        metrics = {
            'kafka.server:type=BrokerTopicMetrics': [
                'MessagesInPerSec',
                'BytesInPerSec',
                'BytesOutPerSec',
                'FailedProduceRequestsPerSec',
                'FailedFetchRequestsPerSec'
            ],
            'kafka.server:type=ReplicaManager': [
                'UnderReplicatedPartitions',
                'IsrShrinksPerSec',
                'IsrExpandsPerSec',
                'PartitionCount',
                'LeaderCount'
            ],
            'kafka.controller:type=KafkaController': [
                'ActiveControllerCount',
                'OfflinePartitionsCount',
                'PreferredReplicaImbalancesPerSec'
            ],
            'kafka.network:type=RequestMetrics': [
                'RequestsPerSec',
                'TotalTimeMs',
                'RequestQueueSize'
            ]
        }
        
        return self._collect_metrics(metrics)
    
    def collect_consumer_metrics(self):
        """Collect consumer metrics via JMX"""
        metrics = {
            'kafka.consumer:type=consumer-fetch-manager-metrics': [
                'records-lag-max',
                'records-consumed-rate',
                'bytes-consumed-rate',
                'fetch-rate',
                'fetch-latency-avg'
            ],
            'kafka.consumer:type=consumer-coordinator-metrics': [
                'rebalance-rate-per-hour',
                'last-rebalance-seconds-ago'
            ]
        }
        
        return self._collect_metrics(metrics)

Alerting Thresholds

Alert Configuration

# Alert thresholds
alert_thresholds = {
    # Consumer lag
    'consumer-lag-warning': {
        'metric': 'records-lag-max',
        'threshold': 10000,
        'window': '5m',
        'severity': 'warning'
    },
    'consumer-lag-critical': {
        'metric': 'records-lag-max',
        'threshold': 100000,
        'window': '5m',
        'severity': 'critical'
    },
    
    # Throughput drop
    'throughput-drop': {
        'metric': 'records-consumed-rate',
        'condition': 'decrease > 50%',
        'window': '10m',
        'severity': 'warning'
    },
    
    # Error rate
    'error-rate': {
        'metric': 'record-error-rate',
        'threshold': 0.01,  # 1%
        'window': '5m',
        'severity': 'warning'
    },
    
    # Rebalance rate
    'rebalance-rate': {
        'metric': 'rebalance-rate-per-hour',
        'threshold': 5,
        'window': '1h',
        'severity': 'warning'
    },
    
    # Broker offline
    'broker-offline': {
        'metric': 'offline-partitions',
        'threshold': 0,
        'window': '1m',
        'severity': 'critical'
    },
    
    # ISR shrink
    'isr-shrink': {
        'metric': 'isr-shrink-rate',
        'threshold': 0.1,
        'window': '5m',
        'severity': 'warning'
    }
}

Dashboard Metrics

# Grafana dashboard metrics
dashboard_metrics = {
    'Overview': [
        'cluster-healthy',
        'active-controller',
        'offline-partitions',
        'under-replicated-partitions'
    ],
    'Throughput': [
        'messages-in-rate',
        'bytes-in-rate',
        'bytes-out-rate',
        'request-rate'
    ],
    'Consumer Health': [
        'records-lag-max',
        'consumption-rate',
        'rebalance-rate',
        'active-members'
    ],
    'Latency': [
        'request-latency-avg',
        'request-latency-p99',
        'fetch-latency-avg',
        'commit-latency-avg'
    ],
    'Storage': [
        'log-size',
        'log-size-rate',
        'log-flush-rate'
    ]
}

ℹ️Monitoring Stack

Recommended monitoring stack:

  1. Metrics Collection: JMX Exporter or Kafka Exporter
  2. Time Series DB: Prometheus or InfluxDB
  3. Visualization: Grafana
  4. Alerting: Alertmanager or PagerDuty
  5. Logging: ELK Stack or Loki

Lag Monitoring Formula

Lag=Latest OffsetCommitted Offset\text{Lag} = \text{Latest Offset} - \text{Committed Offset}
Lag Time=Lag MessagesConsumption Rate\text{Lag Time} = \frac{\text{Lag Messages}}{\text{Consumption Rate}}
Lag Percentage=Lag MessagesLatest Offset×100%\text{Lag Percentage} = \frac{\text{Lag Messages}}{\text{Latest Offset}} \times 100\%
def comprehensive_lag_analysis(consumer, topic):
    """
    Comprehensive consumer lag analysis.
    """
    analysis = {
        'total_lag': 0,
        'total_messages': 0,
        'partitions': {},
        'estimated_catch_up_time': 0,
        'health_score': 100
    }
    
    for partition in consumer.partitions(topic):
        tp = TopicPartition(topic, partition)
        
        consumer.seek_to_end(tp)
        latest = consumer.position(tp)
        committed = consumer.committed(tp) or 0
        
        lag = latest - committed
        analysis['total_lag'] += lag
        analysis['total_messages'] += latest
        
        analysis['partitions'][partition] = {
            'latest': latest,
            'committed': committed,
            'lag': lag,
            'lag_percentage': (lag / latest * 100) if latest > 0 else 0
        }
    
    # Calculate overall metrics
    if analysis['total_messages'] > 0:
        analysis['lag_percentage'] = (
            analysis['total_lag'] / analysis['total_messages'] * 100
        )
    
    # Estimate catch-up time
    # Assume 1000 msgs/sec consumption rate
    consumption_rate = 1000
    analysis['estimated_catch_up_time'] = analysis['total_lag'] / consumption_rate
    
    # Calculate health score
    if analysis['lag_percentage'] > 10:
        analysis['health_score'] = 0
    elif analysis['lag_percentage'] > 1:
        analysis['health_score'] = 50
    else:
        analysis['health_score'] = 100
    
    return analysis

⚠️Key Insight

Consumer lag is the most important metric. It directly indicates if your consumers are keeping up with production. Set up alerts for both absolute lag (10k messages) and relative lag (1% of total).

Advertisement