Microservices Architecture: Service Mesh, gRPC, Event-Driven
Difficulty: Staff/Principal Level | Companies: Netflix, Uber, Amazon, Google, Meta
Interview Question
"Design a microservices architecture for an e-commerce platform with 50+ services. How do you handle service-to-service communication, distributed transactions, and observability?"
โน๏ธKey Concepts
This question tests your understanding of distributed systems, service mesh patterns, and event-driven architecture for large-scale microservices deployments.
Complete Microservices Architecture
Service Decomposition
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ E-COMMERCE MICROSERVICES ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโ GATEWAY LAYER โโโโโโโโโโโโโโโโโโโโ โ
โ โ API Gateway โ Auth Gateway โ Rate Limiter โ CDN โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ SERVICE MESH โโโโโโโโโโโโโโโโโโโโโ โ
โ โ Istio/Envoy Sidecar โ mTLS โ Load Balancing โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ CORE SERVICES โโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ User โ โ Product โ โ Order โ โ โ
โ โ โ Service โ โ Service โ โ Service โ โ โ
โ โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ โ
โ โ โ โ โ โ โ
โ โ โโโโโโผโโโโโโ โโโโโโผโโโโโโ โโโโโโผโโโโโโ โ โ
โ โ โ Auth โ โ Inventoryโ โ Payment โ โ โ
โ โ โ Service โ โ Service โ โ Service โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ EVENT BUS โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Kafka โ EventBridge โ SNS/SQS โ Redis Streams โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ DATA LAYER โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ PostgreSQL โ MongoDB โ Redis โ DynamoDB โ S3 โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Service Communication
Request Distribution Model:
- Total services: N = 50
- Average requests per service: R_avg = 1000 RPS
- Service-to-service calls: S = 3 (average fan-out)
- Total internal calls: C_internal = N ร R_avg ร S = 150,000 RPS
Latency Budget:
- Total latency budget: T_budget = 200ms
- Gateway overhead: T_gateway = 10ms
- Network latency per hop: T_network = 2ms
- Service processing time: T_service = 30ms
- Maximum hops: H_max = (T_budget - T_gateway) / (T_network + T_service)
- H_max = (200 - 10) / (2 + 30) = 5.9 โ 5 hops
Consistency Model:
- Eventual consistency window: W = 5s
- Read-after-write consistency required: 99% of requests
- Consistency probability: P = 1 - e^(-t/W)
- For t = 2s: P = 1 - e^(-2/5) = 0.329 = 32.9%
Service Mesh Implementation
Istio Configuration
# Istio VirtualService for traffic management
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: order-service
namespace: ecommerce
spec:
hosts:
- order-service
http:
- match:
- headers:
x-canary:
exact: "true"
route:
- destination:
host: order-service
subset: canary
port:
number: 8080
timeout: 10s
retries:
attempts: 3
perTryTimeout: 2s
retryOn: gateway-error,connect-failure,refused-stream
- route:
- destination:
host: order-service
subset: stable
port:
number: 8080
weight: 90
- route:
- destination:
host: order-service
subset: canary
port:
number: 8080
weight: 10
timeout: 10s
---
# DestinationRule for load balancing and circuit breaking
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: order-service
namespace: ecommerce
spec:
host: order-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
connectTimeout: 5s
http:
h2UpgradePolicy: DEFAULT
http1MaxPendingRequests: 100
http2MaxRequests: 1000
maxRequestsPerConnection: 10
maxRetries: 3
loadBalancer:
simple: LEAST_REQUEST
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
subsets:
- name: stable
labels:
version: v1
- name: canary
labels:
version: v2
mTLS Configuration
# PeerAuthentication for mTLS
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: istio-system
spec:
mtls:
mode: STRICT
---
# AuthorizationPolicy for service-to-service auth
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: order-service-policy
namespace: ecommerce
spec:
selector:
matchLabels:
app: order-service
action: ALLOW
rules:
- from:
- source:
principals:
- "cluster.local/ns/ecommerce/sa/api-gateway"
- "cluster.local/ns/ecommerce/sa/payment-service"
to:
- operation:
methods: ["GET", "POST"]
paths: ["/api/v1/orders/*"]
- from:
- source:
principals:
- "cluster.local/ns/ecommerce/sa/inventory-service"
to:
- operation:
methods: ["PUT"]
paths: ["/api/v1/orders/*/inventory"]
gRPC Service Definition
// order_service.proto
syntax = "proto3";
package ecommerce.orders;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (Order);
rpc GetOrder(GetOrderRequest) returns (Order);
rpc UpdateOrder(UpdateOrderRequest) returns (Order);
rpc ListOrders(ListOrdersRequest) returns (stream Order);
rpc StreamOrderUpdates(StreamOrderRequest) returns (stream OrderUpdate);
}
message Order {
string id = 1;
string user_id = 2;
repeated OrderItem items = 3;
OrderStatus status = 4;
google.protobuf.Timestamp created_at = 5;
google.protobuf.Timestamp updated_at = 6;
Money total = 7;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
Money price = 3;
}
message Money {
int64 amount = 1;
string currency = 2;
}
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
ORDER_STATUS_PENDING = 1;
ORDER_STATUS_CONFIRMED = 2;
ORDER_STATUS_PROCESSING = 3;
ORDER_STATUS_SHIPPED = 4;
ORDER_STATUS_DELIVERED = 5;
ORDER_STATUS_CANCELLED = 6;
}
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
string idempotency_key = 3;
}
message GetOrderRequest {
string order_id = 1;
}
message UpdateOrderRequest {
string order_id = 1;
OrderStatus status = 2;
map<string, string> metadata = 3;
}
message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
string page_token = 3;
OrderStatus status_filter = 4;
}
message StreamOrderRequest {
string order_id = 1;
}
message OrderUpdate {
string order_id = 1;
OrderStatus status = 2;
google.protobuf.Timestamp timestamp = 3;
map<string, string> details = 4;
}
# gRPC server implementation
import grpc
from concurrent import futures
import time
from typing import Iterator
from dataclasses import dataclass
import uuid
from datetime import datetime
# Proto-generated imports (conceptual)
# from order_service_pb2 import *
# from order_service_pb2_grpc import *
@dataclass
class Order:
id: str
user_id: str
items: list
status: str
created_at: datetime
updated_at: datetime
class OrderServiceServicer:
"""gRPC service implementation"""
def __init__(self):
self.orders: dict[str, Order] = {}
def CreateOrder(self, request, context):
"""Create a new order with idempotency"""
order_id = str(uuid.uuid4())
# Check idempotency
existing_order = self._check_idempotency(request.idempotency_key)
if existing_order:
return existing_order
order = Order(
id=order_id,
user_id=request.user_id,
items=[],
status="PENDING",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.orders[order_id] = order
return order
def GetOrder(self, request, context):
"""Get order by ID"""
order = self.orders.get(request.order_id)
if not order:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("Order not found")
return None
return order
def StreamOrderUpdates(self, request, context) -> Iterator:
"""Stream order status updates"""
order_id = request.order_id
# Simulate streaming updates
while True:
order = self.orders.get(order_id)
if order:
yield order
if order.status in ["DELIVERED", "CANCELLED"]:
break
time.sleep(1)
def _check_idempotency(self, idempotency_key: str):
# Check if request was already processed
pass
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
OrderServiceServicer().add_to_server(server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
โ ๏ธgRPC Best Practices
Always implement idempotency for gRPC methods that create or modify resources. Use idempotency keys to prevent duplicate processing.
Event-Driven Architecture
Event Sourcing Pattern
# Event sourcing implementation
from dataclasses import dataclass, field
from typing import List, Any
from datetime import datetime
import json
import uuid
@dataclass
class Event:
event_id: str
event_type: str
aggregate_id: str
data: dict
timestamp: datetime
metadata: dict = field(default_factory=dict)
class EventStore:
"""Event store for event sourcing"""
def __init__(self):
self.events: dict[str, List[Event]] = {}
def append(self, event: Event):
if event.aggregate_id not in self.events:
self.events[event.aggregate_id] = []
self.events[event.aggregate_id].append(event)
def get_events(self, aggregate_id: str) -> List[Event]:
return self.events.get(aggregate_id, [])
def get_events_by_type(self, event_type: str) -> List[Event]:
result = []
for events in self.events.values():
result.extend([e for e in events if e.event_type == event_type])
return result
class Aggregate:
"""Base aggregate for event sourcing"""
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.version = 0
self.uncommitted_events: List[Event] = []
def apply_event(self, event: Event):
self._apply(event)
self.version += 1
def _apply(self, event: Event):
raise NotImplementedError
def get_uncommitted_events(self) -> List[Event]:
return self.uncommitted_events
def clear_uncommitted_events(self):
self.uncommitted_events = []
class OrderAggregate(Aggregate):
"""Order aggregate with event sourcing"""
def __init__(self, aggregate_id: str):
super().__init__(aggregate_id)
self.user_id = None
self.items = []
self.status = "PENDING"
self.total = 0
def create_order(self, user_id: str, items: list):
event = Event(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=self.aggregate_id,
data={
"user_id": user_id,
"items": items,
"status": "PENDING"
},
timestamp=datetime.utcnow()
)
self.apply_event(event)
self.uncommitted_events.append(event)
def confirm_order(self):
if self.status != "PENDING":
raise ValueError("Cannot confirm order in current status")
event = Event(
event_id=str(uuid.uuid4()),
event_type="OrderConfirmed",
aggregate_id=self.aggregate_id,
data={"status": "CONFIRMED"},
timestamp=datetime.utcnow()
)
self.apply_event(event)
self.uncommitted_events.append(event)
def _apply(self, event: Event):
if event.event_type == "OrderCreated":
self.user_id = event.data["user_id"]
self.items = event.data["items"]
self.status = event.data["status"]
self.total = sum(item["price"] * item["quantity"] for item in self.items)
elif event.event_type == "OrderConfirmed":
self.status = event.data["status"]
Saga Pattern for Distributed Transactions
# Saga pattern implementation
from enum import Enum
from typing import Callable, List, Dict
from dataclasses import dataclass
import asyncio
class SagaStep:
"""Individual saga step"""
def __init__(self, name: str, execute: Callable, compensate: Callable):
self.name = name
self.execute = execute
self.compensate = compensate
class SagaStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPENSATING = "COMPENSATING"
COMPENSATED = "COMPENSATED"
@dataclass
class SagaState:
saga_id: str
status: SagaStatus
current_step: int
data: dict
error: str = None
class SagaOrchestrator:
"""Saga orchestrator for distributed transactions"""
def __init__(self):
self.steps: List[SagaStep] = []
self.states: Dict[str, SagaState] = {}
def add_step(self, step: SagaStep):
self.steps.append(step)
async def execute(self, saga_id: str, initial_data: dict) -> SagaState:
state = SagaState(
saga_id=saga_id,
status=SagaStatus.RUNNING,
current_step=0,
data=initial_data
)
self.states[saga_id] = state
try:
for i, step in enumerate(self.steps):
state.current_step = i
state.data = await step.execute(state.data)
state.status = SagaStatus.COMPLETED
return state
except Exception as e:
state.status = SagaStatus.FAILED
state.error = str(e)
# Compensate in reverse order
state.status = SagaStatus.COMPENSATING
for i in range(state.current_step, -1, -1):
try:
await self.steps[i].compensate(state.data)
except Exception:
# Log compensation failure
pass
state.status = SagaStatus.COMPENSATED
return state
# Example: Order creation saga
async def create_order(data: dict) -> dict:
# Create order in database
return {**data, "order_id": "order-123"}
async def reserve_inventory(data: dict) -> dict:
# Reserve inventory
return {**data, "inventory_reserved": True}
async def process_payment(data: dict) -> dict:
# Process payment
return {**data, "payment_id": "pay-456"}
async def compensate_order(data: dict) -> dict:
# Cancel order
return data
async def compensate_inventory(data: dict) -> dict:
# Release inventory
return data
async def compensate_payment(data: dict) -> dict:
# Refund payment
return data
# Create saga
saga = SagaOrchestrator()
saga.add_step(SagaStep("CreateOrder", create_order, compensate_order))
saga.add_step(SagaStep("ReserveInventory", reserve_inventory, compensate_inventory))
saga.add_step(SagaStep("ProcessPayment", process_payment, compensate_payment))
โน๏ธSaga Pattern
The saga pattern manages distributed transactions by breaking them into a sequence of local transactions. Each step has a compensating transaction for rollback.
Service Discovery
# Service discovery with Consul
import consul.aio
import asyncio
from typing import List, Dict
import random
class ServiceDiscovery:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.aio.Consul(host=consul_host, port=consul_port)
self.services: Dict[str, List[str]] = {}
async def register(self, service_name: str, service_id: str, address: str, port: int):
"""Register service with Consul"""
await self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=address,
port=port,
check=consul.Check.http(f"http://{address}:{port}/health", interval="10s")
)
async def deregister(self, service_id: str):
"""Deregister service from Consul"""
await self.consul.agent.service.deregister(service_id)
async def get_service(self, service_name: str) -> str:
"""Get random instance of service"""
_, services = await self.consul.health.service(service_name, passing=True)
if not services:
raise Exception(f"No healthy instances of {service_name}")
# Random load balancing
service = random.choice(services)
return f"http://{service['Service']['Address']}:{service['Service']['Port']}"
async def get_all_services(self, service_name: str) -> List[str]:
"""Get all instances of service"""
_, services = await self.consul.health.service(service_name, passing=True)
return [
f"http://{s['Service']['Address']}:{s['Service']['Port']}"
for s in services
]
# Usage
async def main():
discovery = ServiceDiscovery()
await discovery.register("order-service", "order-1", "10.0.1.10", 8080)
url = await discovery.get_service("order-service")
print(f"Connecting to: {url}")
Circuit Breaker Pattern
# Circuit breaker implementation
from enum import Enum
from typing import Callable
from datetime import datetime, timedelta
import asyncio
from dataclasses import dataclass
class CircuitState(Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
recovery_timeout: timedelta = timedelta(seconds=30)
success_threshold: int = 3
timeout: timedelta = timedelta(seconds=5)
class CircuitBreaker:
"""Circuit breaker for fault tolerance"""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
async def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception(f"Circuit {self.name} is OPEN")
try:
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.config.timeout.total_seconds()
)
self._on_success()
return result
except asyncio.TimeoutError:
self._on_failure()
raise
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
else:
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
return datetime.utcnow() - self.last_failure_time > self.config.recovery_timeout
# Usage
circuit = CircuitBreaker("order-service")
async def call_order_service():
return await circuit.call(lambda: make_http_request("order-service"))
Load Balancing Strategies
# Load balancing algorithms
from typing import List, Dict
from dataclasses import dataclass
from collections import defaultdict
import random
@dataclass
class ServiceInstance:
host: str
port: int
weight: int = 1
healthy: bool = True
current_connections: int = 0
class LoadBalancer:
"""Load balancer with multiple strategies"""
def __init__(self, instances: List[ServiceInstance]):
self.instances = instances
def round_robin(self) -> ServiceInstance:
"""Round-robin selection"""
healthy = [i for i in self.instances if i.healthy]
if not healthy:
raise Exception("No healthy instances")
self.instances = self.instances[1:] + self.instances[:1]
return self.instances[0]
def weighted_round_robin(self) -> ServiceInstance:
"""Weighted round-robin selection"""
healthy = [i for i in self.instances if i.healthy]
if not healthy:
raise Exception("No healthy instances")
total_weight = sum(i.weight for i in healthy)
r = random.randint(0, total_weight - 1)
current = 0
for instance in healthy:
current += instance.weight
if r < current:
return instance
return healthy[-1]
def least_connections(self) -> ServiceInstance:
"""Least connections selection"""
healthy = [i for i in self.instances if i.healthy]
if not healthy:
raise Exception("No healthy instances")
return min(healthy, key=lambda i: i.current_connections)
def ip_hash(self, client_ip: str) -> ServiceInstance:
"""IP hash for session affinity"""
healthy = [i for i in self.instances if i.healthy]
if not healthy:
raise Exception("No healthy instances")
hash_value = hash(client_ip)
index = hash_value % len(healthy)
return healthy[index]
def consistent_hash(self, key: str, virtual_nodes: int = 150) -> ServiceInstance:
"""Consistent hashing for minimal redistribution"""
ring = defaultdict(list)
for instance in self.instances:
for i in range(virtual_nodes):
virtual_key = hash(f"{instance.host}:{instance.port}:{i}")
ring[virtual_key].append(instance)
sorted_keys = sorted(ring.keys())
target_hash = hash(key)
for virtual_key in sorted_keys:
if virtual_key >= target_hash:
return ring[virtual_key][0]
return ring[sorted_keys[0]][0]
โ Service Mesh Benefits
Service mesh provides transparent service-to-service communication, load balancing, circuit breaking, and observability without code changes.
Summary
| Pattern | Purpose | Implementation |
|---|---|---|
| Service Mesh | Transparent communication | Istio/Envoy |
| gRPC | High-performance RPC | Protocol Buffers |
| Event Sourcing | State management | Event store |
| Saga | Distributed transactions | Orchestrator/Choreography |
| Circuit Breaker | Fault tolerance | Hystrix/Resilience4j |
| Load Balancing | Traffic distribution | Multiple algorithms |