Kafka Streams: Real-Time Stream Processing with the DSL
Kafka Streams is a client library for building real-time stream processing applications that consume from and produce to Kafka topics.
Why Kafka Streams?
Key Advantage:
- No separate cluster required
- Runs as an embedded library within your application
- Leverages Kafka's own partitioning for parallelism and fault tolerance
Capabilities:
- Exactly-once semantics β reliable processing
- Stateful aggregations β maintain state across events
- Windowed operations β time-based grouping
- Interactive queries β query state stores directly
Key Insight: For organizations already invested in Kafka, Streams eliminates the operational overhead of maintaining Flink or Spark clusters.
Architecture Diagram
The Kafka Streams Domain Specific Language (DSL) is a high-level API for defining stream processing topologies. The DSL provides operations like map, filter, flatMap, groupBy, windowedBy, aggregate, join, and merge. The DSL compiles into a Processor Topology β a directed graph of processors connected by streams and tables β which is executed by the Kafka Streams runtime.
A stream represents an unbounded sequence of events (inserts). A table represents the latest value for each key (upserts). The stream-table duality states that: (1) a stream can be reduced to a table by applying updates per key, and (2) a table can be represented as a changelog stream of updates. This duality is fundamental to Kafka Streams: KStream is the stream abstraction, KTable is the table abstraction, and GlobalKTable is a fully-replicated table available to all application instances.
A windowed aggregation groups events by time window before applying an aggregate function. Given a stream of events with timestamps, a window specification W defines the grouping. For each event at time t, it belongs to window(s) that contain t. Common window types: (1) Tumbling β fixed, non-overlapping intervals [t, t+w), (2) Sliding β fixed-size, overlapping intervals [t-w, t), (3) Session β dynamic windows based on activity gaps.
Exactly-once semantics guarantee that each event is processed exactly once in a stream processing pipeline. Kafka Streams achieves EOS using: (1) idempotent producers to prevent duplicate writes, (2) transactional producers to atomically write output records and commit input offsets, and (3) consumer isolation level read_committed to ensure only committed records are read. EOS is enabled via processing.guarantee=exactly_once_v2.
Windowed Aggregation Throughput
For a stream with input rate R (events/second), window size W (seconds), and aggregation function f with time complexity O(1) per event, the throughput is: Throughput = R events/second. The state store size for tumbling windows is approximately: State_size = R * W * avg_event_size bytes. For sliding windows with average overlap factor k: State_size = R * W * k * avg_event_size.
State Store Changelog Recovery Time
Recovery time after failure is: Recovery_time = Changelog_lag / R, where Changelog_lag is the number of changelog entries to replay, and R is the changelog replay rate (typically 50K-200K records/second per partition on SSD). To minimize recovery time, enable standby replicas: num.standby.replicas=1 creates a warm standby state store.
Kafka Streams provides exactly-once semantics for the complete input-process-output cycle when processing.guarantee=exactly_once_v2 is configured. The guarantee holds if: (1) the input topic has acks=all and min.insync.replicas>=2, (2) the output topic has replication factor >= 2, (3) the state store changelog topics have replication factor >= 2, and (4) the application does not perform non-deterministic side effects outside Kafka. Formal: For each input record r, the output O(r) is written exactly once to the output topic, and the state store reflects r exactly once.
Key Concepts
| Concept | Description | API |
|---|---|---|
| KStream | Unbounded stream of key-value records (inserts) | StreamsBuilder.stream("topic") |
| KTable | Changelog stream of updates (upserts per key) | StreamsBuilder.table("topic") |
| GlobalKTable | Fully-replicated table available to all instances | StreamsBuilder.globalTable("topic") |
| Grouped | Grouping specification for aggregations | Grouped.with(keySerde, valSerde) |
| TimeWindows | Tumbling window specification | TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) |
| SessionWindows | Session-based windowing with inactivity gap | SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)) |
| Materialized | State store materialization configuration | Materialized.as("store-name") |
| Topology | Directed graph of stream processors | builder.build() |
| State Store | Local key-value store for stateful operations | RocksDB (default), in-memory |
| Changelog Topic | Replicated state store backup in Kafka | Auto-created per state store |
| Processor API | Low-level API for custom processors | ProcessorSupplier, FixedKeyProcessor |
| Interactive Queries | Query state stores from outside the app | KafkaStreams.store() |
| Cogroup | Aggregate multiple grouped streams together | Cogrouped.with() |
| Branch | Split a stream based on predicates | KStream.branch((k,v) -> condition) |
| Peek | Side-effect without modifying stream | KStream.peek(record -> ...) |
| Repartition | Force re-keying for co-partitioned joins | KStream.repartition() |
| Suppress | Buffer updates until window closes | Suppressed.untilTimeLimit() |
| Caching | Local caching of state store updates | cache.max.bytes.buffering |
- Define the topology: Create a
StreamsBuilderand source aKStreamfrom the input topic. - Rekey if needed: If the join or aggregation key differs from the source key, use
selectKey()andrepartition()to ensure co-partitioning. - Group by key: Use
groupBy()orgroupByKey()to create aKGroupedStream. - Define the window: Specify window type (tumbling, sliding, session) and size using
TimeWindows,SlidingWindows, orSessionWindows. - Materialize the state store: Configure
Materialized.as("store-name")with key/value Serdes and optional retention. - Apply aggregation: Use
aggregate(),reduce(), orcount()to compute the windowed result. - Configure EOS: Set
processing.guarantee=exactly_once_v2andacks=allon output topics. - Handle grace period: Define
grace()to tolerate late-arriving events within a bounded window. - Test with TopologyTestDriver: Validate the topology with unit tests before deploying.
- Monitor state store size: Use JMX metrics (
state-store-size-bytes) to detect state growth anomalies.
Production Code
Windowed Session Aggregation
from confluent_kafka import Producer
from kafka import KafkaConsumer
from kafka.streams import StreamsBuilder, KStream, KTable, TimeWindows, SessionWindows
from kafka.streams.kstream import Materialized, Grouped, Consumed, Produced
import json
import logging
logger = logging.getLogger(__name__)
# This example uses Java Kafka Streams via Py4J bridge
# For production Python Kafka Streams, use faust or bytewax
# This demonstrates the conceptual topology
def build_session_aggregation_topology():
"""Build a Kafka Streams topology for session-based click aggregation."""
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
streams_builder = gateway.entry_point.StreamsBuilder()
# Source: click events from Kafka topic
click_stream = streams_builder.stream(
"click-events",
gateway.entry_point.Consumed.with(
gateway.entry_point.Serdes.String(),
gateway.entry_point.Serdes.String(),
),
)
# Rekey by user_id for co-partitioned aggregation
keyed_stream = click_stream.select_key(
lambda key, value: json.loads(value)["user_id"]
)
# Group by user_id
grouped = keyed_stream.group_by_key(
gateway.entry_point.Grouped.with(
gateway.entry_point.Serdes.String(),
gateway.entry_point.Serdes.String(),
)
)
# Session window: 30-min inactivity gap, 24-hour max duration
session_window = gateway.entry_point.SessionWindows.of_with_no_grace(
gateway.entry_point.Duration.of_minutes(30)
)
# Aggregate clicks per session
session_counts = grouped.windowed_by(
session_window
).count(
"session-click-counts",
gateway.entry_point.Materialized.as(
"session-click-counts-store"
).with_key_serde(gateway.entry_point.Serdes.String())
.with_value_serde(gateway.entry_point.Serdes.Long())
)
# Convert to stream and produce to output topic
session_counts.to_stream().to(
"session-click-aggregations",
gateway.entry_point.Produced.with(
gateway.entry_point.Serdes.String(),
gateway.entry_point.Serdes.Long(),
),
)
topology = streams_builder.build()
logger.info(f"Topology:\n{topology.describe()}")
return topology
# Python-native approach using bytewax (Kafka Streams equivalent)
from bytewax.connectors.kafka import KafkaSource, KafkaSink
from bytewax.dataflow import Dataflow
from bytewax.window import SessionWindow, TumblingWindow, SystemTimeAssigner
from bytewax.operators import max_all, min_all
import json
from datetime import timedelta
def build_bytewax_session_aggregation():
"""Build a bytewax Dataflow for session-based aggregation (Python-native)."""
from bytewax.inputs import KafkaSourceConfig
from bytewax.outputs import KafkaSinkConfig
# Define the Dataflow
flow = Dataflow("click-session-aggregation")
# Kafka source: consume click events
flow.input(
"kafka-in",
KafkaSourceConfig(
brokers=["kafka-broker-1:9092"],
topic="click-events",
group_id="session-aggregation",
),
)
# Parse JSON and extract user_id
flow.map("parse", lambda msg: json.loads(msg.value))
flow.key_on("key-by-user", lambda event: event["user_id"])
# Session window with 30-min gap
session_win = SessionWindow(gap=timedelta(minutes=30))
flow.window("session-window", session_win, assigner=SystemTimeAssigner())
# Count events per session
flow.count("count-per-session")
# Kafka sink: produce aggregated results
flow.output(
"kafka-out",
KafkaSinkConfig(
brokers=["kafka-broker-1:9092"],
topic="session-click-aggregations",
),
)
return flow
Stream-Table Join with Stateful Enrichment
import json
import logging
from typing import Dict, Optional
from dataclasses import dataclass, asdict
logger = logging.getLogger(__name__)
@dataclass
class OrderEvent:
order_id: str
customer_id: str
product_id: str
amount: float
timestamp: str
@dataclass
class CustomerProfile:
customer_id: str
name: str
tier: str
credit_limit: float
class StreamTableJoinProcessor:
"""
Simulates a Kafka Streams stream-table join for order enrichment.
In Kafka Streams, this would be implemented as:
KStream<order_id, OrderEvent> orders = ...;
KTable<customer_id, CustomerProfile> customers = ...;
KStream<order_id, EnrichedOrder> enriched = orders.join(
customers,
(order, profile) -> enrich(order, profile),
Joined.with(orderKeySerde, orderSerde, profileSerde)
);
Parameters:
None (state is managed internally)
State:
customer_table (Dict): Materialized KTable of customer profiles
order_stream (List): Processed order events (for demonstration)
"""
def __init__(self):
self.customer_table: Dict[str, CustomerProfile] = {}
self.order_stream: list = []
def process_customer_update(self, record: dict) -> None:
"""
Process customer profile updates into the materialized table.
In Kafka Streams, this happens automatically when consuming from
a customer-profiles topic with KTable.
Parameters:
record (dict): Customer profile update event
"""
profile = CustomerProfile(**record)
self.customer_table[profile.customer_id] = profile
logger.debug(f"Updated customer table: {profile.customer_id}")
def process_order_event(self, event: dict) -> Optional[dict]:
"""
Join order event with customer table for enrichment.
Parameters:
event (dict): Order event from Kafka stream
Returns:
dict: Enriched order with customer profile data
"""
order = OrderEvent(**event)
customer = self.customer_table.get(order.customer_id)
if customer is None:
logger.warning(
f"No customer profile for {order.customer_id}, "
f"order {order.order_id} will be processed without enrichment"
)
enriched = {
**asdict(order),
"customer_name": "UNKNOWN",
"tier": "unknown",
"credit_limit": 0.0,
"within_credit_limit": False,
}
else:
enriched = {
**asdict(order),
"customer_name": customer.name,
"tier": customer.tier,
"credit_limit": customer.credit_limit,
"within_credit_limit": order.amount <= customer.credit_limit,
}
self.order_stream.append(enriched)
return enriched
def get_table_stats(self) -> dict:
"""
Return statistics about the materialized customer table.
Returns:
dict: Total customer count and tier distribution
"""
return {
"total_customers": len(self.customer_table),
"tiers": {
tier: sum(1 for p in self.customer_table.values() if p.tier == tier)
for tier in set(p.tier for p in self.customer_table.values())
}
}
# Example usage simulating stream-table join
processor = StreamTableJoinProcessor()
# Simulate customer profile updates (table side)
customer_updates = [
{"customer_id": "C001", "name": "Alice", "tier": "gold", "credit_limit": 10000.0},
{"customer_id": "C002", "name": "Bob", "tier": "silver", "credit_limit": 5000.0},
{"customer_id": "C003", "name": "Charlie", "tier": "bronze", "credit_limit": 1000.0},
]
for update in customer_updates:
processor.process_customer_update(update)
# Simulate order events (stream side)
order_events = [
{"order_id": "ORD-1", "customer_id": "C001", "product_id": "P100", "amount": 299.99, "timestamp": "2024-01-15T10:00:00Z"},
{"order_id": "ORD-2", "customer_id": "C002", "product_id": "P200", "amount": 750.00, "timestamp": "2024-01-15T10:01:00Z"},
{"order_id": "ORD-3", "customer_id": "C004", "product_id": "P300", "amount": 50.00, "timestamp": "2024-01-15T10:02:00Z"}, # Unknown customer
]
for event in order_events:
result = processor.process_order_event(event)
print(json.dumps(result, indent=2))
print(f"\nTable stats: {processor.get_table_stats()}")
Kafka Streams vs Apache Flink: Kafka Streams is a client library (runs inside your app); Flink is a distributed processing cluster. Choose Kafka Streams when: (1) your data is already in Kafka, (2) you need simple aggregations and joins, (3) you want minimal operational overhead. Choose Flink when: (1) you need complex event processing (CEP), (2) you need exactly-once across non-Kafka sources/sinks, (3) you need event-time processing with out-of-order handling beyond Kafka Streams' capabilities.
State Store Backups: Kafka Streams automatically backs up state stores to changelog topics. For faster recovery, configure num.standby.replicas=1 to maintain warm standby copies on other brokers. Monitor changelog-lag JMX metric to detect recovery delays. For very large state stores, consider Incremental Cooperative Rebalancing to minimize rebalance pauses.
- Kafka Streams is an embedded library, not a separate cluster. It runs inside your application process.
- Stream-table duality: KStream = inserts, KTable = upserts. Use KTable for entity state, KStream for events.
- Session windows group events by activity gaps; tumbling windows use fixed intervals; sliding windows use overlapping intervals.
- Exactly-once is achieved via idempotent producers + transactional writes + read_committed consumers.
- State stores are backed by changelog topics for fault tolerance. Standby replicas speed up recovery.
- Monitor consumer lag, state store size, and changelog lag for production health.
- Use TopologyTestDriver for unit testing Kafka Streams topologies without a running cluster.
Best Practices
- Set
processing.guarantee=exactly_once_v2for production workloads that require exactly-once semantics. - Configure
num.standby.replicas=1to maintain warm standby state stores and reduce recovery time after failures. - Monitor state store size via JMX metrics. Alert on unexpected growth that indicates memory pressure.
- Use TopologyTestDriver to unit test stream processing topologies without requiring a running Kafka cluster.
- Set grace periods on windowed operations to handle late-arriving events within bounded timeframes.
- Co-partition input topics for joins β both sides of a join must have the same number of partitions.
- Use
Suppress.untilTimeLimit()to buffer window updates and emit only the final result. - Monitor changelog lag to detect state store recovery delays after broker failures.
- Set
max.task.idle.msto handle data skew in partitions β idle tasks will wait for straggler partitions. - Version your state stores using
StoreBuilder.withVersion()to support rolling upgrades without state migration.
Kafka Streams vs Alternatives
| Feature | Kafka Streams | Apache Flink | Spark Structured Streaming | Bytewax |
|---|---|---|---|---|
| Deployment | Embedded library | Cluster | Cluster | Embedded library |
| Operational Overhead | Minimal | High | High | Minimal |
| Exactly-Once | Yes (EOS v2) | Yes (2PC) | Yes (checkpointing) | Yes (recovery) |
| State Backend | RocksDB | RocksDB | RocksDB | Custom |
| Window Types | Tumbling, Sliding, Session | Tumbling, Sliding, Session, Custom | Tumbling, Sliding, Session | Tumbling, Session |
| Interactive Queries | Yes | Limited | No | No |
| Language Support | Java, Scala | Java, Scala, Python | Python, Scala, Java | Python |
| Best For | Kafka-native apps | Complex event processing | Unified batch+stream | Python-native streams |
See Also
- 019 - Apache Kafka: Topics, Producers, and Consumers - Kafka fundamentals and topic design
- 022 - Spark Structured Streaming - Alternative streaming engine
- 023 - Batch vs Streaming: Lambda, Kappa, and Architecture Trade-offs - Architecture pattern comparison
- 020 - Kafka Streams - This lesson
- 030 - Capstone Project: Real-Time Streaming Pipeline - Applying Kafka Streams in production