πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Streams & Connect API: Stream Processing and Data Integration

🟒 Free Lesson

Advertisement

Kafka Streams & Connect API: Stream Processing and Data Integration

Kafka Streams TopologySourceraw-eventsKStreamMap, FilterFilterisValid()JoinKStream-KTableAggregatewindowedBySinkenrichedTumbling WindowFixed, non-overlappingHopping WindowFixed, overlappingSession WindowActivity-based gapsSliding WindowContinuous, time-basedStream processing: Source β†’ Process β†’ Sink with windowed aggregations

Architecture Diagram: Kafka Streams Topology

Architecture Diagram: Kafka Connect Framework

Architecture Diagram: Stream Processing Patterns

Architecture Diagram: Kafka Streams Thread Model

Architecture Diagram: Connect Task Lifecycle

Formal Definitions

DfStream Processing Topology

A stream processing topology is a directed acyclic graph (DAG) of processors where nodes represent operations (source, transform, sink) and edges represent data flow. The topology defines how data moves from source topics through transformations to sink topics. Kafka Streams automatically partitions the topology across instances by assigning different input partitions to different instances.

DfWindowing

Windowing is the grouping of records by time into finite buckets for aggregation. Kafka Streams supports three window types: tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping with configurable advance), and session windows (activity-based, variable-size with inactivity gap). Windowing enables time-bounded aggregations on unbounded streams.

DfKTable

A KTable is an abstraction of a changelog stream where each key has at most one value. It represents the latest state for each key, enabling point lookups and joins. Unlike KStream (append-only), KTable updates are upserts: a new value for a key replaces the previous one. KTable state is backed by a local RocksDB store with a changelog topic for fault tolerance.

DfGlobalKTable

A GlobalKTable is a KTable that is fully replicated on every Streams instance. Unlike regular KTables (partitioned), GlobalKTables are available for joins without repartitioning. They are ideal for reference data (dimension tables) that is small enough to fit in memory on each instance. GlobalKTables are not partitioned and do not scale with partition count.

DfSingle Message Transform (SMT)

A Single Message Transform is a lightweight, stateless transformation applied to individual records as they flow through Kafka Connect. SMTs are chained and applied in order before the converter serializes the record. Common SMTs include InsertField, ReplaceField, MaskField, TimestampRouter, RegexRouter, and Flatten. SMTs modify records in-flight without requiring custom code.

DfDead Letter Queue (DLQ)

A dead letter queue is a Kafka topic where records are routed when they fail processing in Kafka Connect. DLQs are configured via errors.tolerance=all and errors.deadletterqueue.topic.name. Each record in the DLQ retains the original topic, partition, offset, headers, and error information for later inspection and reprocessing.

Key Formulas

End-to-End Latency Bound
Te2e≀Tproduce+Tpoll+Tprocess+TcommitT_{\text{e2e}} \leq T_{\text{produce}} + T_{\text{poll}} + T_{\text{process}} + T_{\text{commit}}

Here,

  • TproduceT_{\text{produce}}=Producer send latency (includes acks)
  • TpollT_{\text{poll}}=Consumer fetch/poll round-trip
  • TprocessT_{\text{process}}=Record processing time
  • TcommitT_{\text{commit}}=Offset commit latency

Window Aggregation (Tumbling)

Awindow=⨁e∈Wif(e)A_{\text{window}} = \bigoplus_{e \in W_i} f(e)

Here,

  • WiW_i=Tumbling window [t_i, t_i + s)
  • ss=Window size (e.g., 5 minutes)
  • f(e)f(e)=Aggregation function applied to each event e
  • ⨁\bigoplus=Aggregation operator (count, sum, reduce, etc.)

Window Aggregation (Hopping)

Ai=⨁e∈Wif(e),Wi=[ti,ti+s)A_i = \bigoplus_{e \in W_i} f(e), \quad W_i = [t_i, t_i + s)

Here,

  • ss=Window size
  • aa=Advance interval (a < s for overlap)
  • tit_i=Window start time, t_i = t_0 + i \cdot a

Session Window Merging

Sj=⋃i:tstart,iβ‰₯tend,jβˆ’gap∧tstart,i≀tend,j+gapWiS_j = \bigcup_{\substack{i: \; t_{\text{start},i} \geq t_{\text{end},j} - \\ \text{gap} \; \wedge \; t_{\text{start},i} \leq t_{\text{end},j} + \text{gap}}} W_i

Here,

  • SjS_j=Session window j after merging
  • WiW_i=Individual event window
  • gapgap=Inactivity gap threshold

Kafka Streams uses event-time windowing by default (via WallclockTimestampExtractor or CustomTimestampExtractor). Watermarks track progress of event time; late-arriving data beyond the grace period is dropped. Configure grace period with TimeWindows.ofSizeWithNoGrace() or TimeWindows.ofSizeAndGrace().

For exactly-once stream processing, set processing.guarantee=exactly_once_v2. This automatically configures idempotent producers, transactional writes, and read_committed consumers within the Streams library. EOS v2 reduces latency by ~30% compared to v1.

Kafka Streams maintains local state in RocksDB with changelog topics for fault tolerance. If an instance fails, state is rebuilt from the changelog topic. The cache.max.bytes.buffering setting controls memory for write-back caching; larger caches reduce changelog writes but increase recovery time.

ThStreams Parallelism Theorem

The parallelism of a Kafka Streams application is determined by:

  1. Number of stream threads (num.stream.threads, default 1)
  2. Number of input partitions assigned to each thread
  3. Number of tasks = number of input partitions (one task per partition)

Total parallelism = num.stream.threads x number of instances, bounded by total input partitions.

ThWindowed Aggregation Correctness

With event-time windowing and grace period, the aggregation result for window W_i is correct if:

  1. All events with timestamp t ∈ W_i arrive before watermark exceeds W_i.end + grace
  2. Late events beyond grace period are discarded (configurable)
  3. For hopping windows with advance a < size s, events may appear in multiple windows

Detailed Explanation

Kafka Streams Architecture

Kafka Streams is a client library for building stream processing applications on top of Kafka. Unlike traditional stream processing frameworks (Apache Flink, Apache Spark Streaming), Kafka Streams runs within the application process, requiring no separate cluster management. This embedded architecture simplifies deployment and scaling, as each instance of the application is a stream processor that participates in the consumer group for the input topics.

Topology and Processor DAG: A Kafka Streams application defines a processing topology as a directed acyclic graph (DAG) of processors. Each processor is either a source (reads from topics), a transformation (map, filter, aggregate), or a sink (writes to topics). The topology is automatically distributed across instances by assigning different partitions to different instances. The StreamsBuilder DSL automatically constructs this topology, while the Processor API allows building custom processor nodes with fine-grained control over state management and scheduling.

Stateful Operations: Kafka Streams supports several stateful operations that require local state storage. These include aggregations (count, reduce, aggregate), joins (KStream-KTable, KStream-KStream, KTable-KTable), and windowing (tumbling, hopping, session windows). State is stored in a local RocksDB instance, backed by a changelog topic in Kafka for fault tolerance. The changelog topic ensures that state can be rebuilt from Kafka if a node fails.

Windowing Strategies: Event-time windowing groups records by their event timestamp rather than processing time. Kafka Streams supports tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping), and session windows (activity-based, variable-size). Watermarks track the progress of event time, allowing the system to handle late-arriving data.

Kafka Connect Architecture

Kafka Connect provides a framework for moving data between Kafka and external systems. Connectors are plugins that implement either SourceConnector (reading from external systems and writing to Kafka) or SinkConnector (reading from Kafka and writing to external systems) interfaces. The Connect framework handles offset management, fault tolerance, and parallelism.

Connector Configuration and transforms: Connectors are configured via JSON configuration files or REST API. The framework supports Single Message Transforms (SMTs) for in-flight record transformation, converters for serialization format handling, and dead letter queues for error handling.

Exactly-Once Semantics in Connect: Kafka Connect supports exactly-once semantics through the exactly.once.support configuration. When enabled, Sink connectors must implement exactly-once semantics in their put() and preCommit() methods.

Key Concepts Table

ComponentDescriptionKey ConfigurationsUse Cases
KStreamUnbounded stream of recordsprocessing.guaranteeEvent processing, real-time analytics
KTableChangelog stream (latest value per key)state.dir, cache.max.bytes.bufferingMaterialized views, join tables
GlobalKTableFull copy on every instancenum.stream.threadsReference data, dimension tables
TopologyDAG of processorsapplication.id, client.idProcessing pipeline definition
State StoreLocal RocksDB persistencerocksdb.block.cache.sizeStateful operations, windowing
Changelog TopicFault-tolerant state backupchangelog.configState recovery, fault tolerance
Source ConnectorRead from external systemstasks.max, connector.classCDC, file ingestion
Sink ConnectorWrite to external systemstopics, tasks.maxData export, search indexing
ConverterSerialization format handlingkey.converter, value.converterJSON, Avro, Protobuf
SMTSingle Message Transformtransforms, transforms.*Record modification, routing
DLQDead Letter Queueerrors.deadletterqueue.topic.nameError handling, reprocessing

Code Examples

Kafka Streams DSL Application

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class OrderProcessingStream {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Application identity; used as consumer group ID
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
        
        // Bootstrap servers for initial metadata discovery
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // Default serializers/deserializers for keys and values
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        // Exactly-once semantics (v2 for improved performance)
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
            StreamsConfig.EXACTLY_ONCE_V2);
        // Automatically configures idempotent producers, transactional writes
        // Reduces latency by ~30% vs EOS v1
        
        // State store configuration
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
        // Directory for RocksDB state stores; use SSD for best performance
        
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        // How often offsets are committed to __consumer_offsets
        // More frequent = lower latency, more overhead
        
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
        // 10MB cache for write-back buffering
        // Larger cache = fewer changelog writes, slower recovery
        
        // Thread and task configuration
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        // Number of stream threads per instance
        // Each thread processes a subset of tasks
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source: Raw order events
        KStream<String, String> orders = builder.stream("raw-orders",
            Consumed.with(Serdes.String(), Serdes.String())
                .withTimestampExtractor(new WallclockTimestampExtractor()));
        // WallclockTimestampExtractor uses processing time (system clock)
        // For event-time: use ExtractRecordTimestamp (from message timestamp)
        
        // Source: User data (KTable for point lookups)
        KTable<String, String> users = builder.table("user-data",
            Consumed.with(Serdes.String(), Serdes.String()));
        // KTable enables point lookups by key for joins
        
        // Transform and enrich
        KStream<String, Order> enrichedOrders = orders
            .filter((key, value) -> value != null && !value.isEmpty())
            // Filter out null/empty records
            .mapValues(value -> parseOrder(value))
            // MapValues preserves key, transforms value
            .filter((key, order) -> order.getAmount() > 0)
            // Filter invalid orders
            .join(users,
                (order, user) -> order.withUser(user),
                // Join function: combine order with user data
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
            // Joined specifies key, value, and foreign value serdes
        
        // Branch by order type
        Map<String, KStream<String, Order>> branches = enrichedOrders
            .branch((key, order) -> order.getType().equals("PREMIUM"),
                Branched.withFunction((stream) -> {
                    stream.mapValues(o -> o.applyDiscount(0.10))
                          .to("premium-orders", Produced.with(Serdes.String(), orderSerde));
                    return stream;
                }, "premium"))
            .branch((key, order) -> order.getType().equals("STANDARD"),
                Branched.withFunction((stream) -> {
                    stream.mapValues(o -> o.applyDiscount(0.0))
                          .to("standard-orders", Produced.with(Serdes.String(), orderSerde));
                    return stream;
                }, "standard"));
        // Branched allows naming branches for monitoring
        
        // Aggregation: Count orders per user in 1-hour tumbling windows
        KTable<Windowed<String>, Long> orderCounts = enrichedOrders
            .groupByKey()
            // Group by key (user) for aggregation
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            // 1-hour tumbling window, no grace period
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "order-count-store")
                .withValueSerde(Serdes.Long()));
            // Materialized creates a named state store
        
        // Aggregate: Total revenue per user in 5-minute hopping windows
        KTable<Windowed<String>, Double> revenueByUser = enrichedOrders
            .mapValues(order -> order.getAmount())
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
                .advanceBy(Duration.ofMinutes(1)))
            // Hopping: 5-minute windows, advance every 1 minute (overlapping)
            .reduce(Double::sum, Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
                    "revenue-store")
                .withValueSerde(Serdes.Double()));
        
        // Session windowing for activity tracking
        KTable<Windowed<String>, Long> sessionCounts = enrichedOrders
            .map((key, order) -> KeyValue.pair(order.getUserId(), order))
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
            // Session window: merges events within 30-minute inactivity gap
            .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(
                    "session-store")
                .withValueSerde(Serdes.Long()));
        
        // Sink: Write enriched orders
        enrichedOrders
            .mapValues(order -> order.toJson())
            .to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
        
        StreamsTopology topology = builder.build();
        System.out.println(topology.describe());
        // Print topology for debugging
        
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30));
            // Close waits up to 30s for in-flight records
            streams.cleanUp();
            // CleanUp removes local state stores (optional)
        }));
    }
    
    private static Order parseOrder(String json) {
        // JSON parsing implementation
        return new Order(); // Placeholder
    }
}

Kafka Streams with Processor API (Custom Processor)

import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;

public class FraudDetectionProcessor implements Processor<String, OrderEvent, String, Alert> {
    
    private ProcessorContext<String, Alert> context;
    private KeyValueStore<String, UserTransactionSummary> stateStore;
    private WindowStore<String, OrderEvent> recentOrders;
    
    @Override
    public void init(ProcessorContext<String, Alert> context) {
        this.context = context;
        
        // Get reference to state store (must be added to topology)
        this.stateStore = context.getStateStore("user-transaction-summary");
        this.recentOrders = context.getStateStore("recent-orders-window");
        
        // Schedule periodic punctuation for cleanup
        context.schedule(Duration.ofMinutes(5), 
            PunctuationType.WALL_CLOCK_TIME, 
            this::cleanupOldOrders);
        // Punctuation runs periodically for housekeeping tasks
        
        context.schedule(Duration.ofMinutes(1), 
            PunctuationType.WALL_CLOCK_TIME, 
            this::checkVelocity);
    }
    
    @Override
    public void process(Record<String, OrderEvent> record) {
        String userId = record.value().getUserId();
        OrderEvent event = record.value();
        
        // Store in window store for velocity checks
        recentOrders.put(record.key(), event, 
            Instant.ofEpochMilli(event.getTimestamp()));
        // Window store enables range queries by time
        
        // Get or initialize user summary
        UserTransactionSummary summary = stateStore.get(userId);
        if (summary == null) {
            summary = new UserTransactionSummary(userId);
        }
        
        // Fraud detection logic
        Alert alert = null;
        
        // Rule 1: Amount threshold
        if (event.getAmount() > summary.getAverageAmount() * 3) {
            alert = new Alert(userId, "HIGH_AMOUNT", 
                "Transaction amount 3x above average", event);
        }
        
        // Rule 2: Velocity check (too many transactions)
        long recentCount = countRecentTransactions(userId, Duration.ofMinutes(5));
        if (recentCount > summary.getTransactionsPerMinute() * 5) {
            alert = new Alert(userId, "HIGH_VELOCITY", 
                "Transaction velocity 5x above normal", event);
        }
        
        // Rule 3: Geographic anomaly
        if (summary.getLastLocation() != null && 
            !summary.getLastLocation().equals(event.getLocation())) {
            alert = new Alert(userId, "GEO_ANOMALY", 
                "Location changed from " + summary.getLastLocation() + 
                " to " + event.getLocation(), event);
        }
        
        // Update summary
        summary.update(event);
        stateStore.put(userId, summary);
        
        // Forward alert if fraud detected
        if (alert != null) {
            context.forward(new Record<>(userId, alert, record.timestamp()));
        }
    }
    
    private long countRecentTransactions(String userId, Duration window) {
        Instant from = Instant.now().minus(window);
        Instant to = Instant.now();
        
        long count = 0;
        try (KeyValueIterator<Windowed<String>, OrderEvent> iterator = 
                recentOrders.fetch(userId, from, to)) {
            while (iterator.hasNext()) {
                count++;
                iterator.next();
            }
        }
        return count;
    }
    
    private void cleanupOldOrders(Instant timestamp) {
        Instant cutoff = timestamp.minus(Duration.ofHours(1));
        try (KeyValueIterator<Windowed<String>, OrderEvent> iterator = 
                recentOrders.all()) {
            while (iterator.hasNext()) {
                KeyValue<Windowed<String>, OrderEvent> entry = iterator.next();
                if (entry.key.window().endTime().isBefore(cutoff)) {
                    recentOrders.delete(entry.key);
                }
            }
        }
    }
    
    private void checkVelocity(Instant timestamp) {
        // Periodic velocity check logic
    }
    
    @Override
    public void close() {}
}

Kafka Connect Source Connector Configuration (MySQL CDC)

{
  "name": "mysql-cdc-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${file:/opt/kafka/secrets/mysql.properties:password}",
    "database.server.id": "184054",
    "database.server.name": "ecommerce",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.orders,ecommerce.users,ecommerce.products",
    
    "topic.prefix": "cdc",
    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 12,
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.retention.ms": 604800000,
    
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "route,unwrap,addTimestamp",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "ecommerce\\.(.*)",
    "transforms.route.replacement": "cdc-$1",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "processed_at",
    
    "heartbeat.interval.ms": "30000",
    "poll.interval.ms": "1000",
    "max.batch.size": "8192",
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    "snapshot.select.statement.overrides": "ecommerce.orders",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-mysql-cdc",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    
    "tasks.max": "3"
  }
}

Kafka Connect Sink Connector Configuration (Elasticsearch)

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "connection.username": "${file:/opt/kafka/secrets/es.properties:username}",
    "connection.password": "${file:/opt/kafka/secrets/es.properties:password}",
    
    "topics": "enriched-orders,enriched-users,enriched-products",
    "key.ignore": false,
    "schema.ignore": false,
    
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "extractKey,indexByDate,typeRoute",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id",
    "transforms.indexByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.indexByDate.timestamp.format": "YYYY-MM-dd",
    "transforms.indexByDate.topic.format": "${topic}-$${timestamp}",
    "transforms.typeRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.typeRoute.regex": "enriched-(.*)",
    "transforms.typeRoute.replacement": "kafka-$1",
    
    "batch.size": 200,
    "max.buffered.records": 500,
    "linger.ms": 1,
    "flush.timeout.ms": 10000,
    "max.retries": 5,
    "retry.backoff.ms": 100,
    
    "behavior.on.null.values": "delete",
    "behavior.on.malformed.documents": "warn",
    
    "write.method": "upsert",
    "document.id.strategy": "record_key",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-elasticsearch",
    "errors.deadletterqueue.topic.replication.factor": 3,
    
    "tasks.max": "3"
  }
}

Kafka Streams with Schema Registry (Avro Serde)

import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;

public class StreamsSchemaRegistryExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "schema-registry-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // Schema Registry configuration
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "admin:secret");
        
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Configure Avro serde
        Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put("schema.registry.url", "http://localhost:8081");
        serdeConfig.put("basic.auth.credentials.source", "USER_INFO");
        serdeConfig.put("basic.auth.user.info", "admin:secret");
        
        GenericAvroSerde avroSerde = new GenericAvroSerde();
        avroSerde.configure(serdeConfig, false);
        // false = not a key serde (value serde)
        
        // Source stream with Avro deserialization
        KStream<String, GenericRecord> orders = builder.stream("order-events",
            Consumed.with(Serdes.String(), avroSerde));
        
        // Transform and filter
        KStream<String, GenericRecord> highValueOrders = orders
            .filter((key, order) -> {
                double amount = (double) order.get("amount");
                return amount > 1000;
            })
            .mapValues(order -> {
                // Create new Avro record with evolved schema
                Schema newSchema = SchemaBuilder.builder("com.example")
                    .record("HighValueOrder")
                    .fields()
                        .name("orderId")
                        .type(order.getSchema().getField("orderId").schema())
                        .noDefault()
                        .name("userId")
                        .type(order.getSchema().getField("userId").schema())
                        .noDefault()
                        .name("amount")
                        .type(order.getSchema().getField("amount").schema())
                        .noDefault()
                        .name("verified")
                        .type()
                        .booleanType()
                        .booleanDefault(false)
                    .endRecord();
                
                GenericRecord highValueOrder = new GenericData.Record(newSchema);
                highValueOrder.put("orderId", order.get("orderId"));
                highValueOrder.put("userId", order.get("userId"));
                highValueOrder.put("amount", order.get("amount"));
                highValueOrder.put("verified", false);
                
                return highValueOrder;
            });
        
        // Sink with Avro serialization
        highValueOrders.to("high-value-orders", 
            Produced.with(Serdes.String(), avroSerde));
        
        // Aggregation with windowing
        KTable<Windowed<String>, Long> orderCounts = orders
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "order-count-store")
                .withValueSerde(Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Bash: Kafka Connect REST API Management

#!/bin/bash
# Kafka Connect REST API management

CONNECT_URL="http://localhost:8083"

echo "=== Kafka Connect Management ==="

# List all connectors
echo "--- Active Connectors ---"
curl -s "$CONNECT_URL/connectors" | jq .

# Get connector status
echo ""
echo "--- MySQL CDC Source Status ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/status" | jq .

# Create a new connector
echo ""
echo "--- Creating S3 Sink Connector ---"
curl -s -X POST "$CONNECT_URL/connectors" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "s3-sink",
    "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "s3.bucket.name": "my-kafka-data",
      "s3.region": "us-east-1",
      "topics": "enriched-orders",
      "tasks.max": "3",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "path.format": '\''YYYY/MM/dd'\'',
      "partition.duration.ms": "3600000",
      "locale": "en-US",
      "timezone": "UTC",
      "flush.size": "1000",
      "rotate.interval.ms": "60000",
      "schema.compatibility": "BACKWARD"
    }
  }' | jq .

# Restart a connector
echo ""
echo "--- Restarting MySQL CDC Source ---"
curl -s -X POST "$CONNECT_URL/connectors/mysql-cdc-source/restart" | jq .

# Delete a connector
echo ""
echo "--- Deleting S3 Sink Connector ---"
curl -s -X DELETE "$CONNECT_URL/connectors/s3-sink"

# Get connector config
echo ""
echo "--- Connector Configuration ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/config" | jq .

# Get connector tasks
echo ""
echo "--- Connector Tasks ---"
curl -s "$CONNECT_URL/connectors/mysql-cdc-source/tasks" | jq .

# Pause/Resume connector
echo ""
echo "--- Pausing Connector ---"
curl -s -X PUT "$CONNECT_URL/connectors/mysql-cdc-source/pause"

echo ""
echo "--- Resuming Connector ---"
curl -s -X PUT "$CONNECT_URL/connectors/mysql-cdc-source/resume"

Performance Metrics

MetricKafka Streams (single instance)Kafka Streams (4 instances)Kafka Connect (distributed)
Throughput (msg/sec)100K400K300K
Latency (p99)10ms20ms50ms
State Store Size1GB1GB per instanceN/A
Memory (JVM Heap)4GB4GB per instance8GB per worker
CPU Usage40%30% per instance50% per worker
Changelog Topic Throughput50K msg/sec200K msg/secN/A
Rebalance Time30s60s120s
Startup Time10s15s30s
Window TypeThroughputLatencyState SizeUse Case
TumblingHighLowSmallFixed-interval aggregation
HoppingMediumMediumLargeOverlapping analysis
SessionMediumMediumVariableActivity tracking

Best Practices

  1. Application ID and Instance Management: Use unique application.id per stream processing application. Each instance gets a unique client.id for monitoring. Scale by adding more instances to the same consumer group.

  2. State Store Optimization: Configure cache.max.bytes.buffering to balance memory usage vs. write amplification. Enable RocksDB bloom filters (rocksdb.block.cache.size) for faster lookups. Use rocksdb.write.buffer.size to tune write performance.

  3. Windowing Configuration: Choose window type based on use case: tumbling for fixed intervals, hopping for overlapping analysis, session for activity tracking. Configure grace periods to handle late events. Use TimeWindows.ofSizeWithNoGrace() when you want to reject late events.

  4. Exactly-Once Semantics: Enable processing.guarantee=exactly_once_v2 for critical applications. Understand that EOS increases latency and reduces throughput. Use idempotent sink operations and transactional producers.

  5. Connect Task Scaling: Set tasks.max based on the number of partitions for source connectors and external system capacity. Monitor connector health via REST API. Use dead letter queues for error handling instead of failing tasks.

  6. Converter Selection: Use Avro with Schema Registry for schema evolution and compatibility. JSON converters are simpler but lack schema enforcement. Protobuf is efficient but requires additional tooling.

  7. SMT Ordering: Apply SMTs in the correct order (e.g., route before transform). Limit the number of SMTs to avoid performance overhead. Use custom SMTs only when built-in options are insufficient.

  8. Monitoring: Track commit-latency-avg, poll-latency-avg, process-latency-avg for Streams. Track connector-failed-task-count, connector-total-task-count for Connect. Use Confluent Control Center or JMX metrics.

  9. Error Handling: Use errors.tolerance=all with dead letter queues for non-critical failures. Implement circuit breakers for external system dependencies. Monitor DLQ size to detect systemic issues.

  10. Testing: Use TopologyTestDriver for unit testing Kafka Streams topologies. Use Testcontainers for integration testing with real Kafka and external systems. Test with realistic data volumes and edge cases.

Key Takeaways:

  • Kafka Streams runs embedded in the application; no separate cluster needed β€” scale by adding instances to the same consumer group
  • End-to-end latency = produce + poll + process + commit; optimize each component for real-time requirements
  • Tumbling windows partition time into non-overlapping buckets; hopping windows allow overlap; session windows are activity-based
  • KTable provides latest-value-per-key semantics for joins and materialized views
  • Exactly-once via processing.guarantee=exactly_once_v2 handles idempotent production + transactional offsets automatically
  • Kafka Connect handles offset management and parallelism; set tasks.max based on source partitions or sink capacity

See also: Kafka Architecture (kafka/01) | Producer & Consumer (kafka/02) | Exactly-Once Semantics (kafka/04) | Schema Registry (kafka/05)

See Also

⭐

Premium Content

Kafka Streams & Connect API: Stream Processing and Data Integration

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement