πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Connect Deep Dive

🟒 Free Lesson

Advertisement

Kafka Connect Deep Dive

Source SystemsPostgreSQLMongoDBREST APIsFile SystemsSource WorkersJDBC ConnectorMongoDB ConnectorHTTP SourceSink WorkersElasticsearchS3 SinkRedis SinkKafka ClusterOrders TopicPayments TopicUsers TopicCDC EventsDestinationsElasticsearchS3/Data LakeRedis CacheSnowflakeSingle MessageTransforms(SMTs)

Overview

Kafka Connect is a scalable and resilient framework for streaming data between Kafka and external systems. This deep dive covers connectors, transforms, converters, and advanced deployment patterns.

Key Concepts

  • Connector: Defines the data flow (source or sink)
  • Task: Executes the actual data transfer
  • Worker: Hosts connectors and tasks
  • Converter: Handles serialization/deserialization
  • Transform: Modifies records in-flight

Distributed vs Standalone Mode

Distributed Mode (Production)

# distributed.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
offset.flush.interval.ms=10000

Standalone Mode (Development)

# standalone.properties
bootstrap.servers=localhost:9092
group.id=standalone-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.filename.offsets.json
config.storage.filename.connector-configs.json
status.storage.filename.connector-status.json

Connector Configurations

Source Connector (JDBC)

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "3",
    "connection.url": "jdbc:postgresql://postgres:5432/mydb",
    "connection.user": "connect_user",
    "connection.password": "${file:/opt/kafka/secrets/db.properties:password}",
    "topic.prefix": "db-",
    "table.whitelist": "orders,users,payments",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "updated_at",
    "poll.interval.ms": "1000"
  }
}

Sink Connector (Elasticsearch)

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "5",
    "topics": "orders,payments",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "false",
    "behavior.on.null.values": "delete",
    "behavior.on.malformed.documents": "warn",
    "write.method": "upsert"
  }
}

Sink Connector (S3)

{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "orders",
    "s3.bucket.name": "kafka-data-lake",
    "s3.region": "us-east-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "locale": "en-US",
    "timezone": "UTC",
    "flush.size": "1000",
    "rotate.interval.ms": "60000"
  }
}

Single Message Transforms (SMTs)

Built-in Transforms

TransformPurposeExample
InsertFieldAdd fieldsAdd timestamp to records
ReplaceFieldRename/remove fieldsRename 'id' to 'user_id'
TimestampRouterRoute by timeRoute to topic-YYYY-MM-DD
RegexRouterRoute by patternRoute based on content
MaskFieldMask sensitive dataRedact PII fields
CastType conversionCast string to integer

SMT Configuration Examples

{
  "name": "orders-with-timestamp",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "transforms": "addTimestamp,maskEmail,renameField",
    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "event_time",
    "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.maskEmail.fields": "email",
    "transforms.maskEmail.replacement": "****@****.com",
    "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameField.renames": "id:order_id,user_id:customer_id"
  }
}

Custom Transform

from org.apache.kafka.connect.transforms import Transformation
from org.apache.kafka.connect.data import Schema, Struct

class AddMetadataTransform(Transformation):
    def configure(self, configs):
        self.source_system = configs.get('source.system', 'unknown')
    
    def apply(self, record):
        if record.value() is None:
            return record
        
        # Add metadata fields
        new_value = Struct(record.value().schema())
        for field in record.value().schema().fields():
            new_value.put(field.name(), record.value().get(field.name()))
        
        new_value.put('source_system', self.source_system)
        new_value.put('processed_at', System.currentTimeMillis())
        
        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            record.value().schema(),
            new_value,
            record.timestamp()
        )
    
    def close(self):
        pass
    
    def config(self):
        return {}

Converters

Converter Types

ConverterFormatUse Case
JsonConverterJSONSimple, human-readable
AvroConverterAvroSchema evolution, compact
ProtobufConverterProtobufSchema evolution, fast
StringConverterStringPlain text messages
ByteArrayConverterBinaryRaw bytes

Avro Converter with Schema Registry

# Schema Registry configuration
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=${file:/opt/kafka/secrets/sr.properties:username}:${file:/opt/kafka/secrets/sr.properties:password}

# Schema Registry with security
schema.registry.url=https://schema-registry:8081
schema.registry.ssl.truststore.location=/var/kafka/ssl/truststore.jks
schema.registry.ssl.truststore.password=${file:/opt/kafka/secrets/ssl.properties:truststore.password}

Schema Evolution

from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

# Producer with schema evolution
producer = AvroProducer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': 'http://schema-registry:8081'
})

# Schema v1
schema_v1 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "status", "type": "string"}
    ]
}

# Schema v2 (backward compatible)
schema_v2 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "status", "type": "string"},
        {"name": "currency", "type": "string", "default": "USD"}  # New field with default
    ]
}

Dead Letter Queues (DLQ)

DLQ Configuration

{
  "name": "orders-sink-with-dlq",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "orders",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "orders-dlq",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.headers.add": "true",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.context.headers.enable": "true"
  }
}

DLQ Processing

from kafka import KafkaConsumer
import json

def process_dlq_messages():
    consumer = KafkaConsumer(
        'orders-dlq',
        bootstrap_servers=['kafka:9092'],
        auto_offset_reset='earliest',
        group_id='dlq-processor'
    )
    
    for message in consumer:
        # Extract original topic and error
        original_topic = message.headers.get('connect.original.topic')
        error_class = message.headers.get('connect.error.class')
        error_message = message.headers.get('connect.error.message')
        
        # Log and alert
        log_dlq_error(
            topic=original_topic,
            error=error_class,
            message=message.value
        )
        
        # Optionally retry or alert
        if should_retry(error_class):
            retry_message(original_topic, message.value)

def log_dlq_error(topic, error, message):
    print(f"DLQ Error: topic={topic}, error={error}, message={message}")

Performance Tuning

Worker Configuration

# Parallelism settings
tasks.max=10
connector.task.max=5

# Batch settings
batch.size=3000
max.poll.records=500

# Timeout settings
offset.flush.timeout.ms=5000
offset.flush.interval.ms=10000

Monitoring Connect Clusters

# List connectors
curl -s http://connect:8083/connectors | jq .

# Check connector status
curl -s http://connect:8083/connectors/postgres-source/status | jq .

# Check task status
curl -s http://connect:8083/connectors/postgres-source/tasks/0/status | jq .

Best Practices

Connector Design

  1. Use distributed mode for production
  2. Set appropriate task counts for parallelism
  3. Implement DLQ for error handling
  4. Use SMTs for data transformation
  5. Monitor lag and throughput

Common Pitfalls

  • Over-provisioning tasks: More tasks than partitions
  • Missing error handling: No DLQ configured
  • Schema mismatches: Incompatible converters
  • Resource exhaustion: Insufficient memory/CPU

Summary

Kafka Connect provides a scalable, resilient, and extensible framework for data integration. Master connectors, transforms, converters, and dead letter queues to build robust streaming data pipelines.

⭐

Premium Content

Kafka Connect Deep Dive

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement