Apache Kafka Producer & Consumer: Advanced Configurations and Patterns
Architecture Diagram: Producer Message Flow
Architecture Diagram: Consumer Group Rebalancing Process
Architecture Diagram: Offset Management Flow
Architecture Diagram: Producer Internal Threading Model
Formal Definitions
DfProducer Batching
Producer batching is the accumulation of multiple records into a single network request before sending to the broker. The RecordAccumulator buffers records per partition, and the Sender thread drains these buffers when either batch.size is reached or linger.ms expires. Batching amortizes network overhead and enables compression across multiple records, significantly improving throughput at the cost of slight latency increase.
DfConsumer Lag
Consumer lag is the difference between the latest offset in a partition log and the consumer's last committed offset. It measures how far behind a consumer is from the producer's write position. Consumer lag = log_end_offset - committed_offset. Persistent lag indicates the consumer cannot keep up with the production rate, requiring scaling or optimization.
DfIdempotent Producer
An idempotent producer prevents duplicate messages during retries by assigning each producer a Producer ID (PID) and tracking per-partition sequence numbers. The broker rejects any message whose sequence number has already been seen, ensuring exactly-once delivery per partition even across retries and producer restarts.
DfRebalance
A rebalance is the process of redistributing partition assignments among consumers in a group when membership changes (consumer join/leave/failure). During rebalance, all consumers in the group stop processing, rejoin, receive new assignments, and resume. The rebalance protocol has two phases: JoinGroup (consumers subscribe) and SyncGroup (leader assigns partitions).
DfDead Letter Queue (DLQ)
A dead letter queue is a separate Kafka topic where records are sent after failing processing after maximum retry attempts. DLQs prevent poison messages from blocking the consumer processing loop. Each record in the DLQ retains the original topic, partition, offset, and error metadata for later analysis and reprocessing.
Key Formulas
Here,
- =Batch fill time (seconds)
- =Records per batch
- =linger.ms delay (seconds)
- =Network round-trip latency (seconds)
Consumer Processing Rate
Here,
- =Records per poll (max.poll.records)
- =Time to process one poll batch
- =Fetch buffer size (fetch.max.bytes)
- =Fetch round-trip time
Consumer Lag Growth Rate
Here,
- =Message production rate (msg/sec)
- =Message consumption rate (msg/sec)
Batch Efficiency
Here,
- =Number of records in batch
- =Average record size (bytes)
- =Total batch size (bytes)
When consumer lag grows continuously (dLag/dt > 0), the consumer is falling behind. Solutions: (1) increase partition count and consumer instances, (2) optimize processing logic, (3) increase max.poll.records, (4) reduce per-record processing time.
The acks setting controls the durability-latency tradeoff: acks=0 (fire-and-forget, lowest latency, no durability), acks=1 (leader acknowledgment, moderate), acks=all (all ISR acknowledge, highest durability, highest latency). For exactly-once, use acks=all with idempotence enabled.
Use commitSync() for the last offset in a batch (guaranteed delivery), and commitAsync() for intermediate offsets (higher throughput). If commitAsync() fails, the next commitSync() will update the offset correctly.
ThAt-Least-Once Delivery
With manual offset commits and enable.auto.commit=false, Kafka provides at-least-once delivery by default: commit after processing ensures no data loss, but processing failures between poll() and commitSync() may cause duplicates. Deduplication at the sink (e.g., database upsert by message ID) is required for exactly-once at the application level.
ThRebalance Impact Theorem
During a rebalance, all consumers in the group pause processing. The rebalance duration is bounded by session.timeout.ms for detection + max.poll.interval.ms for consumer response + coordination overhead. With CooperativeStickyAssignor, only partitions that need to move are revoked, reducing pause time by ~50% compared to eager rebalancing.
Detailed Explanation
The Kafka Producer API is a sophisticated client library designed for high-throughput, low-latency message publishing. At its core, the producer operates on an asynchronous, batching model where records are accumulated in a memory buffer before being sent in batches to the appropriate brokers.
Producer Internals
The producer maintains a RecordAccumulator that buffers records per partition, and a background Sender thread that drains these buffers and sends batched requests to the brokers. This architecture minimizes network overhead and enables efficient compression, as multiple records can be compressed together in a single batch.
The Partitioning Strategy is critical for both ordering and load distribution. By default, Kafka uses a murmur2 hash of the key modulo the number of partitions, ensuring that records with the same key always go to the same partition (and thus maintain ordering). Custom partitioners can be implemented for more sophisticated routing, such as sticky partitioning (which batches records to the same partition before switching) or region-aware partitioning.
Producer Configurations heavily influence behavior. The acks setting determines durability guarantees: acks=0 provides no acknowledgment (fire-and-forget), acks=1 acknowledges when the leader writes to its local log, and acks=all acknowledges when all in-sync replicas write the record. The max.in.flight.requests.per.connection setting controls how many unacknowledged requests can be outstanding per broker connection, with values > 1 potentially causing reordering (unless idempotence is enabled).
Consumer Internals
Consumer Group Coordination is managed by a Group Coordinator broker that maintains the group's membership state and partition assignments. When a consumer joins or leaves, a rebalance is triggered using a two-phase protocol: first, consumers send JoinGroup requests with their subscriptions, then the coordinator elects a leader consumer (typically the first to join) who computes the partition assignment, and finally all consumers send SyncGroup requests to receive their assignments.
The CooperativeStickyAssignor (recommended) performs incremental rebalancing, only revoking partitions that need to move, minimizing the impact on processing continuity. This is a significant improvement over the eager rebalancing strategy which revokes all partitions.
Offset Management is crucial for at-least-once and exactly-once processing semantics. Offsets can be committed automatically (enable.auto.commit=true) or manually via commitSync() or commitAsync(). Manual commits provide more control, allowing consumers to commit only after successful processing.
Consumer Polling and Processing follows a single-threaded model where poll() fetches batches of records, and the application processes them sequentially. The max.poll.records setting controls how many records are returned per poll, while max.poll.interval.ms sets the maximum time between polls before the consumer is considered failed and removed from the group.
Key Concepts Table
| Concept | Description | Configuration | Impact |
|---|---|---|---|
| Record Accumulator | In-memory buffer for batching records | buffer.memory, batch.size | Throughput vs. Memory |
| Sender Thread | Background thread sending batches | linger.ms, request.timeout.ms | Latency vs. Throughput |
| Partitioner | Determines target partition for record | partitioner.class | Ordering, Load Distribution |
| Idempotence | Prevents duplicate records on retry | enable.idempotence | Exactly-once per partition |
| Transactional | Atomic multi-partition writes | transactional.id | Exactly-once across partitions |
| Group Coordinator | Broker managing consumer groups | N/A | Rebalance coordination |
| Heartbeat | Consumer liveness signal | heartbeat.interval.ms | Failure detection speed |
| Session Timeout | Consumer failure threshold | session.timeout.ms | False positive vs. Detection |
| Rebalance Listener | Callback for partition changes | partition.assignment.strategy | Rebalance handling |
| Isolation Level | Transactional read semantics | isolation.level | Consistent reads |
| Max Poll Records | Records per poll() batch | max.poll.records | Throughput vs. latency |
| Fetch Min Bytes | Minimum fetch batch size | fetch.min.bytes | Fetch efficiency |
Code Examples
Advanced Producer with Custom Partitioner
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;
public class CustomPartitionerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Use custom partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
RegionAwarePartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send with custom headers for routing
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
"user-" + (i % 10),
"{\"userId\": " + i + ", \"action\": \"click\"}"
);
// Add headers for partitioning logic
record.headers().add("region", ("us-east-" + (i % 3)).getBytes());
record.headers().add("priority", (i % 5 == 0 ? "high" : "normal").getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf("Partition: %d, Offset: %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.flush();
producer.close();
}
}
// Custom partitioner implementation
class RegionAwarePartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Default: hash-based partitioning
if (keyBytes == null) {
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// Region-aware: distribute by region, then by key hash
String region = extractRegionFromKey(key.toString());
int regionHash = region.hashCode() % numPartitions;
// Within region, use key hash
int keyHash = Utils.murmur2(keyBytes) % numPartitions;
// Combine for final partition
return (regionHash + keyHash) % numPartitions;
}
private String extractRegionFromKey(String key) {
// Extract region from key pattern
return key.contains("us") ? "us" : key.contains("eu") ? "eu" : "other";
}
@Override
public void close() {}
}
Consumer with Manual Offset Management and Retry
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ManualOffsetConsumerExample {
private static final int MAX_RETRIES = 3;
private static final Map<TopicPartition, Long> failedOffsets = new ConcurrentHashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retry-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"), new RebalanceHandler());
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
processRecordWithRetry(record);
// Track successful offset
// OffsetAndMetadata + 1 = next offset to read
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "processed")
);
} catch (Exception e) {
System.err.printf("Failed to process record at offset %d: %s%n",
record.offset(), e.getMessage());
// Track failed offset for later retry
failedOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset()
);
// Don't commit this offset
break;
}
}
// Commit only successful offsets
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
}
// Process any failed records
retryFailedRecords(consumer);
}
} finally {
consumer.close();
}
}
private static void processRecordWithRetry(ConsumerRecord<String, String> record)
throws Exception {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
// Simulate processing
if (record.offset() % 10 == 0) {
throw new RuntimeException("Simulated processing error");
}
System.out.printf("Processed: %s%n", record.value());
return;
} catch (Exception e) {
retries++;
if (retries >= MAX_RETRIES) {
throw e;
}
Thread.sleep(1000 * retries); // Exponential backoff
}
}
}
private static void retryFailedRecords(KafkaConsumer<String, String> consumer) {
if (failedOffsets.isEmpty()) return;
System.out.printf("Retrying %d failed records%n", failedOffsets.size());
failedOffsets.forEach((partition, offset) -> {
// Seek to failed offset for retry
consumer.seek(partition, offset);
});
failedOffsets.clear();
}
static class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
}
}
Transactional Producer-Consumer Example
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
public class ExactlyOnceProducerConsumerExample {
public static void main(String[] args) {
// Transactional producer that reads from one topic and writes to another
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "eos-transformer-1");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "eos-transformer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Arrays.asList("input-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Transform the record
String transformedValue = transform(record.value());
// Write to output topic
ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
"output-events",
record.key(),
transformedValue
);
// Add transformation metadata
outputRecord.headers().add("source-topic", record.topic().getBytes());
outputRecord.headers().add("source-offset",
String.valueOf(record.offset()).getBytes());
outputRecord.headers().add("transformed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(outputRecord);
}
// Commit offsets atomically with the transaction
// This ensures exactly-once: process + commit in one transaction
consumer.commitSync(); // Commit to consumer's __consumer_offsets
producer.sendOffsetsToTransaction(
consumer.position(new TopicPartition("input-events", 0)),
"eos-transformer-group"
);
producer.commitTransaction();
System.out.printf("Committed transaction for %d records%n", records.count());
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
}
}
} finally {
producer.close();
consumer.close();
}
}
private static String transform(String value) {
// Example transformation: add processed timestamp
return value.replace("}", ", \"processedAt\": " + System.currentTimeMillis() + "}");
}
}
Python Producer with Idempotence and Transactions
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
# Producer with idempotence and transaction support
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=5,
retry_backoff_ms=100,
batch_size=65536,
linger_ms=10,
compression_type='lz4',
enable_idempotence=True,
transactional_id='my-transactional-producer-1',
)
# Initialize transactions
producer.init_transactions()
# Transactional send
try:
producer.begin_transaction()
for i in range(100):
producer.send(
topic='order-events',
key=f'order-{i}',
value={'orderId': i, 'amount': 100.0, 'status': 'pending'}
)
# Commit transaction atomically
producer.commit_transaction()
print("Transaction committed successfully")
except KafkaError as e:
# Abort on any failure
producer.abort_transaction()
print(f"Transaction aborted: {e}")
# Callback-based send with error handling
def on_send_success(record_metadata):
print(f"Sent to {record_metadata.topic} partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
def on_send_error(ex):
print(f"Error: {ex}")
# Non-transactional send with callbacks
producer.send('user-events', key='user-123', value={'action': 'login'}) \
.add_callback(on_send_success) \
.add_errback(on_send_error)
producer.flush()
producer.close()
Python Consumer with Manual Offset Management
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json
consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
auto_offset_reset='earliest',
enable_auto_commit=False,
isolation_level='read_committed',
max_poll_records=500,
max_poll_interval_ms=300000,
session_timeout_ms=45000,
)
def process_record(record):
"""Process a single record. Raise exception to trigger retry."""
print(f"Processing: topic={record.topic}, partition={record.partition}, "
f"offset={record.offset}, key={record.key}")
# Business logic here
return True
try:
while True:
records = consumer.poll(timeout_ms=100)
offsets_to_commit = {}
for tp, records_list in records.items():
for record in records_list:
try:
process_record(record)
# Track successful offset
offsets_to_commit[tp] = OffsetAndMetadata(
record.offset + 1, # Next offset to read
metadata='processed'
)
except Exception as e:
print(f"Failed to process offset {record.offset}: {e}")
# Don't commit this offset; will be retried on next poll
break
# Commit only successful offsets
if offsets_to_commit:
consumer.commitSync(offsets_to_commit)
print(f"Committed offsets: {offsets_to_commit}")
except KeyboardInterrupt:
consumer.close()
Bash Consumer Monitoring Script
#!/bin/bash
# Monitor consumer group lag and health
BOOTSTRAP_SERVER="localhost:9092"
GROUP_ID="order-processing-service"
echo "=== Consumer Group Monitor ==="
echo "Timestamp: $(date)"
echo ""
# List all consumer groups
echo "--- Active Consumer Groups ---"
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --list
echo ""
echo "--- Group Details: $GROUP_ID ---"
# Describe shows partition assignments, current offset, log-end offset, lag
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
--group $GROUP_ID \
--describe
echo ""
echo "--- Group State ---"
# Shows group state (Stable, PreparingRebalance, Dead) and member details
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
--group $GROUP_ID \
--describe --state
echo ""
echo "--- Reset Offsets (Dry Run) ---"
# Reset offsets to earliest for specific partition (dry run first)
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
--group $GROUP_ID \
--topic order-events:0 \
--reset-offsets \
--to-earliest \
--dry-run
echo ""
echo "--- Reset Offsets (Execute) ---"
# Actually reset offsets (consumer group must be inactive)
# kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
# --group $GROUP_ID \
# --topic order-events:0 \
# --reset-offsets \
# --to-earliest \
# --execute
Performance Metrics
| Scenario | Throughput (msg/sec) | Latency (p99) | Memory Usage | CPU Usage |
|---|---|---|---|---|
| Async Batch (16KB) | 500,000 | 5ms | 64MB | 30% |
| Sync (acks=all) | 50,000 | 50ms | 32MB | 15% |
| Idempotent | 450,000 | 8ms | 64MB | 35% |
| Transactional | 300,000 | 15ms | 128MB | 40% |
| Compression (gzip) | 200,000 | 10ms | 16MB | 50% |
| Compression (lz4) | 400,000 | 7ms | 32MB | 25% |
| Consumer (single) | 100,000 | 2ms | 32MB | 20% |
| Consumer (group=4) | 400,000 | 5ms | 128MB | 80% |
| Consumer (group=8) | 700,000 | 8ms | 256MB | 150% |
| Configuration | Default | Impact |
|---|---|---|
batch.size | 16384 (16KB) | Throughput: larger = higher throughput |
linger.ms | 0 | Latency: 0 = lowest latency |
buffer.memory | 33554432 (32MB) | Backpressure when full |
max.in.flight | 5 | Pipelining: higher = better throughput |
compression.type | none | CPU vs. network tradeoff |
max.poll.records | 2147483647 | Processing loop granularity |
fetch.min.bytes | 1 | Fetch batching efficiency |
fetch.max.wait.ms | 500 | Latency for small fetches |
Best Practices
-
Batch Size Optimization: Increase
batch.sizeto 64KB-128KB for higher throughput, and uselinger.ms=10-50to allow batching. This trades slight latency for significantly higher throughput. Monitorbatch-size-avgmetric to tune. -
Idempotent Producers: Always enable
enable.idempotence=truein production. This provides exactly-once semantics per partition without performance overhead. Combine withacks=allandmax.in.flight.requests.per.connection=5. -
Transaction Management: Use transactional producers when operations span multiple partitions/topics. Keep transaction scope small and avoid long-running transactions that block others. Set
transaction.timeout.msappropriately. -
Consumer Group Sizing: Match consumer count to partition count. More consumers than partitions wastes resources. Use
CooperativeStickyAssignorfor minimal disruption during rebalances. -
Offset Commit Strategy: Disable auto-commit and commit manually after processing. Use
commitSync()for critical offsets andcommitAsync()for bulk commits. Never commit offsets for unprocessed records. -
Error Handling: Implement retry logic with exponential backoff for transient failures. Use a dead-letter queue (DLQ) for records that fail after max retries. Monitor consumer lag to detect processing bottlenecks.
-
Memory Management: Configure
max.partition.fetch.bytesto control memory usage per partition. Setfetch.min.bytesandfetch.max.wait.msto batch fetch requests efficiently. -
Monitoring: Track producer metrics:
record-send-rate,batch-size-avg,compression-rate-avg,request-latency-avg. Track consumer metrics:records-lag-max,records-consumed-rate,poll-rate. -
Security: Use SASL/SCRAM for authentication and SSL for encryption. Implement ACLs to restrict topic access. Use separate producers for different security zones.
-
Testing: Use Testcontainers or embedded Kafka for integration tests. Test with realistic volumes and verify exactly-once semantics with idempotent consumer writes. Use chaos testing to verify retry behavior.
Key Takeaways:
- Producer batching (batch.size + linger.ms) trades latency for throughput; typical optimal batch size is 64KB-128KB
- Consumer lag = log_end_offset - committed_offset; sustained growth indicates consumer cannot keep up
- Idempotent producers (enable.idempotence=true) prevent duplicates per partition without transactions
- Partition assignment via CooperativeStickyAssignor minimizes rebalance disruption
- Manual offset commits after processing prevent data loss; auto-commit risks duplicates on failure
- Consumer throughput is bounded by min(poll_rate x records_per_poll, fetch_rate)
See also: Kafka Architecture (kafka/01) | Kafka Streams & Connect (kafka/03) | Exactly-Once Semantics (kafka/04) | Schema Registry (kafka/05)
See Also
- Kafka Architecture β Core architecture and topic design
- Kafka Streams and Connect β Stream processing and connector patterns
- Exactly-Once Semantics β Exactly-once delivery guarantees
- Streaming Pipeline β End-to-end streaming pipeline design