🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency

🟢 Free Lesson

Advertisement

Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency

Transaction Lifecycle State MachineEmptyNo TransactionOngoingWriting DataPrepareCommittingCompleteCommittedAbortRolled Backtimeout/abortIdempotent ProducerPID + Sequence NumbersTransactional ProducerAtomic multi-partitionConsumer Isolationread_committedState: Empty → Ongoing → Prepare → Complete (or Abort)
Idempotent Deduplication: Sequence NumbersProducer PIDSeq: 0, 1, 2Broker DedupLog AppendNo DuplicatesEach producer gets unique PID, brokers track sequence numbers per partitionRetries with same PID+Seq are deduped, ensuring idempotent writes

Architecture Diagram: Exactly-Once Semantics Components

Architecture Diagram: Transaction Lifecycle State Machine

Architecture Diagram: Idempotent Producer Deduplication

Architecture Diagram: Two-Phase Commit Protocol

Architecture Diagram: Consumer Isolation Levels

Formal Definitions

DfExactly-Once Semantics (EOS)

Exactly-once semantics guarantees that each message is delivered to the consumer exactly once — no duplicates, no data loss. In Kafka, EOS is achieved through three coordinated mechanisms: (1) idempotent producers that prevent duplicate writes per partition, (2) transactional producers that enable atomic multi-partition writes, and (3) consumer isolation levels that filter uncommitted records. End-to-end EOS requires all three components working together.

DfIdempotent Producer

An idempotent producer ensures that a message is written to a partition exactly once, even if the producer retries. Each producer instance receives a unique Producer ID (PID) and epoch. Each partition tracks the last sequence number per PID. Retries with the same sequence number are rejected as duplicates. Formally: for producer P with PID=p, the broker accepts message (p, seq) only if seq = lastAcceptedSeq(p) + 1.

DfTransactional Producer

A transactional producer enables atomic writes across multiple partitions and topics. A transaction groups multiple produce requests into a single atomic unit: either all messages in the transaction are committed (visible to read_committed consumers) or none are (aborted and invisible). The Transaction Coordinator manages the two-phase commit protocol using the __transaction_state topic.

DfProducer Fencing

Producer fencing is the mechanism that prevents a stale producer instance (after restart or leadership change) from writing to a partition. When a producer restarts, it acquires a new epoch number. The broker rejects any write with an epoch older than the current known epoch for that PID, effectively "fencing" the old producer instance.

DfLast Stable Offset (LSO)

The Last Stable Offset (LSO) is the offset of the first message in a partition that is part of an ongoing (uncommitted) transaction. Consumers with isolation.level=read_committed only see messages up to the LSO, filtering out messages from uncommitted transactions. The LSO advances when the oldest open transaction is committed or aborted.

Key Formulas

Exactly-Once Delivery Guarantee
P(deliver(m)=1)={1if m is produced and committed0otherwiseP(\text{deliver}(m) = 1) = \begin{cases} 1 & \text{if } m \text{ is produced and committed} \\ 0 & \text{otherwise} \end{cases}

Here,

  • mm=Message
  • deliver(m)\text{deliver}(m)=Indicator: 1 if delivered to consumer, 0 otherwise

Idempotent Deduplication Condition

Accept(PID,seq)={ACCEPTif seq=lastSeq(PID)+1REJECTotherwise\text{Accept}(\text{PID}, \text{seq}) = \begin{cases} \text{ACCEPT} & \text{if } \text{seq} = \text{lastSeq}(\text{PID}) + 1 \\ \text{REJECT} & \text{otherwise} \end{cases}

Here,

  • PIDPID=Producer ID
  • seqseq=Sequence number of the incoming message
  • lastSeq(PID)lastSeq(PID)=Last accepted sequence number for this PID on this partition

Deduplication Window

Wdedup=[lastSeqNin_flight+1,  lastSeq+Nin_flight]W_{\text{dedup}} = [\text{lastSeq} - N_{\text{in\_flight}} + 1,\; \text{lastSeq} + N_{\text{in\_flight}}]

Here,

  • Nin_flightN_{\text{in\_flight}}=max.in.flight.requests.per.connection (default 5)
  • lastSeqlastSeq=Last committed sequence number

Transaction Timeout Risk

Ptimeout=P(processing_time>Ttxn_timeout)P_{\text{timeout}} = P(\text{processing\_time} > T_{\text{txn\_timeout}})

Here,

  • Ttxn_timeoutT_{\text{txn\_timeout}}=transaction.timeout.ms (default 60s)
  • processing_timeprocessing\_time=Time from beginTransaction() to commitTransaction()

The Producer ID (PID) is assigned by the Transaction Coordinator and persisted in __transaction_state. On producer restart, a new PID is acquired with an incremented epoch, which fences (invalidates) the old producer instance. This prevents split-brain scenarios where two producer instances write to the same partition.

Transaction timeout (transaction.timeout.ms, default 60s) determines how long a transaction can remain open. If the producer crashes during a transaction, the coordinator will abort it after the timeout. Keep transactions short to minimize resource holding and reduce timeout risk.

With idempotence enabled (enable.idempotence=true), the producer automatically sets acks=all and retries=Integer.MAX_VALUE. This ensures durability and prevents duplicate messages even under transient network failures.

ThOrdering Guarantee with Idempotence

With idempotence enabled, Kafka guarantees exactly-once delivery and ordering within a partition even with retries and max.in.flight.requests.per.connection > 1. The broker buffers out-of-order messages (within the deduplication window) and delivers them in sequence order. Formally: for a producer with PID=p sending messages s_1, s_2, ..., s_k to partition P, a consumer on P will observe them in order s_1, s_2, ..., s_k, each exactly once.

ThExactly-Once Semantics Proof Sketch

Claim: Kafka achieves end-to-end exactly-once semantics when: (1) producer uses idempotence + transactions, (2) consumer uses isolation.level=read_committed, (3) sink operations are idempotent.

Proof sketch:

  1. Source deduplication: Idempotent producer ensures each message is written exactly once per partition (by sequence number tracking).
  2. Atomic multi-partition write: Transaction coordinator ensures all messages in a transaction are either all committed or all aborted (two-phase commit via __transaction_state).
  3. Consumer filtering: read_committed consumers only see messages from committed transactions (filtered by LSO — Last Stable Offset).
  4. Sink idempotency: If the sink operation (database write, HTTP call) is idempotent, reprocessing the same committed message produces the same result.
  5. Combined: The composition of these four properties yields exactly-once end-to-end delivery.

Detailed Explanation

Exactly-once semantics (EOS) in Kafka ensures that each message is delivered exactly once to the consumer, with no duplicates and no data loss. This is achieved through three coordinated mechanisms: idempotent producers, transactional producers, and consumer isolation levels.

Idempotent Producers

Idempotent Producers prevent duplicate messages from being written during retries. When enabled (enable.idempotence=true), each producer is assigned a Producer ID (PID) and an epoch number. Each partition tracks the sequence number of the last successfully written message from each PID. When a producer sends a message, it includes the PID, epoch, and sequence number. The broker checks if the sequence number is the next expected value; if not, it rejects the message as a duplicate. The deduplication window is determined by max.in.flight.requests.per.connection (default 5), meaning messages with sequence numbers within this range of the last accepted sequence are buffered for ordering.

Producer ID and Epoch Mechanism

The Producer ID and Epoch mechanism handles producer restarts and leadership changes. When a producer restarts, it acquires a new epoch, which causes brokers to discard the old deduplication state for that PID, preventing conflicts between old and new producer instances. The epoch is incremented when the producer is fenced (e.g., after a leadership change), ensuring that only one producer instance can write to a partition at a time.

Transactional Producers

Transactional Producers enable atomic writes across multiple partitions. A transactional producer begins a transaction, sends messages to multiple partitions, and then commits or aborts the transaction atomically. The Transaction Coordinator (a broker) manages the transaction state and ensures that all partitions involved in the transaction either have all records written (commit) or none (abort).

Transaction Coordinator Protocol

The coordinator uses a two-phase commit protocol:

  • Phase 1 (Prepare): The coordinator writes transaction markers to all partitions involved in the transaction.
  • Phase 2 (Complete): The coordinator writes the committed offsets to the __consumer_offsets topic atomically.

If the coordinator fails between phases, a new coordinator will complete the transaction by reading the transaction state from __transaction_state and completing the two-phase commit. This ensures that transactions are never left in an indeterminate state.

Consumer Isolation Levels

Consumer Isolation Levels determine how consumers read transactional data. The read_committed isolation level ensures that consumers only read records from committed transactions, while read_uncommitted reads all records regardless of transaction status. With read_committed, consumers automatically filter out records from uncommitted transactions, providing a consistent view of the data. This isolation is implemented by the broker, which tracks the Last Stable Offset (LSO).

End-to-End Exactly-Once

Achieving exactly-once semantics end-to-end requires:

  1. Idempotent producer (prevents duplicates at source)
  2. Transactional producer (ensures atomicity of multi-partition writes)
  3. Consumer with read_committed (ensures only committed records are processed)
  4. Idempotent sink operations (prevents duplicates at destination)

Kafka Streams provides this out of the box with processing.guarantee=exactly_once_v2, which automatically configures all these components.

Key Concepts Table

ComponentDescriptionKey ConfigurationsFailure Mode
Producer ID (PID)Unique identifier for producer instancetransactional.idNew PID on restart
EpochFencing mechanism for producer instancesAuto-incrementedFence stale producers
Sequence NumberPer-partition message orderingAuto-incrementedDuplicate rejection
Transaction CoordinatorBroker managing transaction stateAutomatic electionTransaction recovery
Transaction MarkersControl records in partition logAutomaticAbort uncommitted
LSO (Last Stable Offset)Highest offset not in transactionAutomaticConsumer filtering
Deduplication TablePer-partition PID/epoch/seq trackingmax.in.flightDuplicate rejection
FencingPreventing stale producer writesAutomaticSplit-brain prevention
Idempotent ProducerPrevents duplicate writesenable.idempotenceRetry safety
Transactional ProducerAtomic multi-partition writestransactional.idAtomicity guarantee
__transaction_stateTransaction coordinator logAutomaticTransaction recovery

Code Examples

Idempotent Producer with Detailed Configuration

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class IdempotentProducerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Bootstrap servers
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        
        // Serializers
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        
        // Enable idempotence (critical for exactly-once)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Required for idempotence (auto-set by enable.idempotence=true)
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // All ISR replicas must acknowledge write
        
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        // Unlimited retries with idempotence (safe due to dedup)
        
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        // 5 concurrent requests per broker (pipelining)
        // Ordering preserved with idempotence even with value > 1
        
        // Transactional configuration for multi-partition atomicity
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
        // Unique per producer instance; enables fencing
        
        // Performance tuning
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 16KB batch size
        
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        // 10ms linger for batching
        
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        // Compression reduces network overhead
        
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // 32MB buffer for pending records
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Initialize transactions (required if transactional.id is set)
        // Acquires PID and epoch from Transaction Coordinator
        producer.initTransactions();
        
        try {
            // Begin transaction for atomic writes
            producer.beginTransaction();
            
            for (int i = 0; i < 1000; i++) {
                String key = "order-" + (i % 100);
                String value = "{\"orderId\": \"" + i + "\", \"amount\": " + 
                    (Math.random() * 1000) + "}";
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("order-events", key, value);
                
                // Add metadata headers
                record.headers().add("correlation-id", 
                    java.util.UUID.randomUUID().toString().getBytes());
                record.headers().add("idempotent", "true".getBytes());
                
                // Send with callback for monitoring
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("Failed: " + exception.getMessage());
                    } else {
                        System.out.printf("Sent to partition %d, offset %d, " +
                            "timestamp %d%n",
                            metadata.partition(), metadata.offset(),
                            metadata.timestamp());
                    }
                });
            }
            
            // Commit transaction atomically
            // Writes TxnMarker to all affected partitions
            producer.commitTransaction();
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            // Abort transaction on any failure
            // All writes become invisible to read_committed consumers
            producer.abortTransaction();
            System.err.println("Transaction aborted: " + e.getMessage());
            throw e;
        } finally {
            producer.close();
        }
    }
}

Consumer with Transactional Read Isolation

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class TransactionalConsumerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        
        // Critical: Read committed isolation for exactly-once
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        // Only see records from committed transactions
        // Filtered by LSO (Last Stable Offset)
        
        // Disable auto-commit for manual offset management
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Fetch configuration
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Subscribe with rebalance listener
        consumer.subscribe(Arrays.asList("order-events"), 
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions revoked: " + partitions);
                    // Commit any pending offsets
                    consumer.commitSync(Duration.ofSeconds(30));
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions assigned: " + partitions);
                }
            });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                if (records.isEmpty()) continue;
                
                // Process records
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                
                for (ConsumerRecord<String, String> record : records) {
                    // Process only committed records (exactly-once guarantee)
                    processRecord(record);
                    
                    // Track offset for manual commit
                    offsetsToCommit.put(
                        new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, "processed")
                    );
                }
                
                // Commit offsets after successful processing
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
                }
            }
        } finally {
            consumer.close();
        }
    }
    
    private static void processRecord(ConsumerRecord<String, String> record) {
        System.out.printf("Processing committed record: topic=%s, partition=%d, " +
            "offset=%d, key=%s%n",
            record.topic(), record.partition(), record.offset(), record.key());
        
        // Business logic here
        // This record is guaranteed to be from a committed transaction
    }
}

Kafka Streams with Exactly-Once Semantics

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class ExactlyOnceStreamExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        // Exactly-once semantics (v2 for improved performance)
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
            StreamsConfig.EXACTLY_ONCE_V2);
        // Automatically configures:
        // - Idempotent producers
        // - Transactional writes
        // - read_committed consumers
        // - Atomic offset commits
        // EOS v2 reduces latency by ~30% vs v1
        
        // State store configuration
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
        // More frequent commits = lower latency, more overhead
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream
        KStream<String, String> sourceStream = builder.stream("input-events",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        // Stateful transformation with exactly-once guarantee
        KTable<Windowed<String>, Long> windowedCounts = sourceStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "windowed-count-store")
                .withValueSerde(Serdes.Long()));
        // Windowed count is atomic: read + process + write + commit in one transaction
        
        // Branch operations (all atomic within transaction)
        Map<String, KStream<String, String>> branches = sourceStream
            .filter((key, value) -> value != null)
            .branch((key, value) -> value.contains("HIGH"),
                Branched.withConsumer(stream -> 
                    stream.to("high-priority-events", 
                        Produced.with(Serdes.String(), Serdes.String()))))
            .branch((key, value) -> value.contains("NORMAL"),
                Branched.withConsumer(stream -> 
                    stream.to("normal-priority-events", 
                        Produced.with(Serdes.String(), Serdes.String()))));
        
        // Join operation (atomic)
        KTable<String, String> referenceData = builder.table("reference-data",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        KStream<String, String> enrichedStream = sourceStream
            .join(referenceData,
                (event, ref) -> event + ":" + ref,
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
        
        enrichedStream.to("enriched-events", 
            Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30));
            streams.cleanUp();
        }));
    }
}

Transaction Coordinator Monitoring Script

#!/bin/bash
# Monitor Kafka transaction coordinators and transaction state

BOOTSTRAP_SERVER="kafka-broker-0:9092"

echo "=== KAFKA TRANSACTION MONITORING ==="
echo "Timestamp: $(date)"
echo ""

# List transactional IDs
echo "--- Active Transactional IDs ---"
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --topic __transaction_state \
  --from-beginning \
  --max-messages 100 \
  --formatter "kafka.coordinator.transaction.TransactionStateManager\$TransactionStateMessageFormatter"
# Shows transaction state entries: PID, state, partitions, timeout

echo ""
echo "--- Transaction Coordinator Status ---"
# Check transaction coordinator for specific transactional ID
kafka-transactions.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --describe \
  --transactional-id order-producer-1

echo ""
echo "--- Transaction State ---"
# List ongoing transactions
kafka-transactions.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --list

echo ""
echo "--- Producer Fence Status ---"
# Check for fenced producers (stale epochs)
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --topic __transaction_state \
  --from-beginning \
  --max-messages 50 \
  --property print.key=true \
  --property print.value=true

echo ""
echo "--- Transaction Metrics via JMX ---"
# Transaction coordinator metrics:
# kafka.coordinator.transaction:type=TransactionCoordinator,name=*
#   - txn-commit-rate (committed transactions/sec)
#   - txn-abort-rate (aborted transactions/sec)
#   - txn-timeout-rate (timed-out transactions/sec)
#   - txn-cpu-rate (CPU usage for transaction processing)

Python: Idempotent and Transactional Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time

# Initialize producer with idempotence and transactions
producer = KafkaProducer(
    bootstrap_servers=['kafka-broker-0:9092'],
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=5,
    retry_backoff_ms=100,
    batch_size=65536,
    linger_ms=10,
    compression_type='lz4',
    enable_idempotence=True,
    transactional_id='order-producer-1',
)

# Initialize transactions (acquires PID and epoch)
producer.init_transactions()

# Transactional send
try:
    producer.begin_transaction()
    
    for i in range(100):
        producer.send(
            topic='order-events',
            key=f'order-{i}',
            value={
                'orderId': str(i),
                'amount': round(1000.0, 2),
                'status': 'pending',
                'timestamp': int(time.time() * 1000)
            }
        )
    
    # Commit transaction atomically
    producer.commit_transaction()
    print("Transaction committed successfully")
    
except KafkaError as e:
    # Abort on any failure
    producer.abort_transaction()
    print(f"Transaction aborted: {e}")

# Callback-based send with error handling
def on_send_success(record_metadata):
    print(f"Sent to {record_metadata.topic} partition {record_metadata.partition} "
          f"offset {record_metadata.offset}")

def on_send_error(ex):
    print(f"Error: {ex}")

# Non-transactional send
producer.send('user-events', key='user-123', value={'action': 'login'}) \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)

producer.flush()
producer.close()

Python: Consumer with read_committed

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json

# Consumer with transactional read isolation
consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['kafka-broker-0:9092'],
    group_id='order-consumer-group',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    isolation_level='read_committed',  # Only read committed transactions
    max_poll_records=500,
    session_timeout_ms=45000,
    heartbeat_interval_ms=15000,
)

try:
    while True:
        records = consumer.poll(timeout_ms=100)
        
        offsets_to_commit = {}
        
        for tp, records_list in records.items():
            for record in records_list:
                # Record is guaranteed to be from a committed transaction
                print(f"Processing: topic={record.topic}, partition={record.partition}, "
                      f"offset={record.offset}, key={record.key}")
                
                # Process record...
                
                # Track offset for commit
                offsets_to_commit[tp] = OffsetAndMetadata(
                    record.offset + 1,
                    metadata='processed'
                )
        
        # Commit offsets after successful processing
        if offsets_to_commit:
            consumer.commitSync(offsets_to_commit)

except KeyboardInterrupt:
    consumer.close()

Performance Metrics

ConfigurationThroughput (msg/sec)Latency (p99)Duplicate RateMemory Usage
No Idempotence500,0005ms0.1%32MB
Idempotent (acks=all)450,0008ms0%64MB
Transactional (single partition)400,00012ms0%128MB
Transactional (multi-partition)300,00020ms0%128MB
Kafka Streams (EOS v1)200,00025ms0%256MB
Kafka Streams (EOS v2)250,00018ms0%256MB
Consumer (read_committed)100,0002ms0%32MB
Consumer (read_uncommitted)150,0001ms0%32MB
MetricWithout EOSWith EOSOverhead
Throughput500K msg/sec400K msg/sec20% reduction
Latency (p99)5ms12ms7ms increase
Memory32MB128MB96MB increase
Disk I/OBaseline+30%More writes for TxnMarkers
NetworkBaseline+15%Transaction metadata

Best Practices

  1. Idempotence First: Always enable enable.idempotence=true before considering transactions. Idempotence provides duplicate prevention per partition with minimal overhead.

  2. Transactional Scope: Keep transaction scope as small as possible. Long-running transactions increase latency, memory usage, and the risk of timeout. Aim for transactions that complete within seconds.

  3. Fencing Awareness: Use unique transactional.id per producer instance. Never share transactional IDs across different application instances. Understand that fencing prevents split-brain but requires proper instance management.

  4. Consumer Isolation: Always use isolation.level=read_committed for consumers reading transactional data. Use read_uncommitted only when you need to read uncommitted data for debugging.

  5. Error Handling: Implement proper error handling for transaction failures. Use abortTransaction() on any exception to ensure clean state. Monitor transaction coordinator health and handle coordinator failover gracefully.

  6. Performance Tuning: Enable compression (compression.type=lz4) to reduce network overhead. Tune batch.size and linger.ms to balance latency vs. throughput. Use max.in.flight.requests.per.connection=5 for optimal pipelining with idempotence.

  7. Monitoring: Track producer metrics: transactional-id, epoch, transaction-send-rate, transaction-commit-rate. Track consumer metrics: records-lag-max, last-stable-offset. Monitor coordinator metrics: txn-commit-rate, txn-abort-rate.

  8. Testing: Test exactly-once semantics by writing a consumer that deduplicates by message ID. Verify that reprocessing the same data produces identical results. Use chaos testing to simulate producer restarts and network partitions.

  9. Migration: When enabling idempotence on existing topics, ensure all producers are updated simultaneously. Test with a subset of partitions before full rollout. Monitor for fencing events during migration.

  10. Kafka Streams EOS: Use processing.guarantee=exactly_once_v2 for new applications. Understand the overhead: EOS v2 reduces latency by 30% compared to v1. Ensure state.dir is on fast storage (SSD) for optimal performance.

Key Takeaways:

  • EOS = idempotent producers (per-partition dedup) + transactional producers (atomic multi-partition writes) + read_committed consumers
  • Idempotent deduplication: broker accepts message only if seq = lastSeq(PID) + 1; dedup window = max.in.flight.requests
  • Transaction coordinator uses two-phase commit: write TxnMarker to partitions, then commit offsets atomically
  • Producer fencing via epoch increment prevents split-brain; new PID on restart invalidates old producer
  • End-to-end EOS requires idempotent sink operations at the destination
  • EOS v2 (processing.guarantee=exactly_once_v2) reduces overhead by ~30% vs v1

See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Kafka Streams & Connect (kafka/03) | Schema Registry (kafka/05)

See Also

Premium Content

Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement