πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Security

🟒 Free Lesson

Advertisement

Kafka Security

AuthenticationSASL / mTLSWho are you?AuthorizationACLsWhat can you do?EncryptionSSL/TLSData ProtectionAuditLoggingComplianceKafka Security Layers

Overview

Kafka provides comprehensive security features to protect your event streaming platform across four critical dimensions: authentication, authorization, encryption, and auditing. This guide covers implementing each security layer effectively.

Security Requirements

  • Confidentiality: Data encrypted in transit and at rest
  • Integrity: Messages cannot be tampered with
  • Authentication: Verify identity of clients and brokers
  • Authorization: Control access to topics and operations
  • Auditability: Track all access and operations

SSL/TLS Encryption

Certificate Generation

# Generate CA certificate
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
  -subj "/CN=Kafka-CA" -passout pass:ca-password

# Generate broker keystore
keytool -keystore kafka.server.keystore.jks \
  -alias kafka-broker -validity 365 -genkey \
  -keyalg RSA -keysize 2048 \
  -dname "CN=kafka-broker,OU=IT,O=Company"

# Generate broker truststore
keytool -keystore kafka.server.truststore.jks \
  -alias ca-cert -importcert -file ca-cert \
  -storepass truststore-password

Broker SSL Configuration

# server.properties
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

Client SSL Configuration

# client.properties
ssl.truststore.location=/var/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=truststore-password
ssl.keystore.location=/var/kafka/ssl/kafka.client.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
security.protocol=SSL

SASL Authentication

SASL Mechanisms Comparison

MechanismDescriptionUse Case
PLAINUsername/passwordDevelopment, simple setups
SCRAM-SHA-256/512Salted challenge responseProduction with LDAP
GSSAPIKerberosEnterprise environments
OAUTHBEAREROAuth 2.0 tokensCloud-native applications

SASL/SCRAM Setup

# Create SCRAM credentials
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[iterations=8192,password=admin-secret]' \
  --entity-type users --entity-name admin

kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=app-secret]' \
  --entity-type users --entity-name app-user

SASL Configuration

# server.properties
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512

# JAAS configuration
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-secret";

Python Client with SASL

from kafka import KafkaProducer
from kafka import KafkaConsumer

# Producer with SASL
producer = KafkaProducer(
    bootstrap_servers=['kafka-broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='app-user',
    sasl_plain_password='app-secret',
    ssl_cafile='/var/kafka/ssl/ca-cert'
)

# Consumer with SASL
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka-broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='consumer-user',
    sasl_plain_password='consumer-secret',
    ssl_cafile='/var/kafka/ssl/ca-cert',
    group_id='order-processor'
)

ACL Authorization

ACL Structure

ACLs follow the pattern: Principal + Resource + Operation + Permission

# Grant read access to topic for specific user
kafka-acls.sh --bootstrap-server localhost:9093 \
  --add \
  --allow-principal User:app-user \
  --operation Read \
  --topic orders \
  --command-config admin.properties

# Grant write access to topic
kafka-acls.sh --bootstrap-server localhost:9093 \
  --add \
  --allow-principal User:app-user \
  --operation Write \
  --topic orders \
  --command-config admin.properties

# List ACLs for a topic
kafka-acls.sh --bootstrap-server localhost:9093 \
  --list \
  --topic orders \
  --command-config admin.properties

Common ACL Patterns

# Producer ACLs
kafka-acls.sh --add --allow-principal User:producer \
  --operation Read --topic orders \
  --operation Describe --topic orders

# Consumer ACLs
kafka-acls.sh --add --allow-principal User:consumer \
  --operation Read --topic orders \
  --operation Read --group order-processor

# Admin ACLs
kafka-acls.sh --add --allow-principal User:admin \
  --operation All --topic '*' \
  --operation All --group '*'

ACL Authorizer Configuration

# server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false

Encryption at Rest

Broker Configuration

# Enable encryption at rest (requires specific storage backend)
log.dirs=/encrypted/kafka-logs
compression.type=lz4

Client-Side Encryption

from cryptography.fernet import Fernet
from kafka import KafkaProducer
import json

class EncryptedProducer:
    def __init__(self, bootstrap_servers, encryption_key):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.cipher = Fernet(encryption_key)
    
    def send(self, topic, message, key=None):
        encrypted_value = self.cipher.encrypt(json.dumps(message).encode())
        self.producer.send(topic, key=key, value=encrypted_value)
    
    def flush(self):
        self.producer.flush()

# Usage
key = Fernet.generate_key()
producer = EncryptedProducer(['kafka:9092'], key)
producer.send('sensitive-orders', {'amount': 100, 'user': 'john'})

Security Best Practices

1. Network Security

# Bind to specific interface
listeners=SSL://10.0.1.10:9093

# Restrict listener names
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SASL_SSL
inter.broker.listener.name=INTERNAL

2. Credential Management

# Use environment variables for credentials
import os
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
    sasl_plain_username=os.environ['KAFKA_SASL_USERNAME'],
    sasl_plain_password=os.environ['KAFKA_SASL_PASSWORD'],
    ssl_cafile=os.environ['KAFKA_SSL_CAFILE']
)

3. Regular Credential Rotation

#!/bin/bash
# rotate_scram_credentials.sh

NEW_PASSWORD=$(openssl rand -base64 32)

kafka-configs.sh --bootstrap-server localhost:9093 \
  --alter \
  --add-config "SCRAM-SHA-256=[password=$NEW_PASSWORD]" \
  --entity-type users --entity-name app-user

# Update application configuration
echo "SCRAM_PASSWORD=$NEW_PASSWORD" > /etc/kafka/.env.new
mv /etc/kafka/.env.new /etc/kafka/.env

4. Security Checklist

  • SSL/TLS enabled for all listeners
  • SASL authentication configured
  • ACLs implemented for all topics
  • Super users limited to emergency access only
  • Credentials stored in secure vault
  • Audit logging enabled
  • Regular security assessments scheduled

Summary

Kafka security requires a defense-in-depth approach implementing SSL/TLS encryption, SASL authentication, ACL authorization, and comprehensive auditing. Follow security best practices and regularly audit your configuration to maintain a secure event streaming platform.

⭐

Premium Content

Kafka Security

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement