Message Queue Patterns: Dead Letter, Fan-Out, Saga
Difficulty: Senior Level | Companies: Netflix, Uber, Amazon, Microsoft, RabbitMQ
Interview Question
"Design a message queue system that handles 1 million messages per second with guaranteed delivery, dead letter handling, and exactly-once processing."
โน๏ธKey Concepts
This question tests your understanding of message queue patterns, delivery guarantees, and distributed systems.
Complete Message Queue Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MESSAGE QUEUE ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ PRODUCERS โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Web Apps โ Microservices โ IoT Devices โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ MESSAGE BROKER โโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Kafka / RabbitMQ / SQS โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ Topic 1 โ โ Topic 2 โ โ Topic 3 โ โ โ โ
โ โ โ โ โโโโโโโโ โ โ โโโโโโโโ โ โ โโโโโโโโ โ โ โ โ
โ โ โ โ โQueue โ โ โ โQueue โ โ โ โQueue โ โ โ โ โ
โ โ โ โ โโโโโโโโ โ โ โโโโโโโโ โ โ โโโโโโโโ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ CONSUMERS โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โConsumer 1โ โConsumer 2โ โConsumer 3โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ DEAD LETTER โโโโโโโโโโโโโโโโโโโโโ โ
โ โ DLQ โ Retry Queue โ Alerting โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Queue Metrics
Throughput Calculation:
- Messages per second: M = 1,000,000
- Average message size: S = 1KB
- Total throughput: T = M ร S = 1GB/s
- Queue capacity needed: C = T ร retention_period
- With 24-hour retention: C = 1GB/s ร 86400s = 86.4TB
Latency Budget:
- End-to-end latency: L_total = 100ms
- Producer to broker: L_pb = 10ms
- Broker processing: L_bp = 20ms
- Consumer processing: L_cp = 50ms
- Network overhead: L_net = 20ms
Dead Letter Rate:
- Total messages: M = 1,000,000
- Failure rate: F = 1%
- DLQ messages: D = M ร F = 10,000 messages/second
- DLQ storage needed: S_dlq = D ร message_size ร retention
Exactly-Once Semantics:
- Idempotency key: UUID per message
- Deduplication window: W = 5 minutes
- Storage for deduplication: S = messages ร idempotency_key_size ร W
Dead Letter Queue Implementation
# Dead letter queue with retry logic
import boto3
import json
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
@dataclass
class QueueConfig:
queue_name: str
dead_letter_queue_name: str
max_retries: int = 3
visibility_timeout: int = 30
message_retention: int = 1209600 # 14 days
class DeadLetterQueueManager:
"""Dead letter queue manager with retry logic"""
def __init__(self, config: QueueConfig):
self.config = config
self.sqs = boto3.client('sqs')
# Create queues
self.queue_url = self._create_queue(config.queue_name)
self.dlq_url = self._create_queue(config.dead_letter_queue_name)
def _create_queue(self, queue_name: str) -> str:
"""Create SQS queue"""
response = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'VisibilityTimeout': str(self.config.visibility_timeout),
'MessageRetentionPeriod': str(self.config.message_retention),
'ReceiveMessageWaitTimeSeconds': '20'
}
)
return response['QueueUrl']
def send_message(self, message: Dict[str, Any],
delay_seconds: int = 0) -> str:
"""Send message to queue"""
response = self.sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps(message, default=str),
DelaySeconds=delay_seconds
)
return response['MessageId']
def receive_messages(self, max_messages: int = 10) -> list:
"""Receive messages from queue"""
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=20,
MessageAttributeNames=['All'],
VisibilityTimeout=self.config.visibility_timeout
)
return response.get('Messages', [])
def process_message(self, message: Dict[str, Any],
handler: Callable) -> bool:
"""Process message with retry logic"""
try:
# Process message
handler(message)
# Delete message on success
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return True
except Exception as e:
# Get retry count
retry_count = int(
message.get('MessageAttributes', {})
.get('RetryCount', {})
.get('StringValue', '0')
)
if retry_count >= self.config.max_retries:
# Move to DLQ
self._move_to_dlq(message, str(e))
return False
else:
# Increment retry count and requeue
self._requeue_with_retry(message, retry_count + 1)
return False
def _move_to_dlq(self, message: Dict[str, Any], error: str):
"""Move message to dead letter queue"""
dlq_message = {
'original_message': message['Body'],
'error': error,
'timestamp': datetime.utcnow().isoformat(),
'retry_count': self.config.max_retries,
'queue_name': self.config.queue_name
}
self.sqs.send_message(
QueueUrl=self.dlq_url,
MessageBody=json.dumps(dlq_message, default=str)
)
# Delete from main queue
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
def _requeue_with_retry(self, message: Dict[str, Any], retry_count: int):
"""Requeue message with incremented retry count"""
# Update retry count attribute
attributes = message.get('MessageAttributes', {})
attributes['RetryCount'] = {
'StringValue': str(retry_count),
'DataType': 'Number'
}
# Requeue with delay
delay = min(retry_count * 10, 900) # Max 15 minutes
self.sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=message['Body'],
DelaySeconds=delay,
MessageAttributes=attributes
)
# Delete original message
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
def get_dlq_messages(self, max_messages: int = 10) -> list:
"""Get messages from dead letter queue"""
response = self.sqs.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=5
)
return response.get('Messages', [])
def retry_dlq_messages(self, handler: Callable, max_retries: int = 10):
"""Retry messages from DLQ"""
messages = self.get_dlq_messages(max_retries)
for message in messages:
try:
# Parse DLQ message
dlq_message = json.loads(message['Body'])
original_message = json.loads(dlq_message['original_message'])
# Retry processing
handler(original_message)
# Delete from DLQ on success
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
print(f"DLQ retry failed: {e}")
Fan-Out Pattern
# Fan-out message pattern
import boto3
import json
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class FanOutConfig:
topic_arn: str
subscription_arns: List[str]
class FanOutManager:
"""Fan-out message distribution"""
def __init__(self):
self.sns = boto3.client('sns')
self.sqs = boto3.client('sqs')
def create_topic(self, topic_name: str) -> str:
"""Create SNS topic"""
response = self.sns.create_topic(Name=topic_name)
return response['TopicArn']
def subscribe_queue(self, topic_arn: str, queue_url: str) -> str:
"""Subscribe SQS queue to SNS topic"""
response = self.sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_url
)
return response['SubscriptionArn']
def publish_to_fanout(self, topic_arn: str, message: Dict[str, Any],
message_attributes: Dict[str, str] = None):
"""Publish message to fan-out topic"""
attributes = {}
if message_attributes:
for key, value in message_attributes.items():
attributes[key] = {
'DataType': 'String',
'StringValue': value
}
self.sns.publish(
TopicArn=topic_arn,
Message=json.dumps(message, default=str),
MessageAttributes=attributes
)
def setup_fanout(self, topic_name: str,
queue_names: List[str]) -> FanOutConfig:
"""Setup complete fan-out pattern"""
# Create topic
topic_arn = self.create_topic(topic_name)
# Create and subscribe queues
subscription_arns = []
for queue_name in queue_names:
# Create queue
queue_url = self.sqs.create_queue(
QueueName=queue_name
)['QueueUrl']
# Subscribe to topic
subscription_arn = self.subscribe_queue(topic_arn, queue_url)
subscription_arns.append(subscription_arn)
return FanOutConfig(
topic_arn=topic_arn,
subscription_arns=subscription_arns
)
class ContentBasedRouter:
"""Content-based message routing"""
def __init__(self):
self.sns = boto3.client('sns')
self.routes: Dict[str, str] = {}
def add_route(self, pattern: str, topic_arn: str):
"""Add routing rule"""
self.routes[pattern] = topic_arn
def route_message(self, message: Dict[str, Any]) -> bool:
"""Route message based on content"""
message_type = message.get('type', '')
for pattern, topic_arn in self.routes.items():
if pattern in message_type:
self.sns.publish(
TopicArn=topic_arn,
Message=json.dumps(message, default=str)
)
return True
return False
Saga Pattern
# Saga pattern for distributed transactions
from enum import Enum
from typing import Callable, List, Dict, Any
from dataclasses import dataclass
import asyncio
class SagaStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPENSATING = "COMPENSATING"
COMPENSATED = "COMPENSATED"
@dataclass
class SagaStep:
name: str
execute: Callable
compensate: Callable
class SagaOrchestrator:
"""Saga orchestrator for distributed transactions"""
def __init__(self):
self.steps: List[SagaStep] = []
self.states: Dict[str, SagaStatus] = {}
def add_step(self, step: SagaStep):
"""Add saga step"""
self.steps.append(step)
async def execute(self, saga_id: str, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute saga"""
self.states[saga_id] = SagaStatus.RUNNING
try:
data = initial_data
for i, step in enumerate(self.steps):
try:
data = await step.execute(data)
except Exception as e:
# Compensate in reverse order
self.states[saga_id] = SagaStatus.COMPENSATING
for j in range(i - 1, -1, -1):
try:
await self.steps[j].compensate(data)
except Exception as comp_error:
print(f"Compensation failed for {self.steps[j].name}: {comp_error}")
self.states[saga_id] = SagaStatus.FAILED
return {'success': False, 'error': str(e)}
self.states[saga_id] = SagaStatus.COMPLETED
return {'success': True, 'data': data}
except Exception as e:
self.states[saga_id] = SagaStatus.FAILED
return {'success': False, 'error': str(e)}
class SagaState:
"""Saga state management"""
def __init__(self):
self.states: Dict[str, Dict[str, Any]] = {}
def save_state(self, saga_id: str, state: Dict[str, Any]):
"""Save saga state"""
self.states[saga_id] = state
def get_state(self, saga_id: str) -> Dict[str, Any]:
"""Get saga state"""
return self.states.get(saga_id, {})
def delete_state(self, saga_id: str):
"""Delete saga state"""
if saga_id in self.states:
del self.states[saga_id]
# Example saga implementation
async def create_order(data: Dict[str, Any]) -> Dict[str, Any]:
"""Create order step"""
order_id = f"order-{len(data.get('orders', [])) + 1}"
data.setdefault('orders', []).append(order_id)
return data
async def reserve_inventory(data: Dict[str, Any]) -> Dict[str, Any]:
"""Reserve inventory step"""
data['inventory_reserved'] = True
return data
async def process_payment(data: Dict[str, Any]) -> Dict[str, Any]:
"""Process payment step"""
data['payment_processed'] = True
return data
async def compensate_order(data: Dict[str, Any]) -> Dict[str, Any]:
"""Compensate order step"""
data.setdefault('orders', []).pop()
return data
async def compensate_inventory(data: Dict[str, Any]) -> Dict[str, Any]:
"""Compensate inventory step"""
data['inventory_reserved'] = False
return data
async def compensate_payment(data: Dict[str, Any]) -> Dict[str, Any]:
"""Compensate payment step"""
data['payment_processed'] = False
return data
# Create saga
saga = SagaOrchestrator()
saga.add_step(SagaStep("CreateOrder", create_order, compensate_order))
saga.add_step(SagaStep("ReserveInventory", reserve_inventory, compensate_inventory))
saga.add_step(SagaStep("ProcessPayment", process_payment, compensate_payment))
โ ๏ธSaga Pattern
The saga pattern manages distributed transactions by breaking them into local transactions with compensating actions. Ensure all steps are idempotent.
Message Queue Monitoring
# Queue monitoring and alerting
import boto3
from typing import Dict, Any
from datetime import datetime, timedelta
class QueueMonitor:
"""Message queue monitoring"""
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.sqs = boto3.client('sqs')
def get_queue_metrics(self, queue_url: str) -> Dict[str, Any]:
"""Get queue metrics"""
queue_name = queue_url.split('/')[-1]
# Get approximate number of messages
response = self.sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible',
'ApproximateAgeOfOldestMessage'
]
)
attributes = response['Attributes']
return {
'visible_messages': int(attributes.get('ApproximateNumberOfMessages', 0)),
'in_flight_messages': int(attributes.get('ApproximateNumberOfMessagesNotVisible', 0)),
'oldest_message_age': int(attributes.get('ApproximateAgeOfOldestMessage', 0))
}
def create_queue_alarm(self, queue_name: str, metric_name: str,
threshold: float, alarm_name: str):
"""Create CloudWatch alarm for queue"""
self.cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription=f'Alarm for {metric_name} on {queue_name}',
MetricName=metric_name,
Namespace='AWS/SQS',
Statistic='Average',
Period=300,
EvaluationPeriods=3,
Threshold=threshold,
ComparisonOperator='GreaterThanThreshold',
Dimensions=[
{
'Name': 'QueueName',
'Value': queue_name
}
],
AlarmActions=[],
OKActions=[]
)
def monitor_dlq(self, dlq_url: str, threshold: int = 100):
"""Monitor dead letter queue"""
metrics = self.get_queue_metrics(dlq_url)
if metrics['visible_messages'] > threshold:
self._send_dlq_alert(dlq_url, metrics)
def _send_dlq_alert(self, dlq_url: str, metrics: Dict[str, Any]):
"""Send DLQ alert"""
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:alerts',
Message=f"DLQ threshold exceeded: {metrics['visible_messages']} messages",
Subject='DLQ Alert'
)
โ Message Queue Benefits
Message queues provide decoupling, reliability, and scalability. Use dead letter queues for error handling and saga pattern for distributed transactions.
Summary
| Pattern | Purpose | Use Case |
|---|---|---|
| Point-to-Point | Single consumer | Task processing |
| Publish-Subscribe | Multiple consumers | Event broadcasting |
| Dead Letter | Error handling | Failed message processing |
| Fan-Out | Message distribution | Multiple downstream services |
| Saga | Distributed transactions | Multi-service workflows |