๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Event-Driven Architecture: SNS, SQS, EventBridge, Kafka

Cloud ArchitectureEvent-Driven Architectureโญ Premium

Advertisement

Event-Driven Architecture: SNS, SQS, EventBridge, Kafka

Difficulty: Senior Level | Companies: Netflix, Uber, LinkedIn, AWS, Confluent

Interview Question

"Design an event-driven architecture for an e-commerce platform that handles 1 million events per second with guaranteed delivery and exactly-once processing."

โ„น๏ธKey Concepts

This question tests your understanding of event-driven patterns, message queuing, and distributed systems for high-throughput scenarios.

Complete Event-Driven Architecture

Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    EVENT-DRIVEN ARCHITECTURE                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ EVENT PRODUCERS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  Web App โ”‚  โ”‚ Mobile   โ”‚  โ”‚ IoT      โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  (React) โ”‚  โ”‚ App      โ”‚  โ”‚ Devices  โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚       โ”‚             โ”‚             โ”‚                   โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚          โ”‚             โ”‚             โ”‚                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚  โ”‚                    EVENT BUS LAYER                      โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚              โ”‚
โ”‚  โ”‚  โ”‚              Apache Kafka Cluster                โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                                   โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ Topic 1 โ”‚ โ”‚ Topic 2 โ”‚ โ”‚ Topic 3 โ”‚           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚(Orders) โ”‚ โ”‚(Payments)โ”‚ โ”‚(Inventory)โ”‚          โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                                   โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚              โ”‚
โ”‚  โ”‚  โ”‚              AWS SNS/SQS                          โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                                   โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚  SNS    โ”‚ โ”‚  SQS    โ”‚ โ”‚EventBrdgโ”‚           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ (Fanout)โ”‚ โ”‚ (Queue) โ”‚ โ”‚ (Router)โ”‚           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                                   โ”‚ โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                          โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ EVENT CONSUMERS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚ Order    โ”‚  โ”‚ Payment  โ”‚  โ”‚ Inventoryโ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚ Service  โ”‚  โ”‚ Service  โ”‚  โ”‚ Service  โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚ Shipping โ”‚  โ”‚ Analyticsโ”‚  โ”‚ Notification        โ”‚              โ”‚
โ”‚  โ”‚  โ”‚ Service  โ”‚  โ”‚ Service  โ”‚  โ”‚ Service  โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Event Processing

Throughput Calculation:

  • Total events per second: E = 1,000,000
  • Average event size: S = 1KB
  • Total throughput: T = E ร— S = 1GB/s
  • Kafka partitions needed: P = T / (throughput_per_partition)
  • With 10MB/s per partition: P = 1000 / 10 = 100 partitions

Latency Budget:

  • End-to-end latency: L_total = 100ms
  • Producer to broker: L_pb = 10ms
  • Broker processing: L_bp = 20ms
  • Consumer processing: L_cp = 30ms
  • Network overhead: L_net = 20ms
  • Buffer: L_buffer = 20ms

Exactly-Once Semantics:

  • Idempotency key: UUID per event
  • Transaction ID: Unique per batch
  • Deduplication window: W = 5 minutes
  • Storage needed: S = events ร— idempotency_key_size ร— W

Apache Kafka Implementation

# Kafka producer and consumer
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
from datetime import datetime
import uuid
import hashlib

@dataclass
class KafkaConfig:
    bootstrap_servers: List[str]
    topic: str
    group_id: str
    acks: str = 'all'
    retries: int = 3
    max_in_flight_requests_per_connection: int = 5

class EventProducer:
    """Kafka event producer with exactly-once semantics"""

    def __init__(self, config: KafkaConfig):
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks=config.acks,
            retries=config.retries,
            max_in_flight_requests_per_connection=config.max_in_flight_requests_per_connection,
            enable_idempotence=True,  # Enable exactly-once
            transactional_id=str(uuid.uuid4())
        )
        self.config = config

    def produce_event(self, event_type: str, data: Dict[str, Any], key: str = None):
        """Produce event with idempotency"""
        event = {
            'event_id': str(uuid.uuid4()),
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data,
            'metadata': {
                'producer_id': str(uuid.uuid4()),
                'version': '1.0'
            }
        }

        # Use event_id as key for ordering
        if key is None:
            key = event['event_id']

        try:
            future = self.producer.send(
                topic=self.config.topic,
                key=key,
                value=event
            )
            record_metadata = future.get(timeout=10)
            return {
                'event_id': event['event_id'],
                'topic': record_metadata.topic,
                'partition': record_metadata.partition,
                'offset': record_metadata.offset
            }
        except KafkaError as e:
            print(f"Error producing event: {e}")
            raise

    def produce_batch(self, events: List[Dict[str, Any]]):
        """Produce batch of events"""
        futures = []
        for event in events:
            future = self.producer.send(
                topic=self.config.topic,
                key=event.get('event_id'),
                value=event
            )
            futures.append(future)

        # Wait for all to complete
        for future in futures:
            future.get(timeout=10)

    def flush(self):
        self.producer.flush()

    def close(self):
        self.producer.close()

class EventConsumer:
    """Kafka event consumer with exactly-once processing"""

    def __init__(self, config: KafkaConfig):
        self.consumer = KafkaConsumer(
            config.topic,
            bootstrap_servers=config.bootstrap_servers,
            group_id=config.group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,  # Manual commit for exactly-once
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            max_poll_records=500,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000
        )
        self.config = config
        self.processed_events: set = set()

    def consume_events(self, handler: Callable):
        """Consume events with exactly-once processing"""
        try:
            for message in self.consumer:
                event = message.value
                event_id = event.get('event_id')

                # Check if already processed (deduplication)
                if event_id in self.processed_events:
                    print(f"Skipping duplicate event: {event_id}")
                    continue

                try:
                    # Process event
                    handler(event)

                    # Mark as processed
                    self.processed_events.add(event_id)

                    # Commit offset
                    self.consumer.commit()

                except Exception as e:
                    print(f"Error processing event {event_id}: {e}")
                    # Don't commit - will retry

        except KeyboardInterrupt:
            print("Stopping consumer...")
        finally:
            self.consumer.close()

    def cleanup_old_events(self, max_age_hours: int = 24):
        """Cleanup old processed event IDs"""
        # In production, use Redis or database for this
        pass

# Example usage
def process_order_event(event: Dict[str, Any]):
    """Process order event"""
    print(f"Processing order: {event['data']['order_id']}")
    # Process order...

# Initialize
config = KafkaConfig(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    topic='orders',
    group_id='order-processor'
)

consumer = EventConsumer(config)
consumer.consume_events(process_order_event)

โš ๏ธKafka Configuration

Enable idempotence for exactly-once semantics. Use multiple partitions for parallel processing. Monitor consumer lag to detect processing bottlenecks.

AWS SNS/SQS Integration

# SNS/SQS event-driven architecture
import boto3
import json
from typing import Dict, Any, List
from dataclasses import dataclass

@dataclass
class SNSConfig:
    topic_arn: str
    region: str = 'us-east-1'

class SNSEventPublisher:
    """SNS event publisher"""

    def __init__(self, config: SNSConfig):
        self.sns = boto3.client('sns', region_name=config.region)
        self.topic_arn = config.topic_arn

    def publish_event(self, event_type: str, data: Dict[str, Any], 
                     message_attributes: Dict[str, str] = None):
        """Publish event to SNS"""
        message = {
            'event_type': event_type,
            'data': data,
            'timestamp': datetime.utcnow().isoformat()
        }

        attributes = {}
        if message_attributes:
            for key, value in message_attributes.items():
                attributes[key] = {
                    'DataType': 'String',
                    'StringValue': value
                }

        response = self.sns.publish(
            TopicArn=self.topic_arn,
            Message=json.dumps(message),
            MessageAttributes=attributes
        )
        return response

    def publish_batch(self, events: List[Dict[str, Any]]):
        """Publish batch of events"""
        # SNS doesn't support batch publish directly
        # Use SQS batch publish instead
        pass

class SQSConsumer:
    """SQS event consumer with visibility timeout"""

    def __init__(self, queue_url: str, region: str = 'us-east-1'):
        self.sqs = boto3.client('sqs', region_name=region)
        self.queue_url = queue_url

    def receive_messages(self, max_messages: int = 10, 
                        wait_time_seconds: int = 20) -> List[Dict[str, Any]]:
        """Receive messages from SQS"""
        response = self.sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=wait_time_seconds,
            MessageAttributeNames=['All'],
            VisibilityTimeout=30
        )

        return response.get('Messages', [])

    def process_message(self, message: Dict[str, Any], handler):
        """Process single message"""
        try:
            body = json.loads(message['Body'])
            handler(body)

            # Delete message after successful processing
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
            return True

        except Exception as e:
            print(f"Error processing message: {e}")
            return False

    def change_visibility(self, message: Dict[str, Any], timeout: int):
        """Change message visibility timeout"""
        self.sqs.change_message_visibility(
            QueueUrl=self.queue_url,
            ReceiptHandle=message['ReceiptHandle'],
            VisibilityTimeout=timeout
        )

class DLQHandler:
    """Dead Letter Queue handler"""

    def __init__(self, dlq_url: str, region: str = 'us-east-1'):
        self.sqs = boto3.client('sqs', region_name=region)
        self.dlq_url = dlq_url

    def send_to_dlq(self, message: Dict[str, Any], error: str):
        """Send failed message to DLQ"""
        dlq_message = {
            'original_message': message,
            'error': error,
            'timestamp': datetime.utcnow().isoformat(),
            'retry_count': message.get('Attributes', {}).get('ApproximateReceiveCount', 0)
        }

        self.sqs.send_message(
            QueueUrl=self.dlq_url,
            MessageBody=json.dumps(dlq_message)
        )

    def process_dlq(self, handler):
        """Process messages from DLQ"""
        while True:
            messages = self.receive_messages()
            if not messages:
                break

            for message in messages:
                try:
                    handler(message)
                    self.delete_message(message)
                except Exception as e:
                    print(f"Error processing DLQ message: {e}")

# Example: Order processing with SNS/SQS
"""
Order Service -> SNS Topic -> SQS Queues -> Multiple Consumers

1. Order Service publishes OrderCreated event to SNS
2. SNS fans out to multiple SQS queues:
   - Payment Queue
   - Inventory Queue
   - Notification Queue
3. Each consumer processes independently
4. Failed messages go to DLQ
"""

EventBridge Integration

# EventBridge event routing
import boto3
import json
from typing import Dict, Any, List
from datetime import datetime

class EventBridgePublisher:
    """EventBridge event publisher"""

    def __init__(self, event_bus_name: str = 'default'):
        self.eventbridge = boto3.client('events')
        self.event_bus_name = event_bus_name

    def publish_event(self, source: str, detail_type: str, 
                     detail: Dict[str, Any], resources: List[str] = None):
        """Publish event to EventBridge"""
        entries = [{
            'Source': source,
            'DetailType': detail_type,
            'Detail': json.dumps(detail),
            'EventBusName': self.event_bus_name,
            'Time': datetime.utcnow()
        }]

        if resources:
            entries[0]['Resources'] = resources

        response = self.eventbridge.put_events(Entries=entries)
        return response

    def publish_order_event(self, order_data: Dict[str, Any]):
        """Publish order event"""
        self.publish_event(
            source='com.ecommerce.orders',
            detail_type='OrderCreated',
            detail=order_data,
            resources=[f"arn:aws:ecommerce:orders:{order_data['order_id']}"]
        )

    def publish_payment_event(self, payment_data: Dict[str, Any]):
        """Publish payment event"""
        self.publish_event(
            source='com.ecommerce.payments',
            detail_type='PaymentProcessed',
            detail=payment_data
        )

class EventBridgeRuleManager:
    """EventBridge rule management"""

    def __init__(self):
        self.eventbridge = boto3.client('events')

    def create_routing_rule(self, name: str, source_pattern: str,
                           detail_type_pattern: str, targets: List[Dict[str, str]]):
        """Create event routing rule"""
        # Create rule
        self.eventbridge.put_rule(
            Name=name,
            EventPattern=jsonencode({
                'source': [source_pattern],
                'detail-type': [detail_type_pattern]
            }),
            State='ENABLED',
            Description=f'Route {detail_type_pattern} events from {source_pattern}'
        )

        # Add targets
        for target in targets:
            self.eventbridge.put_targets(
                Rule=name,
                Targets=[{
                    'Id': target['id'],
                    'Arn': target['arn'],
                    'Input': target.get('input', ''),
                    'InputPath': target.get('input_path', ''),
                    'InputTransformer': target.get('input_transformer', {})
                }]
            )

    def create_content_based_routing(self, name: str, rules: List[Dict[str, Any]]):
        """Create content-based routing rules"""
        for rule in rules:
            self.eventbridge.put_rule(
                Name=f"{name}-{rule['name']}",
                EventPattern=json.dumps(rule['pattern']),
                State='ENABLED'
            )

            self.eventbridge.put_targets(
                Rule=f"{name}-{rule['name']}",
                Targets=[{
                    'Id': rule['target_id'],
                    'Arn': rule['target_arn']
                }]
            )

# Example: Content-based routing
"""
{
  "source": ["com.ecommerce.orders"],
  "detail-type": ["OrderCreated"],
  "detail": {
    "status": ["confirmed"]
  }
}

This rule routes OrderCreated events with status=confirmed to a specific target.
"""

โ„น๏ธEventBridge Benefits

EventBridge provides serverless event routing with content-based filtering. Use it for decoupled architectures where producers and consumers don't need to know about each other.

Event Schema Management

# Event schema registry
import json
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
import hashlib

@dataclass
class EventSchema:
    schema_id: str
    event_type: str
    version: str
    schema: Dict[str, Any]
    created_at: datetime
    description: str

class SchemaRegistry:
    """Event schema registry"""

    def __init__(self):
        self.schemas: Dict[str, EventSchema] = {}

    def register_schema(self, event_type: str, version: str, 
                       schema: Dict[str, Any], description: str = "") -> str:
        """Register new event schema"""
        schema_id = hashlib.md5(
            f"{event_type}:{version}:{json.dumps(schema, sort_keys=True)}".encode()
        ).hexdigest()

        self.schemas[schema_id] = EventSchema(
            schema_id=schema_id,
            event_type=event_type,
            version=version,
            schema=schema,
            created_at=datetime.utcnow(),
            description=description
        )

        return schema_id

    def get_schema(self, schema_id: str) -> EventSchema:
        """Get schema by ID"""
        return self.schemas.get(schema_id)

    def get_schema_by_type(self, event_type: str, version: str = None) -> EventSchema:
        """Get schema by event type"""
        for schema in self.schemas.values():
            if schema.event_type == event_type:
                if version is None or schema.version == version:
                    return schema
        return None

    def validate_event(self, schema_id: str, event_data: Dict[str, Any]) -> bool:
        """Validate event against schema"""
        schema = self.get_schema(schema_id)
        if not schema:
            return False

        # Simple validation - in production use jsonschema
        return self._validate_against_schema(event_data, schema.schema)

    def _validate_against_schema(self, data: Dict[str, Any], 
                                schema: Dict[str, Any]) -> bool:
        """Validate data against JSON schema"""
        # Simplified validation
        if 'required' in schema:
            for field in schema['required']:
                if field not in data:
                    return False
        return True

# Example schema
ORDER_CREATED_SCHEMA = {
    "type": "object",
    "properties": {
        "order_id": {"type": "string"},
        "user_id": {"type": "string"},
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "product_id": {"type": "string"},
                    "quantity": {"type": "integer"},
                    "price": {"type": "number"}
                }
            }
        },
        "total": {"type": "number"},
        "currency": {"type": "string"}
    },
    "required": ["order_id", "user_id", "items", "total"]
}

# Register schema
registry = SchemaRegistry()
schema_id = registry.register_schema(
    event_type='OrderCreated',
    version='1.0',
    schema=ORDER_CREATED_SCHEMA,
    description='Schema for order created events'
)

Summary

ComponentPurposeUse Case
KafkaHigh-throughput streamingReal-time event processing
SNSFan-out messagingBroadcast to multiple consumers
SQSMessage queuingReliable message delivery
EventBridgeEvent routingContent-based routing
Schema RegistryEvent validationSchema evolution management

Advertisement