πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Exactly-Once Semantics

Apache KafkaExactly-Once⭐ Premium

Advertisement

Exactly-Once Semantics

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Exactly-once semantics (EOS) ensures each message is processed exactly once, even in failure scenarios. This is critical for financial transactions and data integrity.

Delivery Semantics Comparison

Architecture Diagram
At-Most-Once:
- Message may be lost
- No retries
- Fast but unreliable

At-Least-Once:
- Message may be duplicated
- Retries on failure
- Requires idempotent consumers

Exactly-Once:
- Each message processed once
- Combines idempotent producers + transactions
- Requires careful configuration

Idempotent Producers

Prevent duplicate messages from producer retries:

// Enable idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Critical settings for exactly-once
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);  // Safe with idempotence

Producer<String, String> producer = new KafkaProducer<>(props);

// Each message gets unique ID (PID + sequence number)
// Broker deduplicates based on PID and sequence

Idempotence Internals

Architecture Diagram
Producer sends message:
1. Producer has unique PID (Producer ID)
2. Each message has sequence number (0, 1, 2, ...)
3. Broker tracks: (PID, Topic, Partition) β†’ Last Sequence

Duplicate Detection:
- Sequence N arrives
- If Last Sequence == N-1: accept, update Last Sequence to N
- If Last Sequence >= N: reject (duplicate)
- If Last Sequence < N-1: reject (out of order)

⚠️

Important: Idempotent producers only prevent duplicates from producer retries. They don't handle consumer retries or application-level failures. For full exactly-once, use transactions.

Transactional Producers

Enable atomic writes across multiple partitions:

// Transactional producer setup
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("transactional.id", "order-processor-tx-1");  // Unique per instance
props.put("enable.idempotence", true);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // Write to multiple partitions atomically
    producer.send(new ProducerRecord<>("orders", "order-123", orderData));
    producer.send(new ProducerRecord<>("inventory", "item-456", inventoryUpdate));
    producer.send(new ProducerRecord<>("payments", "pay-789", paymentRecord));
    
    // Commit consumer offsets as part of transaction
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

Consumer Configuration

// Consumer must read committed messages
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-processor");
props.put("enable.auto.commit", false);

// Read only committed messages
props.put("isolation.level", "read_committed");

// Alternative: read uncommitted (default)
props.put("isolation.level", "read_uncommitted");

Isolation Levels

Architecture Diagram
read_uncommitted (default):
- Sees all messages, including uncommitted
- Lower latency
- May see transaction aborts

read_committed:
- Sees only committed messages
- Higher latency (waits for transaction commit)
- Guaranteed consistency

End-to-End Exactly-Once Pattern

// Complete exactly-once processing pattern
public class ExactlyOnceProcessor {
    
    public void process() {
        KafkaConsumer<String, String> consumer = createConsumer();
        KafkaProducer<String, String> producer = createTransactionalProducer();
        
        producer.initTransactions();
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            if (records.isEmpty()) continue;
            
            try {
                producer.beginTransaction();
                
                // Process and produce
                for (ConsumerRecord<String, String> record : records) {
                    // Transform message
                    OutputMessage output = transform(record.value());
                    
                    // Produce to output topic
                    producer.send(new ProducerRecord<>(
                        "output-topic",
                        record.key(),
                        output.toString()
                    ));
                }
                
                // Commit consumer offsets within transaction
                Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);
                producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
                
                producer.commitTransaction();
                
            } catch (Exception e) {
                producer.abortTransaction();
                // Log error, continue processing
            }
        }
    }
}

Python Exactly-Once Pattern

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

class ExactlyOnceProcessor:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['kafka1:9092'],
            transactional_id='order-processor-tx-1',
            key_serializer=lambda k: k.encode('utf-8'),
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        self.consumer = KafkaConsumer(
            'input-topic',
            bootstrap_servers=['kafka1:9092'],
            group_id='order-processor',
            enable_auto_commit=False,
            isolation_level='read_committed',
            auto_offset_reset='earliest'
        )
        
        self.producer.init_transactions()
    
    def process(self):
        while True:
            records = self.consumer.poll(timeout_ms=1000)
            
            if not records:
                continue
            
            try:
                self.producer.begin_transaction()
                
                for tp, messages in records.items():
                    for message in messages:
                        output = self.transform(message.value)
                        
                        self.producer.send(
                            'output-topic',
                            key=message.key,
                            value=output
                        )
                
                # Commit offsets within transaction
                self.producer.send_offsets_to_transaction(
                    {tp: self.consumer.position(tp) for tp in records.keys()},
                    self.consumer.group_metadata()
                )
                
                self.producer.commit_transaction()
                
            except KafkaError as e:
                self.producer.abort_transaction()
                print(f"Transaction failed: {e}")
    
    def transform(self, message):
        # Your transformation logic
        return {'processed': True, 'data': message}

# Run processor
processor = ExactlyOnceProcessor()
processor.process()

Common Pitfalls

1. Transaction Timeouts

// Default transaction timeout: 60 seconds
props.put("transaction.timeout.ms", "60000");

// If processing takes longer than timeout:
// - Transaction is aborted
// - Consumer sees duplicate (if retrying)
// - Need to handle appropriately

2. Consumer Rebalances During Transaction

Architecture Diagram
Problem:
1. Consumer starts transaction
2. Rebalance occurs
3. Transaction aborted
4. Consumer resumes, reprocesses

Solution:
- Use static group membership
- Implement proper offset management
- Handle rebalance in ConsumerRebalanceListener

3. Idempotent Producer + Transaction Interaction

// Transactional producer automatically enables idempotence
// Don't set both explicitly
props.put("transactional.id", "my-tx-id");  // Implies idempotence
// props.put("enable.idempotence", true);  // Not needed

Follow-Up Questions

  1. What is the difference between idempotent producers and transactional producers?
  2. How does isolation.level=read_committed affect consumer behavior?
  3. Explain what happens when a transaction times out during processing.
  4. How would you handle consumer rebalances within a transactional consumer?
  5. What are the limitations of exactly-once semantics in Kafka?

Advertisement