Apache Kafka: The Distributed Event Streaming Platform
Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Originally developed at LinkedIn in 2011, Kafka has evolved from a messaging queue into a comprehensive streaming platform that serves as the central nervous system for modern data architectures.
Why Kafka Dominates Event Streaming
Core Abstraction:
- Distributed commit log β append-only, partitioned, replicated
- Durable, ordered, replayable event storage
Architectural Advantages:
- Disk-sequential I/O β high write throughput
- Zero-copy transfer β efficient consumer reads
- Partition-level parallelism β horizontal scalability
Kafka vs Traditional Message Brokers:
| Feature | Kafka | RabbitMQ | ActiveMQ |
|---|---|---|---|
| Message Retention | Configurable (days-weeks) | Until consumed | Until consumed |
| Replay | Yes (from any offset) | No | Limited |
| Throughput | Millions msg/s | Thousands msg/s | Thousands msg/s |
| Ordering | Per-partition | Per-queue | Per-queue |
| Consumer Model | Pull (pull from broker) | Push (broker pushes) | Push |
| Backpressure | Consumer controls pace | Broker-managed | Broker-managed |
| Partitioning | Native | Via plugins | Limited |
| Schema Evolution | Schema Registry | No native support | No native support |
Key Insight: Unlike traditional message brokers, Kafka retains messages for configurable retention periods, allowing consumers to replay events at any point in time. This enables event sourcing, CQRS, and exactly-once processing patterns.
Architecture Diagram
A distributed commit log is an append-only, partitioned, replicated data structure where each partition maintains a strictly ordered sequence of messages (events). Each message is assigned a monotonically increasing offset within its partition. The log is distributed across multiple brokers, with configurable replication factors for fault tolerance. Consumers maintain their own read position (offset) independent of the log's write position, enabling independent consumption rates and replay.
A consumer group is a set of consumers that jointly consume messages from one or more topics. Kafka guarantees that each message within a partition is delivered to exactly one consumer within a group. This enables parallel consumption: if a topic has P partitions and a consumer group has C consumers, each consumer processes approximately P/C partitions. When C > P, some consumers are idle. When C < P, some consumers process multiple partitions.
A partitioning strategy determines which partition a message is assigned to. Strategies include: (1) Round-robin β distributes messages evenly across partitions, (2) Key-based β hashes the message key to assign partitions, ensuring all messages with the same key go to the same partition, (3) Custom β user-defined partitioner. Key-based partitioning guarantees per-key ordering but can cause data skew if key distribution is non-uniform.
Producer Throughput
Producer throughput (messages/second) is bounded by: Throughput = min(network_bandwidth / avg_message_size, broker_write_capacity / replication_factor, disk_write_rate / replication_factor). For a cluster with B brokers, each with disk rate D, replication factor R, and network bandwidth N: Max_Throughput = min(B * D / R, B * N / avg_message_size).
Consumer Lag
Consumer lag for partition p is defined as: Lag(p) = Latest_offset(p) - Consumer_offset(p). Total consumer lag for a topic is: Total_Lag = Ξ£ Lag(p) for all partitions p. When lag exceeds the retention period, messages are deleted before consumption β this is a data loss scenario. Required: Total_Lag < Retention_bytes / avg_message_rate.
Kafka guarantees message ordering within a single partition only. There is no ordering guarantee across partitions. For a topic with P partitions, the global ordering of messages across partitions is non-deterministic. To achieve per-entity ordering (e.g., per-user, per-order), use the entity ID as the message key, ensuring all messages for that entity are routed to the same partition.
- At-Most-Once: Message is delivered at most once. May lose messages if producer fails before ack or consumer crashes before commit. Formal: P(delivery) <= 1.
- At-Least-Once: Message is delivered at least once. May duplicate messages if producer retries or consumer rebalances. Formal: P(delivery) >= 1.
- Exactly-Once: Message is delivered exactly once. Requires idempotent producers and transactional consumers (Kafka Transactions API). Formal: P(delivery) = 1. Achieved via idempotent writes and atomic offset commits.
Key Concepts
| Concept | Description | Configuration |
|---|---|---|
| Topic | Logical category of messages | kafka-topics --create --topic orders |
| Partition | Parallel unit within a topic | --partitions 3 |
| Replication Factor | Number of copies per partition | --replication-factor 3 |
| Offset | Sequential ID of a message within a partition | Auto-managed per consumer group |
| Producer | Application that writes messages to Kafka | bootstrap.servers, acks |
| Consumer | Application that reads messages from Kafka | group.id, auto.offset.reset |
| Consumer Group | Set of consumers with shared offset tracking | group.id |
| Broker | Kafka server that stores and serves partitions | broker.id, log.dirs |
| ZooKeeper/KRaft | Cluster metadata management | Legacy (ZK) vs KRaft (new) |
| ISR (In-Sync Replicas) | Replicas fully caught up with leader | min.insync.replicas |
| acks | Producer acknowledgment level | 0, 1, all |
| Retention | How long messages are kept | log.retention.hours, log.retention.bytes |
| Schema Registry | Schema evolution management | Confluent Schema Registry |
| Consumer Offset | Last committed position per partition | Stored in __consumer_offsets topic |
| Rebalance | Redistribution of partitions to consumers | Triggered by consumer join/leave |
| Log Compaction | Retain latest value per key | cleanup.policy=compact |
- Determine partition count: Estimate peak throughput. Partition count >= max(peak_throughput / per_partition_throughput, max_consumer_parallelism).
- Choose replication factor: RF=3 for production (survives 1 broker failure). RF=1 for dev/test.
- Select key strategy: If ordering is required per entity, use entity_id as key. If no ordering needed, use null key for round-robin.
- Configure retention: Set
log.retention.hoursbased on replay requirements. Uselog.retention.bytesfor size-based retention. - Enable log compaction for topics storing entity state (e.g., user profiles, configuration). Compaction retains the latest value per key.
- Set
min.insync.replicas=2withacks=allto ensure durability. A write succeeds only when at least 2 replicas acknowledge. - Register schemas with Confluent Schema Registry. Use Avro or Protobuf for schema evolution support.
- Monitor consumer lag to prevent data loss. Alert when lag exceeds retention_period / avg_message_rate.
- Test failover by killing the leader broker and verifying automatic leader election and consumer reconnection.
- Document topic ownership β each topic should have a designated owner team responsible for schema changes and capacity planning.
Production Code
Kafka Producer with Idempotent Writes
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
import uuid
from typing import Dict, Optional
from dataclasses import dataclass, asdict
logger = logging.getLogger(__name__)
@dataclass
class KafkaEvent:
"""Base event class with idempotency support."""
event_id: str
event_type: str
payload: Dict
timestamp: str
def to_json(self) -> bytes:
return json.dumps(asdict(self)).encode("utf-8")
class IdempotentProducer:
"""Kafka producer with idempotent writes, retries, and dead letter handling."""
def __init__(
self,
bootstrap_servers: list,
topic: str,
max_retries: int = 3,
retry_backoff_ms: int = 100,
):
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
acks="all", # Wait for all ISR replicas
enable_idempotence=True, # Exactly-once semantics
max_in_flight_requests_per_connection=5, # Safe with idempotence
retries=max_retries,
retry_backoff=retry_backoff_ms,
compression_type="snappy", # Reduce network I/O
linger_ms=10, # Batch for throughput
batch_size=32768, # 32KB batches
buffer_memory=67108864, # 64MB buffer
)
self.dlq_topic = f"{topic}.dlq"
def send(
self,
event: KafkaEvent,
key: Optional[str] = None,
callback=None,
) -> None:
"""Send an event to Kafka with idempotency and error handling."""
try:
future = self.producer.send(
topic=self.topic,
key=key.encode("utf-8") if key else None,
value=event.to_json(),
headers=[("event_type", event.event_type.encode())],
)
if callback:
future.add_callback(callback)
future.add_errback(self._handle_error, event)
except Exception as e:
logger.error(f"Failed to produce event {event.event_id}: {e}")
self._send_to_dlq(event, str(e))
def _handle_error(self, exc: KafkaError, event: KafkaEvent) -> None:
"""Handle producer errors and route to dead letter queue."""
logger.error(f"Error producing event {event.event_id}: {exc}")
self._send_to_dlq(event, str(exc))
def _send_to_dlq(self, event: KafkaEvent, error: str) -> None:
"""Route failed events to dead letter queue."""
dlq_event = KafkaEvent(
event_id=str(uuid.uuid4()),
event_type="dead_letter",
payload={
"original_event": asdict(event),
"error": error,
},
timestamp=event.timestamp,
)
self.producer.send(
topic=self.dlq_topic,
value=dlq_event.to_json(),
)
logger.warning(f"Event {event.event_id} routed to DLQ")
def flush(self) -> None:
"""Ensure all pending messages are sent."""
self.producer.flush(timeout=30)
def close(self) -> None:
"""Gracefully shut down the producer."""
self.producer.flush(timeout=30)
self.producer.close()
# Usage
producer = IdempotentProducer(
bootstrap_servers=["kafka-broker-1:9092", "kafka-broker-2:9092"],
topic="orders.events",
)
event = KafkaEvent(
event_id=str(uuid.uuid4()),
event_type="order_created",
payload={"order_id": "ORD-12345", "amount": 99.99, "currency": "USD"},
timestamp="2024-01-15T10:30:00Z",
)
producer.send(event, key="ORD-12345", callback=lambda metadata: logger.info(
f"Sent to {metadata.topic}[{metadata.partition}]@{metadata.offset}"
))
producer.flush()
producer.close()
Kafka Consumer with Offset Management
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
import signal
import sys
from typing import Callable, Dict
logger = logging.getLogger(__name__)
class ReliableConsumer:
"""Kafka consumer with graceful shutdown, offset management, and error handling."""
def __init__(
self,
bootstrap_servers: list,
topic: str,
group_id: str,
handler: Callable[[Dict], None],
auto_offset_reset: str = "earliest",
max_poll_records: int = 500,
):
self.handler = handler
self.running = True
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=False, # Manual commit for reliability
max_poll_records=max_poll_records,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_interval_ms=300000,
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
key_deserializer=lambda k: k.decode("utf-8") if k else None,
)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def _shutdown(self, signum, frame):
"""Handle shutdown signals gracefully."""
logger.info("Shutdown signal received, closing consumer...")
self.running = False
def consume(self) -> None:
"""Main consumption loop with manual offset commits."""
logger.info("Starting consumption loop...")
while self.running:
try:
records = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in records.items():
for message in messages:
try:
self.handler(message.value)
except Exception as e:
logger.error(
f"Error processing message at "
f"{message.topic}[{message.partition}]@{message.offset}: {e}"
)
# Don't commit β message will be reprocessed on restart
continue
# Commit after successful processing
if records:
self.consumer.commit()
logger.debug(f"Committed offsets for {len(records)} partitions")
except KafkaError as e:
logger.error(f"Kafka error during polling: {e}")
continue
self.consumer.close()
logger.info("Consumer closed gracefully")
def resume(self) -> None:
"""Resume consumption after a pause."""
self.running = True
# Usage
def process_order(event: dict) -> None:
"""Process a single order event."""
logger.info(f"Processing order: {event.get('order_id')}")
# Business logic here
# If processing fails, an exception will prevent offset commit
consumer = ReliableConsumer(
bootstrap_servers=["kafka-broker-1:9092", "kafka-broker-2:9092"],
topic="orders.events",
group_id="order-processing-group",
handler=process_order,
auto_offset_reset="earliest",
max_poll_records=500,
)
consumer.consume()
Consumer Group Rebalancing: When a consumer joins or leaves a group, Kafka triggers a rebalance that redistributes partitions. During rebalance, all consumers in the group stop consuming. Minimize rebalance frequency by setting session.timeout.ms appropriately and using static group membership (group.instance.id) to avoid rebalances on temporary disconnections.
Partition Count Rules: (1) Never decrease partition count β consumers depend on the existing partition layout. (2) Increasing partitions improves throughput but may break key-based ordering if keys are reassigned. (3) For topics with compaction, increasing partitions can temporarily break compaction guarantees until old segments are compacted.
- Kafka's distributed commit log provides durable, ordered, replayable event storage with disk-sequential I/O.
- Partitioning enables horizontal scalability; the partition count determines maximum parallelism.
- Consumer groups distribute partitions across consumers, with each partition consumed by exactly one consumer per group.
- Producer
acks=allwithmin.insync.replicas=2ensures durability against broker failures. - Exactly-once semantics require idempotent producers (
enable_idempotence=True) and transactional consumers. - Consumer lag monitoring prevents data loss; alert when lag exceeds retention_period / avg_message_rate.
- Key-based partitioning guarantees per-key ordering but may cause data skew. Use null key for uniform distribution.
Best Practices
- Set
acks=allwithmin.insync.replicas=2in production to ensure writes survive broker failures. - Enable idempotent producers (
enable_idempotence=True) to prevent duplicate messages on retries. - Use manual offset commits (
enable_auto_commit=False) to ensure messages are only acknowledged after successful processing. - Monitor consumer lag continuously. Alert when lag exceeds the message retention period.
- Choose partition count carefully β you can increase but never decrease partitions without data migration.
- Use schema registry (Confluent or Apicurio) for schema evolution. Prefer Avro or Protobuf over JSON for production topics.
- Implement dead letter queues for messages that fail processing. Never silently drop failed messages.
- Set
max.poll.recordsappropriately to prevent poll loops from exceedingmax.poll.interval.ms. - Use static group membership (
group.instance.id) to reduce rebalance frequency during temporary disconnections. - Document topic ownership β every topic should have a designated team responsible for schema changes and capacity planning.
Kafka Cluster Sizing Reference
| Metric | Small Cluster | Medium Cluster | Large Cluster |
|---|---|---|---|
| Brokers | 3 | 6-12 | 12-50+ |
| Topics | 10-50 | 50-500 | 500-5000+ |
| Partitions/Topic | 3-6 | 6-24 | 24-100+ |
| Throughput | 10K msg/s | 100K msg/s | 1M+ msg/s |
| Storage | 100GB-1TB | 1TB-50TB | 50TB-1PB+ |
| Retention | 7 days | 7-30 days | Configurable |
| Use Case | Dev/Small app | Production workloads | Enterprise platform |
See Also
- 020 - Kafka Streams: DSL, Windowed Aggregation, and Exactly-Once - Stream processing with Kafka
- 022 - Spark Structured Streaming: Triggers, Watermarks, and State Management - Alternative stream processing engine
- 024 - Data Ingestion Patterns: Batch, Streaming, CDC, and APIs - Kafka as ingestion backbone
- 023 - Batch vs Streaming: Lambda, Kappa, and Architecture Trade-offs - Streaming architecture patterns
- 030 - Capstone Project: Real-Time Streaming Pipeline - End-to-end Kafka project