Caching: functools.lru_cache, Redis, TTL, Cache Invalidation
Caching patterns for high-performance applications
Interview Question
"Explain caching strategies in Python. How does functools.lru_cache work? When would you use Redis vs in-memory caching? How do you handle cache invalidation?"
Difficulty: Medium | Frequently asked at Google, Meta, Amazon
Theoretical Foundation
Why Cache?
# Without caching:
# 1. Repeated expensive computations
# 2. Multiple database queries
# 3. API calls for same data
# 4. High latency, low throughput
# With caching:
# 1. Fast access to computed results
# 2. Reduced database load
# 3. Lower API costs
# 4. Better user experience
ℹ️
Key Concept: Caching trades memory for speed. Store frequently accessed data for faster retrieval.
functools.lru_cache
Basic Usage
import functools
import time
# Simple LRU cache
@functools.lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
"""Calculate Fibonacci number with caching."""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# Usage
start = time.time()
result = fibonacci(100)
elapsed = time.time() - start
print(f"Fibonacci(100) = {result}")
print(f"Time: {elapsed:.4f}s")
print(f"Cache info: {fibonacci.cache_info()}")
Output:
Fibonacci(100) = 354224848179261915075
Time: 0.0002s
CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)
Advanced LRU Cache
import functools
import time
from typing import Any, Callable
# Cache with custom key function
def lru_cache_with_key(key_func: Callable):
"""LRU cache with custom key function."""
def decorator(func: Callable) -> Callable:
cache = {}
cache_order = []
maxsize = 128
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
key = key_func(*args, **kwargs)
if key in cache:
# Move to end (most recently used)
cache_order.remove(key)
cache_order.append(key)
return cache[key]
# Compute result
result = func(*args, **kwargs)
# Add to cache
if len(cache) >= maxsize:
# Remove oldest
oldest_key = cache_order.pop(0)
del cache[oldest_key]
cache[key] = result
cache_order.append(key)
return result
def cache_info():
return {
'hits': len(cache_order) - len(set(cache_order)),
'size': len(cache),
'maxsize': maxsize
}
def cache_clear():
cache.clear()
cache_order.clear()
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper
return decorator
# Usage
@lru_cache_with_key(key_func=lambda x, y: (x, y))
def add(x, y):
return x + y
print(add(1, 2))
print(add(1, 2)) # Cache hit
print(f"Cache info: {add.cache_info()}")
TTL Cache
import time
import functools
from typing import Any, Dict, Optional
class TTLCache:
"""Time-to-live cache implementation."""
def __init__(self, maxsize: int = 128, ttl: float = 60.0):
self.maxsize = maxsize
self.ttl = ttl
self.cache: Dict[str, tuple] = {}
self.timestamps: Dict[str, float] = {}
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
# Expired
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
"""Set value in cache."""
if len(self.cache) >= self.maxsize:
# Remove oldest
oldest_key = min(self.timestamps, key=self.timestamps.get)
del self.cache[oldest_key]
del self.timestamps[oldest_key]
self.cache[key] = value
self.timestamps[key] = time.time()
def delete(self, key: str):
"""Delete value from cache."""
self.cache.pop(key, None)
self.timestamps.pop(key, None)
def clear(self):
"""Clear all cache."""
self.cache.clear()
self.timestamps.clear()
def ttl_cache(ttl: float = 60.0, maxsize: int = 128):
"""Decorator for TTL caching."""
cache = TTLCache(maxsize=maxsize, ttl=ttl)
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
result = cache.get(key)
if result is not None:
return result
result = func(*args, **kwargs)
cache.set(key, result)
return result
wrapper.cache_clear = cache.clear
return wrapper
return decorator
# Usage
@ttl_cache(ttl=30.0, maxsize=100)
def expensive_operation(x: int) -> int:
time.sleep(0.1) # Simulate work
return x * 2
# First call - computed
result1 = expensive_operation(5)
print(f"First: {result1}")
# Second call - cached
result2 = expensive_operation(5)
print(f"Second: {result2}")
💡
Interview Tip: LRU cache is great for pure functions with deterministic outputs. TTL cache is better for data that changes over time.
Redis Caching
Basic Redis Operations
import redis
import json
import pickle
from typing import Any, Optional
class RedisCache:
"""Redis cache implementation."""
def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0):
self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
value = self.client.get(key)
if value:
return json.loads(value)
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""Set value in cache with TTL."""
self.client.setex(key, ttl, json.dumps(value))
def delete(self, key: str):
"""Delete value from cache."""
self.client.delete(key)
def exists(self, key: str) -> bool:
"""Check if key exists."""
return self.client.exists(key)
def incr(self, key: str) -> int:
"""Increment counter."""
return self.client.incr(key)
def expire(self, key: str, ttl: int):
"""Set expiration time."""
self.client.expire(key, ttl)
def get_many(self, keys: list) -> list:
"""Get multiple values."""
values = self.client.mget(keys)
return [json.loads(v) if v else None for v in values]
def set_many(self, mapping: dict, ttl: int = 3600):
"""Set multiple values."""
pipe = self.client.pipeline()
for key, value in mapping.items():
pipe.setex(key, ttl, json.dumps(value))
pipe.execute()
# Usage
cache = RedisCache()
# Basic operations
cache.set("user:1", {"name": "Alice", "age": 30}, ttl=300)
user = cache.get("user:1")
print(f"User: {user}")
# Counter
cache.incr("page_views")
cache.incr("page_views")
print(f"Page views: {cache.get('page_views')}")
# Multiple operations
cache.set_many({
"user:1:name": "Alice",
"user:1:email": "alice@example.com"
}, ttl=300)
Redis with Decorators
import redis
import json
import functools
from typing import Any, Callable
class RedisDecorator:
"""Decorator for Redis caching."""
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
key = f"{func.__name__}:{args}:{kwargs}"
# Check cache
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Compute result
result = func(*args, **kwargs)
# Store in cache
self.redis.setex(key, self.ttl, json.dumps(result))
return result
return wrapper
# Usage
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
cache_decorator = RedisDecorator(redis_client, ttl=300)
@cache_decorator
def get_user(user_id: int) -> dict:
"""Get user from database."""
# Simulate database query
return {"id": user_id, "name": f"User_{user_id}"}
# First call - database query
user1 = get_user(1)
print(f"User 1: {user1}")
# Second call - cache hit
user2 = get_user(1)
print(f"User 2: {user2}")
Redis Patterns
import redis
import json
from typing import List, Dict
class RedisPatterns:
"""Common Redis patterns."""
def __init__(self, client: redis.Redis):
self.client = client
# 1. Rate Limiting
def is_rate_limited(self, key: str, limit: int, window: int) -> bool:
"""Check if rate limited."""
current = self.client.incr(key)
if current == 1:
self.client.expire(key, window)
return current > limit
# 2. Distributed Lock
def acquire_lock(self, lock_name: str, timeout: int = 10) -> bool:
"""Acquire distributed lock."""
return self.client.setnx(f"lock:{lock_name}", 1) and \
self.client.expire(f"lock:{lock_name}", timeout)
def release_lock(self, lock_name: str):
"""Release distributed lock."""
self.client.delete(f"lock:{lock_name}")
# 3. Session Storage
def set_session(self, session_id: str, data: dict, ttl: int = 1800):
"""Store session data."""
self.client.setex(f"session:{session_id}", ttl, json.dumps(data))
def get_session(self, session_id: str) -> dict:
"""Get session data."""
data = self.client.get(f"session:{session_id}")
return json.loads(data) if data else None
# 4. Leaderboard
def add_score(self, leaderboard: str, user_id: str, score: float):
"""Add score to leaderboard."""
self.client.zadd(f"leaderboard:{leaderboard}", {user_id: score})
def get_top_scores(self, leaderboard: str, count: int = 10) -> List[Dict]:
"""Get top scores from leaderboard."""
results = self.client.zrevrange(
f"leaderboard:{leaderboard}", 0, count-1, withscores=True
)
return [{"user_id": uid.decode(), "score": score} for uid, score in results]
# Usage
patterns = RedisPatterns(redis.Redis())
# Rate limiting
for i in range(10):
limited = patterns.is_rate_limited("api:rate", limit=5, window=60)
print(f"Request {i+1}: {'Limited' if limited else 'Allowed'}")
ℹ️
Redis Use Case: Redis is excellent for distributed caching, rate limiting, and session storage.
Cache Invalidation Strategies
Time-Based Invalidation
import time
from typing import Any, Dict, Optional
class TimeBasedCache:
"""Cache with time-based expiration."""
def __init__(self):
self.cache: Dict[str, tuple] = {}
def set(self, key: str, value: Any, ttl: float = 60.0):
"""Set value with TTL."""
self.cache[key] = (value, time.time() + ttl)
def get(self, key: str) -> Optional[Any]:
"""Get value if not expired."""
if key in self.cache:
value, expiry = self.cache[key]
if time.time() < expiry:
return value
else:
del self.cache[key]
return None
# Usage
cache = TimeBasedCache()
cache.set("data", "value", ttl=2.0)
print(cache.get("data")) # "value"
time.sleep(3)
print(cache.get("data")) # None
Event-Based Invalidation
from typing import Callable, Dict, List, Set
from collections import defaultdict
class EventBasedCache:
"""Cache with event-based invalidation."""
def __init__(self):
self.cache: Dict[str, Any] = {}
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.invalidators: Dict[str, List[Callable]] = defaultdict(list)
def set(self, key: str, value: Any, depends_on: List[str] = None):
"""Set value with dependencies."""
self.cache[key] = value
if depends_on:
for dep in depends_on:
self.dependencies[dep].add(key)
def get(self, key: str) -> Optional[Any]:
"""Get value."""
return self.cache.get(key)
def invalidate(self, dependency: str):
"""Invalidate all keys depending on this dependency."""
for key in self.dependencies.get(dependency, []):
self.cache.pop(key, None)
print(f"Invalidated: {key}")
def on_invalidate(self, dependency: str, callback: Callable):
"""Register invalidation callback."""
self.invalidators[dependency].append(callback)
# Usage
cache = EventBasedCache()
# Set data with dependencies
cache.set("user:1", {"name": "Alice"}, depends_on=["users"])
cache.set("user:1:posts", ["post1", "post2"], depends_on=["user:1"])
cache.set("user:1:followers", ["user2", "user3"], depends_on=["user:1"])
# Invalidate user data
cache.invalidate("user:1")
print(f"User after invalidation: {cache.get('user:1')}")
print(f"Posts after invalidation: {cache.get('user:1:posts')}")
Write-Through Cache
from typing import Any, Callable
import time
class WriteThroughCache:
"""Write-through cache pattern."""
def __init__(self, backend: Callable):
self.cache = {}
self.backend = backend
def get(self, key: str) -> Any:
"""Get from cache or backend."""
if key in self.cache:
return self.cache[key]
# Fetch from backend
value = self.backend.get(key)
if value is not None:
self.cache[key] = value
return value
def set(self, key: str, value: Any):
"""Write to both cache and backend."""
self.cache[key] = value
self.backend.set(key, value)
def delete(self, key: str):
"""Delete from both cache and backend."""
self.cache.pop(key, None)
self.backend.delete(key)
# Usage
class Database:
def __init__(self):
self.data = {}
def get(self, key):
return self.data.get(key)
def set(self, key, value):
self.data[key] = value
def delete(self, key):
self.data.pop(key, None)
db = Database()
cache = WriteThroughCache(db)
# Write-through
cache.set("user:1", {"name": "Alice"})
print(f"Cache: {cache.get('user:1')}")
print(f"Database: {db.get('user:1')}")
Cache-Aside Pattern
from typing import Any, Callable
import time
class CacheAside:
"""Cache-aside pattern."""
def __init__(self, cache: dict, backend: Callable, ttl: float = 60.0):
self.cache = cache
self.backend = backend
self.ttl = ttl
self.timestamps = {}
def get(self, key: str) -> Any:
"""Get with cache-aside."""
# Check cache
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
# Cache miss - fetch from backend
value = self.backend.get(key)
if value is not None:
self.cache[key] = value
self.timestamps[key] = time.time()
return value
def invalidate(self, key: str):
"""Invalidate cache entry."""
self.cache.pop(key, None)
self.timestamps.pop(key, None)
# Usage
cache = {}
backend = Database()
cacheAside = CacheAside(cache, backend, ttl=30)
# First access - cache miss
user = cacheAside.get("user:1")
print(f"First: {user}")
# Second access - cache hit
user = cacheAside.get("user:1")
print(f"Second: {user}")
💡
Interview Tip: Cache invalidation is one of the hardest problems in computer science. Choose the right strategy for your use case.
Advanced Patterns
Multi-Level Cache
from typing import Any, Optional
import time
class MultiLevelCache:
"""Multi-level cache (L1: memory, L2: Redis)."""
def __init__(self, l1_ttl: float = 60.0, l2_ttl: int = 3600):
self.l1_cache = {} # In-memory
self.l1_timestamps = {}
self.l1_ttl = l1_ttl
self.l2_cache = None # Would be Redis in production
self.l2_ttl = l2_ttl
def get(self, key: str) -> Optional[Any]:
"""Get from multi-level cache."""
# L1 check
if key in self.l1_cache:
if time.time() - self.l1_timestamps[key] < self.l1_ttl:
return self.l1_cache[key]
del self.l1_cache[key]
del self.l1_timestamps[key]
# L2 check (would query Redis)
# value = self.l2_cache.get(key)
value = None
if value is not None:
# Promote to L1
self.l1_cache[key] = value
self.l1_timestamps[key] = time.time()
return value
return None
def set(self, key: str, value: Any):
"""Set in both cache levels."""
# L1
self.l1_cache[key] = value
self.l1_timestamps[key] = time.time()
# L2 (would set in Redis)
# self.l2_cache.setex(key, self.l2_ttl, json.dumps(value))
# Usage
cache = MultiLevelCache()
cache.set("user:1", {"name": "Alice"})
print(cache.get("user:1"))
Cache Warming
from typing import List, Any
import time
class CacheWarmer:
"""Warm cache with predicted data."""
def __init__(self, cache, backend):
self.cache = cache
self.backend = backend
def warm_keys(self, keys: List[str]):
"""Warm cache for specific keys."""
for key in keys:
value = self.backend.get(key)
if value is not None:
self.cache.set(key, value)
print(f"Warmed: {key}")
def warm_by_pattern(self, pattern: str):
"""Warm cache by key pattern."""
# In production, scan Redis for matching keys
keys = [f"{pattern}:{i}" for i in range(100)]
self.warm_keys(keys)
# Usage
cache = {}
backend = Database()
warmer = CacheWarmer(cache, backend)
warmer.warm_keys(["user:1", "user:2", "user:3"])
Interview Tips
Common Follow-up Questions
-
"When would you use Redis vs in-memory cache?"
- In-memory: Single process, fast, limited size
- Redis: Distributed, persistent, shared
-
"How do you handle cache stampede?"
- Locking during cache miss
- Probabilistic early expiration
- Background refresh
-
"What's cache invalidation?"
- Removing stale data from cache
- Strategies: time-based, event-based, write-through
Code Review Tips
# BAD: No cache invalidation
cache.set("user:1", user_data)
# User data changes but cache not updated!
# GOOD: Invalidate on write
def update_user(user_id, data):
user = db.update(user_id, data)
cache.invalidate(f"user:{user_id}")
# BAD: Cache everything forever
@lru_cache(maxsize=1000000) # Memory leak!
def get_data(key):
return db.get(key)
# GOOD: TTL-based caching
@ttl_cache(ttl=300) # Expires after 5 minutes
def get_data(key):
return db.get(key)
# BAD: No cache monitoring
# GOOD: Track cache metrics
print(f"Cache hit rate: {hits/(hits+misses)*100:.1f}%")
⚠️
Common Mistake: Not monitoring cache hit rates leads to suboptimal caching strategies.
Summary
| Strategy | Use Case | Complexity |
|---|---|---|
| LRU Cache | Pure functions | Low |
| TTL Cache | Time-sensitive data | Low |
| Redis | Distributed systems | Medium |
| Write-Through | Strong consistency | Medium |
| Cache-Aside | Read-heavy workloads | Low |
| Multi-Level | Performance optimization | High |
Best Practices
- Use LRU cache for pure functions
- Use TTL cache for time-sensitive data
- Use Redis for distributed systems
- Implement cache invalidation
- Monitor cache hit rates
- Consider cache stampede
ℹ️
Key Takeaway: Caching is essential for performance, but requires careful consideration of invalidation strategies.
Practice Problems
- LRU Cache: Implement an LRU cache from scratch
- TTL Cache: Build a TTL cache with automatic expiration
- Redis Cache: Implement Redis caching with decorators
- Cache Invalidation: Design a cache invalidation system
- Multi-Level Cache: Build a multi-level cache system
Further Reading
- functools.lru_cache: Python documentation
- Redis: https://redis.io/docs/
- Caching Patterns: Martin Kleppmann's "Designing Data-Intensive Applications"
- Books: "High Performance Python" by Micha Gorelick
Remember: Caching is a trade-off between memory, speed, and consistency. Choose wisely.