Kafka Streams & Connect API: Stream Processing and Data Integration
Architecture Diagram: Kafka Streams Topology
Architecture Diagram: Kafka Connect Framework
Architecture Diagram: Stream Processing Patterns
Architecture Diagram: Kafka Streams Thread Model
Architecture Diagram: Connect Task Lifecycle
Formal Definitions
DfStream Processing Topology
A stream processing topology is a directed acyclic graph (DAG) of processors where nodes represent operations (source, transform, sink) and edges represent data flow. The topology defines how data moves from source topics through transformations to sink topics. Kafka Streams automatically partitions the topology across instances by assigning different input partitions to different instances.
DfWindowing
Windowing is the grouping of records by time into finite buckets for aggregation. Kafka Streams supports three window types: tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping with configurable advance), and session windows (activity-based, variable-size with inactivity gap). Windowing enables time-bounded aggregations on unbounded streams.
DfKTable
A KTable is an abstraction of a changelog stream where each key has at most one value. It represents the latest state for each key, enabling point lookups and joins. Unlike KStream (append-only), KTable updates are upserts: a new value for a key replaces the previous one. KTable state is backed by a local RocksDB store with a changelog topic for fault tolerance.
DfGlobalKTable
A GlobalKTable is a KTable that is fully replicated on every Streams instance. Unlike regular KTables (partitioned), GlobalKTables are available for joins without repartitioning. They are ideal for reference data (dimension tables) that is small enough to fit in memory on each instance. GlobalKTables are not partitioned and do not scale with partition count.
DfSingle Message Transform (SMT)
A Single Message Transform is a lightweight, stateless transformation applied to individual records as they flow through Kafka Connect. SMTs are chained and applied in order before the converter serializes the record. Common SMTs include InsertField, ReplaceField, MaskField, TimestampRouter, RegexRouter, and Flatten. SMTs modify records in-flight without requiring custom code.
DfDead Letter Queue (DLQ)
A dead letter queue is a Kafka topic where records are routed when they fail processing in Kafka Connect. DLQs are configured via errors.tolerance=all and errors.deadletterqueue.topic.name. Each record in the DLQ retains the original topic, partition, offset, headers, and error information for later inspection and reprocessing.
Key Formulas
Here,
- =Producer send latency (includes acks)
- =Consumer fetch/poll round-trip
- =Record processing time
- =Offset commit latency
Window Aggregation (Tumbling)
Here,
- =Tumbling window [t_i, t_i + s)
- =Window size (e.g., 5 minutes)
- =Aggregation function applied to each event e
- =Aggregation operator (count, sum, reduce, etc.)
Window Aggregation (Hopping)
Here,
- =Window size
- =Advance interval (a < s for overlap)
- =Window start time, t_i = t_0 + i \cdot a
Session Window Merging
Here,
- =Session window j after merging
- =Individual event window
- =Inactivity gap threshold
Kafka Streams uses event-time windowing by default (via WallclockTimestampExtractor or CustomTimestampExtractor). Watermarks track progress of event time; late-arriving data beyond the grace period is dropped. Configure grace period with TimeWindows.ofSizeWithNoGrace() or TimeWindows.ofSizeAndGrace().
For exactly-once stream processing, set processing.guarantee=exactly_once_v2. This automatically configures idempotent producers, transactional writes, and read_committed consumers within the Streams library. EOS v2 reduces latency by ~30% compared to v1.
Kafka Streams maintains local state in RocksDB with changelog topics for fault tolerance. If an instance fails, state is rebuilt from the changelog topic. The cache.max.bytes.buffering setting controls memory for write-back caching; larger caches reduce changelog writes but increase recovery time.
ThStreams Parallelism Theorem
The parallelism of a Kafka Streams application is determined by:
- Number of stream threads (
num.stream.threads, default 1) - Number of input partitions assigned to each thread
- Number of tasks = number of input partitions (one task per partition)
Total parallelism = num.stream.threads x number of instances, bounded by total input partitions.
ThWindowed Aggregation Correctness
With event-time windowing and grace period, the aggregation result for window W_i is correct if:
- All events with timestamp t β W_i arrive before watermark exceeds W_i.end + grace
- Late events beyond grace period are discarded (configurable)
- For hopping windows with advance a < size s, events may appear in multiple windows
Detailed Explanation
Kafka Streams Architecture
Kafka Streams is a client library for building stream processing applications on top of Kafka. Unlike traditional stream processing frameworks (Apache Flink, Apache Spark Streaming), Kafka Streams runs within the application process, requiring no separate cluster management. This embedded architecture simplifies deployment and scaling, as each instance of the application is a stream processor that participates in the consumer group for the input topics.
Topology and Processor DAG: A Kafka Streams application defines a processing topology as a directed acyclic graph (DAG) of processors. Each processor is either a source (reads from topics), a transformation (map, filter, aggregate), or a sink (writes to topics). The topology is automatically distributed across instances by assigning different partitions to different instances. The StreamsBuilder DSL automatically constructs this topology, while the Processor API allows building custom processor nodes with fine-grained control over state management and scheduling.
Stateful Operations: Kafka Streams supports several stateful operations that require local state storage. These include aggregations (count, reduce, aggregate), joins (KStream-KTable, KStream-KStream, KTable-KTable), and windowing (tumbling, hopping, session windows). State is stored in a local RocksDB instance, backed by a changelog topic in Kafka for fault tolerance. The changelog topic ensures that state can be rebuilt from Kafka if a node fails.
Windowing Strategies: Event-time windowing groups records by their event timestamp rather than processing time. Kafka Streams supports tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping), and session windows (activity-based, variable-size). Watermarks track the progress of event time, allowing the system to handle late-arriving data.
Kafka Connect Architecture
Kafka Connect provides a framework for moving data between Kafka and external systems. Connectors are plugins that implement either SourceConnector (reading from external systems and writing to Kafka) or SinkConnector (reading from Kafka and writing to external systems) interfaces. The Connect framework handles offset management, fault tolerance, and parallelism.
Connector Configuration and transforms: Connectors are configured via JSON configuration files or REST API. The framework supports Single Message Transforms (SMTs) for in-flight record transformation, converters for serialization format handling, and dead letter queues for error handling.
Exactly-Once Semantics in Connect: Kafka Connect supports exactly-once semantics through the exactly.once.support configuration. When enabled, Sink connectors must implement exactly-once semantics in their put() and preCommit() methods.
Key Concepts Table
| Component | Description | Key Configurations | Use Cases |
|---|---|---|---|
| KStream | Unbounded stream of records | processing.guarantee | Event processing, real-time analytics |
| KTable | Changelog stream (latest value per key) | state.dir, cache.max.bytes.buffering | Materialized views, join tables |
| GlobalKTable | Full copy on every instance | num.stream.threads | Reference data, dimension tables |
| Topology | DAG of processors | application.id, client.id | Processing pipeline definition |
| State Store | Local RocksDB persistence | rocksdb.block.cache.size | Stateful operations, windowing |
| Changelog Topic | Fault-tolerant state backup | changelog.config | State recovery, fault tolerance |
| Source Connector | Read from external systems | tasks.max, connector.class | CDC, file ingestion |
| Sink Connector | Write to external systems | topics, tasks.max | Data export, search indexing |
| Converter | Serialization format handling | key.converter, value.converter | JSON, Avro, Protobuf |
| SMT | Single Message Transform | transforms, transforms.* | Record modification, routing |
| DLQ | Dead Letter Queue | errors.deadletterqueue.topic.name | Error handling, reprocessing |
Code Examples
Kafka Streams DSL Application
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class OrderProcessingStream {
public static void main(String[] args) {
Properties props = new Properties();
// Application identity; used as consumer group ID
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
// Bootstrap servers for initial metadata discovery
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Default serializers/deserializers for keys and values
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Exactly-once semantics (v2 for improved performance)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Automatically configures idempotent producers, transactional writes
// Reduces latency by ~30% vs EOS v1
// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Directory for RocksDB state stores; use SSD for best performance
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// How often offsets are committed to __consumer_offsets
// More frequent = lower latency, more overhead
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// 10MB cache for write-back buffering
// Larger cache = fewer changelog writes, slower recovery
// Thread and task configuration
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Number of stream threads per instance
// Each thread processes a subset of tasks
StreamsBuilder builder = new StreamsBuilder();
// Source: Raw order events
KStream<String, String> orders = builder.stream("raw-orders",
Consumed.with(Serdes.String(), Serdes.String())
.withTimestampExtractor(new WallclockTimestampExtractor()));
// WallclockTimestampExtractor uses processing time (system clock)
// For event-time: use ExtractRecordTimestamp (from message timestamp)
// Source: User data (KTable for point lookups)
KTable<String, String> users = builder.table("user-data",
Consumed.with(Serdes.String(), Serdes.String()));
// KTable enables point lookups by key for joins
// Transform and enrich
KStream<String, Order> enrichedOrders = orders
.filter((key, value) -> value != null && !value.isEmpty())
// Filter out null/empty records
.mapValues(value -> parseOrder(value))
// MapValues preserves key, transforms value
.filter((key, order) -> order.getAmount() > 0)
// Filter invalid orders
.join(users,
(order, user) -> order.withUser(user),
// Join function: combine order with user data
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
// Joined specifies key, value, and foreign value serdes
// Branch by order type
Map<String, KStream<String, Order>> branches = enrichedOrders
.branch((key, order) -> order.getType().equals("PREMIUM"),
Branched.withFunction((stream) -> {
stream.mapValues(o -> o.applyDiscount(0.10))
.to("premium-orders", Produced.with(Serdes.String(), orderSerde));
return stream;
}, "premium"))
.branch((key, order) -> order.getType().equals("STANDARD"),
Branched.withFunction((stream) -> {
stream.mapValues(o -> o.applyDiscount(0.0))
.to("standard-orders", Produced.with(Serdes.String(), orderSerde));
return stream;
}, "standard"));
// Branched allows naming branches for monitoring
// Aggregation: Count orders per user in 1-hour tumbling windows
KTable<Windowed<String>, Long> orderCounts = enrichedOrders
.groupByKey()
// Group by key (user) for aggregation
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
// 1-hour tumbling window, no grace period
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withValueSerde(Serdes.Long()));
// Materialized creates a named state store
// Aggregate: Total revenue per user in 5-minute hopping windows
KTable<Windowed<String>, Double> revenueByUser = enrichedOrders
.mapValues(order -> order.getAmount())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
// Hopping: 5-minute windows, advance every 1 minute (overlapping)
.reduce(Double::sum, Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
"revenue-store")
.withValueSerde(Serdes.Double()));
// Session windowing for activity tracking
KTable<Windowed<String>, Long> sessionCounts = enrichedOrders
.map((key, order) -> KeyValue.pair(order.getUserId(), order))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
// Session window: merges events within 30-minute inactivity gap
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(
"session-store")
.withValueSerde(Serdes.Long()));
// Sink: Write enriched orders
enrichedOrders
.mapValues(order -> order.toJson())
.to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
StreamsTopology topology = builder.build();
System.out.println(topology.describe());
// Print topology for debugging
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close(Duration.ofSeconds(30));
// Close waits up to 30s for in-flight records
streams.cleanUp();
// CleanUp removes local state stores (optional)
}));
}
private static Order parseOrder(String json) {
// JSON parsing implementation
return new Order(); // Placeholder
}
}
Kafka Streams with Processor API (Custom Processor)
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;
public class FraudDetectionProcessor implements Processor<String, OrderEvent, String, Alert> {
private ProcessorContext<String, Alert> context;
private KeyValueStore<String, UserTransactionSummary> stateStore;
private WindowStore<String, OrderEvent> recentOrders;
@Override
public void init(ProcessorContext<String, Alert> context) {
this.context = context;
// Get reference to state store (must be added to topology)
this.stateStore = context.getStateStore("user-transaction-summary");
this.recentOrders = context.getStateStore("recent-orders-window");
// Schedule periodic punctuation for cleanup
context.schedule(Duration.ofMinutes(5),
PunctuationType.WALL_CLOCK_TIME,
this::cleanupOldOrders);
// Punctuation runs periodically for housekeeping tasks
context.schedule(Duration.ofMinutes(1),
PunctuationType.WALL_CLOCK_TIME,
this::checkVelocity);
}
@Override
public void process(Record<String, OrderEvent> record) {
String userId = record.value().getUserId();
OrderEvent event = record.value();
// Store in window store for velocity checks
recentOrders.put(record.key(), event,
Instant.ofEpochMilli(event.getTimestamp()));
// Window store enables range queries by time
// Get or initialize user summary
UserTransactionSummary summary = stateStore.get(userId);
if (summary == null) {
summary = new UserTransactionSummary(userId);
}
// Fraud detection logic
Alert alert = null;
// Rule 1: Amount threshold
if (event.getAmount() > summary.getAverageAmount() * 3) {
alert = new Alert(userId, "HIGH_AMOUNT",
"Transaction amount 3x above average", event);
}
// Rule 2: Velocity check (too many transactions)
long recentCount = countRecentTransactions(userId, Duration.ofMinutes(5));
if (recentCount > summary.getTransactionsPerMinute() * 5) {
alert = new Alert(userId, "HIGH_VELOCITY",
"Transaction velocity 5x above normal", event);
}
// Rule 3: Geographic anomaly
if (summary.getLastLocation() != null &&
!summary.getLastLocation().equals(event.getLocation())) {
alert = new Alert(userId, "GEO_ANOMALY",
"Location changed from " + summary.getLastLocation() +
" to " + event.getLocation(), event);
}
// Update summary
summary.update(event);
stateStore.put(userId, summary);
// Forward alert if fraud detected
if (alert != null) {
context.forward(new Record<>(userId, alert, record.timestamp()));
}
}
private long countRecentTransactions(String userId, Duration window) {
Instant from = Instant.now().minus(window);
Instant to = Instant.now();
long count = 0;
try (KeyValueIterator<Windowed<String>, OrderEvent> iterator =
recentOrders.fetch(userId, from, to)) {
while (iterator.hasNext()) {
count++;
iterator.next();
}
}
return count;
}
private void cleanupOldOrders(Instant timestamp) {
Instant cutoff = timestamp.minus(Duration.ofHours(1));
try (KeyValueIterator<Windowed<String>, OrderEvent> iterator =
recentOrders.all()) {
while (iterator.hasNext()) {
KeyValue<Windowed<String>, OrderEvent> entry = iterator.next();
if (entry.key.window().endTime().isBefore(cutoff)) {
recentOrders.delete(entry.key);
}
}
}
}
private void checkVelocity(Instant timestamp) {
// Periodic velocity check logic
}
@Override
public void close() {}
}
Kafka Connect Source Connector Configuration (MySQL CDC)
{
"name": "mysql-cdc-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary.example.com",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/opt/kafka/secrets/mysql.properties:password}",
"database.server.id": "184054",
"database.server.name": "ecommerce",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.users,ecommerce.products",
"topic.prefix": "cdc",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 12,
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.retention.ms": 604800000,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route,unwrap,addTimestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "ecommerce\\.(.*)",
"transforms.route.replacement": "cdc-$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"heartbeat.interval.ms": "30000",
"poll.interval.ms": "1000",
"max.batch.size": "8192",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"snapshot.select.statement.overrides": "ecommerce.orders",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-mysql-cdc",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.log.enable": true,
"errors.log.include.messages": true,
"tasks.max": "3"
}
}
Kafka Connect Sink Connector Configuration (Elasticsearch)
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"connection.username": "${file:/opt/kafka/secrets/es.properties:username}",
"connection.password": "${file:/opt/kafka/secrets/es.properties:password}",
"topics": "enriched-orders,enriched-users,enriched-products",
"key.ignore": false,
"schema.ignore": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "extractKey,indexByDate,typeRoute",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "order_id",
"transforms.indexByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.indexByDate.timestamp.format": "YYYY-MM-dd",
"transforms.indexByDate.topic.format": "${topic}-$${timestamp}",
"transforms.typeRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.typeRoute.regex": "enriched-(.*)",
"transforms.typeRoute.replacement": "kafka-$1",
"batch.size": 200,
"max.buffered.records": 500,
"linger.ms": 1,
"flush.timeout.ms": 10000,
"max.retries": 5,
"retry.backoff.ms": 100,
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"write.method": "upsert",
"document.id.strategy": "record_key",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-elasticsearch",
"errors.deadletterqueue.topic.replication.factor": 3,
"tasks.max": "3"
}
}
Kafka Streams with Schema Registry (Avro Serde)
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
public class StreamsSchemaRegistryExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "schema-registry-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
// Configure Avro serde
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", "http://localhost:8081");
serdeConfig.put("basic.auth.credentials.source", "USER_INFO");
serdeConfig.put("basic.auth.user.info", "admin:secret");
GenericAvroSerde avroSerde = new GenericAvroSerde();
avroSerde.configure(serdeConfig, false);
// false = not a key serde (value serde)
// Source stream with Avro deserialization
KStream<String, GenericRecord> orders = builder.stream("order-events",
Consumed.with(Serdes.String(), avroSerde));
// Transform and filter
KStream<String, GenericRecord> highValueOrders = orders
.filter((key, order) -> {
double amount = (double) order.get("amount");
return amount > 1000;
})
.mapValues(order -> {
// Create new Avro record with evolved schema
Schema newSchema = SchemaBuilder.builder("com.example")
.record("HighValueOrder")
.fields()
.name("orderId")
.type(order.getSchema().getField("orderId").schema())
.noDefault()
.name("userId")
.type(order.getSchema().getField("userId").schema())
.noDefault()
.name("amount")
.type(order.getSchema().getField("amount").schema())
.noDefault()
.name("verified")
.type()
.booleanType()
.booleanDefault(false)
.endRecord();
GenericRecord highValueOrder = new GenericData.Record(newSchema);
highValueOrder.put("orderId", order.get("orderId"));
highValueOrder.put("userId", order.get("userId"));
highValueOrder.put("amount", order.get("amount"));
highValueOrder.put("verified", false);
return highValueOrder;
});
// Sink with Avro serialization
highValueOrders.to("high-value-orders",
Produced.with(Serdes.String(), avroSerde));
// Aggregation with windowing
KTable<Windowed<String>, Long> orderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withValueSerde(Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Bash: Kafka Connect REST API Management
#!/bin/bash
# Kafka Connect REST API management
CONNECT_URL="http://localhost:8083"
echo "=== Kafka Connect Management ==="
# List all connectors
echo "--- Active Connectors ---"
curl -s "$CONNECT_URL/connectors" | jq .
# Get connector status
echo ""
echo "--- MySQL CDC Source Status ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/status" | jq .
# Create a new connector
echo ""
echo "--- Creating S3 Sink Connector ---"
curl -s -X POST "$CONNECT_URL/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-kafka-data",
"s3.region": "us-east-1",
"topics": "enriched-orders",
"tasks.max": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": '\''YYYY/MM/dd'\'',
"partition.duration.ms": "3600000",
"locale": "en-US",
"timezone": "UTC",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"schema.compatibility": "BACKWARD"
}
}' | jq .
# Restart a connector
echo ""
echo "--- Restarting MySQL CDC Source ---"
curl -s -X POST "$CONNECT_URL/connectors/mysql-cdc-source/restart" | jq .
# Delete a connector
echo ""
echo "--- Deleting S3 Sink Connector ---"
curl -s -X DELETE "$CONNECT_URL/connectors/s3-sink"
# Get connector config
echo ""
echo "--- Connector Configuration ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/config" | jq .
# Get connector tasks
echo ""
echo "--- Connector Tasks ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/tasks" | jq .
# Pause/Resume connector
echo ""
echo "--- Pausing Connector ---"
curl -s -X PUT "$CONNECT_URL/connectors/mysql-cdc-source/pause"
echo ""
echo "--- Resuming Connector ---"
curl -s -X PUT "$CONNECT_URL/connectors/mysql-cdc-source/resume"
Performance Metrics
| Metric | Kafka Streams (single instance) | Kafka Streams (4 instances) | Kafka Connect (distributed) |
|---|---|---|---|
| Throughput (msg/sec) | 100K | 400K | 300K |
| Latency (p99) | 10ms | 20ms | 50ms |
| State Store Size | 1GB | 1GB per instance | N/A |
| Memory (JVM Heap) | 4GB | 4GB per instance | 8GB per worker |
| CPU Usage | 40% | 30% per instance | 50% per worker |
| Changelog Topic Throughput | 50K msg/sec | 200K msg/sec | N/A |
| Rebalance Time | 30s | 60s | 120s |
| Startup Time | 10s | 15s | 30s |
| Window Type | Throughput | Latency | State Size | Use Case |
|---|---|---|---|---|
| Tumbling | High | Low | Small | Fixed-interval aggregation |
| Hopping | Medium | Medium | Large | Overlapping analysis |
| Session | Medium | Medium | Variable | Activity tracking |
Best Practices
-
Application ID and Instance Management: Use unique
application.idper stream processing application. Each instance gets a uniqueclient.idfor monitoring. Scale by adding more instances to the same consumer group. -
State Store Optimization: Configure
cache.max.bytes.bufferingto balance memory usage vs. write amplification. Enable RocksDB bloom filters (rocksdb.block.cache.size) for faster lookups. Userocksdb.write.buffer.sizeto tune write performance. -
Windowing Configuration: Choose window type based on use case: tumbling for fixed intervals, hopping for overlapping analysis, session for activity tracking. Configure grace periods to handle late events. Use
TimeWindows.ofSizeWithNoGrace()when you want to reject late events. -
Exactly-Once Semantics: Enable
processing.guarantee=exactly_once_v2for critical applications. Understand that EOS increases latency and reduces throughput. Use idempotent sink operations and transactional producers. -
Connect Task Scaling: Set
tasks.maxbased on the number of partitions for source connectors and external system capacity. Monitor connector health via REST API. Use dead letter queues for error handling instead of failing tasks. -
Converter Selection: Use Avro with Schema Registry for schema evolution and compatibility. JSON converters are simpler but lack schema enforcement. Protobuf is efficient but requires additional tooling.
-
SMT Ordering: Apply SMTs in the correct order (e.g., route before transform). Limit the number of SMTs to avoid performance overhead. Use custom SMTs only when built-in options are insufficient.
-
Monitoring: Track
commit-latency-avg,poll-latency-avg,process-latency-avgfor Streams. Trackconnector-failed-task-count,connector-total-task-countfor Connect. Use Confluent Control Center or JMX metrics. -
Error Handling: Use
errors.tolerance=allwith dead letter queues for non-critical failures. Implement circuit breakers for external system dependencies. Monitor DLQ size to detect systemic issues. -
Testing: Use
TopologyTestDriverfor unit testing Kafka Streams topologies. Use Testcontainers for integration testing with real Kafka and external systems. Test with realistic data volumes and edge cases.
Key Takeaways:
- Kafka Streams runs embedded in the application; no separate cluster needed β scale by adding instances to the same consumer group
- End-to-end latency = produce + poll + process + commit; optimize each component for real-time requirements
- Tumbling windows partition time into non-overlapping buckets; hopping windows allow overlap; session windows are activity-based
- KTable provides latest-value-per-key semantics for joins and materialized views
- Exactly-once via processing.guarantee=exactly_once_v2 handles idempotent production + transactional offsets automatically
- Kafka Connect handles offset management and parallelism; set tasks.max based on source partitions or sink capacity
See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Exactly-Once Semantics (kafka/04) | Schema Registry (kafka/05)
See Also
- Kafka Architecture β Core Kafka architecture
- Producer Consumer Patterns β Producer and consumer patterns
- Exactly-Once Semantics β Exactly-once processing guarantees
- Kafka Streams Intro β Kafka streams fundamentals