Exactly-Once with Kafka Streams
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Exactly-once semantics in Kafka Streams ensures each record is processed exactly once, even across failures. This is achieved through producer transactions and consumer offset commits.
Exactly-Once Architecture
Architecture Diagram
Processing Guarantee Levels:
βββ at_most_once (default before 2.4)
β βββ May lose records on failure
βββ at_least_once
β βββ May duplicate records on failure
βββ exactly_once_v2 (Kafka Streams 2.4+)
βββ Each record processed exactly once
How it works:
1. Consumer reads records
2. Producer writes to output topic
3. Consumer offsets committed
4. All in single transaction (atomic)
Transaction Flow
Architecture Diagram
1. Streams application starts transaction
2. Consumer polls records
3. Processor transforms records
4. Producer sends to output topic
5. Consumer offsets recorded
6. Transaction commits atomically
7. All-or-nothing guarantee
Java Kafka Streams Exactly-Once
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class ExactlyOnceStreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
// Critical: Enable exactly-once processing
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Transaction timeout (default 60s)
props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
60000);
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, String> orders = builder.stream("input-orders");
// Process with exactly-once
KStream<String, String> processed = orders
.filter((key, value) -> value != null)
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> System.out.println("Processed: " + key));
// Sink to output topic
processed.to("output-orders");
// Optional: Update state store (also within transaction)
KTable<String, Long> orderCounts = orders
.groupByKey()
.count();
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close(Duration.ofSeconds(30));
}));
}
private static String enrichOrder(String order) {
// Transformation logic
return order.toUpperCase();
}
}
Python Kafka Streams (Faust)
import faust
from faust import Topic, Record
from typing import Optional
import json
# Faust app with exactly-once
app = faust.App(
'exactly-once-processor',
broker='kafka://kafka1:9092,kafka2:9092',
processing_guarantee='exactly_once',
broker_max_poll_interval=300,
topic_partitions=4
)
# Topics
input_topic = app.topic('input-orders', value_type=str)
output_topic = app.topic('output-orders', value_type=str)
# State store (table)
order_counts = app.Table(
'order-counts',
default=int,
partitions=4
)
@app.agent(input_topic)
async def process_orders(stream):
"""Process orders with exactly-once semantics"""
async for order in stream:
try:
# Transform
processed = transform_order(order)
# Send to output topic
await output_topic.send(value=processed)
# Update state store
user_id = extract_user_id(order)
order_counts[user_id] += 1
# Processing complete - will be committed atomically
except Exception as e:
print(f"Error processing order: {e}")
# Faust will handle retry/backoff
def transform_order(order):
"""Transform order data"""
return f"processed:{order}"
def extract_user_id(order):
"""Extract user ID from order"""
# Your extraction logic
return "user-123"
if __name__ == '__main__':
app.main()
Transaction Management
Producer Transactions
// Understanding transaction internals
Properties props = new Properties();
props.put("transactional.id", "streams-processor-1"); // Unique per instance
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Send records
producer.send(new ProducerRecord<>("output", "key", "value"));
// Commit consumer offsets within transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("input", 0), new OffsetAndMetadata(100));
producer.sendOffsetsToTransaction(offsets, "consumer-group");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Consumer Offset Management
// Consumer with exactly-once
Properties props = new Properties();
props.put("group.id", "exactly-once-consumer");
props.put("enable.auto.commit", false);
props.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaProducer<>(props);
consumer.subscribe(Arrays.asList("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process record
processRecord(record);
}
// Don't commit here - will be done in transaction
}
State Store Exactly-Once
// State store updates are atomic with output
KTable<String, OrderSummary> summary = orders
.groupByKey()
.aggregate(
OrderSummary::new,
(key, order, summary) -> summary.addOrder(order),
Materialized.<String, OrderSummary, KeyValueStore<Bytes, byte[]>>as(
"order-summary-store"
)
);
// State store is backed by changelog topic
// Changelog is written within the same transaction
// Recovery replays changelog on failure
State Store Recovery
Architecture Diagram
Failure Scenario:
1. Processor reads 100 records
2. Writes 50 to state store
3. Crashes before commit
Recovery:
1. Consumer restarts from last committed offset
2. State store restored from changelog topic
3. Re-processes all 100 records
4. State store idempotently updated
5. Transaction commits successfully
Exactly-Once Configuration
# Application configuration
application.id=exactly-once-app
bootstrap.servers=kafka1:9092,kafka2:9092
# Processing guarantee
processing.guarantee=exactly_once_v2
# Producer settings (automatically configured)
acks=all
enable.idempotence=true
retries=2147483647
# Transaction timeout
transaction.timeout.ms=60000
# Consumer settings
isolation.level=read_committed
enable.auto.commit=false
Monitoring Exactly-Once
from prometheus_client import Counter, Gauge, Histogram
# Metrics
processed_records = Counter(
'kafka_streams_processed_records_total',
'Total processed records'
)
transaction_commits = Counter(
'kafka_streams_transaction_commits_total',
'Total transaction commits'
)
transaction_aborts = Counter(
'kafka_streams_transaction_aborts_total',
'Total transaction aborts'
)
processing_lag = Gauge(
'kafka_streams_processing_lag',
'Processing lag in records'
)
processing_latency = Histogram(
'kafka_streams_processing_latency_seconds',
'Processing latency',
buckets=[0.001, 0.01, 0.1, 1.0]
)
def monitor_streams_application(app):
"""Monitor Kafka Streams application"""
while True:
# Get state
state = app.state
# Update metrics
processed_records.inc(state.processed_count)
transaction_commits.inc(state.commit_count)
transaction_aborts.inc(state.abort_count)
# Calculate lag
lag = state.end_offset - state.committed_offset
processing_lag.set(lag)
time.sleep(10)
Performance Considerations
Architecture Diagram
Exactly-Once Overhead:
βββ Producer transactions add latency
βββ Multiple partitions increase commit time
βββ State store updates require changelog writes
βββ Network overhead for transaction coordination
Optimization Strategies:
βββ Batch records within transaction
βββ Optimize state store (RocksDB tuning)
βββ Reduce transaction timeout if possible
βββ Use cooperative rebalancing
βββ Monitor and tune commit intervals
Common Pitfalls
1. Transaction Timeout Too Short
// Problem: Long-running operations exceed timeout
props.put("transaction.timeout.ms", "10000"); // 10 seconds
// Solution: Increase timeout for long processing
props.put("transaction.timeout.ms", "300000"); // 5 minutes
2. Consumer Rebalances During Transaction
Architecture Diagram
Problem:
1. Transaction starts
2. Rebalance occurs
3. Transaction aborted
4. Records reprocessed
Solution:
- Use static group membership
- Implement proper rebalance listener
- Handle partial transaction recovery
3. State Store Corruption
Architecture Diagram
Problem: State store inconsistent after crash
Solution:
- Enable changelog topics
- Use idempotent updates
- Implement proper recovery logic
Follow-Up Questions
- What is the difference between
exactly_onceandexactly_once_v2? - How does Kafka Streams handle transaction timeouts?
- Explain the role of changelog topics in exactly-once processing.
- What happens when a rebalance occurs during an active transaction?
- How would you troubleshoot exactly-once processing failures?