Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency
Architecture Diagram: Exactly-Once Semantics Components
Architecture Diagram: Transaction Lifecycle State Machine
Architecture Diagram: Idempotent Producer Deduplication
Architecture Diagram: Two-Phase Commit Protocol
Architecture Diagram: Consumer Isolation Levels
Formal Definitions
DfExactly-Once Semantics (EOS)
Exactly-once semantics guarantees that each message is delivered to the consumer exactly once — no duplicates, no data loss. In Kafka, EOS is achieved through three coordinated mechanisms: (1) idempotent producers that prevent duplicate writes per partition, (2) transactional producers that enable atomic multi-partition writes, and (3) consumer isolation levels that filter uncommitted records. End-to-end EOS requires all three components working together.
DfIdempotent Producer
An idempotent producer ensures that a message is written to a partition exactly once, even if the producer retries. Each producer instance receives a unique Producer ID (PID) and epoch. Each partition tracks the last sequence number per PID. Retries with the same sequence number are rejected as duplicates. Formally: for producer P with PID=p, the broker accepts message (p, seq) only if seq = lastAcceptedSeq(p) + 1.
DfTransactional Producer
A transactional producer enables atomic writes across multiple partitions and topics. A transaction groups multiple produce requests into a single atomic unit: either all messages in the transaction are committed (visible to read_committed consumers) or none are (aborted and invisible). The Transaction Coordinator manages the two-phase commit protocol using the __transaction_state topic.
DfProducer Fencing
Producer fencing is the mechanism that prevents a stale producer instance (after restart or leadership change) from writing to a partition. When a producer restarts, it acquires a new epoch number. The broker rejects any write with an epoch older than the current known epoch for that PID, effectively "fencing" the old producer instance.
DfLast Stable Offset (LSO)
The Last Stable Offset (LSO) is the offset of the first message in a partition that is part of an ongoing (uncommitted) transaction. Consumers with isolation.level=read_committed only see messages up to the LSO, filtering out messages from uncommitted transactions. The LSO advances when the oldest open transaction is committed or aborted.
Key Formulas
Here,
- =Message
- =Indicator: 1 if delivered to consumer, 0 otherwise
Idempotent Deduplication Condition
Here,
- =Producer ID
- =Sequence number of the incoming message
- =Last accepted sequence number for this PID on this partition
Deduplication Window
Here,
- =max.in.flight.requests.per.connection (default 5)
- =Last committed sequence number
Transaction Timeout Risk
Here,
- =transaction.timeout.ms (default 60s)
- =Time from beginTransaction() to commitTransaction()
The Producer ID (PID) is assigned by the Transaction Coordinator and persisted in __transaction_state. On producer restart, a new PID is acquired with an incremented epoch, which fences (invalidates) the old producer instance. This prevents split-brain scenarios where two producer instances write to the same partition.
Transaction timeout (transaction.timeout.ms, default 60s) determines how long a transaction can remain open. If the producer crashes during a transaction, the coordinator will abort it after the timeout. Keep transactions short to minimize resource holding and reduce timeout risk.
With idempotence enabled (enable.idempotence=true), the producer automatically sets acks=all and retries=Integer.MAX_VALUE. This ensures durability and prevents duplicate messages even under transient network failures.
ThOrdering Guarantee with Idempotence
With idempotence enabled, Kafka guarantees exactly-once delivery and ordering within a partition even with retries and max.in.flight.requests.per.connection > 1. The broker buffers out-of-order messages (within the deduplication window) and delivers them in sequence order. Formally: for a producer with PID=p sending messages s_1, s_2, ..., s_k to partition P, a consumer on P will observe them in order s_1, s_2, ..., s_k, each exactly once.
ThExactly-Once Semantics Proof Sketch
Claim: Kafka achieves end-to-end exactly-once semantics when: (1) producer uses idempotence + transactions, (2) consumer uses isolation.level=read_committed, (3) sink operations are idempotent.
Proof sketch:
- Source deduplication: Idempotent producer ensures each message is written exactly once per partition (by sequence number tracking).
- Atomic multi-partition write: Transaction coordinator ensures all messages in a transaction are either all committed or all aborted (two-phase commit via __transaction_state).
- Consumer filtering: read_committed consumers only see messages from committed transactions (filtered by LSO — Last Stable Offset).
- Sink idempotency: If the sink operation (database write, HTTP call) is idempotent, reprocessing the same committed message produces the same result.
- Combined: The composition of these four properties yields exactly-once end-to-end delivery.
Detailed Explanation
Exactly-once semantics (EOS) in Kafka ensures that each message is delivered exactly once to the consumer, with no duplicates and no data loss. This is achieved through three coordinated mechanisms: idempotent producers, transactional producers, and consumer isolation levels.
Idempotent Producers
Idempotent Producers prevent duplicate messages from being written during retries. When enabled (enable.idempotence=true), each producer is assigned a Producer ID (PID) and an epoch number. Each partition tracks the sequence number of the last successfully written message from each PID. When a producer sends a message, it includes the PID, epoch, and sequence number. The broker checks if the sequence number is the next expected value; if not, it rejects the message as a duplicate. The deduplication window is determined by max.in.flight.requests.per.connection (default 5), meaning messages with sequence numbers within this range of the last accepted sequence are buffered for ordering.
Producer ID and Epoch Mechanism
The Producer ID and Epoch mechanism handles producer restarts and leadership changes. When a producer restarts, it acquires a new epoch, which causes brokers to discard the old deduplication state for that PID, preventing conflicts between old and new producer instances. The epoch is incremented when the producer is fenced (e.g., after a leadership change), ensuring that only one producer instance can write to a partition at a time.
Transactional Producers
Transactional Producers enable atomic writes across multiple partitions. A transactional producer begins a transaction, sends messages to multiple partitions, and then commits or aborts the transaction atomically. The Transaction Coordinator (a broker) manages the transaction state and ensures that all partitions involved in the transaction either have all records written (commit) or none (abort).
Transaction Coordinator Protocol
The coordinator uses a two-phase commit protocol:
- Phase 1 (Prepare): The coordinator writes transaction markers to all partitions involved in the transaction.
- Phase 2 (Complete): The coordinator writes the committed offsets to the
__consumer_offsetstopic atomically.
If the coordinator fails between phases, a new coordinator will complete the transaction by reading the transaction state from __transaction_state and completing the two-phase commit. This ensures that transactions are never left in an indeterminate state.
Consumer Isolation Levels
Consumer Isolation Levels determine how consumers read transactional data. The read_committed isolation level ensures that consumers only read records from committed transactions, while read_uncommitted reads all records regardless of transaction status. With read_committed, consumers automatically filter out records from uncommitted transactions, providing a consistent view of the data. This isolation is implemented by the broker, which tracks the Last Stable Offset (LSO).
End-to-End Exactly-Once
Achieving exactly-once semantics end-to-end requires:
- Idempotent producer (prevents duplicates at source)
- Transactional producer (ensures atomicity of multi-partition writes)
- Consumer with
read_committed(ensures only committed records are processed) - Idempotent sink operations (prevents duplicates at destination)
Kafka Streams provides this out of the box with processing.guarantee=exactly_once_v2, which automatically configures all these components.
Key Concepts Table
| Component | Description | Key Configurations | Failure Mode |
|---|---|---|---|
| Producer ID (PID) | Unique identifier for producer instance | transactional.id | New PID on restart |
| Epoch | Fencing mechanism for producer instances | Auto-incremented | Fence stale producers |
| Sequence Number | Per-partition message ordering | Auto-incremented | Duplicate rejection |
| Transaction Coordinator | Broker managing transaction state | Automatic election | Transaction recovery |
| Transaction Markers | Control records in partition log | Automatic | Abort uncommitted |
| LSO (Last Stable Offset) | Highest offset not in transaction | Automatic | Consumer filtering |
| Deduplication Table | Per-partition PID/epoch/seq tracking | max.in.flight | Duplicate rejection |
| Fencing | Preventing stale producer writes | Automatic | Split-brain prevention |
| Idempotent Producer | Prevents duplicate writes | enable.idempotence | Retry safety |
| Transactional Producer | Atomic multi-partition writes | transactional.id | Atomicity guarantee |
| __transaction_state | Transaction coordinator log | Automatic | Transaction recovery |
Code Examples
Idempotent Producer with Detailed Configuration
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class IdempotentProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
// Bootstrap servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
// Serializers
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Enable idempotence (critical for exactly-once)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Required for idempotence (auto-set by enable.idempotence=true)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// All ISR replicas must acknowledge write
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// Unlimited retries with idempotence (safe due to dedup)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 5 concurrent requests per broker (pipelining)
// Ordering preserved with idempotence even with value > 1
// Transactional configuration for multi-partition atomicity
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
// Unique per producer instance; enables fencing
// Performance tuning
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 16KB batch size
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// 10ms linger for batching
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Compression reduces network overhead
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 32MB buffer for pending records
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transactions (required if transactional.id is set)
// Acquires PID and epoch from Transaction Coordinator
producer.initTransactions();
try {
// Begin transaction for atomic writes
producer.beginTransaction();
for (int i = 0; i < 1000; i++) {
String key = "order-" + (i % 100);
String value = "{\"orderId\": \"" + i + "\", \"amount\": " +
(Math.random() * 1000) + "}";
ProducerRecord<String, String> record =
new ProducerRecord<>("order-events", key, value);
// Add metadata headers
record.headers().add("correlation-id",
java.util.UUID.randomUUID().toString().getBytes());
record.headers().add("idempotent", "true".getBytes());
// Send with callback for monitoring
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Failed: " + exception.getMessage());
} else {
System.out.printf("Sent to partition %d, offset %d, " +
"timestamp %d%n",
metadata.partition(), metadata.offset(),
metadata.timestamp());
}
});
}
// Commit transaction atomically
// Writes TxnMarker to all affected partitions
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// Abort transaction on any failure
// All writes become invisible to read_committed consumers
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
throw e;
} finally {
producer.close();
}
}
}
Consumer with Transactional Read Isolation
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class TransactionalConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Critical: Read committed isolation for exactly-once
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Only see records from committed transactions
// Filtered by LSO (Last Stable Offset)
// Disable auto-commit for manual offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Fetch configuration
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe with rebalance listener
consumer.subscribe(Arrays.asList("order-events"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit any pending offsets
consumer.commitSync(Duration.ofSeconds(30));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
// Process records
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Process only committed records (exactly-once guarantee)
processRecord(record);
// Track offset for manual commit
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "processed")
);
}
// Commit offsets after successful processing
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
}
}
} finally {
consumer.close();
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Processing committed record: topic=%s, partition=%d, " +
"offset=%d, key=%s%n",
record.topic(), record.partition(), record.offset(), record.key());
// Business logic here
// This record is guaranteed to be from a committed transaction
}
}
Kafka Streams with Exactly-Once Semantics
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class ExactlyOnceStreamExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Exactly-once semantics (v2 for improved performance)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Automatically configures:
// - Idempotent producers
// - Transactional writes
// - read_committed consumers
// - Atomic offset commits
// EOS v2 reduces latency by ~30% vs v1
// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// More frequent commits = lower latency, more overhead
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, String> sourceStream = builder.stream("input-events",
Consumed.with(Serdes.String(), Serdes.String()));
// Stateful transformation with exactly-once guarantee
KTable<Windowed<String>, Long> windowedCounts = sourceStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"windowed-count-store")
.withValueSerde(Serdes.Long()));
// Windowed count is atomic: read + process + write + commit in one transaction
// Branch operations (all atomic within transaction)
Map<String, KStream<String, String>> branches = sourceStream
.filter((key, value) -> value != null)
.branch((key, value) -> value.contains("HIGH"),
Branched.withConsumer(stream ->
stream.to("high-priority-events",
Produced.with(Serdes.String(), Serdes.String()))))
.branch((key, value) -> value.contains("NORMAL"),
Branched.withConsumer(stream ->
stream.to("normal-priority-events",
Produced.with(Serdes.String(), Serdes.String()))));
// Join operation (atomic)
KTable<String, String> referenceData = builder.table("reference-data",
Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> enrichedStream = sourceStream
.join(referenceData,
(event, ref) -> event + ":" + ref,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
enrichedStream.to("enriched-events",
Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close(Duration.ofSeconds(30));
streams.cleanUp();
}));
}
}
Transaction Coordinator Monitoring Script
#!/bin/bash
# Monitor Kafka transaction coordinators and transaction state
BOOTSTRAP_SERVER="kafka-broker-0:9092"
echo "=== KAFKA TRANSACTION MONITORING ==="
echo "Timestamp: $(date)"
echo ""
# List transactional IDs
echo "--- Active Transactional IDs ---"
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic __transaction_state \
--from-beginning \
--max-messages 100 \
--formatter "kafka.coordinator.transaction.TransactionStateManager\$TransactionStateMessageFormatter"
# Shows transaction state entries: PID, state, partitions, timeout
echo ""
echo "--- Transaction Coordinator Status ---"
# Check transaction coordinator for specific transactional ID
kafka-transactions.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--describe \
--transactional-id order-producer-1
echo ""
echo "--- Transaction State ---"
# List ongoing transactions
kafka-transactions.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--list
echo ""
echo "--- Producer Fence Status ---"
# Check for fenced producers (stale epochs)
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic __transaction_state \
--from-beginning \
--max-messages 50 \
--property print.key=true \
--property print.value=true
echo ""
echo "--- Transaction Metrics via JMX ---"
# Transaction coordinator metrics:
# kafka.coordinator.transaction:type=TransactionCoordinator,name=*
# - txn-commit-rate (committed transactions/sec)
# - txn-abort-rate (aborted transactions/sec)
# - txn-timeout-rate (timed-out transactions/sec)
# - txn-cpu-rate (CPU usage for transaction processing)
Python: Idempotent and Transactional Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
# Initialize producer with idempotence and transactions
producer = KafkaProducer(
bootstrap_servers=['kafka-broker-0:9092'],
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=5,
retry_backoff_ms=100,
batch_size=65536,
linger_ms=10,
compression_type='lz4',
enable_idempotence=True,
transactional_id='order-producer-1',
)
# Initialize transactions (acquires PID and epoch)
producer.init_transactions()
# Transactional send
try:
producer.begin_transaction()
for i in range(100):
producer.send(
topic='order-events',
key=f'order-{i}',
value={
'orderId': str(i),
'amount': round(1000.0, 2),
'status': 'pending',
'timestamp': int(time.time() * 1000)
}
)
# Commit transaction atomically
producer.commit_transaction()
print("Transaction committed successfully")
except KafkaError as e:
# Abort on any failure
producer.abort_transaction()
print(f"Transaction aborted: {e}")
# Callback-based send with error handling
def on_send_success(record_metadata):
print(f"Sent to {record_metadata.topic} partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
def on_send_error(ex):
print(f"Error: {ex}")
# Non-transactional send
producer.send('user-events', key='user-123', value={'action': 'login'}) \
.add_callback(on_send_success) \
.add_errback(on_send_error)
producer.flush()
producer.close()
Python: Consumer with read_committed
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json
# Consumer with transactional read isolation
consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['kafka-broker-0:9092'],
group_id='order-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
isolation_level='read_committed', # Only read committed transactions
max_poll_records=500,
session_timeout_ms=45000,
heartbeat_interval_ms=15000,
)
try:
while True:
records = consumer.poll(timeout_ms=100)
offsets_to_commit = {}
for tp, records_list in records.items():
for record in records_list:
# Record is guaranteed to be from a committed transaction
print(f"Processing: topic={record.topic}, partition={record.partition}, "
f"offset={record.offset}, key={record.key}")
# Process record...
# Track offset for commit
offsets_to_commit[tp] = OffsetAndMetadata(
record.offset + 1,
metadata='processed'
)
# Commit offsets after successful processing
if offsets_to_commit:
consumer.commitSync(offsets_to_commit)
except KeyboardInterrupt:
consumer.close()
Performance Metrics
| Configuration | Throughput (msg/sec) | Latency (p99) | Duplicate Rate | Memory Usage |
|---|---|---|---|---|
| No Idempotence | 500,000 | 5ms | 0.1% | 32MB |
| Idempotent (acks=all) | 450,000 | 8ms | 0% | 64MB |
| Transactional (single partition) | 400,000 | 12ms | 0% | 128MB |
| Transactional (multi-partition) | 300,000 | 20ms | 0% | 128MB |
| Kafka Streams (EOS v1) | 200,000 | 25ms | 0% | 256MB |
| Kafka Streams (EOS v2) | 250,000 | 18ms | 0% | 256MB |
| Consumer (read_committed) | 100,000 | 2ms | 0% | 32MB |
| Consumer (read_uncommitted) | 150,000 | 1ms | 0% | 32MB |
| Metric | Without EOS | With EOS | Overhead |
|---|---|---|---|
| Throughput | 500K msg/sec | 400K msg/sec | 20% reduction |
| Latency (p99) | 5ms | 12ms | 7ms increase |
| Memory | 32MB | 128MB | 96MB increase |
| Disk I/O | Baseline | +30% | More writes for TxnMarkers |
| Network | Baseline | +15% | Transaction metadata |
Best Practices
-
Idempotence First: Always enable
enable.idempotence=truebefore considering transactions. Idempotence provides duplicate prevention per partition with minimal overhead. -
Transactional Scope: Keep transaction scope as small as possible. Long-running transactions increase latency, memory usage, and the risk of timeout. Aim for transactions that complete within seconds.
-
Fencing Awareness: Use unique
transactional.idper producer instance. Never share transactional IDs across different application instances. Understand that fencing prevents split-brain but requires proper instance management. -
Consumer Isolation: Always use
isolation.level=read_committedfor consumers reading transactional data. Useread_uncommittedonly when you need to read uncommitted data for debugging. -
Error Handling: Implement proper error handling for transaction failures. Use
abortTransaction()on any exception to ensure clean state. Monitor transaction coordinator health and handle coordinator failover gracefully. -
Performance Tuning: Enable compression (
compression.type=lz4) to reduce network overhead. Tunebatch.sizeandlinger.msto balance latency vs. throughput. Usemax.in.flight.requests.per.connection=5for optimal pipelining with idempotence. -
Monitoring: Track producer metrics:
transactional-id,epoch,transaction-send-rate,transaction-commit-rate. Track consumer metrics:records-lag-max,last-stable-offset. Monitor coordinator metrics:txn-commit-rate,txn-abort-rate. -
Testing: Test exactly-once semantics by writing a consumer that deduplicates by message ID. Verify that reprocessing the same data produces identical results. Use chaos testing to simulate producer restarts and network partitions.
-
Migration: When enabling idempotence on existing topics, ensure all producers are updated simultaneously. Test with a subset of partitions before full rollout. Monitor for fencing events during migration.
-
Kafka Streams EOS: Use
processing.guarantee=exactly_once_v2for new applications. Understand the overhead: EOS v2 reduces latency by 30% compared to v1. Ensurestate.diris on fast storage (SSD) for optimal performance.
Key Takeaways:
- EOS = idempotent producers (per-partition dedup) + transactional producers (atomic multi-partition writes) + read_committed consumers
- Idempotent deduplication: broker accepts message only if seq = lastSeq(PID) + 1; dedup window = max.in.flight.requests
- Transaction coordinator uses two-phase commit: write TxnMarker to partitions, then commit offsets atomically
- Producer fencing via epoch increment prevents split-brain; new PID on restart invalidates old producer
- End-to-end EOS requires idempotent sink operations at the destination
- EOS v2 (processing.guarantee=exactly_once_v2) reduces overhead by ~30% vs v1
See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Kafka Streams & Connect (kafka/03) | Schema Registry (kafka/05)
See Also
- Kafka Architecture — Architecture supporting EOS
- Producer Consumer Patterns — Producer ack settings for EOS
- Kafka Streams and Connect — Streams EOS implementation
- Error Handling and Retries — Error handling in pipelines