Kafka Streams: Windowing, Joins, State Stores, EOS
Difficulty: Staff | Asked at: Confluent, LinkedIn, Uber, Netflix
βΉοΈInterview Context
Kafka Streams is a client library for building streaming applications. This question tests your understanding of windowing, joins, state management, and exactly-once semantics in stream processing.
The Question
Explain how Kafka Streams handles windowing, joins, and state management. How does it achieve exactly-once semantics? What are the trade-offs between different state store backends?
Kafka Streams Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Streams Application β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Processor Topology β β
β β β β
β β Source β Filter β Map β Window β Join β Sink β β
β β β β β β β β β
β β Topics Topics Topics State State Topics β β
β β Store Store β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β State Store Backends β β
β β - RocksDB (default, local) β β
β β - InMemory (testing) β β
β β - Custom (Cassandra, Redis) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Changelog Topics β β
β β - Backup state to Kafka β β
β β - Enable state recovery β β
β β - Compacted topics β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Windowing Strategies
Tumbling Windows
from kafka.streams import KafkaStreams, StreamsBuilder
from kafka.streams.kstream import Windows, TimeWindows
from kafka.streams.kstream.internals import TimeWindow
import time
# Tumbling windows: fixed-size, non-overlapping
builder = StreamsBuilder()
stream = builder.stream('events')
# Tumbling window of 1 minute
windowed_stream = (
stream
.group_by_key()
.windowedBy(TimeWindows.of_size_with_no_grace(60000)) # 60 seconds
.count()
)
windowed_stream.to_stream().to('windowed-counts')
Tumbling Window Example (60-second windows):
Time: 0 60 120 180 240
|------|------|------|------|
| W1 | W2 | W3 | W4 |
Events:
T=10: event-1 β Window 1
T=30: event-2 β Window 1
T=70: event-3 β Window 2
T=115: event-4 β Window 2
T=130: event-5 β Window 3
Window Counts:
W1: 2 events
W2: 2 events
W3: 1 event
Hopping Windows
# Hopping windows: fixed-size, overlapping
# Advance interval < window size
# Hopping window: 60-second window, advancing every 10 seconds
hopping_windowed = (
stream
.group_by_key()
.windowedBy(
TimeWindows.of_size_with_grace(
size_ms=60000, # 60-second window
grace_ms=10000 # 10-second grace period
).advance_by(10000) # Advance every 10 seconds
)
.count()
)
Hopping Window Example (60s window, 10s advance):
Time: 0 10 20 30 40 50 60
|------|------|------|------|------|------|
| W1 | W2 |
| W1b | W2b
| W1c | W2c
| W1d | W2e
| W1e | W2f
Events:
T=5: event-1 β W1, W1b, W1c, W1d, W1e
T=15: event-2 β W1b, W1c, W1d, W1e, W1f
T=25: event-3 β W1c, W1d, W1e, W1f, W1g
Event appears in 6 windows (60/10 = 6)
Sliding Windows
# Sliding windows: based on event timestamps
# Events within time difference form a window
# Sliding window: 30-second difference
sliding_windowed = (
stream
.group_by_key()
.windowedBy(
Windows.of_time_difference_with_grace(
difference_ms=30000, # 30-second window
grace_ms=10000 # 10-second grace
)
)
.count()
)
Sliding Window Example (30-second difference):
Events:
T=0: event-1
T=10: event-2
T=20: event-3
T=40: event-4
Windows:
Window 1: [T=0, T=30] β event-1, event-2, event-3
Window 2: [T=10, T=40] β event-2, event-3, event-4
Window 3: [T=20, T=50] β event-3, event-4
Each event appears in multiple windows based on timestamp proximity
Session Windows
# Session windows: group events by activity gap
# Events within gap are in same session
# Session window: 10-minute inactivity gap
session_windowed = (
stream
.group_by_key()
.windowedBy(
Windows.of_time_difference_with_gap(
gap_ms=600000 # 10-minute gap
)
)
.count()
)
Session Window Example (10-minute gap):
Events:
T=0: event-1
T=5: event-2 β Same session (5 min gap)
T=15: event-3 β New session (10 min gap)
T=20: event-4 β Same session (5 min gap)
T=40: event-5 β New session (20 min gap)
Sessions:
Session 1: [T=0, T=10] β event-1, event-2 (session ends at T=15)
Session 2: [T=15, T=25] β event-3, event-4 (session ends at T=40)
Session 3: [T=40, T=40] β event-5 (single event)
βΉοΈWindowing Selection
- Tumbling: Fixed periods, non-overlapping (e.g., hourly reports)
- Hopping: Fixed periods, overlapping (e.g., moving averages)
- Sliding: Event-based, continuous (e.g., correlation)
- Session: Activity-based gaps (e.g., user sessions)
Window Implementation Formula
def calculate_tumbling_window(event_timestamp_ms, window_size_ms, epoch_ms=0):
"""
Calculate tumbling window for event.
"""
window_start = ((event_timestamp_ms - epoch_ms) // window_size_ms) * window_size_ms + epoch_ms
window_end = window_start + window_size_ms
return window_start, window_end
def calculate_hopping_window(event_timestamp_ms, window_size_ms, advance_ms):
"""
Calculate all hopping windows containing event.
"""
windows = []
first_window_start = (event_timestamp_ms // window_size_ms) * window_size_ms
# Find all windows containing this event
window_start = first_window_start
while window_start <= event_timestamp_ms:
window_end = window_start + window_size_ms
if event_timestamp_ms < window_end:
windows.append((window_start, window_end))
window_start += advance_ms
return windows
# Example
event_ts = 1690000050000 # 50 seconds into a minute
start, end = calculate_tumbling_window(event_ts, 60000)
print(f"Window: {start} - {end}")
# Window: 1690000000000 - 1690000060000
β οΈLate Events
Use grace period to handle late-arriving events. Events arriving after the grace period are dropped. Set grace based on your data's out-of-orderness characteristics.
Stream-Table Joins
Join Types
from kafka.streams.kstream import Joined, JoinWindows
# 1. Stream-Stream Join
# Both inputs are streams
stream_joined = (
stream_a
.join(
stream_b,
value_mapper=lambda v_a, v_b: {'a': v_a, 'b': v_b},
window=JoinWindows.of_time_difference_with_grace(
60000, # 60-second join window
10000 # 10-second grace
),
joined=Joined.with_key-serde(Serdes.String())
)
)
# 2. Stream-Table Join
# One input is a stream, one is a table
enriched_stream = (
event_stream.join(
user_table, # Table with user data
value_mapper=lambda event, user: {**event, 'user': user},
joined=Joined.with_key-serde(Serdes.String())
)
)
# 3. Table-Table Join
# Both inputs are tables (KTable-KTable join)
combined_table = (
table_a.join(
table_b,
value_mapper=lambda v_a, v_b: {'a': v_a, 'b': v_b}
)
)
Join Implementation
from kafka.streams import StreamsBuilder, Serdes
from kafka.streams.kstream import Joined, JoinWindows
def build_stream_join():
"""
Build a stream-stream join application.
Join orders with payments within 5-minute window.
"""
builder = StreamsBuilder()
# Input streams
orders = builder.stream(
'orders',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
payments = builder.stream(
'payments',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
# Join orders with payments
# Key: order_id
# Window: 5 minutes
joined = (
orders.join(
payments,
value_mapper=lambda order, payment: {
'order_id': order['id'],
'payment_id': payment['id'],
'amount': order['amount'],
'payment_method': payment['method'],
'status': 'matched'
},
window=JoinWindows.of_time_difference_with_grace(
300000, # 5-minute join window
60000 # 1-minute grace period
),
joined=Joined.with_key-serde=Serdes.String(),
value-serde=JsonSerde()
)
)
# Output joined results
joined.to(
'matched-orders',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
return builder
def build_stream_table_join():
"""
Build a stream-table join application.
Enrich events with user data from table.
"""
builder = StreamsBuilder()
# Input stream
events = builder.stream(
'events',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
# User table (changelog topic backing)
users = builder.table(
'users',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
# Join events with user data
enriched = (
events.join(
users,
value_mapper=lambda event, user: {
**event,
'user_name': user.get('name'),
'user_email': user.get('email'),
'user_tier': user.get('tier')
},
joined=Joined.with_key-serde=Serdes.String()
)
)
# Output enriched events
enriched.to(
'enriched-events',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
return builder
βΉοΈJoin Semantics
- Stream-Stream: Both records must arrive within join window. Late arrivals cause non-matches.
- Stream-Table: Table state is current at time of stream event. Table updates don't retroactively update joined stream.
- Table-Table: Both tables update. Join result updates when either table changes.
Join Performance Formula
def estimate_join_performance(
stream_a_rate: int,
stream_b_rate: int,
window_size_ms: int,
avg_record_size: int
) -> dict:
"""
Estimate join performance and memory requirements.
"""
# Records in window for stream A
records_in_window_a = stream_a_rate * window_size_ms / 1000
# Records in window for stream B
records_in_window_b = stream_b_rate * window_size_ms / 1000
# Memory for join state
memory_for_state = (records_in_window_a + records_in_window_b) * avg_record_size
# Join cost per second
join_operations_per_sec = stream_a_rate * records_in_window_b
return {
'records_in_window_a': records_in_window_a,
'records_in_window_b': records_in_window_b,
'memory_required_mb': memory_for_state / (1024 * 1024),
'join_operations_per_sec': join_operations_per_sec,
'recommended_window_size_ms': min(300000, window_size_ms)
}
# Example
perf = estimate_join_performance(
stream_a_rate=1000, # 1000 orders/sec
stream_b_rate=500, # 500 payments/sec
window_size_ms=300000, # 5-minute window
avg_record_size=1000 # 1KB per record
)
print(f"Memory required: {perf['memory_required_mb']:.1f} MB")
print(f"Join operations/sec: {perf['join_operations_per_sec']}")
State Stores
State Store Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β State Store Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Processor β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β State Store β β
β β βββββββββββββββ βββββββββββββββ β β
β β β RocksDB β β Changelog β β β
β β β (Local) β β Topic β β β
β β βββββββββββββββ βββββββββββββββ β β
β β β β β β
β β β β β β
β β Read/Write Backup/Recovery β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Recovery: β
β 1. Read changelog topic β
β 2. Replay events to rebuild local state β
β 3. Resume processing β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
RocksDB State Store
from kafka.streams.state import RocksDBStore, StoreBuilder
# RocksDB state store configuration
state_store_builder = (
StoreBuilder
.from_class(RocksDBStore)
.with_name('my-state-store')
.with_key_serde(Serdes.String())
.with_value_serde(JsonSerde())
.with_logging_enabled() # Enable changelog
.with_caching_enabled() # Enable caching
.with_metrics_enabled() # Enable metrics
)
# Build state store
state_store = state_store_builder.build()
# Use in processor
processor = (
builder.stream('events')
.group_by_key()
.aggregate(
initializer=lambda: {'count': 0, 'sum': 0},
aggregator=lambda key, value, aggregate: {
'count': aggregate['count'] + 1,
'sum': aggregate['sum'] + value['amount']
},
materialized=Materialized.from_state_store(state_store)
)
)
State Store Configuration
# RocksDB configuration
rocksdb_config = {
# Block cache
'block.cache.size': 16 * 1024 * 1024, # 16MB
# Write buffer
'write.buffer.size': 4 * 1024 * 1024, # 4MB
'max.write.buffer.number': 3,
# Compaction
'level0.file.num.compaction.trigger': 4,
'level0.stop.writes.trigger': 20,
'max.bytes.for.level.base': 256 * 1024 * 1024, # 256MB
# Compression
'compression.type': 'lz4',
# Bloom filter
'bloom.filter.bits.per.key': 10,
# Block size
'block.size': 4096 # 4KB
}
# Changelog topic configuration
changelog_config = {
'changelog.replication.factor': 3,
'changelog.num.partitions': 1,
'changelog.retention.ms': 604800000, # 7 days
'changelog.cleanup.policy': 'compact'
}
State Store Metrics
# Key state store metrics
state_metrics = {
# Size
'bytes': 'Total bytes in state store',
'entries': 'Number of entries',
# Performance
'put-rate': 'Write operations per second',
'get-rate': 'Read operations per second',
'delete-rate': 'Delete operations per second',
'flush-rate': 'Flush operations per second',
# Latency
'put-latency-avg': 'Average write latency',
'put-latency-max': 'Maximum write latency',
'get-latency-avg': 'Average read latency',
'get-latency-max': 'Maximum read latency',
# Compaction
'compaction-time': 'Time spent compacting',
'compaction-bytes': 'Bytes compacted',
# Cache
'cache-hits': 'Cache hit count',
'cache-misses': 'Cache miss count',
'cache-hit-rate': 'Cache hit rate'
}
β οΈRocksDB Tuning
RocksDB performance depends heavily on memory allocation. Set block.cache.size to 25-50% of available memory for state-heavy applications. Monitor compaction metrics to avoid write stalls.
InMemory State Store
from kafka.streams.state import InMemoryStore, StoreBuilder
# In-memory state store (for testing or small state)
in_memory_builder = (
StoreBuilder
.from_class(InMemoryStore)
.with_name('in-memory-store')
.with_key_serde(Serdes.String())
.with_value_serde(JsonSerde())
.with_logging_disabled() # No changelog
)
# Use case: temporary aggregation
temp_aggregate = (
builder.stream('events')
.group_by_key()
.windowedBy(TimeWindows.of_size_with_no_grace(60000))
.aggregate(
initializer=lambda: 0,
aggregator=lambda key, value, agg: agg + value['amount'],
materialized=Materialized.from_state_store(in_memory_builder)
)
)
State Store Backend Comparison
| Backend | Performance | Persistence | Recovery | Use Case |
|---|---|---|---|---|
| RocksDB | High | Yes | Changelog | Production default |
| InMemory | Highest | No | None | Testing, small state |
| Custom | Varies | Varies | Varies | Special needs |
βΉοΈState Store Selection
- RocksDB: Default for production. Handles large state with disk persistence.
- InMemory: For testing or when state fits in memory and loss is acceptable.
- Custom: For integrating with external stores (Redis, Cassandra). Requires implementing
StateStoreinterface.
Exactly-Once Semantics in Kafka Streams
How EOS Works
from kafka.streams import KafkaStreams, StreamsConfig
# Enable exactly-once semantics
config = StreamsConfig({
'application.id': 'my-streams-app',
'bootstrap.servers': 'localhost:9092',
# Exactly-once configuration
'processing.guarantee': 'exactly_once_v2', # or 'exactly_once'
# Producer configuration for EOS
'producer.acks': 'all',
'producer.enable.idempotence': True,
'producer.transactional.id': 'my-streams-app',
# Consumer configuration
'isolation.level': 'read_committed'
})
# Build topology
builder = StreamsBuilder()
stream = builder.stream('input-topic')
stream.map_values(transform).to('output-topic')
# Start streams with EOS
streams = KafkaStreams(builder.build(), config)
streams.start()
EOS Processing Guarantee Levels
# Three levels of processing guarantee
# 1. at_least_once (default)
config['processing.guarantee'] = 'at_least_once'
# - Producer retries may cause duplicates
# - Consumer offsets committed after processing
# - If crash between processing and commit, messages reprocessed
# 2. exactly_once (legacy)
config['processing.guarantee'] = 'exactly_once'
# - Uses transactions
# - Partitions and offsets committed atomically
# - Limitation: only works for single-input applications
# 3. exactly_once_v2 (recommended)
config['processing.guarantee'] = 'exactly_once_v2'
# - Improved exactly-once
# - Works with multiple input topics
# - Better performance than exactly_once
# - Recommended for new applications
EOS Implementation Details
class ExactlyOnceProcessor:
"""
Processor with exactly-once semantics.
Guarantees:
1. Each input record processed exactly once
2. Output records written exactly once
3. State updates applied exactly once
4. Consumer offsets committed exactly once
"""
def __init__(self, config):
self.config = config
self.state_store = None
def process(self, record):
"""
Process record with exactly-once semantics.
Under the hood:
1. Begin transaction
2. Process record
3. Write to output topics
4. Update state store
5. Commit consumer offsets
6. Commit transaction
If any step fails:
1. Abort transaction
2. No output written
3. State not updated
4. Offsets not committed
5. Record reprocessed on retry
"""
try:
# Begin transaction (automatic with EOS)
# Process record
result = self._transform(record.value())
# Write to output topic
self._send_to_output(record.key(), result)
# Update state store
self._update_state(record.key(), result)
# Consumer offsets committed automatically
# with transaction
except Exception as e:
# Transaction aborted automatically
# Record will be reprocessed
log.error(f"Processing failed: {e}")
raise
EOS Performance Formula
def calculate_eos_overhead(
records_per_sec: int,
batch_size: int,
transaction_latency_ms: float,
coordinator_rtt_ms: float
) -> dict:
"""
Calculate EOS overhead for Kafka Streams.
"""
# Transactions per second
transactions_per_sec = records_per_sec / batch_size
# Transaction overhead
transaction_overhead_per_sec = transactions_per_sec * transaction_latency_ms / 1000
# Coordinator RTT overhead
coordinator_overhead_per_sec = transactions_per_sec * coordinator_rtt_ms / 1000
# Total overhead
total_overhead_per_sec = transaction_overhead_per_sec + coordinator_overhead_per_sec
# Throughput reduction
throughput_reduction = (total_overhead_per_sec * batch_size) / (records_per_sec * batch_size)
return {
'transactions_per_sec': transactions_per_sec,
'transaction_overhead_per_sec': transaction_overhead_per_sec,
'coordinator_overhead_per_sec': coordinator_overhead_per_sec,
'total_overhead_per_sec': total_overhead_per_sec,
'throughput_reduction_percent': throughput_reduction * 100,
'recommended_batch_size': max(100, int(records_per_sec * transaction_latency_ms / 1000))
}
# Example
overhead = calculate_eos_overhead(
records_per_sec=100000,
batch_size=1000,
transaction_latency_ms=10,
coordinator_rtt_ms=5
)
print(f"Transactions/sec: {overhead['transactions_per_sec']:.0f}")
print(f"Throughput reduction: {overhead['throughput_reduction_percent']:.1f}%")
print(f"Recommended batch size: {overhead['recommended_batch_size']}")
β οΈEOS Trade-off
EOS adds 5-15% throughput overhead due to transaction coordination. Use EOS when duplicate processing is expensive. For most use cases, at-least-once with idempotent processing is sufficient.
Complete Kafka Streams Application
from kafka.streams import KafkaStreams, StreamsBuilder, StreamsConfig
from kafka.streams.kstream import Windows, TimeWindows, Joined
from kafka.streams.state import RocksDBStore, StoreBuilder
def build_exactly_once_application():
"""
Complete Kafka Streams application with EOS.
Features:
- Exactly-once semantics
- Windowed aggregation
- Stream-table join
- State store with RocksDB
"""
config = StreamsConfig({
'application.id': 'user-analytics',
'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
# EOS configuration
'processing.guarantee': 'exactly_once_v2',
# State store configuration
'state.dir': '/var/kafka/streams-state',
'rocksdb.block.cache.size': 16 * 1024 * 1024,
# Consumer configuration
'isolation.level': 'read_committed',
'auto.offset.reset': 'earliest',
# Producer configuration
'producer.acks': 'all',
'producer.enable.idempotence': True,
# Performance tuning
'num.stream.threads': 4,
'replication.factor': 3,
'commit.interval.ms': 100,
'cache.max.bytes.buffering': 10 * 1024 * 1024
})
builder = StreamsBuilder()
# Input streams
page_views = builder.stream(
'page-views',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
user_actions = builder.stream(
'user-actions',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
# User table for enrichment
users = builder.table(
'users',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
# Windowed page view counts
windowed_counts = (
page_views
.group_by_key()
.windowedBy(TimeWindows.of_size_with_grace(300000, 60000)) # 5-min window, 1-min grace
.count()
)
# Join page views with user actions
enriched_views = (
page_views.join(
user_actions,
value_mapper=lambda pv, ua: {
'page': pv['page'],
'action': ua['action'],
'timestamp': pv['timestamp']
},
window=JoinWindows.of_time_difference_with_grace(60000, 10000),
joined=Joined.with_key-serde=Serdes.String()
)
)
# Enrich with user data
final_stream = (
enriched_views.join(
users,
value_mapper=lambda view, user: {
**view,
'user_name': user.get('name'),
'user_tier': user.get('tier')
},
joined=Joined.with_key-serde=Serdes.String()
)
)
# Output
windowed_counts.to_stream().to(
'windowed-counts',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
final_stream.to(
'enriched-events',
key-serde=Serdes.String(),
value-serde=JsonSerde()
)
return KafkaStreams(builder.build(), config)
# Run application
streams = build_exactly_once_application()
streams.start()
# Graceful shutdown
import signal
def shutdown_handler(sig, frame):
streams.close()
streams.clean_up()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
βΉοΈProduction Tip
Monitor commit-latency-avg and records-processed-rate for EOS applications. High commit latency indicates transaction coordination overhead. Adjust commit.interval.ms and batch.size to optimize.