πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Transactional Producers

Apache KafkaTransactional⭐ Premium

Advertisement

Transactional Producers

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Transactional producers enable atomic writes across multiple partitions and topics. They are fundamental to exactly-once semantics in Kafka.

Transaction Architecture

Architecture Diagram
Transactional Producer:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer                                        β”‚
β”‚  β”œβ”€β”€ Transaction ID (persistent)                β”‚
β”‚  β”œβ”€β”€ Producer ID (PID) - assigned by broker     β”‚
β”‚  └── Epoch (monotonic counter)                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Transaction Coordinator                         β”‚
β”‚  β”œβ”€β”€ Manages transaction state                  β”‚
β”‚  β”œβ”€β”€ Writes to __transaction_state topic        β”‚
β”‚  └── Coordinates commit/abort                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Brokers                                        β”‚
β”‚  β”œβ”€β”€ Receive produce requests                   β”‚
β”‚  β”œβ”€β”€ Track PID and sequence numbers            β”‚
β”‚  └── Buffer uncommitted records                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Java Transactional Producer

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionalProducerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Transactional configuration
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-tx-1");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        // Transaction timeout
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Initialize transactions
        producer.initTransactions();
        
        try {
            // Begin transaction
            producer.beginTransaction();
            
            // Produce to multiple topics atomically
            producer.send(new ProducerRecord<>("orders", "order-123", 
                "{\"amount\": 99.99}"));
            producer.send(new ProducerRecord<>("inventory", "item-456", 
                "{\"stock\": -1}"));
            producer.send(new ProducerRecord<>("payments", "pay-789", 
                "{\"status\": \"pending\"}"));
            
            // Commit consumer offsets as part of transaction
            // This ensures exactly-once processing
            producer.sendOffsetsToTransaction(
                getConsumerOffsets(),
                "order-consumer-group"
            );
            
            // Commit transaction
            producer.commitTransaction();
            
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            System.err.println("Transaction failed: " + e.getMessage());
            producer.abortTransaction();
        } finally {
            producer.close();
        }
    }
    
    private static java.util.Map<org.apache.kafka.common.TopicPartition, 
                                 org.apache.kafka.clients.consumer.OffsetAndMetadata> 
            getConsumerOffsets() {
        java.util.Map<org.apache.kafka.common.TopicPartition, 
                      org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets = 
            new java.util.HashMap<>();
        
        offsets.put(
            new org.apache.kafka.common.TopicPartition("orders", 0),
            new org.apache.kafka.clients.consumer.OffsetAndMetadata(100)
        );
        
        return offsets;
    }
}

Python Transactional Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

class TransactionalProducer:
    def __init__(self, bootstrap_servers, transactional_id):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            transactional_id=transactional_id,
            key_serializer=lambda k: k.encode('utf-8'),
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            acks='all',
            enable_idempotence=True,
            retries=2147483647,
            max_in_flight_requests_per_connection=5
        )
        
        self.producer.init_transactions()
    
    def send_transaction(self, records):
        """
        Send multiple records atomically
        records: list of (topic, key, value) tuples
        """
        try:
            self.producer.begin_transaction()
            
            for topic, key, value in records:
                self.producer.send(topic, key=key, value=value)
            
            self.producer.commit_transaction()
            return True
            
        except KafkaError as e:
            print(f"Transaction failed: {e}")
            self.producer.abort_transaction()
            return False
    
    def send_with_offsets(self, records, consumer_group, offsets):
        """
        Send records and commit consumer offsets atomically
        """
        try:
            self.producer.begin_transaction()
            
            # Send records
            for topic, key, value in records:
                self.producer.send(topic, key=key, value=value)
            
            # Commit consumer offsets
            self.producer.send_offsets_to_transaction(offsets, consumer_group)
            
            self.producer.commit_transaction()
            return True
            
        except KafkaError as e:
            print(f"Transaction failed: {e}")
            self.producer.abort_transaction()
            return False
    
    def close(self):
        self.producer.close()

# Usage
producer = TransactionalProducer(
    bootstrap_servers='kafka1:9092',
    transactional_id='order-processor-tx-1'
)

records = [
    ('orders', 'order-123', {'amount': 99.99}),
    ('inventory', 'item-456', {'stock': -1}),
    ('payments', 'pay-789', {'status': 'pending'})
]

success = producer.send_transaction(records)
print(f"Transaction {'committed' if success else 'aborted'}")

Transaction Coordinator Internals

Architecture Diagram
Transaction State Machine:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Empty β†’ Ongoing β†’ PrepareCommit β†’ CompleteCommitβ”‚
β”‚    β”‚        β”‚            β”‚              β”‚        β”‚
β”‚    β”‚        β”‚            β–Ό              β”‚        β”‚
β”‚    β”‚        β”‚     PrepareAbort          β”‚        β”‚
β”‚    β”‚        β”‚            β”‚              β”‚        β”‚
β”‚    β”‚        β–Ό            β–Ό              β–Ό        β”‚
β”‚    └───── Ongoing ←──────┴──────── CompleteAbortβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

State Transitions:
1. Empty β†’ Ongoing: initTransactions() or beginTransaction()
2. Ongoing β†’ PrepareCommit: commitTransaction()
3. Ongoing β†’ PrepareAbort: abortTransaction()
4. PrepareCommit β†’ CompleteCommit: Commit markers written
5. PrepareAbort β†’ CompleteAbort: Abort markers written
6. CompleteCommit/CompleteAbort β†’ Empty: Transaction complete

Transaction Coordinator Failure

Architecture Diagram
Scenario: Transaction coordinator fails during transaction

1. Producer sends produce requests
2. Coordinator fails
3. New coordinator elected from __transaction_state
4. Producer times out and aborts
5. Consumer sees aborted transaction (with isolation.level=read_committed)
6. Producer retries with new coordinator

Exactly-Once Consumer

// Consumer must read committed messages
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// Read only committed messages
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        // Process record
        processRecord(record);
    }
    
    // Commit offsets manually
    consumer.commitSync();
}

Isolation Levels

Architecture Diagram
read_uncommitted (default):
- Sees all messages, including uncommitted
- Lower latency
- May see aborted transactions (but not their content)

read_committed:
- Sees only committed messages
- Higher latency (waits for transaction commit)
- Guaranteed consistency
- Aborted transactions are filtered out

Transaction Patterns

1. Read-Process-Write Pattern

public void readProcessWrite() {
    KafkaConsumer<String, String> consumer = createConsumer();
    KafkaProducer<String, String> producer = createTransactionalProducer();
    
    producer.initTransactions();
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        
        if (records.isEmpty()) continue;
        
        try {
            producer.beginTransaction();
            
            for (ConsumerRecord<String, String> record : records) {
                // Process
                OutputMessage output = process(record.value());
                
                // Write
                producer.send(new ProducerRecord<>(
                    "output-topic",
                    record.key(),
                    output.toString()
                ));
            }
            
            // Commit consumer offsets
            producer.sendOffsetsToTransaction(
                getOffsets(records),
                consumer.groupMetadata()
            );
            
            producer.commitTransaction();
            
        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}

2. Multi-Topic Write Pattern

public void multiTopicWrite() {
    producer.beginTransaction();
    
    // Write to multiple topics atomically
    producer.send(new ProducerRecord<>("topic-a", "key", "value"));
    producer.send(new ProducerRecord<>("topic-b", "key", "value"));
    producer.send(new ProducerRecord<>("topic-c", "key", "value"));
    
    producer.commitTransaction();
}

3. Transactional Idempotent Writes

public void transactionalIdempotent() {
    producer.beginTransaction();
    
    for (Record record : records) {
        // Idempotent write - same PID and sequence
        // Broker deduplicates based on PID + sequence
        producer.send(new ProducerRecord<>("topic", record.key, record.value));
    }
    
    producer.commitTransaction();
}

Monitoring Transactions

from prometheus_client import Counter, Gauge

# Transaction metrics
transaction_commits = Counter(
    'kafka_transaction_commits_total',
    'Total transaction commits',
    ['transactional_id']
)

transaction_aborts = Counter(
    'kafka_transaction_aborts_total',
    'Total transaction aborts',
    ['transactional_id']
)

transaction_latency = Gauge(
    'kafka_transaction_latency_seconds',
    'Transaction commit latency',
    ['transactional_id']
)

# Monitor transaction state
def monitor_transactions():
    while True:
        # Get transaction coordinator metrics
        # via JMX or AdminClient
        
        time.sleep(10)

Common Pitfalls

1. Transaction Timeout

// Problem: Processing exceeds transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);  // 10s

// Solution: Increase timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);  // 5min

2. Multiple Instances with Same Transactional ID

Architecture Diagram
Problem: Two producers with same transactional.id
- Only one can be active at a time
- Other gets ProducerFencedException

Solution: Use unique transactional.id per instance
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
          "processor-" + instanceId);

3. Transaction State Topic Cleanup

Architecture Diagram
Problem: __transaction_state grows unbounded

Solution:
- Configure cleanup policy
- Set appropriate retention
- Monitor topic size

Follow-Up Questions

  1. What is the difference between transactional.id and producer.id?
  2. How does Kafka handle transaction coordinator failures?
  3. Explain the purpose of the epoch in transactional producers.
  4. How would you implement idempotent consumer processing?
  5. What are the limitations of Kafka transactions?

Advertisement