Event-Driven Architecture
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber
Event-Driven Fundamentals
Event-driven architecture (EDA) decouples components through events โ immutable records of something that happened. Publishers emit events without knowing who consumes them.
โน๏ธ
EDA enables independent scaling, loose coupling, and natural audit trails. It's the backbone of modern distributed systems.
Event Flow Architecture
Producers Event Bus Consumers
โโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ Order โโโโโโโโโโโถโ โโโโโโโโโโถโ Inventory โ
โ Service โ โ EventBridge โ โ Service โ
โโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโโ
โ โโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโ
โโโโโโโโโโโโ โ โ Rules โ โโโโโโโโโโถโ Notification โ
โ Payment โโโโโโโโโโโถโ โ Engine โ โ โ Service โ
โ Service โ โ โโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโ
โโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโโ
โ โโโโโโโโโโถโ Analytics โ
โโโโโโโโโโโโ โ โ โ Service โ
โ User โโโโโโโโโโโถโ โ โโโโโโโโโโโโโโโโ
โ Service โ โโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโ
Pattern 1: EventBridge with Content-Based Filtering
Route events to specific handlers based on event content.
// EventBridge rule configuration
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as lambda from 'aws-cdk-lib/aws-lambda';
// Rule 1: High-value orders go to fraud detection
const highValueOrderRule = new events.Rule(this, 'HighValueOrderRule', {
eventPattern: {
source: ['myapp.orders'],
detailType: ['OrderCreated'],
detail: {
total: [{ numeric: ['>=', 1000] }],
},
},
});
highValueOrderRule.addTarget(new targets.LambdaFunction(fraudDetectionLambda));
// Rule 2: International orders need customs processing
const internationalOrderRule = new events.Rule(this, 'InternationalOrderRule', {
eventPattern: {
source: ['myapp.orders'],
detailType: ['OrderCreated'],
detail: {
shippingAddress: {
country: [{ anythingBut: ['US'] }],
},
},
},
});
internationalOrderRule.addTarget(new targets.LambdaFunction(customsLambda));
// Rule 3: All orders update analytics
const analyticsRule = new events.Rule(this, 'AnalyticsRule', {
eventPattern: {
source: ['myapp.orders'],
detailType: ['OrderCreated', 'OrderCancelled', 'OrderShipped'],
},
});
analyticsRule.addTarget(new targets.LambdaFunction(analyticsLambda));
Pattern 2: SQS Dead Letter Queue with Retry
Handle failures gracefully with exponential backoff.
# SQS consumer with DLQ and retry logic
import boto3
import json
import time
from typing import Dict, Any
sqs = boto3.client('sqs')
class ResilientConsumer:
def __init__(self, queue_url: str, dlq_url: str):
self.queue_url = queue_url
self.dlq_url = dlq_url
self.max_retries = 3
def process_messages(self):
while True:
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=30,
)
for message in response.get('Messages', []):
try:
self._process_message(message)
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle'],
)
except Exception as e:
self._handle_failure(message, e)
def _process_message(self, message: Dict[str, Any]):
body = json.loads(message['Body'])
# Simulate processing with potential failure
if body.get('force_error'):
raise ValueError("Simulated processing error")
print(f"Processed: {body}")
def _handle_failure(self, message: Dict, error: Exception):
body = json.loads(message['Body'])
retry_count = body.get('retryCount', 0)
if retry_count >= self.max_retries:
# Send to DLQ
sqs.send_message(
QueueUrl=self.dlq_url,
MessageBody=json.dumps({
**body,
'error': str(error),
'finalFailure': True,
}),
)
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle'],
)
else:
# Requeue with incremented retry count
sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps({
**body,
'retryCount': retry_count + 1,
}),
DelaySeconds=2 ** retry_count * 10, # Exponential backoff
)
sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=0,
)
โ ๏ธ
Always set a maximum retry count and DLQ. Infinite retries can cause poison messages to block your queue indefinitely.
Pattern 3: Event Sourcing for Audit Trail
Store all state changes as events for complete auditability.
// Event store implementation
import { DynamoDB } from 'aws-sdk';
const dynamodb = new DynamoDB.DocumentClient();
interface Event {
eventId: string;
aggregateId: string;
aggregateType: string;
eventType: string;
version: number;
data: any;
metadata: {
userId: string;
timestamp: string;
correlationId: string;
};
}
export class EventStore {
async appendEvent(event: Event): Promise<void> {
await dynamodb.put({
TableName: 'events',
Item: {
aggregateId: event.aggregateId,
version: event.version,
...event,
// Optimistic concurrency via condition expression
ConditionExpression: 'attribute_not_exists(aggregateId)',
},
}).promise();
}
async getEvents(aggregateId: string, fromVersion: number = 0): Promise<Event[]> {
const result = await dynamodb.query({
TableName: 'events',
KeyConditionExpression: 'aggregateId = :id AND version > :from',
ExpressionAttributeValues: {
':id': aggregateId,
':from': fromVersion,
},
ScanIndexForward: true, // Ascending order
}).promise();
return (result.Items || []) as Event[];
}
async getEventsByType(eventType: string, startDate: string): Promise<Event[]> {
const result = await dynamodb.query({
TableName: 'events',
IndexName: 'eventType-index',
KeyConditionExpression: 'eventType = :type AND metadata.timestamp > :start',
ExpressionAttributeValues: {
':type': eventType,
':start': startDate,
},
}).promise();
return (result.Items || []) as Event[];
}
}
// Aggregate that rebuilds state from events
export class OrderAggregate {
private events: Event[] = [];
constructor(public id: string) {}
// Rebuild state from event history
async loadFromEvents(eventStore: EventStore): Promise<void> {
this.events = await eventStore.getEvents(this.id);
}
// Apply new event
createOrder(customerId: string, items: OrderItem[]): Event {
const event: Event = {
eventId: uuid(),
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderCreated',
version: this.events.length + 1,
data: { customerId, items, status: 'pending' },
metadata: {
userId: getCurrentUserId(),
timestamp: new Date().toISOString(),
correlationId: getCorrelationId(),
},
};
this.events.push(event);
return event;
}
}
Pattern 4: CQRS with Separate Read/Write Stores
Optimize read and write models independently.
// Write side - command handler
export class CreateOrderCommandHandler {
constructor(
private eventStore: EventStore,
private idGenerator: IdGenerator,
) {}
async handle(command: CreateOrderCommand): Promise<string> {
const orderId = this.idGenerator.generate();
const aggregate = new OrderAggregate(orderId);
const event = aggregate.createOrder(command.customerId, command.items);
await this.eventStore.appendEvent(event);
return orderId;
}
}
// Read side - query handler (separate database)
export class GetOrderQueryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: GetOrderQuery): Promise<OrderView | null> {
// Query optimized read model (e.g., PostgreSQL, Elasticsearch)
return this.readDb.query(`
SELECT id, customer_name, items, total, status, created_at
FROM order_views
WHERE id = $1
`, [query.orderId]);
}
}
// Projection updater - listens to events, updates read model
export class OrderProjectionUpdater {
async handleEvent(event: Event): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.readDb.upsert({
id: event.aggregateId,
customerId: event.data.customerId,
items: event.data.items,
total: event.data.items.reduce((sum, i) => sum + i.price * i.quantity, 0),
status: event.data.status,
createdAt: event.metadata.timestamp,
});
break;
case 'OrderShipped':
await this.readDb.update(event.aggregateId, {
status: 'shipped',
shippedAt: event.metadata.timestamp,
});
break;
}
}
}
โน๏ธ
CQRS adds complexity but provides massive read scalability. Use it when read and write patterns differ significantly (e.g., e-commerce product catalog vs. order processing).
Event-Driven Comparison
| Tool | Throughput | Latency | Best For |
|---|---|---|---|
| SNS + SQS | 300K/s | 100ms | Fan-out notifications |
| EventBridge | 10K/s | <1s | Content-based routing |
| Kafka | 1M+/s | <10ms | High-throughput streaming |
| Kinesis | 1MB/s/shard | 200ms | Real-time data ingestion |
Follow-Up Questions
- How do you handle event ordering guarantees in a distributed system with multiple consumers?
- What strategies would you use to evolve event schemas without breaking existing consumers?
- How would you implement exactly-once processing semantics in an event-driven architecture?