Python System Design
Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Scalable Architecture Patterns
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
from collections import defaultdict
# Load Balancer Implementation
class LoadBalancer:
"""Simple load balancer with round-robin and health checks."""
def __init__(self, servers: List[str]):
self.servers = servers
self.current_index = 0
self.health_status = {server: True for server in servers}
def get_next_server(self) -> Optional[str]:
"""Get next available server using round-robin."""
attempts = 0
while attempts < len(self.servers):
server = self.servers[self.current_index]
self.current_index = (self.current_index + 1) % len(self.servers)
if self.health_status[server]:
return server
attempts += 1
return None
def mark_unhealthy(self, server: str):
"""Mark server as unhealthy."""
self.health_status[server] = False
def mark_healthy(self, server: str):
"""Mark server as healthy."""
self.health_status[server] = True
# Circuit Breaker Pattern
class CircuitBreaker:
"""Circuit breaker for fault tolerance."""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def can_execute(self) -> bool:
"""Check if request can be executed."""
if self.state == "closed":
return True
if self.state == "open":
if datetime.now().timestamp() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
# half-open state
return True
def record_success(self):
"""Record successful execution."""
self.failure_count = 0
self.state = "closed"
def record_failure(self):
"""Record failed execution."""
self.failure_count += 1
self.last_failure_time = datetime.now().timestamp()
if self.failure_count >= self.failure_threshold:
self.state = "open"
# Cache Strategy
class CacheStrategy(ABC):
"""Abstract cache strategy."""
@abstractmethod
def get(self, key: str) -> Optional[Any]:
pass
@abstractmethod
def set(self, key: str, value: Any, ttl: Optional[float] = None):
pass
@abstractmethod
def delete(self, key: str):
pass
class LRUCache(CacheStrategy):
"""Least Recently Used cache."""
def __init__(self, maxsize: int = 1000):
from collections import OrderedDict
self.cache = OrderedDict()
self.maxsize = maxsize
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
return None
def set(self, key: str, value: Any, ttl: Optional[float] = None):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.maxsize:
self.cache.popitem(last=False)
def delete(self, key: str):
if key in self.cache:
del self.cache[key]
βΉοΈ
In system design interviews, focus on scalability, reliability, and availability. Use appropriate patterns for your use case.
Distributed Systems Patterns
import hashlib
from bisect import bisect_right
from typing import List, Dict
import time
from threading import Lock
class ConsistentHashing:
"""Consistent hashing for distributed systems."""
def __init__(self, nodes: List[str], virtual_nodes: int = 150):
self.ring = {}
self.sorted_keys = []
self.virtual_nodes = virtual_nodes
for node in nodes:
self.add_node(node)
def _hash(self, key: str) -> int:
"""Generate hash for key."""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""Add node with virtual nodes."""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:v{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
self.sorted_keys.append(hash_val)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""Remove node and its virtual nodes."""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:v{i}"
hash_val = self._hash(virtual_key)
del self.ring[hash_val]
self.sorted_keys.remove(hash_val)
def get_node(self, key: str) -> Optional[str]:
"""Get node responsible for key."""
if not self.ring:
return None
hash_val = self._hash(key)
idx = bisect_right(self.sorted_keys, hash_val) % len(self.sorted_keys)
return self.ring[self.sorted_keys[idx]]
# Rate Limiter
class TokenBucket:
"""Token bucket rate limiter."""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self.lock = Lock()
def consume(self, tokens: int = 1) -> bool:
"""Try to consume tokens."""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
# Distributed Lock
class DistributedLock:
"""Simple distributed lock implementation."""
def __init__(self, lock_id: str, timeout: float = 10.0):
self.lock_id = lock_id
self.timeout = timeout
self.locks = {}
self.lock = Lock()
def acquire(self, resource: str) -> bool:
"""Try to acquire lock on resource."""
with self.lock:
if resource in self.locks:
lock_info = self.locks[resource]
if time.time() - lock_info['acquired_at'] < self.timeout:
return False
self.locks[resource] = {
'lock_id': self.lock_id,
'acquired_at': time.time()
}
return True
def release(self, resource: str) -> bool:
"""Release lock on resource."""
with self.lock:
if resource in self.locks:
if self.locks[resource]['lock_id'] == self.lock_id:
del self.locks[resource]
return True
return False
Message Queue Pattern
from collections import deque
from typing import Callable, Any
import threading
import time
class MessageQueue:
"""Simple message queue implementation."""
def __init__(self):
self.queues = {}
self.consumers = {}
self.lock = threading.Lock()
def create_queue(self, queue_name: str):
"""Create a new queue."""
with self.lock:
if queue_name not in self.queues:
self.queues[queue_name] = deque()
self.consumers[queue_name] = []
def publish(self, queue_name: str, message: Any):
"""Publish message to queue."""
with self.lock:
if queue_name not in self.queues:
self.create_queue(queue_name)
self.queues[queue_name].append({
'message': message,
'timestamp': time.time()
})
def subscribe(self, queue_name: str, callback: Callable):
"""Subscribe to queue with callback."""
with self.lock:
if queue_name not in self.consumers:
self.consumers[queue_name] = []
self.consumers[queue_name].append(callback)
def consume(self, queue_name: str):
"""Consume messages from queue."""
while True:
with self.lock:
if queue_name in self.queues and self.queues[queue_name]:
msg = self.queues[queue_name].popleft()
for callback in self.consumers.get(queue_name, []):
try:
callback(msg['message'])
except Exception as e:
print(f"Error processing message: {e}")
time.sleep(0.1)
# Event Sourcing
class EventStore:
"""Event store for event sourcing pattern."""
def __init__(self):
self.events = []
self.projections = {}
def append(self, event: Dict):
"""Append event to store."""
event['timestamp'] = time.time()
event['version'] = len(self.events) + 1
self.events.append(event)
# Update projections
self._update_projections(event)
def get_events(self, aggregate_id: Optional[str] = None) -> List[Dict]:
"""Get events, optionally filtered by aggregate."""
if aggregate_id:
return [e for e in self.events if e.get('aggregate_id') == aggregate_id]
return self.events
def _update_projections(self, event: Dict):
"""Update read models based on event."""
event_type = event.get('type')
if event_type == 'user_created':
user_id = event['data']['user_id']
self.projections[f'user:{user_id}'] = event['data']
elif event_type == 'order_placed':
order_id = event['data']['order_id']
self.projections[f'order:{order_id}'] = event['data']
# Usage
event_store = EventStore()
event_store.append({
'type': 'user_created',
'aggregate_id': 'user123',
'data': {'user_id': 'user123', 'name': 'John'}
})
event_store.append({
'type': 'order_placed',
'aggregate_id': 'order456',
'data': {'order_id': 'order456', 'user_id': 'user123', 'amount': 100}
})
β οΈ
Event sourcing provides complete audit trail but adds complexity. Use it when you need to track all changes or rebuild state.
Microservices Patterns
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json
@dataclass
class ServiceConfig:
"""Service configuration."""
name: str
version: str
port: int
dependencies: List[str]
class ServiceRegistry:
"""Service registry for microservices."""
def __init__(self):
self.services = {}
def register(self, config: ServiceConfig):
"""Register a service."""
self.services[config.name] = {
'config': config,
'health': 'healthy',
'registered_at': time.time()
}
def deregister(self, service_name: str):
"""Deregister a service."""
if service_name in self.services:
del self.services[service_name]
def discover(self, service_name: str) -> Optional[Dict]:
"""Discover a service."""
return self.services.get(service_name)
def get_all_services(self) -> Dict:
"""Get all registered services."""
return self.services
class APIGateway:
"""Simple API gateway."""
def __init__(self, registry: ServiceRegistry):
self.registry = registry
self.routes = {}
def add_route(self, path: str, service_name: str):
"""Add route to service."""
self.routes[path] = service_name
def route_request(self, path: str, method: str = 'GET', data: Any = None):
"""Route request to appropriate service."""
if path not in self.routes:
return {'error': 'Route not found'}, 404
service_name = self.routes[path]
service = self.registry.discover(service_name)
if not service or service['health'] != 'healthy':
return {'error': 'Service unavailable'}, 503
# Forward request to service
return {
'service': service_name,
'method': method,
'data': data,
'status': 'routed'
}, 200
# Circuit Breaker with Fallback
class ResilientService:
"""Service with circuit breaker and fallback."""
def __init__(self, primary_service, fallback_service=None):
self.primary = primary_service
self.fallback = fallback_service
self.circuit_breaker = CircuitBreaker()
def execute(self, *args, **kwargs):
"""Execute with circuit breaker protection."""
if not self.circuit_breaker.can_execute():
if self.fallback:
return self.fallback(*args, **kwargs)
raise Exception("Service unavailable")
try:
result = self.primary(*args, **kwargs)
self.circuit_breaker.record_success()
return result
except Exception as e:
self.circuit_breaker.record_failure()
if self.fallback:
return self.fallback(*args, **kwargs)
raise
Follow-Up Questions
-
Explain the CAP theorem and its implications for distributed systems.
-
How would you design a URL shortener like bit.ly?
-
What are the trade-offs between different caching strategies?
-
How do you handle data consistency in microservices?
-
Explain the saga pattern for distributed transactions.