πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Streams: Windowing, Joins, State Stores, EOS

Apache KafkaKafka Streams⭐ Premium

Advertisement

Kafka Streams: Windowing, Joins, State Stores, EOS

Difficulty: Staff | Asked at: Confluent, LinkedIn, Uber, Netflix

ℹ️Interview Context

Kafka Streams is a client library for building streaming applications. This question tests your understanding of windowing, joins, state management, and exactly-once semantics in stream processing.

The Question

Explain how Kafka Streams handles windowing, joins, and state management. How does it achieve exactly-once semantics? What are the trade-offs between different state store backends?

Kafka Streams Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Kafka Streams Application                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Processor Topology                   β”‚   β”‚
β”‚  β”‚                                                 β”‚   β”‚
β”‚  β”‚  Source β†’ Filter β†’ Map β†’ Window β†’ Join β†’ Sink  β”‚   β”‚
β”‚  β”‚    ↓         ↓       ↓        ↓       ↓       β”‚   β”‚
β”‚  β”‚  Topics   Topics  Topics  State   State   Topics  β”‚   β”‚
β”‚  β”‚                              Store   Store        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚           State Store Backends                   β”‚   β”‚
β”‚  β”‚  - RocksDB (default, local)                     β”‚   β”‚
β”‚  β”‚  - InMemory (testing)                           β”‚   β”‚
β”‚  β”‚  - Custom (Cassandra, Redis)                    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚           Changelog Topics                       β”‚   β”‚
β”‚  β”‚  - Backup state to Kafka                        β”‚   β”‚
β”‚  β”‚  - Enable state recovery                        β”‚   β”‚
β”‚  β”‚  - Compacted topics                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Windowing Strategies

Tumbling Windows

from kafka.streams import KafkaStreams, StreamsBuilder
from kafka.streams.kstream import Windows, TimeWindows
from kafka.streams.kstream.internals import TimeWindow
import time

# Tumbling windows: fixed-size, non-overlapping
builder = StreamsBuilder()

stream = builder.stream('events')

# Tumbling window of 1 minute
windowed_stream = (
    stream
    .group_by_key()
    .windowedBy(TimeWindows.of_size_with_no_grace(60000))  # 60 seconds
    .count()
)

windowed_stream.to_stream().to('windowed-counts')
Architecture Diagram
Tumbling Window Example (60-second windows):

Time: 0     60    120    180    240
      |------|------|------|------|
      | W1   | W2   | W3   | W4   |
      
Events:
  T=10:  event-1  β†’ Window 1
  T=30:  event-2  β†’ Window 1
  T=70:  event-3  β†’ Window 2
  T=115: event-4  β†’ Window 2
  T=130: event-5  β†’ Window 3

Window Counts:
  W1: 2 events
  W2: 2 events
  W3: 1 event

Hopping Windows

# Hopping windows: fixed-size, overlapping
# Advance interval < window size

# Hopping window: 60-second window, advancing every 10 seconds
hopping_windowed = (
    stream
    .group_by_key()
    .windowedBy(
        TimeWindows.of_size_with_grace(
            size_ms=60000,      # 60-second window
            grace_ms=10000      # 10-second grace period
        ).advance_by(10000)    # Advance every 10 seconds
    )
    .count()
)
Architecture Diagram
Hopping Window Example (60s window, 10s advance):

Time: 0     10    20    30    40    50    60
      |------|------|------|------|------|------|
      |  W1                          |  W2   |
         |  W1b                          |  W2b
            |  W1c                           |  W2c
               |  W1d                            |  W2e
                  |  W1e                             |  W2f

Events:
  T=5:   event-1  β†’ W1, W1b, W1c, W1d, W1e
  T=15:  event-2  β†’ W1b, W1c, W1d, W1e, W1f
  T=25:  event-3  β†’ W1c, W1d, W1e, W1f, W1g

Event appears in 6 windows (60/10 = 6)

Sliding Windows

# Sliding windows: based on event timestamps
# Events within time difference form a window

# Sliding window: 30-second difference
sliding_windowed = (
    stream
    .group_by_key()
    .windowedBy(
        Windows.of_time_difference_with_grace(
            difference_ms=30000,  # 30-second window
            grace_ms=10000        # 10-second grace
        )
    )
    .count()
)
Architecture Diagram
Sliding Window Example (30-second difference):

Events:
  T=0:    event-1
  T=10:   event-2
  T=20:   event-3
  T=40:   event-4

Windows:
  Window 1: [T=0, T=30] β†’ event-1, event-2, event-3
  Window 2: [T=10, T=40] β†’ event-2, event-3, event-4
  Window 3: [T=20, T=50] β†’ event-3, event-4

Each event appears in multiple windows based on timestamp proximity

Session Windows

# Session windows: group events by activity gap
# Events within gap are in same session

# Session window: 10-minute inactivity gap
session_windowed = (
    stream
    .group_by_key()
    .windowedBy(
        Windows.of_time_difference_with_gap(
            gap_ms=600000  # 10-minute gap
        )
    )
    .count()
)
Architecture Diagram
Session Window Example (10-minute gap):

Events:
  T=0:    event-1
  T=5:    event-2       ← Same session (5 min gap)
  T=15:   event-3       ← New session (10 min gap)
  T=20:   event-4       ← Same session (5 min gap)
  T=40:   event-5       ← New session (20 min gap)

Sessions:
  Session 1: [T=0, T=10] β†’ event-1, event-2 (session ends at T=15)
  Session 2: [T=15, T=25] β†’ event-3, event-4 (session ends at T=40)
  Session 3: [T=40, T=40] β†’ event-5 (single event)

ℹ️Windowing Selection

  • Tumbling: Fixed periods, non-overlapping (e.g., hourly reports)
  • Hopping: Fixed periods, overlapping (e.g., moving averages)
  • Sliding: Event-based, continuous (e.g., correlation)
  • Session: Activity-based gaps (e.g., user sessions)

Window Implementation Formula

WindowΒ Start=⌊event_timestampβˆ’epochwindow_sizeβŒ‹Γ—window_size+epoch\text{Window Start} = \lfloor \frac{\text{event\_timestamp} - \text{epoch}}{\text{window\_size}} \rfloor \times \text{window\_size} + \text{epoch}
WindowΒ End=WindowΒ Start+window_size\text{Window End} = \text{Window Start} + \text{window\_size}
def calculate_tumbling_window(event_timestamp_ms, window_size_ms, epoch_ms=0):
    """
    Calculate tumbling window for event.
    """
    window_start = ((event_timestamp_ms - epoch_ms) // window_size_ms) * window_size_ms + epoch_ms
    window_end = window_start + window_size_ms
    return window_start, window_end

def calculate_hopping_window(event_timestamp_ms, window_size_ms, advance_ms):
    """
    Calculate all hopping windows containing event.
    """
    windows = []
    first_window_start = (event_timestamp_ms // window_size_ms) * window_size_ms
    
    # Find all windows containing this event
    window_start = first_window_start
    while window_start <= event_timestamp_ms:
        window_end = window_start + window_size_ms
        if event_timestamp_ms < window_end:
            windows.append((window_start, window_end))
        window_start += advance_ms
    
    return windows

# Example
event_ts = 1690000050000  # 50 seconds into a minute
start, end = calculate_tumbling_window(event_ts, 60000)
print(f"Window: {start} - {end}")
# Window: 1690000000000 - 1690000060000

⚠️Late Events

Use grace period to handle late-arriving events. Events arriving after the grace period are dropped. Set grace based on your data's out-of-orderness characteristics.

Stream-Table Joins

Join Types

from kafka.streams.kstream import Joined, JoinWindows

# 1. Stream-Stream Join
# Both inputs are streams
stream_joined = (
    stream_a
    .join(
        stream_b,
        value_mapper=lambda v_a, v_b: {'a': v_a, 'b': v_b},
        window=JoinWindows.of_time_difference_with_grace(
            60000,  # 60-second join window
            10000   # 10-second grace
        ),
        joined=Joined.with_key-serde(Serdes.String())
    )
)

# 2. Stream-Table Join
# One input is a stream, one is a table
enriched_stream = (
    event_stream.join(
        user_table,  # Table with user data
        value_mapper=lambda event, user: {**event, 'user': user},
        joined=Joined.with_key-serde(Serdes.String())
    )
)

# 3. Table-Table Join
# Both inputs are tables (KTable-KTable join)
combined_table = (
    table_a.join(
        table_b,
        value_mapper=lambda v_a, v_b: {'a': v_a, 'b': v_b}
    )
)

Join Implementation

from kafka.streams import StreamsBuilder, Serdes
from kafka.streams.kstream import Joined, JoinWindows

def build_stream_join():
    """
    Build a stream-stream join application.
    
    Join orders with payments within 5-minute window.
    """
    builder = StreamsBuilder()
    
    # Input streams
    orders = builder.stream(
        'orders',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    payments = builder.stream(
        'payments',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    # Join orders with payments
    # Key: order_id
    # Window: 5 minutes
    joined = (
        orders.join(
            payments,
            value_mapper=lambda order, payment: {
                'order_id': order['id'],
                'payment_id': payment['id'],
                'amount': order['amount'],
                'payment_method': payment['method'],
                'status': 'matched'
            },
            window=JoinWindows.of_time_difference_with_grace(
                300000,  # 5-minute join window
                60000    # 1-minute grace period
            ),
            joined=Joined.with_key-serde=Serdes.String(),
            value-serde=JsonSerde()
        )
    )
    
    # Output joined results
    joined.to(
        'matched-orders',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    return builder

def build_stream_table_join():
    """
    Build a stream-table join application.
    
    Enrich events with user data from table.
    """
    builder = StreamsBuilder()
    
    # Input stream
    events = builder.stream(
        'events',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    # User table (changelog topic backing)
    users = builder.table(
        'users',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    # Join events with user data
    enriched = (
        events.join(
            users,
            value_mapper=lambda event, user: {
                **event,
                'user_name': user.get('name'),
                'user_email': user.get('email'),
                'user_tier': user.get('tier')
            },
            joined=Joined.with_key-serde=Serdes.String()
        )
    )
    
    # Output enriched events
    enriched.to(
        'enriched-events',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    return builder

ℹ️Join Semantics

  • Stream-Stream: Both records must arrive within join window. Late arrivals cause non-matches.
  • Stream-Table: Table state is current at time of stream event. Table updates don't retroactively update joined stream.
  • Table-Table: Both tables update. Join result updates when either table changes.

Join Performance Formula

JoinΒ Cost=O(NΓ—M)Β forΒ stream-streamΒ joins\text{Join Cost} = O(N \times M) \text{ for stream-stream joins}
Optimization=HashΒ joinΒ withΒ windowΒ boundaries\text{Optimization} = \text{Hash join with window boundaries}
def estimate_join_performance(
    stream_a_rate: int,
    stream_b_rate: int,
    window_size_ms: int,
    avg_record_size: int
) -> dict:
    """
    Estimate join performance and memory requirements.
    """
    # Records in window for stream A
    records_in_window_a = stream_a_rate * window_size_ms / 1000
    
    # Records in window for stream B
    records_in_window_b = stream_b_rate * window_size_ms / 1000
    
    # Memory for join state
    memory_for_state = (records_in_window_a + records_in_window_b) * avg_record_size
    
    # Join cost per second
    join_operations_per_sec = stream_a_rate * records_in_window_b
    
    return {
        'records_in_window_a': records_in_window_a,
        'records_in_window_b': records_in_window_b,
        'memory_required_mb': memory_for_state / (1024 * 1024),
        'join_operations_per_sec': join_operations_per_sec,
        'recommended_window_size_ms': min(300000, window_size_ms)
    }

# Example
perf = estimate_join_performance(
    stream_a_rate=1000,  # 1000 orders/sec
    stream_b_rate=500,   # 500 payments/sec
    window_size_ms=300000,  # 5-minute window
    avg_record_size=1000  # 1KB per record
)
print(f"Memory required: {perf['memory_required_mb']:.1f} MB")
print(f"Join operations/sec: {perf['join_operations_per_sec']}")

State Stores

State Store Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  State Store Architecture                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Processor                                               β”‚
β”‚     ↓                                                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚            State Store                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚ RocksDB    β”‚  β”‚ Changelog   β”‚             β”‚   β”‚
β”‚  β”‚  β”‚ (Local)    β”‚  β”‚ Topic       β”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚   β”‚
β”‚  β”‚        ↑                    ↑                    β”‚   β”‚
β”‚  β”‚        β”‚                    β”‚                    β”‚   β”‚
β”‚  β”‚  Read/Write              Backup/Recovery         β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                         β”‚
β”‚  Recovery:                                               β”‚
β”‚  1. Read changelog topic                               β”‚
β”‚  2. Replay events to rebuild local state               β”‚
β”‚  3. Resume processing                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

RocksDB State Store

from kafka.streams.state import RocksDBStore, StoreBuilder

# RocksDB state store configuration
state_store_builder = (
    StoreBuilder
    .from_class(RocksDBStore)
    .with_name('my-state-store')
    .with_key_serde(Serdes.String())
    .with_value_serde(JsonSerde())
    .with_logging_enabled()  # Enable changelog
    .with_caching_enabled()  # Enable caching
    .with_metrics_enabled()  # Enable metrics
)

# Build state store
state_store = state_store_builder.build()

# Use in processor
processor = (
    builder.stream('events')
    .group_by_key()
    .aggregate(
        initializer=lambda: {'count': 0, 'sum': 0},
        aggregator=lambda key, value, aggregate: {
            'count': aggregate['count'] + 1,
            'sum': aggregate['sum'] + value['amount']
        },
        materialized=Materialized.from_state_store(state_store)
    )
)

State Store Configuration

# RocksDB configuration
rocksdb_config = {
    # Block cache
    'block.cache.size': 16 * 1024 * 1024,  # 16MB
    
    # Write buffer
    'write.buffer.size': 4 * 1024 * 1024,  # 4MB
    'max.write.buffer.number': 3,
    
    # Compaction
    'level0.file.num.compaction.trigger': 4,
    'level0.stop.writes.trigger': 20,
    'max.bytes.for.level.base': 256 * 1024 * 1024,  # 256MB
    
    # Compression
    'compression.type': 'lz4',
    
    # Bloom filter
    'bloom.filter.bits.per.key': 10,
    
    # Block size
    'block.size': 4096  # 4KB
}

# Changelog topic configuration
changelog_config = {
    'changelog.replication.factor': 3,
    'changelog.num.partitions': 1,
    'changelog.retention.ms': 604800000,  # 7 days
    'changelog.cleanup.policy': 'compact'
}

State Store Metrics

# Key state store metrics
state_metrics = {
    # Size
    'bytes': 'Total bytes in state store',
    'entries': 'Number of entries',
    
    # Performance
    'put-rate': 'Write operations per second',
    'get-rate': 'Read operations per second',
    'delete-rate': 'Delete operations per second',
    'flush-rate': 'Flush operations per second',
    
    # Latency
    'put-latency-avg': 'Average write latency',
    'put-latency-max': 'Maximum write latency',
    'get-latency-avg': 'Average read latency',
    'get-latency-max': 'Maximum read latency',
    
    # Compaction
    'compaction-time': 'Time spent compacting',
    'compaction-bytes': 'Bytes compacted',
    
    # Cache
    'cache-hits': 'Cache hit count',
    'cache-misses': 'Cache miss count',
    'cache-hit-rate': 'Cache hit rate'
}

⚠️RocksDB Tuning

RocksDB performance depends heavily on memory allocation. Set block.cache.size to 25-50% of available memory for state-heavy applications. Monitor compaction metrics to avoid write stalls.

InMemory State Store

from kafka.streams.state import InMemoryStore, StoreBuilder

# In-memory state store (for testing or small state)
in_memory_builder = (
    StoreBuilder
    .from_class(InMemoryStore)
    .with_name('in-memory-store')
    .with_key_serde(Serdes.String())
    .with_value_serde(JsonSerde())
    .with_logging_disabled()  # No changelog
)

# Use case: temporary aggregation
temp_aggregate = (
    builder.stream('events')
    .group_by_key()
    .windowedBy(TimeWindows.of_size_with_no_grace(60000))
    .aggregate(
        initializer=lambda: 0,
        aggregator=lambda key, value, agg: agg + value['amount'],
        materialized=Materialized.from_state_store(in_memory_builder)
    )
)

State Store Backend Comparison

BackendPerformancePersistenceRecoveryUse Case
RocksDBHighYesChangelogProduction default
InMemoryHighestNoNoneTesting, small state
CustomVariesVariesVariesSpecial needs

ℹ️State Store Selection

  • RocksDB: Default for production. Handles large state with disk persistence.
  • InMemory: For testing or when state fits in memory and loss is acceptable.
  • Custom: For integrating with external stores (Redis, Cassandra). Requires implementing StateStore interface.

Exactly-Once Semantics in Kafka Streams

How EOS Works

from kafka.streams import KafkaStreams, StreamsConfig

# Enable exactly-once semantics
config = StreamsConfig({
    'application.id': 'my-streams-app',
    'bootstrap.servers': 'localhost:9092',
    
    # Exactly-once configuration
    'processing.guarantee': 'exactly_once_v2',  # or 'exactly_once'
    
    # Producer configuration for EOS
    'producer.acks': 'all',
    'producer.enable.idempotence': True,
    'producer.transactional.id': 'my-streams-app',
    
    # Consumer configuration
    'isolation.level': 'read_committed'
})

# Build topology
builder = StreamsBuilder()
stream = builder.stream('input-topic')
stream.map_values(transform).to('output-topic')

# Start streams with EOS
streams = KafkaStreams(builder.build(), config)
streams.start()

EOS Processing Guarantee Levels

# Three levels of processing guarantee

# 1. at_least_once (default)
config['processing.guarantee'] = 'at_least_once'
# - Producer retries may cause duplicates
# - Consumer offsets committed after processing
# - If crash between processing and commit, messages reprocessed

# 2. exactly_once (legacy)
config['processing.guarantee'] = 'exactly_once'
# - Uses transactions
# - Partitions and offsets committed atomically
# - Limitation: only works for single-input applications

# 3. exactly_once_v2 (recommended)
config['processing.guarantee'] = 'exactly_once_v2'
# - Improved exactly-once
# - Works with multiple input topics
# - Better performance than exactly_once
# - Recommended for new applications

EOS Implementation Details

class ExactlyOnceProcessor:
    """
    Processor with exactly-once semantics.
    
    Guarantees:
    1. Each input record processed exactly once
    2. Output records written exactly once
    3. State updates applied exactly once
    4. Consumer offsets committed exactly once
    """
    
    def __init__(self, config):
        self.config = config
        self.state_store = None
    
    def process(self, record):
        """
        Process record with exactly-once semantics.
        
        Under the hood:
        1. Begin transaction
        2. Process record
        3. Write to output topics
        4. Update state store
        5. Commit consumer offsets
        6. Commit transaction
        
        If any step fails:
        1. Abort transaction
        2. No output written
        3. State not updated
        4. Offsets not committed
        5. Record reprocessed on retry
        """
        try:
            # Begin transaction (automatic with EOS)
            # Process record
            result = self._transform(record.value())
            
            # Write to output topic
            self._send_to_output(record.key(), result)
            
            # Update state store
            self._update_state(record.key(), result)
            
            # Consumer offsets committed automatically
            # with transaction
            
        except Exception as e:
            # Transaction aborted automatically
            # Record will be reprocessed
            log.error(f"Processing failed: {e}")
            raise

EOS Performance Formula

EOSΒ Overhead=TransactionΒ Latency+CoordinatorΒ RTT\text{EOS Overhead} = \text{Transaction Latency} + \text{Coordinator RTT}
ThroughputΒ Impact=1TransactionΒ LatencyΒ (ms)Γ—1000Β txns/sec\text{Throughput Impact} = \frac{1}{\text{Transaction Latency (ms)}} \times 1000 \text{ txns/sec}
def calculate_eos_overhead(
    records_per_sec: int,
    batch_size: int,
    transaction_latency_ms: float,
    coordinator_rtt_ms: float
) -> dict:
    """
    Calculate EOS overhead for Kafka Streams.
    """
    # Transactions per second
    transactions_per_sec = records_per_sec / batch_size
    
    # Transaction overhead
    transaction_overhead_per_sec = transactions_per_sec * transaction_latency_ms / 1000
    
    # Coordinator RTT overhead
    coordinator_overhead_per_sec = transactions_per_sec * coordinator_rtt_ms / 1000
    
    # Total overhead
    total_overhead_per_sec = transaction_overhead_per_sec + coordinator_overhead_per_sec
    
    # Throughput reduction
    throughput_reduction = (total_overhead_per_sec * batch_size) / (records_per_sec * batch_size)
    
    return {
        'transactions_per_sec': transactions_per_sec,
        'transaction_overhead_per_sec': transaction_overhead_per_sec,
        'coordinator_overhead_per_sec': coordinator_overhead_per_sec,
        'total_overhead_per_sec': total_overhead_per_sec,
        'throughput_reduction_percent': throughput_reduction * 100,
        'recommended_batch_size': max(100, int(records_per_sec * transaction_latency_ms / 1000))
    }

# Example
overhead = calculate_eos_overhead(
    records_per_sec=100000,
    batch_size=1000,
    transaction_latency_ms=10,
    coordinator_rtt_ms=5
)
print(f"Transactions/sec: {overhead['transactions_per_sec']:.0f}")
print(f"Throughput reduction: {overhead['throughput_reduction_percent']:.1f}%")
print(f"Recommended batch size: {overhead['recommended_batch_size']}")

⚠️EOS Trade-off

EOS adds 5-15% throughput overhead due to transaction coordination. Use EOS when duplicate processing is expensive. For most use cases, at-least-once with idempotent processing is sufficient.

Complete Kafka Streams Application

from kafka.streams import KafkaStreams, StreamsBuilder, StreamsConfig
from kafka.streams.kstream import Windows, TimeWindows, Joined
from kafka.streams.state import RocksDBStore, StoreBuilder

def build_exactly_once_application():
    """
    Complete Kafka Streams application with EOS.
    
    Features:
    - Exactly-once semantics
    - Windowed aggregation
    - Stream-table join
    - State store with RocksDB
    """
    config = StreamsConfig({
        'application.id': 'user-analytics',
        'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
        
        # EOS configuration
        'processing.guarantee': 'exactly_once_v2',
        
        # State store configuration
        'state.dir': '/var/kafka/streams-state',
        'rocksdb.block.cache.size': 16 * 1024 * 1024,
        
        # Consumer configuration
        'isolation.level': 'read_committed',
        'auto.offset.reset': 'earliest',
        
        # Producer configuration
        'producer.acks': 'all',
        'producer.enable.idempotence': True,
        
        # Performance tuning
        'num.stream.threads': 4,
        'replication.factor': 3,
        'commit.interval.ms': 100,
        'cache.max.bytes.buffering': 10 * 1024 * 1024
    })
    
    builder = StreamsBuilder()
    
    # Input streams
    page_views = builder.stream(
        'page-views',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    user_actions = builder.stream(
        'user-actions',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    # User table for enrichment
    users = builder.table(
        'users',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    # Windowed page view counts
    windowed_counts = (
        page_views
        .group_by_key()
        .windowedBy(TimeWindows.of_size_with_grace(300000, 60000))  # 5-min window, 1-min grace
        .count()
    )
    
    # Join page views with user actions
    enriched_views = (
        page_views.join(
            user_actions,
            value_mapper=lambda pv, ua: {
                'page': pv['page'],
                'action': ua['action'],
                'timestamp': pv['timestamp']
            },
            window=JoinWindows.of_time_difference_with_grace(60000, 10000),
            joined=Joined.with_key-serde=Serdes.String()
        )
    )
    
    # Enrich with user data
    final_stream = (
        enriched_views.join(
            users,
            value_mapper=lambda view, user: {
                **view,
                'user_name': user.get('name'),
                'user_tier': user.get('tier')
            },
            joined=Joined.with_key-serde=Serdes.String()
        )
    )
    
    # Output
    windowed_counts.to_stream().to(
        'windowed-counts',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    final_stream.to(
        'enriched-events',
        key-serde=Serdes.String(),
        value-serde=JsonSerde()
    )
    
    return KafkaStreams(builder.build(), config)

# Run application
streams = build_exactly_once_application()
streams.start()

# Graceful shutdown
import signal
def shutdown_handler(sig, frame):
    streams.close()
    streams.clean_up()

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

ℹ️Production Tip

Monitor commit-latency-avg and records-processed-rate for EOS applications. High commit latency indicates transaction coordination overhead. Adjust commit.interval.ms and batch.size to optimize.

Advertisement