Kafka Connect Deep Dive
Overview
Kafka Connect is a scalable and resilient framework for streaming data between Kafka and external systems. This deep dive covers connectors, transforms, converters, and advanced deployment patterns.
Key Concepts
- Connector: Defines the data flow (source or sink)
- Task: Executes the actual data transfer
- Worker: Hosts connectors and tasks
- Converter: Handles serialization/deserialization
- Transform: Modifies records in-flight
Distributed vs Standalone Mode
Distributed Mode (Production)
# distributed.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
offset.flush.interval.ms=10000
Standalone Mode (Development)
# standalone.properties
bootstrap.servers=localhost:9092
group.id=standalone-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.filename.offsets.json
config.storage.filename.connector-configs.json
status.storage.filename.connector-status.json
Connector Configurations
Source Connector (JDBC)
{
"name": "postgres-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:postgresql://postgres:5432/mydb",
"connection.user": "connect_user",
"connection.password": "${file:/opt/kafka/secrets/db.properties:password}",
"topic.prefix": "db-",
"table.whitelist": "orders,users,payments",
"mode": "incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"poll.interval.ms": "1000"
}
}
Sink Connector (Elasticsearch)
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "5",
"topics": "orders,payments",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "false",
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"write.method": "upsert"
}
}
Sink Connector (S3)
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "orders",
"s3.bucket.name": "kafka-data-lake",
"s3.region": "us-east-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "en-US",
"timezone": "UTC",
"flush.size": "1000",
"rotate.interval.ms": "60000"
}
}
Single Message Transforms (SMTs)
Built-in Transforms
| Transform | Purpose | Example |
|---|---|---|
| InsertField | Add fields | Add timestamp to records |
| ReplaceField | Rename/remove fields | Rename 'id' to 'user_id' |
| TimestampRouter | Route by time | Route to topic-YYYY-MM-DD |
| RegexRouter | Route by pattern | Route based on content |
| MaskField | Mask sensitive data | Redact PII fields |
| Cast | Type conversion | Cast string to integer |
SMT Configuration Examples
{
"name": "orders-with-timestamp",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "addTimestamp,maskEmail,renameField",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "event_time",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email",
"transforms.maskEmail.replacement": "****@****.com",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "id:order_id,user_id:customer_id"
}
}
Custom Transform
from org.apache.kafka.connect.transforms import Transformation
from org.apache.kafka.connect.data import Schema, Struct
class AddMetadataTransform(Transformation):
def configure(self, configs):
self.source_system = configs.get('source.system', 'unknown')
def apply(self, record):
if record.value() is None:
return record
# Add metadata fields
new_value = Struct(record.value().schema())
for field in record.value().schema().fields():
new_value.put(field.name(), record.value().get(field.name()))
new_value.put('source_system', self.source_system)
new_value.put('processed_at', System.currentTimeMillis())
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.value().schema(),
new_value,
record.timestamp()
)
def close(self):
pass
def config(self):
return {}
Converters
Converter Types
| Converter | Format | Use Case |
|---|---|---|
| JsonConverter | JSON | Simple, human-readable |
| AvroConverter | Avro | Schema evolution, compact |
| ProtobufConverter | Protobuf | Schema evolution, fast |
| StringConverter | String | Plain text messages |
| ByteArrayConverter | Binary | Raw bytes |
Avro Converter with Schema Registry
# Schema Registry configuration
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=${file:/opt/kafka/secrets/sr.properties:username}:${file:/opt/kafka/secrets/sr.properties:password}
# Schema Registry with security
schema.registry.url=https://schema-registry:8081
schema.registry.ssl.truststore.location=/var/kafka/ssl/truststore.jks
schema.registry.ssl.truststore.password=${file:/opt/kafka/secrets/ssl.properties:truststore.password}
Schema Evolution
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
# Producer with schema evolution
producer = AvroProducer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081'
})
# Schema v1
schema_v1 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"}
]
}
# Schema v2 (backward compatible)
schema_v2 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "currency", "type": "string", "default": "USD"} # New field with default
]
}
Dead Letter Queues (DLQ)
DLQ Configuration
{
"name": "orders-sink-with-dlq",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders-dlq",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.headers.add": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.context.headers.enable": "true"
}
}
DLQ Processing
from kafka import KafkaConsumer
import json
def process_dlq_messages():
consumer = KafkaConsumer(
'orders-dlq',
bootstrap_servers=['kafka:9092'],
auto_offset_reset='earliest',
group_id='dlq-processor'
)
for message in consumer:
# Extract original topic and error
original_topic = message.headers.get('connect.original.topic')
error_class = message.headers.get('connect.error.class')
error_message = message.headers.get('connect.error.message')
# Log and alert
log_dlq_error(
topic=original_topic,
error=error_class,
message=message.value
)
# Optionally retry or alert
if should_retry(error_class):
retry_message(original_topic, message.value)
def log_dlq_error(topic, error, message):
print(f"DLQ Error: topic={topic}, error={error}, message={message}")
Performance Tuning
Worker Configuration
# Parallelism settings
tasks.max=10
connector.task.max=5
# Batch settings
batch.size=3000
max.poll.records=500
# Timeout settings
offset.flush.timeout.ms=5000
offset.flush.interval.ms=10000
Monitoring Connect Clusters
# List connectors
curl -s http://connect:8083/connectors | jq .
# Check connector status
curl -s http://connect:8083/connectors/postgres-source/status | jq .
# Check task status
curl -s http://connect:8083/connectors/postgres-source/tasks/0/status | jq .
Best Practices
Connector Design
- Use distributed mode for production
- Set appropriate task counts for parallelism
- Implement DLQ for error handling
- Use SMTs for data transformation
- Monitor lag and throughput
Common Pitfalls
- Over-provisioning tasks: More tasks than partitions
- Missing error handling: No DLQ configured
- Schema mismatches: Incompatible converters
- Resource exhaustion: Insufficient memory/CPU
Summary
Kafka Connect provides a scalable, resilient, and extensible framework for data integration. Master connectors, transforms, converters, and dead letter queues to build robust streaming data pipelines.