πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Kafka Producer & Consumer: Advanced Configurations and Patterns

🟒 Free Lesson

Advertisement

Apache Kafka Producer & Consumer: Advanced Configurations and Patterns

Producer Ack Modes (acks=0/1/all)ProducerSend Recordacks=0No ACKacks=1Leader ACKacks=allISR ACKLeaderBrokerISRFollowersacks=0: Fire & ForgetFastest, no durability guaranteeacks=1: Leader OnlyModerate durabilityacks=all: Full ISRHighest durabilityacks controls acknowledgment: 0=no ack, 1=leader, all=all ISR replicas
Consumer Offset Commit StrategiesAuto-CommitSync CommitAsync CommitManual CommitAt-Least-OnceAuto-commit: periodic background | Sync: blocks until ack | Async: non-blockingManual: commitSync/commitAsync after processing | At-least-once: commit after processing

Architecture Diagram: Producer Message Flow

Architecture Diagram: Consumer Group Rebalancing Process

Architecture Diagram: Offset Management Flow

Architecture Diagram: Producer Internal Threading Model

Formal Definitions

DfProducer Batching

Producer batching is the accumulation of multiple records into a single network request before sending to the broker. The RecordAccumulator buffers records per partition, and the Sender thread drains these buffers when either batch.size is reached or linger.ms expires. Batching amortizes network overhead and enables compression across multiple records, significantly improving throughput at the cost of slight latency increase.

DfConsumer Lag

Consumer lag is the difference between the latest offset in a partition log and the consumer's last committed offset. It measures how far behind a consumer is from the producer's write position. Consumer lag = log_end_offset - committed_offset. Persistent lag indicates the consumer cannot keep up with the production rate, requiring scaling or optimization.

DfIdempotent Producer

An idempotent producer prevents duplicate messages during retries by assigning each producer a Producer ID (PID) and tracking per-partition sequence numbers. The broker rejects any message whose sequence number has already been seen, ensuring exactly-once delivery per partition even across retries and producer restarts.

DfRebalance

A rebalance is the process of redistributing partition assignments among consumers in a group when membership changes (consumer join/leave/failure). During rebalance, all consumers in the group stop processing, rejoin, receive new assignments, and resume. The rebalance protocol has two phases: JoinGroup (consumers subscribe) and SyncGroup (leader assigns partitions).

DfDead Letter Queue (DLQ)

A dead letter queue is a separate Kafka topic where records are sent after failing processing after maximum retry attempts. DLQs prevent poison messages from blocking the consumer processing loop. Each record in the DLQ retains the original topic, partition, offset, and error metadata for later analysis and reprocessing.

Key Formulas

Producer Throughput
Tproducer=BbatchΓ—Nrecordsmax⁑(Bbatch,Llinger)+LnetworkT_{\text{producer}} = \frac{B_{\text{batch}} \times N_{\text{records}}}{\max(B_{\text{batch}}, L_{\text{linger}}) + L_{\text{network}}}

Here,

  • BbatchB_{\text{batch}}=Batch fill time (seconds)
  • NrecordsN_{\text{records}}=Records per batch
  • LlingerL_{\text{linger}}=linger.ms delay (seconds)
  • LnetworkL_{\text{network}}=Network round-trip latency (seconds)

Consumer Processing Rate

Rconsumer=min⁑(NpollTprocess,BfetchTfetch)R_{\text{consumer}} = \min\left(\frac{N_{\text{poll}}}{T_{\text{process}}}, \frac{B_{\text{fetch}}}{T_{\text{fetch}}}\right)

Here,

  • NpollN_{\text{poll}}=Records per poll (max.poll.records)
  • TprocessT_{\text{process}}=Time to process one poll batch
  • BfetchB_{\text{fetch}}=Fetch buffer size (fetch.max.bytes)
  • TfetchT_{\text{fetch}}=Fetch round-trip time

Consumer Lag Growth Rate

d Lagdt=Rproduceβˆ’Rconsume\frac{d\,\text{Lag}}{dt} = R_{\text{produce}} - R_{\text{consume}}

Here,

  • RproduceR_{\text{produce}}=Message production rate (msg/sec)
  • RconsumeR_{\text{consume}}=Message consumption rate (msg/sec)

Batch Efficiency

Ξ·batch=NrecordsΓ—SavgBbatch\eta_{\text{batch}} = \frac{N_{\text{records}} \times S_{\text{avg}}}{B_{\text{batch}}}

Here,

  • NrecordsN_{\text{records}}=Number of records in batch
  • SavgS_{\text{avg}}=Average record size (bytes)
  • BbatchB_{\text{batch}}=Total batch size (bytes)

When consumer lag grows continuously (dLag/dt > 0), the consumer is falling behind. Solutions: (1) increase partition count and consumer instances, (2) optimize processing logic, (3) increase max.poll.records, (4) reduce per-record processing time.

The acks setting controls the durability-latency tradeoff: acks=0 (fire-and-forget, lowest latency, no durability), acks=1 (leader acknowledgment, moderate), acks=all (all ISR acknowledge, highest durability, highest latency). For exactly-once, use acks=all with idempotence enabled.

Use commitSync() for the last offset in a batch (guaranteed delivery), and commitAsync() for intermediate offsets (higher throughput). If commitAsync() fails, the next commitSync() will update the offset correctly.

ThAt-Least-Once Delivery

With manual offset commits and enable.auto.commit=false, Kafka provides at-least-once delivery by default: commit after processing ensures no data loss, but processing failures between poll() and commitSync() may cause duplicates. Deduplication at the sink (e.g., database upsert by message ID) is required for exactly-once at the application level.

ThRebalance Impact Theorem

During a rebalance, all consumers in the group pause processing. The rebalance duration is bounded by session.timeout.ms for detection + max.poll.interval.ms for consumer response + coordination overhead. With CooperativeStickyAssignor, only partitions that need to move are revoked, reducing pause time by ~50% compared to eager rebalancing.

Detailed Explanation

The Kafka Producer API is a sophisticated client library designed for high-throughput, low-latency message publishing. At its core, the producer operates on an asynchronous, batching model where records are accumulated in a memory buffer before being sent in batches to the appropriate brokers.

Producer Internals

The producer maintains a RecordAccumulator that buffers records per partition, and a background Sender thread that drains these buffers and sends batched requests to the brokers. This architecture minimizes network overhead and enables efficient compression, as multiple records can be compressed together in a single batch.

The Partitioning Strategy is critical for both ordering and load distribution. By default, Kafka uses a murmur2 hash of the key modulo the number of partitions, ensuring that records with the same key always go to the same partition (and thus maintain ordering). Custom partitioners can be implemented for more sophisticated routing, such as sticky partitioning (which batches records to the same partition before switching) or region-aware partitioning.

Producer Configurations heavily influence behavior. The acks setting determines durability guarantees: acks=0 provides no acknowledgment (fire-and-forget), acks=1 acknowledges when the leader writes to its local log, and acks=all acknowledges when all in-sync replicas write the record. The max.in.flight.requests.per.connection setting controls how many unacknowledged requests can be outstanding per broker connection, with values > 1 potentially causing reordering (unless idempotence is enabled).

Consumer Internals

Consumer Group Coordination is managed by a Group Coordinator broker that maintains the group's membership state and partition assignments. When a consumer joins or leaves, a rebalance is triggered using a two-phase protocol: first, consumers send JoinGroup requests with their subscriptions, then the coordinator elects a leader consumer (typically the first to join) who computes the partition assignment, and finally all consumers send SyncGroup requests to receive their assignments.

The CooperativeStickyAssignor (recommended) performs incremental rebalancing, only revoking partitions that need to move, minimizing the impact on processing continuity. This is a significant improvement over the eager rebalancing strategy which revokes all partitions.

Offset Management is crucial for at-least-once and exactly-once processing semantics. Offsets can be committed automatically (enable.auto.commit=true) or manually via commitSync() or commitAsync(). Manual commits provide more control, allowing consumers to commit only after successful processing.

Consumer Polling and Processing follows a single-threaded model where poll() fetches batches of records, and the application processes them sequentially. The max.poll.records setting controls how many records are returned per poll, while max.poll.interval.ms sets the maximum time between polls before the consumer is considered failed and removed from the group.

Key Concepts Table

ConceptDescriptionConfigurationImpact
Record AccumulatorIn-memory buffer for batching recordsbuffer.memory, batch.sizeThroughput vs. Memory
Sender ThreadBackground thread sending batcheslinger.ms, request.timeout.msLatency vs. Throughput
PartitionerDetermines target partition for recordpartitioner.classOrdering, Load Distribution
IdempotencePrevents duplicate records on retryenable.idempotenceExactly-once per partition
TransactionalAtomic multi-partition writestransactional.idExactly-once across partitions
Group CoordinatorBroker managing consumer groupsN/ARebalance coordination
HeartbeatConsumer liveness signalheartbeat.interval.msFailure detection speed
Session TimeoutConsumer failure thresholdsession.timeout.msFalse positive vs. Detection
Rebalance ListenerCallback for partition changespartition.assignment.strategyRebalance handling
Isolation LevelTransactional read semanticsisolation.levelConsistent reads
Max Poll RecordsRecords per poll() batchmax.poll.recordsThroughput vs. latency
Fetch Min BytesMinimum fetch batch sizefetch.min.bytesFetch efficiency

Code Examples

Advanced Producer with Custom Partitioner

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;

public class CustomPartitionerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Use custom partitioner
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
            RegionAwarePartitioner.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Send with custom headers for routing
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "user-events",
                "user-" + (i % 10),
                "{\"userId\": " + i + ", \"action\": \"click\"}"
            );
            
            // Add headers for partitioning logic
            record.headers().add("region", ("us-east-" + (i % 3)).getBytes());
            record.headers().add("priority", (i % 5 == 0 ? "high" : "normal").getBytes());
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error: " + exception.getMessage());
                } else {
                    System.out.printf("Partition: %d, Offset: %d%n", 
                        metadata.partition(), metadata.offset());
                }
            });
        }
        
        producer.flush();
        producer.close();
    }
}

// Custom partitioner implementation
class RegionAwarePartitioner implements Partitioner {
    
    @Override
    public void configure(Map<String, ?> configs) {}
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // Default: hash-based partitioning
        if (keyBytes == null) {
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }
        
        // Region-aware: distribute by region, then by key hash
        String region = extractRegionFromKey(key.toString());
        int regionHash = region.hashCode() % numPartitions;
        
        // Within region, use key hash
        int keyHash = Utils.murmur2(keyBytes) % numPartitions;
        
        // Combine for final partition
        return (regionHash + keyHash) % numPartitions;
    }
    
    private String extractRegionFromKey(String key) {
        // Extract region from key pattern
        return key.contains("us") ? "us" : key.contains("eu") ? "eu" : "other";
    }
    
    @Override
    public void close() {}
}

Consumer with Manual Offset Management and Retry

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ManualOffsetConsumerExample {
    
    private static final int MAX_RETRIES = 3;
    private static final Map<TopicPartition, Long> failedOffsets = new ConcurrentHashMap<>();
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "retry-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("order-events"), new RebalanceHandler());
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        processRecordWithRetry(record);
                        
                        // Track successful offset
                        // OffsetAndMetadata + 1 = next offset to read
                        offsetsToCommit.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, "processed")
                        );
                        
                    } catch (Exception e) {
                        System.err.printf("Failed to process record at offset %d: %s%n",
                            record.offset(), e.getMessage());
                        
                        // Track failed offset for later retry
                        failedOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            record.offset()
                        );
                        
                        // Don't commit this offset
                        break;
                    }
                }
                
                // Commit only successful offsets
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
                }
                
                // Process any failed records
                retryFailedRecords(consumer);
            }
        } finally {
            consumer.close();
        }
    }
    
    private static void processRecordWithRetry(ConsumerRecord<String, String> record) 
            throws Exception {
        int retries = 0;
        while (retries < MAX_RETRIES) {
            try {
                // Simulate processing
                if (record.offset() % 10 == 0) {
                    throw new RuntimeException("Simulated processing error");
                }
                System.out.printf("Processed: %s%n", record.value());
                return;
            } catch (Exception e) {
                retries++;
                if (retries >= MAX_RETRIES) {
                    throw e;
                }
                Thread.sleep(1000 * retries); // Exponential backoff
            }
        }
    }
    
    private static void retryFailedRecords(KafkaConsumer<String, String> consumer) {
        if (failedOffsets.isEmpty()) return;
        
        System.out.printf("Retrying %d failed records%n", failedOffsets.size());
        
        failedOffsets.forEach((partition, offset) -> {
            // Seek to failed offset for retry
            consumer.seek(partition, offset);
        });
        
        failedOffsets.clear();
    }
    
    static class RebalanceHandler implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Partitions revoked: " + partitions);
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Partitions assigned: " + partitions);
        }
    }
}

Transactional Producer-Consumer Example

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;

public class ExactlyOnceProducerConsumerExample {
    
    public static void main(String[] args) {
        // Transactional producer that reads from one topic and writes to another
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "eos-transformer-1");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "eos-transformer-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        
        producer.initTransactions();
        consumer.subscribe(Arrays.asList("input-events"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                if (records.isEmpty()) continue;
                
                producer.beginTransaction();
                try {
                    for (ConsumerRecord<String, String> record : records) {
                        // Transform the record
                        String transformedValue = transform(record.value());
                        
                        // Write to output topic
                        ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
                            "output-events",
                            record.key(),
                            transformedValue
                        );
                        
                        // Add transformation metadata
                        outputRecord.headers().add("source-topic", record.topic().getBytes());
                        outputRecord.headers().add("source-offset", 
                            String.valueOf(record.offset()).getBytes());
                        outputRecord.headers().add("transformed-at", 
                            String.valueOf(System.currentTimeMillis()).getBytes());
                        
                        producer.send(outputRecord);
                    }
                    
                    // Commit offsets atomically with the transaction
                    // This ensures exactly-once: process + commit in one transaction
                    consumer.commitSync(); // Commit to consumer's __consumer_offsets
                    producer.sendOffsetsToTransaction(
                        consumer.position(new TopicPartition("input-events", 0)),
                        "eos-transformer-group"
                    );
                    
                    producer.commitTransaction();
                    System.out.printf("Committed transaction for %d records%n", records.count());
                    
                } catch (Exception e) {
                    producer.abortTransaction();
                    System.err.println("Transaction aborted: " + e.getMessage());
                }
            }
        } finally {
            producer.close();
            consumer.close();
        }
    }
    
    private static String transform(String value) {
        // Example transformation: add processed timestamp
        return value.replace("}", ", \"processedAt\": " + System.currentTimeMillis() + "}");
    }
}

Python Producer with Idempotence and Transactions

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time

# Producer with idempotence and transaction support
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=5,
    retry_backoff_ms=100,
    batch_size=65536,
    linger_ms=10,
    compression_type='lz4',
    enable_idempotence=True,
    transactional_id='my-transactional-producer-1',
)

# Initialize transactions
producer.init_transactions()

# Transactional send
try:
    producer.begin_transaction()
    
    for i in range(100):
        producer.send(
            topic='order-events',
            key=f'order-{i}',
            value={'orderId': i, 'amount': 100.0, 'status': 'pending'}
        )
    
    # Commit transaction atomically
    producer.commit_transaction()
    print("Transaction committed successfully")
    
except KafkaError as e:
    # Abort on any failure
    producer.abort_transaction()
    print(f"Transaction aborted: {e}")

# Callback-based send with error handling
def on_send_success(record_metadata):
    print(f"Sent to {record_metadata.topic} partition {record_metadata.partition} "
          f"offset {record_metadata.offset}")

def on_send_error(ex):
    print(f"Error: {ex}")

# Non-transactional send with callbacks
producer.send('user-events', key='user-123', value={'action': 'login'}) \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)

producer.flush()
producer.close()

Python Consumer with Manual Offset Management

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    isolation_level='read_committed',
    max_poll_records=500,
    max_poll_interval_ms=300000,
    session_timeout_ms=45000,
)

def process_record(record):
    """Process a single record. Raise exception to trigger retry."""
    print(f"Processing: topic={record.topic}, partition={record.partition}, "
          f"offset={record.offset}, key={record.key}")
    # Business logic here
    return True

try:
    while True:
        records = consumer.poll(timeout_ms=100)
        
        offsets_to_commit = {}
        
        for tp, records_list in records.items():
            for record in records_list:
                try:
                    process_record(record)
                    
                    # Track successful offset
                    offsets_to_commit[tp] = OffsetAndMetadata(
                        record.offset + 1,  # Next offset to read
                        metadata='processed'
                    )
                    
                except Exception as e:
                    print(f"Failed to process offset {record.offset}: {e}")
                    # Don't commit this offset; will be retried on next poll
                    break
        
        # Commit only successful offsets
        if offsets_to_commit:
            consumer.commitSync(offsets_to_commit)
            print(f"Committed offsets: {offsets_to_commit}")

except KeyboardInterrupt:
    consumer.close()

Bash Consumer Monitoring Script

#!/bin/bash
# Monitor consumer group lag and health

BOOTSTRAP_SERVER="localhost:9092"
GROUP_ID="order-processing-service"

echo "=== Consumer Group Monitor ==="
echo "Timestamp: $(date)"
echo ""

# List all consumer groups
echo "--- Active Consumer Groups ---"
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --list

echo ""
echo "--- Group Details: $GROUP_ID ---"
# Describe shows partition assignments, current offset, log-end offset, lag
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --group $GROUP_ID \
  --describe

echo ""
echo "--- Group State ---"
# Shows group state (Stable, PreparingRebalance, Dead) and member details
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --group $GROUP_ID \
  --describe --state

echo ""
echo "--- Reset Offsets (Dry Run) ---"
# Reset offsets to earliest for specific partition (dry run first)
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --group $GROUP_ID \
  --topic order-events:0 \
  --reset-offsets \
  --to-earliest \
  --dry-run

echo ""
echo "--- Reset Offsets (Execute) ---"
# Actually reset offsets (consumer group must be inactive)
# kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
#   --group $GROUP_ID \
#   --topic order-events:0 \
#   --reset-offsets \
#   --to-earliest \
#   --execute

Performance Metrics

ScenarioThroughput (msg/sec)Latency (p99)Memory UsageCPU Usage
Async Batch (16KB)500,0005ms64MB30%
Sync (acks=all)50,00050ms32MB15%
Idempotent450,0008ms64MB35%
Transactional300,00015ms128MB40%
Compression (gzip)200,00010ms16MB50%
Compression (lz4)400,0007ms32MB25%
Consumer (single)100,0002ms32MB20%
Consumer (group=4)400,0005ms128MB80%
Consumer (group=8)700,0008ms256MB150%
ConfigurationDefaultImpact
batch.size16384 (16KB)Throughput: larger = higher throughput
linger.ms0Latency: 0 = lowest latency
buffer.memory33554432 (32MB)Backpressure when full
max.in.flight5Pipelining: higher = better throughput
compression.typenoneCPU vs. network tradeoff
max.poll.records2147483647Processing loop granularity
fetch.min.bytes1Fetch batching efficiency
fetch.max.wait.ms500Latency for small fetches

Best Practices

  1. Batch Size Optimization: Increase batch.size to 64KB-128KB for higher throughput, and use linger.ms=10-50 to allow batching. This trades slight latency for significantly higher throughput. Monitor batch-size-avg metric to tune.

  2. Idempotent Producers: Always enable enable.idempotence=true in production. This provides exactly-once semantics per partition without performance overhead. Combine with acks=all and max.in.flight.requests.per.connection=5.

  3. Transaction Management: Use transactional producers when operations span multiple partitions/topics. Keep transaction scope small and avoid long-running transactions that block others. Set transaction.timeout.ms appropriately.

  4. Consumer Group Sizing: Match consumer count to partition count. More consumers than partitions wastes resources. Use CooperativeStickyAssignor for minimal disruption during rebalances.

  5. Offset Commit Strategy: Disable auto-commit and commit manually after processing. Use commitSync() for critical offsets and commitAsync() for bulk commits. Never commit offsets for unprocessed records.

  6. Error Handling: Implement retry logic with exponential backoff for transient failures. Use a dead-letter queue (DLQ) for records that fail after max retries. Monitor consumer lag to detect processing bottlenecks.

  7. Memory Management: Configure max.partition.fetch.bytes to control memory usage per partition. Set fetch.min.bytes and fetch.max.wait.ms to batch fetch requests efficiently.

  8. Monitoring: Track producer metrics: record-send-rate, batch-size-avg, compression-rate-avg, request-latency-avg. Track consumer metrics: records-lag-max, records-consumed-rate, poll-rate.

  9. Security: Use SASL/SCRAM for authentication and SSL for encryption. Implement ACLs to restrict topic access. Use separate producers for different security zones.

  10. Testing: Use Testcontainers or embedded Kafka for integration tests. Test with realistic volumes and verify exactly-once semantics with idempotent consumer writes. Use chaos testing to verify retry behavior.

Key Takeaways:

  • Producer batching (batch.size + linger.ms) trades latency for throughput; typical optimal batch size is 64KB-128KB
  • Consumer lag = log_end_offset - committed_offset; sustained growth indicates consumer cannot keep up
  • Idempotent producers (enable.idempotence=true) prevent duplicates per partition without transactions
  • Partition assignment via CooperativeStickyAssignor minimizes rebalance disruption
  • Manual offset commits after processing prevent data loss; auto-commit risks duplicates on failure
  • Consumer throughput is bounded by min(poll_rate x records_per_poll, fetch_rate)

See also: Kafka Architecture (kafka/01) | Kafka Streams & Connect (kafka/03) | Exactly-Once Semantics (kafka/04) | Schema Registry (kafka/05)

See Also

⭐

Premium Content

Apache Kafka Producer & Consumer: Advanced Configurations and Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement