πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Streams Architecture

Apache KafkaKafka Streams⭐ Premium

Advertisement

Kafka Streams Architecture

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Kafka Streams is a client library for building stream processing applications on top of Kafka. It provides stateful processing, windowing, and exactly-once semantics.

Architecture Overview

Architecture Diagram
Kafka Streams Application:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Stream Thread 1                                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚ Source      β”‚   β”‚ Processor   β”‚            β”‚
β”‚  β”‚ (Consumer)  │──▢│ (Transform) β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚         β”‚                    β”‚                 β”‚
β”‚         β–Ό                    β–Ό                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚ State Store β”‚   β”‚ Sink        β”‚            β”‚
β”‚  β”‚ (RocksDB)   β”‚   β”‚ (Producer)  β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Processing Topology

Architecture Diagram
KStream (unbounded stream)
    β”‚
    β”œβ”€β”€ Filter
    β”œβ”€β”€ Map
    β”œβ”€β”€ FlatMap
    └── WindowedAggregate
            β”‚
            β–Ό
    KTable (changelog-backed state)
            β”‚
            β”œβ”€β”€ Join
            └── Aggregate
                    β”‚
                    β–Ό
            Output KStream

Basic Kafka Streams Application

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class StreamProcessor {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.StringSerde.class);
        
        // Exactly-once semantics
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
                  StreamsConfig.EXACTLY_ONCE_V2);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream
        KStream<String, String> orders = builder.stream("orders");
        
        // Transform stream
        KStream<String, String> processed = orders
            .filter((key, value) -> value != null)
            .mapValues(value -> value.toUpperCase())
            .peek((key, value) -> System.out.println("Processing: " + key));
        
        // Sink
        processed.to("processed-orders");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Python Kafka Streams (Faust)

import faust
from faust import Topic, Record

# Faust app (Kafka Streams equivalent in Python)
app = faust.App(
    'stream-processor',
    broker='kafka://kafka1:9092,kafka2:9092',
    processing_guarantee='exactly_once'
)

# Define topic
orders_topic = app.topic('orders', value_type=str)

# Define table (state store)
order_counts = app.Table(
    'order-counts',
    default=int,
    partitions=4
)

@app.agent(orders_topic)
async def process_orders(stream):
    async for order in stream:
        # Process each order
        processed = transform(order)
        
        # Update state
        order_counts[order['user_id']] += 1
        
        # Send to output topic
        await processed_orders.send(value=processed)

@app.task
async def on_start():
    print("Stream processor started")

State Stores

RocksDB State Store

// Create state store for local state
KTable<String, OrderSummary> orderSummary = orders
    .groupByKey()
    .aggregate(
        OrderSummary::new,
        (key, order, summary) -> summary.addOrder(order),
        Materialized.<String, OrderSummary, KeyValueStore<Bytes, byte[]>>as(
            "order-summary-store"
        )
        .withKeySerde(Serdes.String())
        .withValueSerde(new OrderSummarySerde())
        .withRetention(Duration.ofHours(24))
    );

Custom State Store

// Custom state store implementation
public class CustomStateStore implements StateStore {
    private final String name;
    private final Map<String, byte[]> store = new ConcurrentHashMap<>();
    
    @Override
    public String name() {
        return name;
    }
    
    @Override
    public void init(ProcessorContext context, StateStore root) {
        // Initialize store
        // Register flush listener
        this.flushListener = root::flush;
    }
    
    @Override
    public void flush() {
        // Flush to persistent storage
    }
    
    @Override
    public void close() {
        // Cleanup resources
    }
    
    public byte[] get(String key) {
        return store.get(key);
    }
    
    public void put(String key, byte[] value) {
        store.put(key, value);
        flushListener.run();  // Notify processor
    }
}

Windowing Operations

Tumbling Windows

// Fixed-size, non-overlapping windows
KTable<Windowed<String>, Long> tumblingCounts = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Window boundaries:
// [00:00, 00:05), [00:05, 00:10), [00:10, 00:15), ...

Hopping Windows

// Fixed-size, overlapping windows
KTable<Windowed<String>, Long> hoppingCounts = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10))
        .advanceBy(Duration.ofMinutes(5)))
    .count();

// Window boundaries:
// [00:00, 00:10), [00:05, 00:15), [00:10, 00:20), ...

Sliding Windows

// Dynamic windows based on event timestamps
KTable<Windowed<String>, Long> slidingCounts = orders
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

Session Windows

// Dynamic windows based on activity
KTable<Windowed<String>, Long> sessionCounts = orders
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
    .count();

// Windows merge if gap < inactivity gap
// Good for user activity tracking

Join Operations

KStream-KStream Join

// Join two streams on key
KStream<String, EnrichedOrder> enrichedOrders = orders
    .join(
        userProfiles,
        (order, profile) -> new EnrichedOrder(order, profile),
        Joined.with(Serdes.String(), new OrderSerde(), new ProfileSerde())
    );

KStream-KTable Join

// Enrich stream with table lookup
KStream<String, EnrichedOrder> enrichedOrders = orders
    .join(
        orderSummaryTable,
        (order, summary) -> new EnrichedOrder(order, summary),
        Joined.with(Serdes.String(), new OrderSerde(), new SummarySerde())
    );

Left Join

// Include records even if no match
KStream<String, EnrichedOrder> enrichedOrders = orders
    .leftJoin(
        userProfiles,
        (order, profile) -> new EnrichedOrder(order, profile),
        Joined.with(Serdes.String(), new OrderSerde(), new ProfileSerde())
    );

ℹ️

Key Insight: Join windows are required for KStream-KStream joins. KStream-KTable joins are lookup-based and don't require windows.

Exactly-Once in Streams

// Enable exactly-once processing
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
          StreamsConfig.EXACTLY_ONCE_V2);

// What happens:
// 1. Producer transactions enabled automatically
// 2. Consumer offsets committed within transactions
// 3. State store updates atomic with output
// 4. Changelog topics for fault tolerance

Error Handling

// Handle deserialization errors
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          DeserializationExceptionHandler.class);

// Custom exception handler
public class CustomExceptionHandler implements DeserializationExceptionHandler {
    @Override
    public DeserializationHandlerResponse handleDeserializationException(
            ProcessorContext context,
            DeserializationException exception,
            byte[] record,
            String topic) {
        // Log error, send to dead letter topic
        System.err.println("Deserialization failed: " + exception.getMessage());
        
        // Option 1: Continue processing
        return DeserializationHandlerResponse.CONTINUE;
        
        // Option 2: Stop processing
        // return DeserializationHandlerResponse.FAIL;
    }
}

Follow-Up Questions

  1. What is the difference between KStream and KTable?
  2. How does Kafka Streams handle state store fault tolerance?
  3. Explain the difference between tumbling, hopping, and session windows.
  4. What happens during a rebalance in a Kafka Streams application?
  5. How does exactly-once semantics work in Kafka Streams?

Advertisement