Kafka Testing Strategies
Overview
Effective Kafka testing requires a multi-layered approach covering unit tests, integration tests, contract tests, and end-to-end tests. This guide covers testing strategies using Testcontainers, embedded Kafka, and schema validation.
Testing Goals
- Isolation: Tests run independently
- Speed: Fast feedback loops
- Reliability: Consistent results
- Coverage: Comprehensive scenarios
Unit Testing
Mocking Kafka Producer
from unittest.mock import Mock, patch, MagicMock
import pytest
class TestOrderService:
@patch('kafka.KafkaProducer')
def test_publish_order(self, mock_producer_class):
# Arrange
mock_producer = MagicMock()
mock_producer_class.return_value = mock_producer
service = OrderService()
order = Order(id='123', amount=100.00)
# Act
service.publish_order(order)
# Assert
mock_producer.send.assert_called_once_with(
'orders',
key=b'123',
value=json.dumps(order.to_dict()).encode()
)
mock_producer.flush.assert_called_once()
Mocking Kafka Consumer
from unittest.mock import Mock, patch, MagicMock
from kafka import ConsumerRecord
class TestOrderProcessor:
@patch('kafka.KafkaConsumer')
def test_process_order(self, mock_consumer_class):
# Arrange
mock_consumer = MagicMock()
mock_consumer_class.return_value = mock_consumer
# Create mock message
message = ConsumerRecord(
topic='orders',
partition=0,
offset=12345,
timestamp=1640995200000,
timestamp_type=0,
key=b'order-123',
value=b'{"id": "123", "amount": 100.00}',
checksum=None,
serialized_key_size=9,
serialized_value_size=34,
headers=[]
)
mock_consumer.__iter__ = Mock(return_value=iter([message]))
# Act
processor = OrderProcessor()
processor.process_messages()
# Assert
mock_consumer.commit.assert_called_once()
Testing Serialization
import pytest
from kafka_serialization import AvroSerializer, JsonSerializer
class TestSerializers:
def test_json_serializer(self):
serializer = JsonSerializer()
data = {'id': '123', 'amount': 100.00}
result = serializer.serialize(data)
assert json.loads(result) == data
def test_avro_serializer(self):
serializer = AvroSerializer(schema_path='order.avsc')
data = {'id': '123', 'amount': 100.00}
result = serializer.serialize(data)
assert isinstance(result, bytes)
assert len(result) > 0
Integration Testing
Testcontainers Setup
import pytest
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
@pytest.fixture(scope='module')
def kafka_container():
with KafkaContainer('confluentinc/cp-kafka:7.4.0') as kafka:
yield kafka
@pytest.fixture
def producer(kafka_container):
producer = KafkaProducer(
bootstrap_servers=kafka_container.get_bootstrap_server(),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
yield producer
producer.close()
@pytest.fixture
def consumer(kafka_container):
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers=kafka_container.get_bootstrap_server(),
auto_offset_reset='earliest',
group_id='test-group'
)
yield consumer
consumer.close()
def test_produce_consume(producer, consumer):
# Produce
message = {'id': '123', 'amount': 100.00}
producer.send('test-topic', value=message)
producer.flush()
# Consume
consumed = next(consumer)
assert json.loads(consumed.value) == message
Embedded Kafka
// Java example with Spring Boot
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"test-topic"},
count = 1
)
public class KafkaIntegrationTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void testPublishAndConsume() throws Exception {
// Publish
kafkaTemplate.send("test-topic", "key", "message").get();
// Consume
ConsumerRecord<String, String> record =
KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record.value()).isEqualTo("message");
}
}
Spring Boot Test Configuration
# application-test.yml
spring:
kafka:
bootstrap-servers: ${embedded.kafka.brokers}
consumer:
group-id: test-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Contract Testing
Schema Validation
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
class TestSchemaContract:
def setup_method(self):
self.schema = avro.load('order.avsc')
self.producer = AvroProducer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081'
})
def test_order_schema_compatibility(self):
# Test schema evolution compatibility
schema_v1 = avro.load('order_v1.avsc')
schema_v2 = avro.load('order_v2.avsc')
# Verify backward compatibility
assert is_compatible(schema_v1, schema_v2)
def test_producer_schema_compliance(self):
# Test that producer uses correct schema
order = {
'id': '123',
'amount': 100.00,
'status': 'pending'
}
# Validate against schema
validate_schema(order, self.schema)
Schema Registry Compatibility
# Check schema compatibility
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
http://schema-registry:8081/subjects/orders-value/compatibility
# Register new schema version
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
http://schema-registry:8081/subjects/orders-value/versions
Consumer Group Testing
def test_consumer_group_rebalance():
# Create multiple consumers
consumers = []
for i in range(3):
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers=['kafka:9092'],
group_id='test-group'
)
consumers.append(consumer)
# Wait for rebalance
time.sleep(5)
# Verify partition assignment
for consumer in consumers:
partitions = consumer.assignment()
assert len(partitions) > 0
# Test rebalance on consumer shutdown
consumers[0].close()
time.sleep(5)
# Verify remaining consumers got partitions
for consumer in consumers[1:]:
partitions = consumer.assignment()
assert len(partitions) > 0
Load Testing
kafka-producer-perf-test
#!/bin/bash
# load_test.sh
TOPIC="orders"
NUM_MESSAGES=1000000
MESSAGE_SIZE=1024
echo "Starting load test..."
# Run producer performance test
kafka-producer-perf-test.sh \
--topic $TOPIC \
--num-records $NUM_MESSAGES \
--record-size $MESSAGE_SIZE \
--throughput -1 \
--producer-props \
bootstrap.servers=kafka:9092 \
batch.size=32768 \
linger.ms=5 \
compression.type=lz4 \
acks=1 \
2>&1 | tee producer_results.txt
# Run consumer performance test
kafka-consumer-perf-test.sh \
--topic $TOPIC \
--messages $NUM_MESSAGES \
--group perf-test \
--bootstrap-server kafka:9092 \
2>&1 | tee consumer_results.txt
echo "Load test complete. Results saved."
Python Load Testing
import asyncio
import time
from kafka import KafkaProducer, KafkaConsumer
import statistics
class KafkaLoadTest:
def __init__(self, bootstrap_servers, topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.results = []
async def producer_load_test(self, num_messages, message_size):
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
batch_size=32768,
linger_ms=5,
compression_type='lz4'
)
latencies = []
start_time = time.time()
for i in range(num_messages):
message = b'x' * message_size
send_start = time.time()
future = producer.send(self.topic, value=message)
future.add_callback(lambda _: latencies.append(time.time() - send_start))
producer.flush()
end_time = time.time()
return {
'throughput': num_messages / (end_time - start_time),
'latency_p50': statistics.median(latencies),
'latency_p99': sorted(latencies)[int(len(latencies) * 0.99)]
}
async def consumer_load_test(self, duration_seconds):
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id='load-test-group',
auto_offset_reset='earliest'
)
messages_consumed = 0
start_time = time.time()
while time.time() - start_time < duration_seconds:
records = consumer.poll(timeout_ms=100)
messages_consumed += sum(len(records) for records in records.values())
consumer.close()
return {
'messages_per_second': messages_consumed / duration_seconds
}
# Run load test
async def main():
load_test = KafkaLoadTest('kafka:9092', 'orders')
producer_results = await load_test.producer_load_test(
num_messages=100000,
message_size=1024
)
consumer_results = await load_test.consumer_load_test(
duration_seconds=60
)
print(f"Producer: {producer_results}")
print(f"Consumer: {consumer_results}")
asyncio.run(main())
Best Practices
Test Data Management
@pytest.fixture
def test_data():
return {
'orders': [
{'id': '1', 'amount': 100.00, 'status': 'pending'},
{'id': '2', 'amount': 250.00, 'status': 'completed'},
{'id': '3', 'amount': 75.00, 'status': 'cancelled'}
]
}
def test_order_processing(test_data):
for order in test_data['orders']:
process_order(order)
assert order['status'] in ['pending', 'completed', 'cancelled']
Test Isolation
@pytest.fixture(autouse=True)
def setup_teardown():
# Setup
clean_kafka_topics()
yield
# Teardown
clean_kafka_topics()
def clean_kafka_topics():
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
topics = admin_client.list_topics()
for topic in topics:
if topic.startswith('test-'):
admin_client.delete_topics([topic])
Summary
Effective Kafka testing requires a comprehensive strategy covering unit tests with mocks, integration tests with Testcontainers, contract tests for schema validation, and load tests for performance validation. Use the testing pyramid to balance speed and coverage.