๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Message Queue Patterns: Dead Letter, Fan-Out, Saga

Cloud ArchitectureMessage Queue Patternsโญ Premium

Advertisement

Message Queue Patterns: Dead Letter, Fan-Out, Saga

Difficulty: Senior Level | Companies: Netflix, Uber, Amazon, Microsoft, RabbitMQ

Interview Question

"Design a message queue system that handles 1 million messages per second with guaranteed delivery, dead letter handling, and exactly-once processing."

โ„น๏ธKey Concepts

This question tests your understanding of message queue patterns, delivery guarantees, and distributed systems.

Complete Message Queue Architecture

Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MESSAGE QUEUE ARCHITECTURE                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ PRODUCERS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚  โ”‚  Web Apps โ”‚ Microservices โ”‚ IoT Devices           โ”‚                  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                 โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ MESSAGE BROKER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚           Kafka / RabbitMQ / SQS             โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ Topic 1  โ”‚  โ”‚ Topic 2  โ”‚  โ”‚ Topic 3  โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚  โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚  โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ โ”‚Queue โ”‚ โ”‚  โ”‚ โ”‚Queue โ”‚ โ”‚  โ”‚ โ”‚Queue โ”‚ โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ CONSUMERS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚Consumer 1โ”‚  โ”‚Consumer 2โ”‚  โ”‚Consumer 3โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ DEAD LETTER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚
โ”‚  โ”‚  DLQ โ”‚ Retry Queue โ”‚ Alerting                     โ”‚                 โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Queue Metrics

Throughput Calculation:

  • Messages per second: M = 1,000,000
  • Average message size: S = 1KB
  • Total throughput: T = M ร— S = 1GB/s
  • Queue capacity needed: C = T ร— retention_period
  • With 24-hour retention: C = 1GB/s ร— 86400s = 86.4TB

Latency Budget:

  • End-to-end latency: L_total = 100ms
  • Producer to broker: L_pb = 10ms
  • Broker processing: L_bp = 20ms
  • Consumer processing: L_cp = 50ms
  • Network overhead: L_net = 20ms

Dead Letter Rate:

  • Total messages: M = 1,000,000
  • Failure rate: F = 1%
  • DLQ messages: D = M ร— F = 10,000 messages/second
  • DLQ storage needed: S_dlq = D ร— message_size ร— retention

Exactly-Once Semantics:

  • Idempotency key: UUID per message
  • Deduplication window: W = 5 minutes
  • Storage for deduplication: S = messages ร— idempotency_key_size ร— W

Dead Letter Queue Implementation

# Dead letter queue with retry logic
import boto3
import json
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import time

@dataclass
class QueueConfig:
    queue_name: str
    dead_letter_queue_name: str
    max_retries: int = 3
    visibility_timeout: int = 30
    message_retention: int = 1209600  # 14 days

class DeadLetterQueueManager:
    """Dead letter queue manager with retry logic"""

    def __init__(self, config: QueueConfig):
        self.config = config
        self.sqs = boto3.client('sqs')

        # Create queues
        self.queue_url = self._create_queue(config.queue_name)
        self.dlq_url = self._create_queue(config.dead_letter_queue_name)

    def _create_queue(self, queue_name: str) -> str:
        """Create SQS queue"""
        response = self.sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'VisibilityTimeout': str(self.config.visibility_timeout),
                'MessageRetentionPeriod': str(self.config.message_retention),
                'ReceiveMessageWaitTimeSeconds': '20'
            }
        )
        return response['QueueUrl']

    def send_message(self, message: Dict[str, Any], 
                    delay_seconds: int = 0) -> str:
        """Send message to queue"""
        response = self.sqs.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps(message, default=str),
            DelaySeconds=delay_seconds
        )
        return response['MessageId']

    def receive_messages(self, max_messages: int = 10) -> list:
        """Receive messages from queue"""
        response = self.sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=20,
            MessageAttributeNames=['All'],
            VisibilityTimeout=self.config.visibility_timeout
        )
        return response.get('Messages', [])

    def process_message(self, message: Dict[str, Any], 
                       handler: Callable) -> bool:
        """Process message with retry logic"""
        try:
            # Process message
            handler(message)

            # Delete message on success
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
            return True

        except Exception as e:
            # Get retry count
            retry_count = int(
                message.get('MessageAttributes', {})
                .get('RetryCount', {})
                .get('StringValue', '0')
            )

            if retry_count >= self.config.max_retries:
                # Move to DLQ
                self._move_to_dlq(message, str(e))
                return False
            else:
                # Increment retry count and requeue
                self._requeue_with_retry(message, retry_count + 1)
                return False

    def _move_to_dlq(self, message: Dict[str, Any], error: str):
        """Move message to dead letter queue"""
        dlq_message = {
            'original_message': message['Body'],
            'error': error,
            'timestamp': datetime.utcnow().isoformat(),
            'retry_count': self.config.max_retries,
            'queue_name': self.config.queue_name
        }

        self.sqs.send_message(
            QueueUrl=self.dlq_url,
            MessageBody=json.dumps(dlq_message, default=str)
        )

        # Delete from main queue
        self.sqs.delete_message(
            QueueUrl=self.queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )

    def _requeue_with_retry(self, message: Dict[str, Any], retry_count: int):
        """Requeue message with incremented retry count"""
        # Update retry count attribute
        attributes = message.get('MessageAttributes', {})
        attributes['RetryCount'] = {
            'StringValue': str(retry_count),
            'DataType': 'Number'
        }

        # Requeue with delay
        delay = min(retry_count * 10, 900)  # Max 15 minutes
        self.sqs.send_message(
            QueueUrl=self.queue_url,
            MessageBody=message['Body'],
            DelaySeconds=delay,
            MessageAttributes=attributes
        )

        # Delete original message
        self.sqs.delete_message(
            QueueUrl=self.queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )

    def get_dlq_messages(self, max_messages: int = 10) -> list:
        """Get messages from dead letter queue"""
        response = self.sqs.receive_message(
            QueueUrl=self.dlq_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=5
        )
        return response.get('Messages', [])

    def retry_dlq_messages(self, handler: Callable, max_retries: int = 10):
        """Retry messages from DLQ"""
        messages = self.get_dlq_messages(max_retries)

        for message in messages:
            try:
                # Parse DLQ message
                dlq_message = json.loads(message['Body'])
                original_message = json.loads(dlq_message['original_message'])

                # Retry processing
                handler(original_message)

                # Delete from DLQ on success
                self.sqs.delete_message(
                    QueueUrl=self.dlq_url,
                    ReceiptHandle=message['ReceiptHandle']
                )

            except Exception as e:
                print(f"DLQ retry failed: {e}")

Fan-Out Pattern

# Fan-out message pattern
import boto3
import json
from typing import Dict, Any, List
from dataclasses import dataclass

@dataclass
class FanOutConfig:
    topic_arn: str
    subscription_arns: List[str]

class FanOutManager:
    """Fan-out message distribution"""

    def __init__(self):
        self.sns = boto3.client('sns')
        self.sqs = boto3.client('sqs')

    def create_topic(self, topic_name: str) -> str:
        """Create SNS topic"""
        response = self.sns.create_topic(Name=topic_name)
        return response['TopicArn']

    def subscribe_queue(self, topic_arn: str, queue_url: str) -> str:
        """Subscribe SQS queue to SNS topic"""
        response = self.sns.subscribe(
            TopicArn=topic_arn,
            Protocol='sqs',
            Endpoint=queue_url
        )
        return response['SubscriptionArn']

    def publish_to_fanout(self, topic_arn: str, message: Dict[str, Any],
                         message_attributes: Dict[str, str] = None):
        """Publish message to fan-out topic"""
        attributes = {}
        if message_attributes:
            for key, value in message_attributes.items():
                attributes[key] = {
                    'DataType': 'String',
                    'StringValue': value
                }

        self.sns.publish(
            TopicArn=topic_arn,
            Message=json.dumps(message, default=str),
            MessageAttributes=attributes
        )

    def setup_fanout(self, topic_name: str, 
                    queue_names: List[str]) -> FanOutConfig:
        """Setup complete fan-out pattern"""
        # Create topic
        topic_arn = self.create_topic(topic_name)

        # Create and subscribe queues
        subscription_arns = []
        for queue_name in queue_names:
            # Create queue
            queue_url = self.sqs.create_queue(
                QueueName=queue_name
            )['QueueUrl']

            # Subscribe to topic
            subscription_arn = self.subscribe_queue(topic_arn, queue_url)
            subscription_arns.append(subscription_arn)

        return FanOutConfig(
            topic_arn=topic_arn,
            subscription_arns=subscription_arns
        )

class ContentBasedRouter:
    """Content-based message routing"""

    def __init__(self):
        self.sns = boto3.client('sns')
        self.routes: Dict[str, str] = {}

    def add_route(self, pattern: str, topic_arn: str):
        """Add routing rule"""
        self.routes[pattern] = topic_arn

    def route_message(self, message: Dict[str, Any]) -> bool:
        """Route message based on content"""
        message_type = message.get('type', '')

        for pattern, topic_arn in self.routes.items():
            if pattern in message_type:
                self.sns.publish(
                    TopicArn=topic_arn,
                    Message=json.dumps(message, default=str)
                )
                return True

        return False

Saga Pattern

# Saga pattern for distributed transactions
from enum import Enum
from typing import Callable, List, Dict, Any
from dataclasses import dataclass
import asyncio

class SagaStatus(Enum):
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"
    COMPENSATING = "COMPENSATING"
    COMPENSATED = "COMPENSATED"

@dataclass
class SagaStep:
    name: str
    execute: Callable
    compensate: Callable

class SagaOrchestrator:
    """Saga orchestrator for distributed transactions"""

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.states: Dict[str, SagaStatus] = {}

    def add_step(self, step: SagaStep):
        """Add saga step"""
        self.steps.append(step)

    async def execute(self, saga_id: str, initial_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute saga"""
        self.states[saga_id] = SagaStatus.RUNNING

        try:
            data = initial_data

            for i, step in enumerate(self.steps):
                try:
                    data = await step.execute(data)
                except Exception as e:
                    # Compensate in reverse order
                    self.states[saga_id] = SagaStatus.COMPENSATING

                    for j in range(i - 1, -1, -1):
                        try:
                            await self.steps[j].compensate(data)
                        except Exception as comp_error:
                            print(f"Compensation failed for {self.steps[j].name}: {comp_error}")

                    self.states[saga_id] = SagaStatus.FAILED
                    return {'success': False, 'error': str(e)}

            self.states[saga_id] = SagaStatus.COMPLETED
            return {'success': True, 'data': data}

        except Exception as e:
            self.states[saga_id] = SagaStatus.FAILED
            return {'success': False, 'error': str(e)}

class SagaState:
    """Saga state management"""

    def __init__(self):
        self.states: Dict[str, Dict[str, Any]] = {}

    def save_state(self, saga_id: str, state: Dict[str, Any]):
        """Save saga state"""
        self.states[saga_id] = state

    def get_state(self, saga_id: str) -> Dict[str, Any]:
        """Get saga state"""
        return self.states.get(saga_id, {})

    def delete_state(self, saga_id: str):
        """Delete saga state"""
        if saga_id in self.states:
            del self.states[saga_id]

# Example saga implementation
async def create_order(data: Dict[str, Any]) -> Dict[str, Any]:
    """Create order step"""
    order_id = f"order-{len(data.get('orders', [])) + 1}"
    data.setdefault('orders', []).append(order_id)
    return data

async def reserve_inventory(data: Dict[str, Any]) -> Dict[str, Any]:
    """Reserve inventory step"""
    data['inventory_reserved'] = True
    return data

async def process_payment(data: Dict[str, Any]) -> Dict[str, Any]:
    """Process payment step"""
    data['payment_processed'] = True
    return data

async def compensate_order(data: Dict[str, Any]) -> Dict[str, Any]:
    """Compensate order step"""
    data.setdefault('orders', []).pop()
    return data

async def compensate_inventory(data: Dict[str, Any]) -> Dict[str, Any]:
    """Compensate inventory step"""
    data['inventory_reserved'] = False
    return data

async def compensate_payment(data: Dict[str, Any]) -> Dict[str, Any]:
    """Compensate payment step"""
    data['payment_processed'] = False
    return data

# Create saga
saga = SagaOrchestrator()
saga.add_step(SagaStep("CreateOrder", create_order, compensate_order))
saga.add_step(SagaStep("ReserveInventory", reserve_inventory, compensate_inventory))
saga.add_step(SagaStep("ProcessPayment", process_payment, compensate_payment))

โš ๏ธSaga Pattern

The saga pattern manages distributed transactions by breaking them into local transactions with compensating actions. Ensure all steps are idempotent.

Message Queue Monitoring

# Queue monitoring and alerting
import boto3
from typing import Dict, Any
from datetime import datetime, timedelta

class QueueMonitor:
    """Message queue monitoring"""

    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.sqs = boto3.client('sqs')

    def get_queue_metrics(self, queue_url: str) -> Dict[str, Any]:
        """Get queue metrics"""
        queue_name = queue_url.split('/')[-1]

        # Get approximate number of messages
        response = self.sqs.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=[
                'ApproximateNumberOfMessages',
                'ApproximateNumberOfMessagesNotVisible',
                'ApproximateAgeOfOldestMessage'
            ]
        )

        attributes = response['Attributes']

        return {
            'visible_messages': int(attributes.get('ApproximateNumberOfMessages', 0)),
            'in_flight_messages': int(attributes.get('ApproximateNumberOfMessagesNotVisible', 0)),
            'oldest_message_age': int(attributes.get('ApproximateAgeOfOldestMessage', 0))
        }

    def create_queue_alarm(self, queue_name: str, metric_name: str,
                          threshold: float, alarm_name: str):
        """Create CloudWatch alarm for queue"""
        self.cloudwatch.put_metric_alarm(
            AlarmName=alarm_name,
            AlarmDescription=f'Alarm for {metric_name} on {queue_name}',
            MetricName=metric_name,
            Namespace='AWS/SQS',
            Statistic='Average',
            Period=300,
            EvaluationPeriods=3,
            Threshold=threshold,
            ComparisonOperator='GreaterThanThreshold',
            Dimensions=[
                {
                    'Name': 'QueueName',
                    'Value': queue_name
                }
            ],
            AlarmActions=[],
            OKActions=[]
        )

    def monitor_dlq(self, dlq_url: str, threshold: int = 100):
        """Monitor dead letter queue"""
        metrics = self.get_queue_metrics(dlq_url)

        if metrics['visible_messages'] > threshold:
            self._send_dlq_alert(dlq_url, metrics)

    def _send_dlq_alert(self, dlq_url: str, metrics: Dict[str, Any]):
        """Send DLQ alert"""
        sns = boto3.client('sns')
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789012:alerts',
            Message=f"DLQ threshold exceeded: {metrics['visible_messages']} messages",
            Subject='DLQ Alert'
        )

โœ…Message Queue Benefits

Message queues provide decoupling, reliability, and scalability. Use dead letter queues for error handling and saga pattern for distributed transactions.

Summary

PatternPurposeUse Case
Point-to-PointSingle consumerTask processing
Publish-SubscribeMultiple consumersEvent broadcasting
Dead LetterError handlingFailed message processing
Fan-OutMessage distributionMultiple downstream services
SagaDistributed transactionsMulti-service workflows

Advertisement