🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python Caching — Speed Up Your Applications

Python AdvancedCaching🟢 Free Lesson

Advertisement

Python Caching — Speed Up Your Applications

Caching stores results of expensive operations to avoid recomputation. A cache hit is 1000x faster than recomputing. This tutorial covers functools.lru_cache, cachetools TTLCache, Redis integration, and cache invalidation strategies.

Learning Objectives

  • Use functools.lru_cache for memoization
  • Implement in-memory caching with TTL
  • Integrate Redis for distributed caching
  • Apply cache invalidation strategies
  • Build a complete caching layer for APIs

Why Cache?

Architecture Diagram
Without Cache:
Request -> Database Query (100ms) -> Response
Request -> Database Query (100ms) -> Response
Request -> Database Query (100ms) -> Response
Total: 300ms

With Cache:
Request -> Cache Hit (1ms) -> Response
Request -> Cache Hit (1ms) -> Response
Request -> Cache Miss -> Database (100ms) -> Cache Store -> Response
Total: 103ms (3x faster!)

Caching Strategies Comparison

StrategyDescriptionUse Case
Cache-asideApp checks cache first, falls back to DBMost common
Write-throughWrite to cache and DB simultaneouslyConsistency critical
Write-behindWrite to cache, async write to DBHigh write throughput
Read-throughCache fetches from DB on missTransparent to app

functools.lru_cache

The easiest way to add caching — just one decorator.

from functools import lru_cache

@lru_cache(maxsize=128)  # Cache up to 128 results
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# First call: computes recursively (slow)
print(fibonacci(100))  # Fast!

# Subsequent calls: returns cached result (instant)
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)

# Clear cache
fibonacci.cache_clear()

lru_cache with Parameters

from functools import lru_cache

@lru_cache(maxsize=256)
def get_user(user_id: int) -> dict:
    """Expensive database query."""
    print(f"Fetching user {user_id} from database...")
    return {"id": user_id, "name": f"User {user_id}"}

# Cache hit (no print)
user = get_user(1)

# Cache miss (prints "Fetching...")
user = get_user(2)

# Cache hit (no print)
user = get_user(1)

# Cache stats
print(get_user.cache_info())
# CacheInfo(hits=1, misses=2, maxsize=256, currsize=2)

cachetools TTLCache

cachetools provides TTL (time-to-live) support that lru_cache lacks.

# pip install cachetools
from cachetools import TTLCache, LRUCache, LFUCache

# TTL cache: expires after 300 seconds
cache = TTLCache(maxsize=100, ttl=300)

def get_user(user_id):
    if user_id in cache:
        return cache[user_id]

    # Expensive database query
    user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
    cache[user_id] = user
    return user

# LRU cache: evicts least recently used
lru_cache = LRUCache(maxsize=100)

# LFU cache: evicts least frequently used
lfu_cache = LFUCache(maxsize=100)

In-Memory Cache with TTL

import time
from typing import Any, Optional
from threading import Lock

class SimpleCache:
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl
        self._lock = Lock()

    def get(self, key: str) -> Optional[Any]:
        with self._lock:
            if key in self.cache:
                value, timestamp = self.cache[key]
                if time.time() - timestamp < self.default_ttl:
                    return value
                del self.cache[key]  # Expired
        return None

    def set(self, key: str, value: Any, ttl: int = None):
        with self._lock:
            self.cache[key] = (value, time.time())

    def delete(self, key: str):
        with self._lock:
            self.cache.pop(key, None)

    def clear(self):
        with self._lock:
            self.cache.clear()

    def cleanup(self):
        """Remove expired entries."""
        with self._lock:
            now = time.time()
            expired = [
                key for key, (_, timestamp) in self.cache.items()
                if now - timestamp >= self.default_ttl
            ]
            for key in expired:
                del self.cache[key]

    def size(self) -> int:
        return len(self.cache)

# Usage
cache = SimpleCache(default_ttl=60)
cache.set("user:123", {"name": "Alice"})
user = cache.get("user:123")  # Returns dict
cache.delete("user:123")

Redis Caching

Basic Redis Cache

import redis
import json
from typing import Any, Optional

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def cache_get(key: str) -> Optional[Any]:
    """Get value from Redis cache."""
    cached = r.get(key)
    if cached:
        return json.loads(cached)
    return None

def cache_set(key: str, value: Any, ttl: int = 300):
    """Set value in Redis cache with TTL."""
    r.setex(key, ttl, json.dumps(value))

def cache_delete(key: str):
    """Delete value from Redis cache."""
    r.delete(key)

def cache_pattern(pattern: str) -> list:
    """Get all keys matching pattern."""
    return r.keys(pattern)

Cache Decorator with Redis

import redis
import json
import functools
from typing import Any, Callable

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def redis_cache(ttl: int = 300, prefix: str = "cache"):
    """Decorator to cache function results in Redis."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from function name and arguments
            key_parts = [prefix, func.__name__] + [str(a) for a in args]
            key_parts += [f"{k}={v}" for k, v in sorted(kwargs.items())]
            cache_key = ":".join(key_parts)

            # Try cache
            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)

            # Call function
            result = func(*args, **kwargs)

            # Store in cache
            r.setex(cache_key, ttl, json.dumps(result))

            return result
        return wrapper
    return decorator

# Usage
@redis_cache(ttl=600, prefix="user")
def get_user(user_id: int) -> dict:
    # Expensive database query
    return {"id": user_id, "name": f"User {user_id}"}

Cache-Aside Pattern

import redis
import json
from typing import Optional, Callable

class CacheAside:
    def __init__(self, redis_client, default_ttl: int = 300):
        self.redis = redis_client
        self.default_ttl = default_ttl

    def get_or_set(
        self,
        key: str,
        fetch_fn: Callable,
        ttl: int = None
    ) -> any:
        """Get from cache or fetch and cache."""
        # Try cache first
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        # Cache miss — fetch from source
        value = fetch_fn()

        # Store in cache
        self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value)
        )

        return value

    def invalidate(self, pattern: str):
        """Invalidate all keys matching pattern."""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

# Usage
cache = CacheAside(r, default_ttl=600)

# Get user (cache-aside)
user = cache.get_or_set(
    "user:123",
    lambda: db.get_user(123),
    ttl=300
)

# Invalidate user cache on update
db.update_user(123, {"name": "New Name"})
cache.invalidate("user:123")

Cache Invalidation Strategies

Time-Based (TTL)

# Simplest strategy: data expires after N seconds
cache.set("user:123", user_data, ttl=300)  # 5 minutes

# Good for:
# - Data that changes infrequently
# - When staleness is acceptable
# - Reducing database load

Event-Based

def update_user(user_id, data):
    db.update(user_id, data)
    r.delete(f"user:{user_id}")  # Invalidate cache

def delete_user(user_id):
    db.delete(user_id)
    r.delete(f"user:{user_id}")
    r.delete(f"user:{user_id}:posts")  # Related caches too

# Good for:
# - Data that changes frequently
# - When consistency is critical
# - Real-time applications

Pattern-Based

def invalidate_pattern(pattern: str):
    keys = r.keys(pattern)
    if keys:
        r.delete(*keys)

invalidate_pattern("user:*")        # Clear all user caches
invalidate_pattern("user:123:*")    # Clear all caches for user 123
invalidate_pattern("product:*")     # Clear all product caches

# Good for:
# - Bulk invalidation
# - Related data cleanup
# - Cache warming

Version-Based

import hashlib
import json

class VersionedCache:
    def __init__(self, redis_client):
        self.redis = redis_client

    def set_versioned(self, key: str, value: any, version: int):
        versioned_key = f"{key}:v{version}"
        self.redis.setex(versioned_key, 3600, json.dumps(value))
        self.redis.set(f"{key}:version", version)

    def get_versioned(self, key: str):
        version = self.redis.get(f"{key}:version")
        if not version:
            return None
        versioned_key = f"{key}:v{version}"
        cached = self.redis.get(versioned_key)
        return json.loads(cached) if cached else None

# Usage
cache = VersionedCache(r)
cache.set_versioned("config", {"debug": True}, version=1)

Cache Warming

import redis
import json
from concurrent.futures import ThreadPoolExecutor

class CacheWarmer:
    def __init__(self, redis_client):
        self.redis = redis_client

    def warm_user_cache(self, user_ids: list):
        """Pre-load user data into cache."""
        def fetch_and_cache(user_id):
            user = db.get_user(user_id)
            self.redis.setex(
                f"user:{user_id}",
                300,
                json.dumps(user)
            )

        with ThreadPoolExecutor(max_workers=10) as executor:
            executor.map(fetch_and_cache, user_ids)

    def warm_product_cache(self):
        """Pre-load popular products."""
        products = db.get_popular_products(limit=100)
        pipeline = self.redis.pipeline()
        for product in products:
            pipeline.setex(
                f"product:{product['id']}",
                600,
                json.dumps(product)
            )
        pipeline.execute()

# Usage
warmer = CacheWarmer(r)
warmer.warm_user_cache([1, 2, 3, 4, 5])
warmer.warm_product_cache()

Common Mistakes

MistakeProblemSolution
Caching everythingMemory/cache thrashingCache only expensive operations
No TTLStale data foreverAlways set expiration
Not invalidatingStale data after updatesInvalidate on writes
Cache stampedeMultiple requests hit DBUse locks or singleflight
Wrong cache keyWrong data returnedInclude all parameters in key
No cache monitoringUnknown hit ratioTrack hits/misses

Best Practices

  1. Use lru_cache for the easiest caching
  2. Use TTL (time-to-live) for cache expiration
  3. Redis enables distributed caching across servers
  4. Cache invalidation is hard — choose strategies carefully
  5. Cache expensive computations, not simple lookups
  6. Monitor cache hit ratio — aim for > 80%
  7. Use @lru_cache for recursive functions
  8. Consider cache warming for critical data
  9. Use cache-aside pattern for most use cases
  10. Always handle cache failures gracefully — fall back to database

Key Takeaways

  1. lru_cache is the easiest way to add caching
  2. Use TTL (time-to-live) for cache expiration
  3. Redis enables distributed caching across servers
  4. Cache invalidation is hard — choose strategies carefully
  5. Cache expensive computations, not simple lookups
  6. Monitor cache hit ratio (aim for greater than 80%)
  7. Use @lru_cache for recursive functions
  8. Consider cache warming for critical data
  9. Always handle cache failures gracefully
  10. Use cache-aside pattern for most applications

Premium Content

Python Caching — Speed Up Your Applications

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement