๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Event-Driven Architecture

Cloud ArchitectureIntegration Patternsโญ Premium

Advertisement

Event-Driven Architecture

Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber

Event-Driven Fundamentals

Event-driven architecture (EDA) decouples components through events โ€” immutable records of something that happened. Publishers emit events without knowing who consumes them.

โ„น๏ธ

EDA enables independent scaling, loose coupling, and natural audit trails. It's the backbone of modern distributed systems.

Event Flow Architecture

Architecture Diagram
    Producers                    Event Bus                 Consumers
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚ Order    โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Inventory    โ”‚
    โ”‚ Service  โ”‚          โ”‚   EventBridge   โ”‚         โ”‚ Service      โ”‚
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚                 โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                          โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚  โ”‚ Rules     โ”‚  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Notification โ”‚
    โ”‚ Payment  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚  โ”‚ Engine    โ”‚  โ”‚         โ”‚ Service      โ”‚
    โ”‚ Service  โ”‚          โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚                 โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                          โ”‚                 โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Analytics    โ”‚
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚                 โ”‚         โ”‚ Service      โ”‚
    โ”‚ User     โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ”‚ Service  โ”‚          โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Pattern 1: EventBridge with Content-Based Filtering

Route events to specific handlers based on event content.

// EventBridge rule configuration
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as lambda from 'aws-cdk-lib/aws-lambda';

// Rule 1: High-value orders go to fraud detection
const highValueOrderRule = new events.Rule(this, 'HighValueOrderRule', {
  eventPattern: {
    source: ['myapp.orders'],
    detailType: ['OrderCreated'],
    detail: {
      total: [{ numeric: ['>=', 1000] }],
    },
  },
});

highValueOrderRule.addTarget(new targets.LambdaFunction(fraudDetectionLambda));

// Rule 2: International orders need customs processing
const internationalOrderRule = new events.Rule(this, 'InternationalOrderRule', {
  eventPattern: {
    source: ['myapp.orders'],
    detailType: ['OrderCreated'],
    detail: {
      shippingAddress: {
        country: [{ anythingBut: ['US'] }],
      },
    },
  },
});

internationalOrderRule.addTarget(new targets.LambdaFunction(customsLambda));

// Rule 3: All orders update analytics
const analyticsRule = new events.Rule(this, 'AnalyticsRule', {
  eventPattern: {
    source: ['myapp.orders'],
    detailType: ['OrderCreated', 'OrderCancelled', 'OrderShipped'],
  },
});

analyticsRule.addTarget(new targets.LambdaFunction(analyticsLambda));

Pattern 2: SQS Dead Letter Queue with Retry

Handle failures gracefully with exponential backoff.

# SQS consumer with DLQ and retry logic
import boto3
import json
import time
from typing import Dict, Any

sqs = boto3.client('sqs')

class ResilientConsumer:
    def __init__(self, queue_url: str, dlq_url: str):
        self.queue_url = queue_url
        self.dlq_url = dlq_url
        self.max_retries = 3
    
    def process_messages(self):
        while True:
            response = sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20,
                VisibilityTimeout=30,
            )
            
            for message in response.get('Messages', []):
                try:
                    self._process_message(message)
                    sqs.delete_message(
                        QueueUrl=self.queue_url,
                        ReceiptHandle=message['ReceiptHandle'],
                    )
                except Exception as e:
                    self._handle_failure(message, e)
    
    def _process_message(self, message: Dict[str, Any]):
        body = json.loads(message['Body'])
        
        # Simulate processing with potential failure
        if body.get('force_error'):
            raise ValueError("Simulated processing error")
        
        print(f"Processed: {body}")
    
    def _handle_failure(self, message: Dict, error: Exception):
        body = json.loads(message['Body'])
        retry_count = body.get('retryCount', 0)
        
        if retry_count >= self.max_retries:
            # Send to DLQ
            sqs.send_message(
                QueueUrl=self.dlq_url,
                MessageBody=json.dumps({
                    **body,
                    'error': str(error),
                    'finalFailure': True,
                }),
            )
            sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle'],
            )
        else:
            # Requeue with incremented retry count
            sqs.send_message(
                QueueUrl=self.queue_url,
                MessageBody=json.dumps({
                    **body,
                    'retryCount': retry_count + 1,
                }),
                DelaySeconds=2 ** retry_count * 10,  # Exponential backoff
            )
            sqs.change_message_visibility(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle'],
                VisibilityTimeout=0,
            )

โš ๏ธ

Always set a maximum retry count and DLQ. Infinite retries can cause poison messages to block your queue indefinitely.

Pattern 3: Event Sourcing for Audit Trail

Store all state changes as events for complete auditability.

// Event store implementation
import { DynamoDB } from 'aws-sdk';

const dynamodb = new DynamoDB.DocumentClient();

interface Event {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  version: number;
  data: any;
  metadata: {
    userId: string;
    timestamp: string;
    correlationId: string;
  };
}

export class EventStore {
  async appendEvent(event: Event): Promise<void> {
    await dynamodb.put({
      TableName: 'events',
      Item: {
        aggregateId: event.aggregateId,
        version: event.version,
        ...event,
        // Optimistic concurrency via condition expression
        ConditionExpression: 'attribute_not_exists(aggregateId)',
      },
    }).promise();
  }

  async getEvents(aggregateId: string, fromVersion: number = 0): Promise<Event[]> {
    const result = await dynamodb.query({
      TableName: 'events',
      KeyConditionExpression: 'aggregateId = :id AND version > :from',
      ExpressionAttributeValues: {
        ':id': aggregateId,
        ':from': fromVersion,
      },
      ScanIndexForward: true, // Ascending order
    }).promise();

    return (result.Items || []) as Event[];
  }

  async getEventsByType(eventType: string, startDate: string): Promise<Event[]> {
    const result = await dynamodb.query({
      TableName: 'events',
      IndexName: 'eventType-index',
      KeyConditionExpression: 'eventType = :type AND metadata.timestamp > :start',
      ExpressionAttributeValues: {
        ':type': eventType,
        ':start': startDate,
      },
    }).promise();

    return (result.Items || []) as Event[];
  }
}

// Aggregate that rebuilds state from events
export class OrderAggregate {
  private events: Event[] = [];
  
  constructor(public id: string) {}

  // Rebuild state from event history
  async loadFromEvents(eventStore: EventStore): Promise<void> {
    this.events = await eventStore.getEvents(this.id);
  }

  // Apply new event
  createOrder(customerId: string, items: OrderItem[]): Event {
    const event: Event = {
      eventId: uuid(),
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderCreated',
      version: this.events.length + 1,
      data: { customerId, items, status: 'pending' },
      metadata: {
        userId: getCurrentUserId(),
        timestamp: new Date().toISOString(),
        correlationId: getCorrelationId(),
      },
    };
    this.events.push(event);
    return event;
  }
}

Pattern 4: CQRS with Separate Read/Write Stores

Optimize read and write models independently.

// Write side - command handler
export class CreateOrderCommandHandler {
  constructor(
    private eventStore: EventStore,
    private idGenerator: IdGenerator,
  ) {}

  async handle(command: CreateOrderCommand): Promise<string> {
    const orderId = this.idGenerator.generate();
    const aggregate = new OrderAggregate(orderId);
    
    const event = aggregate.createOrder(command.customerId, command.items);
    await this.eventStore.appendEvent(event);
    
    return orderId;
  }
}

// Read side - query handler (separate database)
export class GetOrderQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: GetOrderQuery): Promise<OrderView | null> {
    // Query optimized read model (e.g., PostgreSQL, Elasticsearch)
    return this.readDb.query(`
      SELECT id, customer_name, items, total, status, created_at
      FROM order_views
      WHERE id = $1
    `, [query.orderId]);
  }
}

// Projection updater - listens to events, updates read model
export class OrderProjectionUpdater {
  async handleEvent(event: Event): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.readDb.upsert({
          id: event.aggregateId,
          customerId: event.data.customerId,
          items: event.data.items,
          total: event.data.items.reduce((sum, i) => sum + i.price * i.quantity, 0),
          status: event.data.status,
          createdAt: event.metadata.timestamp,
        });
        break;
      
      case 'OrderShipped':
        await this.readDb.update(event.aggregateId, {
          status: 'shipped',
          shippedAt: event.metadata.timestamp,
        });
        break;
    }
  }
}

โ„น๏ธ

CQRS adds complexity but provides massive read scalability. Use it when read and write patterns differ significantly (e.g., e-commerce product catalog vs. order processing).

Event-Driven Comparison

ToolThroughputLatencyBest For
SNS + SQS300K/s100msFan-out notifications
EventBridge10K/s<1sContent-based routing
Kafka1M+/s<10msHigh-throughput streaming
Kinesis1MB/s/shard200msReal-time data ingestion

Follow-Up Questions

  1. How do you handle event ordering guarantees in a distributed system with multiple consumers?
  2. What strategies would you use to evolve event schemas without breaking existing consumers?
  3. How would you implement exactly-once processing semantics in an event-driven architecture?

Advertisement