πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Testing Strategies

🟒 Free Lesson

Advertisement

Kafka Testing Strategies

Unit TestsFast, Isolated, ManyIntegration TestsTestcontainers, Embedded KafkaContract TestsSchema ValidationE2E TestsSlow, ComprehensiveMany TestsModerate TestsFew TestsVery Few TestsFast ⚑MediumSlowVery Slow

Overview

Effective Kafka testing requires a multi-layered approach covering unit tests, integration tests, contract tests, and end-to-end tests. This guide covers testing strategies using Testcontainers, embedded Kafka, and schema validation.

Testing Goals

  • Isolation: Tests run independently
  • Speed: Fast feedback loops
  • Reliability: Consistent results
  • Coverage: Comprehensive scenarios

Unit Testing

Mocking Kafka Producer

from unittest.mock import Mock, patch, MagicMock
import pytest

class TestOrderService:
    @patch('kafka.KafkaProducer')
    def test_publish_order(self, mock_producer_class):
        # Arrange
        mock_producer = MagicMock()
        mock_producer_class.return_value = mock_producer
        
        service = OrderService()
        order = Order(id='123', amount=100.00)
        
        # Act
        service.publish_order(order)
        
        # Assert
        mock_producer.send.assert_called_once_with(
            'orders',
            key=b'123',
            value=json.dumps(order.to_dict()).encode()
        )
        mock_producer.flush.assert_called_once()

Mocking Kafka Consumer

from unittest.mock import Mock, patch, MagicMock
from kafka import ConsumerRecord

class TestOrderProcessor:
    @patch('kafka.KafkaConsumer')
    def test_process_order(self, mock_consumer_class):
        # Arrange
        mock_consumer = MagicMock()
        mock_consumer_class.return_value = mock_consumer
        
        # Create mock message
        message = ConsumerRecord(
            topic='orders',
            partition=0,
            offset=12345,
            timestamp=1640995200000,
            timestamp_type=0,
            key=b'order-123',
            value=b'{"id": "123", "amount": 100.00}',
            checksum=None,
            serialized_key_size=9,
            serialized_value_size=34,
            headers=[]
        )
        
        mock_consumer.__iter__ = Mock(return_value=iter([message]))
        
        # Act
        processor = OrderProcessor()
        processor.process_messages()
        
        # Assert
        mock_consumer.commit.assert_called_once()

Testing Serialization

import pytest
from kafka_serialization import AvroSerializer, JsonSerializer

class TestSerializers:
    def test_json_serializer(self):
        serializer = JsonSerializer()
        data = {'id': '123', 'amount': 100.00}
        
        result = serializer.serialize(data)
        
        assert json.loads(result) == data
    
    def test_avro_serializer(self):
        serializer = AvroSerializer(schema_path='order.avsc')
        data = {'id': '123', 'amount': 100.00}
        
        result = serializer.serialize(data)
        
        assert isinstance(result, bytes)
        assert len(result) > 0

Integration Testing

Testcontainers Setup

import pytest
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer

@pytest.fixture(scope='module')
def kafka_container():
    with KafkaContainer('confluentinc/cp-kafka:7.4.0') as kafka:
        yield kafka

@pytest.fixture
def producer(kafka_container):
    producer = KafkaProducer(
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    yield producer
    producer.close()

@pytest.fixture
def consumer(kafka_container):
    consumer = KafkaConsumer(
        'test-topic',
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        auto_offset_reset='earliest',
        group_id='test-group'
    )
    yield consumer
    consumer.close()

def test_produce_consume(producer, consumer):
    # Produce
    message = {'id': '123', 'amount': 100.00}
    producer.send('test-topic', value=message)
    producer.flush()
    
    # Consume
    consumed = next(consumer)
    
    assert json.loads(consumed.value) == message

Embedded Kafka

// Java example with Spring Boot
@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"test-topic"},
    count = 1
)
public class KafkaIntegrationTest {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;
    
    @Test
    public void testPublishAndConsume() throws Exception {
        // Publish
        kafkaTemplate.send("test-topic", "key", "message").get();
        
        // Consume
        ConsumerRecord<String, String> record = 
            KafkaTestUtils.getSingleRecord(consumer, "test-topic");
        
        assertThat(record.value()).isEqualTo("message");
    }
}

Spring Boot Test Configuration

# application-test.yml
spring:
  kafka:
    bootstrap-servers: ${embedded.kafka.brokers}
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Contract Testing

Schema Validation

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer

class TestSchemaContract:
    def setup_method(self):
        self.schema = avro.load('order.avsc')
        self.producer = AvroProducer({
            'bootstrap.servers': 'kafka:9092',
            'schema.registry.url': 'http://schema-registry:8081'
        })
    
    def test_order_schema_compatibility(self):
        # Test schema evolution compatibility
        schema_v1 = avro.load('order_v1.avsc')
        schema_v2 = avro.load('order_v2.avsc')
        
        # Verify backward compatibility
        assert is_compatible(schema_v1, schema_v2)
    
    def test_producer_schema_compliance(self):
        # Test that producer uses correct schema
        order = {
            'id': '123',
            'amount': 100.00,
            'status': 'pending'
        }
        
        # Validate against schema
        validate_schema(order, self.schema)

Schema Registry Compatibility

# Check schema compatibility
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
  http://schema-registry:8081/subjects/orders-value/compatibility

# Register new schema version
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "..."}' \
  http://schema-registry:8081/subjects/orders-value/versions

Consumer Group Testing

def test_consumer_group_rebalance():
    # Create multiple consumers
    consumers = []
    for i in range(3):
        consumer = KafkaConsumer(
            'test-topic',
            bootstrap_servers=['kafka:9092'],
            group_id='test-group'
        )
        consumers.append(consumer)
    
    # Wait for rebalance
    time.sleep(5)
    
    # Verify partition assignment
    for consumer in consumers:
        partitions = consumer.assignment()
        assert len(partitions) > 0
    
    # Test rebalance on consumer shutdown
    consumers[0].close()
    time.sleep(5)
    
    # Verify remaining consumers got partitions
    for consumer in consumers[1:]:
        partitions = consumer.assignment()
        assert len(partitions) > 0

Load Testing

kafka-producer-perf-test

#!/bin/bash
# load_test.sh

TOPIC="orders"
NUM_MESSAGES=1000000
MESSAGE_SIZE=1024

echo "Starting load test..."

# Run producer performance test
kafka-producer-perf-test.sh \
  --topic $TOPIC \
  --num-records $NUM_MESSAGES \
  --record-size $MESSAGE_SIZE \
  --throughput -1 \
  --producer-props \
    bootstrap.servers=kafka:9092 \
    batch.size=32768 \
    linger.ms=5 \
    compression.type=lz4 \
    acks=1 \
  2>&1 | tee producer_results.txt

# Run consumer performance test
kafka-consumer-perf-test.sh \
  --topic $TOPIC \
  --messages $NUM_MESSAGES \
  --group perf-test \
  --bootstrap-server kafka:9092 \
  2>&1 | tee consumer_results.txt

echo "Load test complete. Results saved."

Python Load Testing

import asyncio
import time
from kafka import KafkaProducer, KafkaConsumer
import statistics

class KafkaLoadTest:
    def __init__(self, bootstrap_servers, topic):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.results = []
    
    async def producer_load_test(self, num_messages, message_size):
        producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            batch_size=32768,
            linger_ms=5,
            compression_type='lz4'
        )
        
        latencies = []
        start_time = time.time()
        
        for i in range(num_messages):
            message = b'x' * message_size
            send_start = time.time()
            
            future = producer.send(self.topic, value=message)
            future.add_callback(lambda _: latencies.append(time.time() - send_start))
        
        producer.flush()
        end_time = time.time()
        
        return {
            'throughput': num_messages / (end_time - start_time),
            'latency_p50': statistics.median(latencies),
            'latency_p99': sorted(latencies)[int(len(latencies) * 0.99)]
        }
    
    async def consumer_load_test(self, duration_seconds):
        consumer = KafkaConsumer(
            self.topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id='load-test-group',
            auto_offset_reset='earliest'
        )
        
        messages_consumed = 0
        start_time = time.time()
        
        while time.time() - start_time < duration_seconds:
            records = consumer.poll(timeout_ms=100)
            messages_consumed += sum(len(records) for records in records.values())
        
        consumer.close()
        
        return {
            'messages_per_second': messages_consumed / duration_seconds
        }

# Run load test
async def main():
    load_test = KafkaLoadTest('kafka:9092', 'orders')
    
    producer_results = await load_test.producer_load_test(
        num_messages=100000,
        message_size=1024
    )
    
    consumer_results = await load_test.consumer_load_test(
        duration_seconds=60
    )
    
    print(f"Producer: {producer_results}")
    print(f"Consumer: {consumer_results}")

asyncio.run(main())

Best Practices

Test Data Management

@pytest.fixture
def test_data():
    return {
        'orders': [
            {'id': '1', 'amount': 100.00, 'status': 'pending'},
            {'id': '2', 'amount': 250.00, 'status': 'completed'},
            {'id': '3', 'amount': 75.00, 'status': 'cancelled'}
        ]
    }

def test_order_processing(test_data):
    for order in test_data['orders']:
        process_order(order)
        assert order['status'] in ['pending', 'completed', 'cancelled']

Test Isolation

@pytest.fixture(autouse=True)
def setup_teardown():
    # Setup
    clean_kafka_topics()
    yield
    # Teardown
    clean_kafka_topics()

def clean_kafka_topics():
    admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
    topics = admin_client.list_topics()
    for topic in topics:
        if topic.startswith('test-'):
            admin_client.delete_topics([topic])

Summary

Effective Kafka testing requires a comprehensive strategy covering unit tests with mocks, integration tests with Testcontainers, contract tests for schema validation, and load tests for performance validation. Use the testing pyramid to balance speed and coverage.

⭐

Premium Content

Kafka Testing Strategies

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement