πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Python System Design

Python InterviewSystem Design⭐ Premium

Advertisement

Python System Design

Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Scalable Architecture Patterns

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
from collections import defaultdict

# Load Balancer Implementation
class LoadBalancer:
    """Simple load balancer with round-robin and health checks."""
    
    def __init__(self, servers: List[str]):
        self.servers = servers
        self.current_index = 0
        self.health_status = {server: True for server in servers}
    
    def get_next_server(self) -> Optional[str]:
        """Get next available server using round-robin."""
        attempts = 0
        
        while attempts < len(self.servers):
            server = self.servers[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.servers)
            
            if self.health_status[server]:
                return server
            
            attempts += 1
        
        return None
    
    def mark_unhealthy(self, server: str):
        """Mark server as unhealthy."""
        self.health_status[server] = False
    
    def mark_healthy(self, server: str):
        """Mark server as healthy."""
        self.health_status[server] = True

# Circuit Breaker Pattern
class CircuitBreaker:
    """Circuit breaker for fault tolerance."""
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def can_execute(self) -> bool:
        """Check if request can be executed."""
        if self.state == "closed":
            return True
        
        if self.state == "open":
            if datetime.now().timestamp() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        
        # half-open state
        return True
    
    def record_success(self):
        """Record successful execution."""
        self.failure_count = 0
        self.state = "closed"
    
    def record_failure(self):
        """Record failed execution."""
        self.failure_count += 1
        self.last_failure_time = datetime.now().timestamp()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

# Cache Strategy
class CacheStrategy(ABC):
    """Abstract cache strategy."""
    
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass
    
    @abstractmethod
    def set(self, key: str, value: Any, ttl: Optional[float] = None):
        pass
    
    @abstractmethod
    def delete(self, key: str):
        pass

class LRUCache(CacheStrategy):
    """Least Recently Used cache."""
    
    def __init__(self, maxsize: int = 1000):
        from collections import OrderedDict
        self.cache = OrderedDict()
        self.maxsize = maxsize
    
    def get(self, key: str) -> Optional[Any]:
        if key in self.cache:
            self.cache.move_to_end(key)
            return self.cache[key]
        return None
    
    def set(self, key: str, value: Any, ttl: Optional[float] = None):
        if key in self.cache:
            self.cache.move_to_end(key)
        
        self.cache[key] = value
        
        if len(self.cache) > self.maxsize:
            self.cache.popitem(last=False)
    
    def delete(self, key: str):
        if key in self.cache:
            del self.cache[key]

ℹ️

In system design interviews, focus on scalability, reliability, and availability. Use appropriate patterns for your use case.

Distributed Systems Patterns

import hashlib
from bisect import bisect_right
from typing import List, Dict
import time
from threading import Lock

class ConsistentHashing:
    """Consistent hashing for distributed systems."""
    
    def __init__(self, nodes: List[str], virtual_nodes: int = 150):
        self.ring = {}
        self.sorted_keys = []
        self.virtual_nodes = virtual_nodes
        
        for node in nodes:
            self.add_node(node)
    
    def _hash(self, key: str) -> int:
        """Generate hash for key."""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def add_node(self, node: str):
        """Add node with virtual nodes."""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:v{i}"
            hash_val = self._hash(virtual_key)
            self.ring[hash_val] = node
            self.sorted_keys.append(hash_val)
        
        self.sorted_keys.sort()
    
    def remove_node(self, node: str):
        """Remove node and its virtual nodes."""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:v{i}"
            hash_val = self._hash(virtual_key)
            del self.ring[hash_val]
            self.sorted_keys.remove(hash_val)
    
    def get_node(self, key: str) -> Optional[str]:
        """Get node responsible for key."""
        if not self.ring:
            return None
        
        hash_val = self._hash(key)
        idx = bisect_right(self.sorted_keys, hash_val) % len(self.sorted_keys)
        return self.ring[self.sorted_keys[idx]]

# Rate Limiter
class TokenBucket:
    """Token bucket rate limiter."""
    
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        """Try to consume tokens."""
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = elapsed * self.refill_rate
        
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now

# Distributed Lock
class DistributedLock:
    """Simple distributed lock implementation."""
    
    def __init__(self, lock_id: str, timeout: float = 10.0):
        self.lock_id = lock_id
        self.timeout = timeout
        self.locks = {}
        self.lock = Lock()
    
    def acquire(self, resource: str) -> bool:
        """Try to acquire lock on resource."""
        with self.lock:
            if resource in self.locks:
                lock_info = self.locks[resource]
                if time.time() - lock_info['acquired_at'] < self.timeout:
                    return False
            
            self.locks[resource] = {
                'lock_id': self.lock_id,
                'acquired_at': time.time()
            }
            return True
    
    def release(self, resource: str) -> bool:
        """Release lock on resource."""
        with self.lock:
            if resource in self.locks:
                if self.locks[resource]['lock_id'] == self.lock_id:
                    del self.locks[resource]
                    return True
            return False

Message Queue Pattern

from collections import deque
from typing import Callable, Any
import threading
import time

class MessageQueue:
    """Simple message queue implementation."""
    
    def __init__(self):
        self.queues = {}
        self.consumers = {}
        self.lock = threading.Lock()
    
    def create_queue(self, queue_name: str):
        """Create a new queue."""
        with self.lock:
            if queue_name not in self.queues:
                self.queues[queue_name] = deque()
                self.consumers[queue_name] = []
    
    def publish(self, queue_name: str, message: Any):
        """Publish message to queue."""
        with self.lock:
            if queue_name not in self.queues:
                self.create_queue(queue_name)
            
            self.queues[queue_name].append({
                'message': message,
                'timestamp': time.time()
            })
    
    def subscribe(self, queue_name: str, callback: Callable):
        """Subscribe to queue with callback."""
        with self.lock:
            if queue_name not in self.consumers:
                self.consumers[queue_name] = []
            
            self.consumers[queue_name].append(callback)
    
    def consume(self, queue_name: str):
        """Consume messages from queue."""
        while True:
            with self.lock:
                if queue_name in self.queues and self.queues[queue_name]:
                    msg = self.queues[queue_name].popleft()
                    
                    for callback in self.consumers.get(queue_name, []):
                        try:
                            callback(msg['message'])
                        except Exception as e:
                            print(f"Error processing message: {e}")
            
            time.sleep(0.1)

# Event Sourcing
class EventStore:
    """Event store for event sourcing pattern."""
    
    def __init__(self):
        self.events = []
        self.projections = {}
    
    def append(self, event: Dict):
        """Append event to store."""
        event['timestamp'] = time.time()
        event['version'] = len(self.events) + 1
        self.events.append(event)
        
        # Update projections
        self._update_projections(event)
    
    def get_events(self, aggregate_id: Optional[str] = None) -> List[Dict]:
        """Get events, optionally filtered by aggregate."""
        if aggregate_id:
            return [e for e in self.events if e.get('aggregate_id') == aggregate_id]
        return self.events
    
    def _update_projections(self, event: Dict):
        """Update read models based on event."""
        event_type = event.get('type')
        
        if event_type == 'user_created':
            user_id = event['data']['user_id']
            self.projections[f'user:{user_id}'] = event['data']
        
        elif event_type == 'order_placed':
            order_id = event['data']['order_id']
            self.projections[f'order:{order_id}'] = event['data']

# Usage
event_store = EventStore()

event_store.append({
    'type': 'user_created',
    'aggregate_id': 'user123',
    'data': {'user_id': 'user123', 'name': 'John'}
})

event_store.append({
    'type': 'order_placed',
    'aggregate_id': 'order456',
    'data': {'order_id': 'order456', 'user_id': 'user123', 'amount': 100}
})

⚠️

Event sourcing provides complete audit trail but adds complexity. Use it when you need to track all changes or rebuild state.

Microservices Patterns

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json

@dataclass
class ServiceConfig:
    """Service configuration."""
    name: str
    version: str
    port: int
    dependencies: List[str]

class ServiceRegistry:
    """Service registry for microservices."""
    
    def __init__(self):
        self.services = {}
    
    def register(self, config: ServiceConfig):
        """Register a service."""
        self.services[config.name] = {
            'config': config,
            'health': 'healthy',
            'registered_at': time.time()
        }
    
    def deregister(self, service_name: str):
        """Deregister a service."""
        if service_name in self.services:
            del self.services[service_name]
    
    def discover(self, service_name: str) -> Optional[Dict]:
        """Discover a service."""
        return self.services.get(service_name)
    
    def get_all_services(self) -> Dict:
        """Get all registered services."""
        return self.services

class APIGateway:
    """Simple API gateway."""
    
    def __init__(self, registry: ServiceRegistry):
        self.registry = registry
        self.routes = {}
    
    def add_route(self, path: str, service_name: str):
        """Add route to service."""
        self.routes[path] = service_name
    
    def route_request(self, path: str, method: str = 'GET', data: Any = None):
        """Route request to appropriate service."""
        if path not in self.routes:
            return {'error': 'Route not found'}, 404
        
        service_name = self.routes[path]
        service = self.registry.discover(service_name)
        
        if not service or service['health'] != 'healthy':
            return {'error': 'Service unavailable'}, 503
        
        # Forward request to service
        return {
            'service': service_name,
            'method': method,
            'data': data,
            'status': 'routed'
        }, 200

# Circuit Breaker with Fallback
class ResilientService:
    """Service with circuit breaker and fallback."""
    
    def __init__(self, primary_service, fallback_service=None):
        self.primary = primary_service
        self.fallback = fallback_service
        self.circuit_breaker = CircuitBreaker()
    
    def execute(self, *args, **kwargs):
        """Execute with circuit breaker protection."""
        if not self.circuit_breaker.can_execute():
            if self.fallback:
                return self.fallback(*args, **kwargs)
            raise Exception("Service unavailable")
        
        try:
            result = self.primary(*args, **kwargs)
            self.circuit_breaker.record_success()
            return result
        except Exception as e:
            self.circuit_breaker.record_failure()
            
            if self.fallback:
                return self.fallback(*args, **kwargs)
            raise

Follow-Up Questions

  1. Explain the CAP theorem and its implications for distributed systems.

  2. How would you design a URL shortener like bit.ly?

  3. What are the trade-offs between different caching strategies?

  4. How do you handle data consistency in microservices?

  5. Explain the saga pattern for distributed transactions.

Advertisement