πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Kafka: Topics, Producers, and Consumers

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Apache Kafka: The Distributed Event Streaming Platform

Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Originally developed at LinkedIn in 2011, Kafka has evolved from a messaging queue into a comprehensive streaming platform that serves as the central nervous system for modern data architectures.

Kafka Cluster ArchitectureBrokers (Kafka Cluster)Broker 1LeaderBroker 2FollowerBroker 3FollowerZooKeeper / KRaft (Metadata Management)Partition 0Partition 1Partition 2Producer 1Producer 2Producer 3Consumer Group AConsumer Group B

Why Kafka Dominates Event Streaming


Core Abstraction:

  • Distributed commit log β€” append-only, partitioned, replicated
  • Durable, ordered, replayable event storage

Architectural Advantages:

  1. Disk-sequential I/O β€” high write throughput
  2. Zero-copy transfer β€” efficient consumer reads
  3. Partition-level parallelism β€” horizontal scalability

Kafka vs Traditional Message Brokers:

FeatureKafkaRabbitMQActiveMQ
Message RetentionConfigurable (days-weeks)Until consumedUntil consumed
ReplayYes (from any offset)NoLimited
ThroughputMillions msg/sThousands msg/sThousands msg/s
OrderingPer-partitionPer-queuePer-queue
Consumer ModelPull (pull from broker)Push (broker pushes)Push
BackpressureConsumer controls paceBroker-managedBroker-managed
PartitioningNativeVia pluginsLimited
Schema EvolutionSchema RegistryNo native supportNo native support

Key Insight: Unlike traditional message brokers, Kafka retains messages for configurable retention periods, allowing consumers to replay events at any point in time. This enables event sourcing, CQRS, and exactly-once processing patterns.

Architecture Diagram

A distributed commit log is an append-only, partitioned, replicated data structure where each partition maintains a strictly ordered sequence of messages (events). Each message is assigned a monotonically increasing offset within its partition. The log is distributed across multiple brokers, with configurable replication factors for fault tolerance. Consumers maintain their own read position (offset) independent of the log's write position, enabling independent consumption rates and replay.

A consumer group is a set of consumers that jointly consume messages from one or more topics. Kafka guarantees that each message within a partition is delivered to exactly one consumer within a group. This enables parallel consumption: if a topic has P partitions and a consumer group has C consumers, each consumer processes approximately P/C partitions. When C > P, some consumers are idle. When C < P, some consumers process multiple partitions.

A partitioning strategy determines which partition a message is assigned to. Strategies include: (1) Round-robin β€” distributes messages evenly across partitions, (2) Key-based β€” hashes the message key to assign partitions, ensuring all messages with the same key go to the same partition, (3) Custom β€” user-defined partitioner. Key-based partitioning guarantees per-key ordering but can cause data skew if key distribution is non-uniform.

Producer Throughput

Producer throughput (messages/second) is bounded by: Throughput = min(network_bandwidth / avg_message_size, broker_write_capacity / replication_factor, disk_write_rate / replication_factor). For a cluster with B brokers, each with disk rate D, replication factor R, and network bandwidth N: Max_Throughput = min(B * D / R, B * N / avg_message_size).

Consumer Lag

Consumer lag for partition p is defined as: Lag(p) = Latest_offset(p) - Consumer_offset(p). Total consumer lag for a topic is: Total_Lag = Ξ£ Lag(p) for all partitions p. When lag exceeds the retention period, messages are deleted before consumption β€” this is a data loss scenario. Required: Total_Lag < Retention_bytes / avg_message_rate.

Kafka guarantees message ordering within a single partition only. There is no ordering guarantee across partitions. For a topic with P partitions, the global ordering of messages across partitions is non-deterministic. To achieve per-entity ordering (e.g., per-user, per-order), use the entity ID as the message key, ensuring all messages for that entity are routed to the same partition.

  • At-Most-Once: Message is delivered at most once. May lose messages if producer fails before ack or consumer crashes before commit. Formal: P(delivery) <= 1.
  • At-Least-Once: Message is delivered at least once. May duplicate messages if producer retries or consumer rebalances. Formal: P(delivery) >= 1.
  • Exactly-Once: Message is delivered exactly once. Requires idempotent producers and transactional consumers (Kafka Transactions API). Formal: P(delivery) = 1. Achieved via idempotent writes and atomic offset commits.

Key Concepts

ConceptDescriptionConfiguration
TopicLogical category of messageskafka-topics --create --topic orders
PartitionParallel unit within a topic--partitions 3
Replication FactorNumber of copies per partition--replication-factor 3
OffsetSequential ID of a message within a partitionAuto-managed per consumer group
ProducerApplication that writes messages to Kafkabootstrap.servers, acks
ConsumerApplication that reads messages from Kafkagroup.id, auto.offset.reset
Consumer GroupSet of consumers with shared offset trackinggroup.id
BrokerKafka server that stores and serves partitionsbroker.id, log.dirs
ZooKeeper/KRaftCluster metadata managementLegacy (ZK) vs KRaft (new)
ISR (In-Sync Replicas)Replicas fully caught up with leadermin.insync.replicas
acksProducer acknowledgment level0, 1, all
RetentionHow long messages are keptlog.retention.hours, log.retention.bytes
Schema RegistrySchema evolution managementConfluent Schema Registry
Consumer OffsetLast committed position per partitionStored in __consumer_offsets topic
RebalanceRedistribution of partitions to consumersTriggered by consumer join/leave
Log CompactionRetain latest value per keycleanup.policy=compact
  1. Determine partition count: Estimate peak throughput. Partition count >= max(peak_throughput / per_partition_throughput, max_consumer_parallelism).
  2. Choose replication factor: RF=3 for production (survives 1 broker failure). RF=1 for dev/test.
  3. Select key strategy: If ordering is required per entity, use entity_id as key. If no ordering needed, use null key for round-robin.
  4. Configure retention: Set log.retention.hours based on replay requirements. Use log.retention.bytes for size-based retention.
  5. Enable log compaction for topics storing entity state (e.g., user profiles, configuration). Compaction retains the latest value per key.
  6. Set min.insync.replicas=2 with acks=all to ensure durability. A write succeeds only when at least 2 replicas acknowledge.
  7. Register schemas with Confluent Schema Registry. Use Avro or Protobuf for schema evolution support.
  8. Monitor consumer lag to prevent data loss. Alert when lag exceeds retention_period / avg_message_rate.
  9. Test failover by killing the leader broker and verifying automatic leader election and consumer reconnection.
  10. Document topic ownership β€” each topic should have a designated owner team responsible for schema changes and capacity planning.

Production Code

Kafka Producer with Idempotent Writes

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
import uuid
from typing import Dict, Optional
from dataclasses import dataclass, asdict

logger = logging.getLogger(__name__)


@dataclass
class KafkaEvent:
    """Base event class with idempotency support."""
    event_id: str
    event_type: str
    payload: Dict
    timestamp: str

    def to_json(self) -> bytes:
        return json.dumps(asdict(self)).encode("utf-8")


class IdempotentProducer:
    """Kafka producer with idempotent writes, retries, and dead letter handling."""

    def __init__(
        self,
        bootstrap_servers: list,
        topic: str,
        max_retries: int = 3,
        retry_backoff_ms: int = 100,
    ):
        self.topic = topic
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            acks="all",                    # Wait for all ISR replicas
            enable_idempotence=True,        # Exactly-once semantics
            max_in_flight_requests_per_connection=5,  # Safe with idempotence
            retries=max_retries,
            retry_backoff=retry_backoff_ms,
            compression_type="snappy",     # Reduce network I/O
            linger_ms=10,                  # Batch for throughput
            batch_size=32768,              # 32KB batches
            buffer_memory=67108864,        # 64MB buffer
        )
        self.dlq_topic = f"{topic}.dlq"

    def send(
        self,
        event: KafkaEvent,
        key: Optional[str] = None,
        callback=None,
    ) -> None:
        """Send an event to Kafka with idempotency and error handling."""
        try:
            future = self.producer.send(
                topic=self.topic,
                key=key.encode("utf-8") if key else None,
                value=event.to_json(),
                headers=[("event_type", event.event_type.encode())],
            )
            if callback:
                future.add_callback(callback)
            future.add_errback(self._handle_error, event)
        except Exception as e:
            logger.error(f"Failed to produce event {event.event_id}: {e}")
            self._send_to_dlq(event, str(e))

    def _handle_error(self, exc: KafkaError, event: KafkaEvent) -> None:
        """Handle producer errors and route to dead letter queue."""
        logger.error(f"Error producing event {event.event_id}: {exc}")
        self._send_to_dlq(event, str(exc))

    def _send_to_dlq(self, event: KafkaEvent, error: str) -> None:
        """Route failed events to dead letter queue."""
        dlq_event = KafkaEvent(
            event_id=str(uuid.uuid4()),
            event_type="dead_letter",
            payload={
                "original_event": asdict(event),
                "error": error,
            },
            timestamp=event.timestamp,
        )
        self.producer.send(
            topic=self.dlq_topic,
            value=dlq_event.to_json(),
        )
        logger.warning(f"Event {event.event_id} routed to DLQ")

    def flush(self) -> None:
        """Ensure all pending messages are sent."""
        self.producer.flush(timeout=30)

    def close(self) -> None:
        """Gracefully shut down the producer."""
        self.producer.flush(timeout=30)
        self.producer.close()


# Usage
producer = IdempotentProducer(
    bootstrap_servers=["kafka-broker-1:9092", "kafka-broker-2:9092"],
    topic="orders.events",
)

event = KafkaEvent(
    event_id=str(uuid.uuid4()),
    event_type="order_created",
    payload={"order_id": "ORD-12345", "amount": 99.99, "currency": "USD"},
    timestamp="2024-01-15T10:30:00Z",
)
producer.send(event, key="ORD-12345", callback=lambda metadata: logger.info(
    f"Sent to {metadata.topic}[{metadata.partition}]@{metadata.offset}"
))
producer.flush()
producer.close()

Kafka Consumer with Offset Management

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
import signal
import sys
from typing import Callable, Dict

logger = logging.getLogger(__name__)


class ReliableConsumer:
    """Kafka consumer with graceful shutdown, offset management, and error handling."""

    def __init__(
        self,
        bootstrap_servers: list,
        topic: str,
        group_id: str,
        handler: Callable[[Dict], None],
        auto_offset_reset: str = "earliest",
        max_poll_records: int = 500,
    ):
        self.handler = handler
        self.running = True
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=False,  # Manual commit for reliability
            max_poll_records=max_poll_records,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000,
            max_poll_interval_ms=300000,
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),
            key_deserializer=lambda k: k.decode("utf-8") if k else None,
        )

        # Register signal handlers for graceful shutdown
        signal.signal(signal.SIGINT, self._shutdown)
        signal.signal(signal.SIGTERM, self._shutdown)

    def _shutdown(self, signum, frame):
        """Handle shutdown signals gracefully."""
        logger.info("Shutdown signal received, closing consumer...")
        self.running = False

    def consume(self) -> None:
        """Main consumption loop with manual offset commits."""
        logger.info("Starting consumption loop...")
        while self.running:
            try:
                records = self.consumer.poll(timeout_ms=1000)
                for topic_partition, messages in records.items():
                    for message in messages:
                        try:
                            self.handler(message.value)
                        except Exception as e:
                            logger.error(
                                f"Error processing message at "
                                f"{message.topic}[{message.partition}]@{message.offset}: {e}"
                            )
                            # Don't commit β€” message will be reprocessed on restart
                            continue

                # Commit after successful processing
                if records:
                    self.consumer.commit()
                    logger.debug(f"Committed offsets for {len(records)} partitions")

            except KafkaError as e:
                logger.error(f"Kafka error during polling: {e}")
                continue

        self.consumer.close()
        logger.info("Consumer closed gracefully")

    def resume(self) -> None:
        """Resume consumption after a pause."""
        self.running = True


# Usage
def process_order(event: dict) -> None:
    """Process a single order event."""
    logger.info(f"Processing order: {event.get('order_id')}")
    # Business logic here
    # If processing fails, an exception will prevent offset commit


consumer = ReliableConsumer(
    bootstrap_servers=["kafka-broker-1:9092", "kafka-broker-2:9092"],
    topic="orders.events",
    group_id="order-processing-group",
    handler=process_order,
    auto_offset_reset="earliest",
    max_poll_records=500,
)
consumer.consume()

Consumer Group Rebalancing: When a consumer joins or leaves a group, Kafka triggers a rebalance that redistributes partitions. During rebalance, all consumers in the group stop consuming. Minimize rebalance frequency by setting session.timeout.ms appropriately and using static group membership (group.instance.id) to avoid rebalances on temporary disconnections.

Partition Count Rules: (1) Never decrease partition count β€” consumers depend on the existing partition layout. (2) Increasing partitions improves throughput but may break key-based ordering if keys are reassigned. (3) For topics with compaction, increasing partitions can temporarily break compaction guarantees until old segments are compacted.

  • Kafka's distributed commit log provides durable, ordered, replayable event storage with disk-sequential I/O.
  • Partitioning enables horizontal scalability; the partition count determines maximum parallelism.
  • Consumer groups distribute partitions across consumers, with each partition consumed by exactly one consumer per group.
  • Producer acks=all with min.insync.replicas=2 ensures durability against broker failures.
  • Exactly-once semantics require idempotent producers (enable_idempotence=True) and transactional consumers.
  • Consumer lag monitoring prevents data loss; alert when lag exceeds retention_period / avg_message_rate.
  • Key-based partitioning guarantees per-key ordering but may cause data skew. Use null key for uniform distribution.

Best Practices

  1. Set acks=all with min.insync.replicas=2 in production to ensure writes survive broker failures.
  2. Enable idempotent producers (enable_idempotence=True) to prevent duplicate messages on retries.
  3. Use manual offset commits (enable_auto_commit=False) to ensure messages are only acknowledged after successful processing.
  4. Monitor consumer lag continuously. Alert when lag exceeds the message retention period.
  5. Choose partition count carefully β€” you can increase but never decrease partitions without data migration.
  6. Use schema registry (Confluent or Apicurio) for schema evolution. Prefer Avro or Protobuf over JSON for production topics.
  7. Implement dead letter queues for messages that fail processing. Never silently drop failed messages.
  8. Set max.poll.records appropriately to prevent poll loops from exceeding max.poll.interval.ms.
  9. Use static group membership (group.instance.id) to reduce rebalance frequency during temporary disconnections.
  10. Document topic ownership β€” every topic should have a designated team responsible for schema changes and capacity planning.

Kafka Cluster Sizing Reference

MetricSmall ClusterMedium ClusterLarge Cluster
Brokers36-1212-50+
Topics10-5050-500500-5000+
Partitions/Topic3-66-2424-100+
Throughput10K msg/s100K msg/s1M+ msg/s
Storage100GB-1TB1TB-50TB50TB-1PB+
Retention7 days7-30 daysConfigurable
Use CaseDev/Small appProduction workloadsEnterprise platform

See Also

⭐

Premium Content

Apache Kafka: Topics, Producers, and Consumers

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement