πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-Time Recommendation System (Embeddings + ANN + Redis)

AI/ML ProjectsRecommendation Systems⭐ Premium

Advertisement

Real-Time Recommendation System

Embeddings + ANN + Redis | Sub-Millisecond Serving

Advanced14+ HoursProduction-Ready

Project Overview

Problem Statement

E-commerce and content platforms need to deliver personalized recommendations in real-time. Traditional collaborative filtering is too slow for online serving. This system uses pre-computed embeddings with approximate nearest neighbor (ANN) search in Redis to serve recommendations in under 1ms.

Objectives

  • Build user and item embedding models from interaction data
  • Implement ANN search using HNSW indexes in Redis
  • Create a real-time feature store for user context
  • Deploy with A/B testing infrastructure
  • Achieve sub-millisecond recommendation latency at 10K QPS
ComponentTechnology
Embedding ModelSentence Transformers + Custom
Vector StoreRedis + HNSW
Feature StoreRedis + Feature Store
API FrameworkFastAPI + gRPC
A/B TestingOptimizely / LaunchDarkly
MonitoringPrometheus + Grafana
Experiment TrackingMLflow

Architecture Diagram

Architecture Diagram
+-------------------------------------------------------------------+
|              Real-Time Recommendation Architecture                |
+-------------------------------------------------------------------+
|  +--------------+    +--------------+    +------------------+     |
|  | User Events  |--->| Stream Proc  |--->| Embedding Gen    |     |
|  |  (Kafka)     |    |  (Flink)     |    |  (GPU Workers)   |     |
|  +--------------+    +--------------+    +--------+---------+     |
|                                                  |               |
|                                                  v               |
|  +--------------+    +--------------+    +------------------+     |
|  | Real-time    |<---| Redis HNSW   |<---| Embedding Store  |     |
|  | Serving API  |    | Index        |    | (Pre-computed)   |     |
|  +--------------+    +--------------+    +------------------+     |
|        |                                                   |     |
|        v                                                   v     |
|  +--------------+    +--------------+    +------------------+     |
|  |  Response    |    |  A/B Testing |    |  Feature Store   |     |
|  |  Ranking     |    |  Layer       |    |  (User Context)  |     |
|  +--------------+    +--------------+    +------------------+     |
+-------------------------------------------------------------------+

Step-by-Step Implementation

Step 1: Environment Setup

mkdir rec-system && cd rec-system
python -m venv venv && source venv/bin/activate
pip install fastapi uvicorn redis[async] sentence-transformers
pip install numpy faiss-cpu torch pydantic
pip install mlflow kafka-python asyncpg
pip install prometheus-client grpcio

Step 2: Embedding Model Training

Train a custom embedding model using user interaction data with contrastive learning.

# src/embeddings/train_embeddings.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sentence_transformers import SentenceTransformer, InputExample, losses
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator
import numpy as np
from typing import List, Tuple, Dict
import mlflow


class InteractionDataset(Dataset):
    def __init__(self, interactions: List[Dict]):
        self.pairs = []
        for inter in interactions:
            # Positive: items the user interacted with
            # Negative: random items the user did not interact with
            self.pairs.append(InputExample(
                texts=[inter["user_profile"], inter["item_description"]],
                label=float(inter["engagement_score"])
            ))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        return self.pairs[idx]


class EmbeddingTrainer:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def train(
        self,
        train_data: List[Dict],
        eval_data: List[Dict],
        epochs: int = 10,
        batch_size: int = 64,
        learning_rate: float = 2e-5,
    ):
        train_dataset = InteractionDataset(train_data)
        train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

        # Cosine similarity loss
        train_loss = losses.CosineSimilarityLoss(self.model)

        # Evaluator
        eval_examples = [InputExample(
            texts=[e["user_profile"], e["item_description"]],
            label=float(e["engagement_score"])
        ) for e in eval_data]
        evaluator = EmbeddingSimilarityEvaluator.from_input_examples(
            eval_examples, name="eval"
        )

        mlflow.set_experiment("embedding-training")
        with mlflow.start_run():
            mlflow.log_params({
                "model_name": "all-MiniLM-L6-v2",
                "epochs": epochs,
                "batch_size": batch_size,
                "learning_rate": learning_rate,
            })

            self.model.fit(
                train_objectives=[(train_dataloader, train_loss)],
                evaluator=evaluator,
                epochs=epochs,
                warmup_steps=100,
                output_path="./trained_embeddings",
                show_progress_bar=True,
            )

    def encode(self, texts: List[str], batch_size: int = 256) -> np.ndarray:
        return self.model.encode(texts, batch_size=batch_size, show_progress_bar=True)

Step 3: Redis ANN Index Setup

Build and manage HNSW indexes in Redis for fast approximate nearest neighbor search.

# src/storage/redis_index.py
import redis.asyncio as redis
import numpy as np
from typing import List, Tuple, Optional
import json
import logging

logger = logging.getLogger(__name__)


class RedisVectorIndex:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=False)

    async def create_index(
        self, index_name: str, dimension: int = 384,
        ef_construction: int = 200, m: int = 16
    ):
        try:
            await self.redis.execute_command(
                "FT.CREATE", index_name,
                "ON", "HASH",
                "PREFIX", "1", f"{index_name}:",
                "SCHEMA",
                "vector", "VECTOR", "HNSW", "6",
                "TYPE", "FLOAT32",
                "DIM", str(dimension),
                "DISTANCE_METRIC", "COSINE",
                "EF_CONSTRUCTION", str(ef_construction),
                "M", str(m),
            )
            logger.info(f"Created index {index_name} with dim={dimension}")
        except redis.ResponseError as e:
            if "Index already exists" not in str(e):
                raise

    async def upsert_embeddings(
        self, index_name: str,
        ids: List[str], embeddings: np.ndarray,
        metadata: Optional[List[dict]] = None
    ):
        pipe = self.redis.pipeline()
        for i, (item_id, embedding) in enumerate(zip(ids, embeddings)):
            key = f"{index_name}:{item_id}"
            data = {"vector": embedding.astype(np.float32).tobytes()}
            if metadata and i < len(metadata):
                data["meta"] = json.dumps(metadata[i])
            pipe.hset(key, mapping=data)
        await pipe.execute()
        logger.info(f"Upserted {len(ids)} embeddings to {index_name}")

    async def search(
        self, index_name: str, query_vector: np.ndarray,
        top_k: int = 10,
    ) -> List[Tuple[str, float]]:
        query = f"*=>[KNN {top_k} @vector $query_vec AS score]"
        result = await self.redis.execute_command(
            "FT.SEARCH", index_name, query,
            "PARAMS", "2", "query_vec", query_vector.astype(np.float32).tobytes(),
            "DIALECT", "2",
            "SORTBY", "score", "ASC",
        )
        matches = []
        for i in range(2, len(result), 2):
            key = result[i].decode().split(":")[-1]
            score = float(result[i+1][result[i+1].index(b"score") + 6])
            matches.append((key, score))
        return matches

Step 4: Real-Time Serving API

Build a high-performance FastAPI service for real-time recommendation serving.

# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
from contextlib import asynccontextmanager
import numpy as np
import time

from src.storage.redis_index import RedisVectorIndex
from src.embeddings.train_embeddings import EmbeddingTrainer
from src.features.user_features import UserFeatureStore


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.redis = RedisVectorIndex()
    app.state.embedder = EmbeddingTrainer("./trained_embeddings")
    app.state.features = UserFeatureStore()
    await app.state.redis.create_index("items", dimension=384)
    yield
    await app.state.redis.redis.close()


app = FastAPI(title="Rec System API", version="1.0.0", lifespan=lifespan)


class RecRequest(BaseModel):
    user_id: str
    context: Optional[dict] = None
    num_results: int = Field(10, ge=1, le=100)
    exclude_items: Optional[List[str]] = None


class RecItem(BaseModel):
    item_id: str
    score: float
    metadata: Optional[dict] = None


class RecResponse(BaseModel):
    recommendations: List[RecItem]
    latency_ms: float
    model_version: str


@app.post("/api/v1/recommend", response_model=RecResponse)
async def recommend(request: RecRequest):
    start = time.time()

    # Get user embedding from feature store
    user_embedding = await app.state.features.get_user_embedding(
        request.user_id, request.context
    )

    # ANN search in Redis
    results = await app.state.redis.search(
        "items", user_embedding, top_k=request.num_results * 2
    )

    # Filter excluded items
    if request.exclude_items:
        results = [r for r in results if r[0] not in request.exclude_items]

    # Take top_k results
    results = results[:request.num_results]

    latency = (time.time() - start) * 1000

    return RecResponse(
        recommendations=[RecItem(item_id=r[0], score=r[1]) for r in results],
        latency_ms=latency,
        model_version="v1.2.0",
    )

Step 5: User Feature Store

Build a real-time feature store for computing and caching user embeddings from recent interactions.

# src/features/user_features.py
import redis.asyncio as redis
import numpy as np
from typing import Optional, Dict, List
import json
from datetime import datetime, timedelta


class UserFeatureStore:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.TTL = timedelta(days=30)

    async def get_user_embedding(
        self, user_id: str, context: Optional[Dict] = None
    ) -> np.ndarray:
        # Try cache first
        cached = await self.redis.get(f"user_emb:{user_id}")
        if cached:
            return np.frombuffer(cached, dtype=np.float32)

        # Compute from recent interactions
        interactions = await self.get_recent_interactions(user_id)
        if not interactions:
            return np.random.randn(384).astype(np.float32)  # Cold start

        # Weighted average of item embeddings
        weights = np.array([i["weight"] for i in interactions])
        weights = weights / weights.sum()
        embeddings = np.array([np.frombuffer(i["embedding"], dtype=np.float32) for i in interactions])
        user_emb = np.average(embeddings, axis=0, weights=weights)

        # Cache result
        await self.redis.setex(
            f"user_emb:{user_id}",
            int(self.TTL.total_seconds()),
            user_emb.astype(np.float32).tobytes(),
        )
        return user_emb

    async def get_recent_interactions(
        self, user_id: str, limit: int = 50
    ) -> List[Dict]:
        return await self.redis.lrange(f"user_interactions:{user_id}", 0, limit - 1)

    async def record_interaction(
        self, user_id: str, item_id: str,
        interaction_type: str, weight: float
    ):
        entry = json.dumps({"item_id": item_id, "type": interaction_type, "weight": weight})
        await self.redis.lpush(f"user_interactions:{user_id}", entry)
        await self.redis.ltrim(f"user_interactions:{user_id}", 0, 999)
        # Invalidate cached embedding
        await self.redis.delete(f"user_emb:{user_id}")

Step 6: Docker Deployment

# docker-compose.yml
version: "3.8"
services:
  rec-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - MODEL_PATH=/models/embeddings
    depends_on:
      - redis
    deploy:
      replicas: 3

  redis:
    image: redis/redis-stack-server:latest
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

volumes:
  redis-data:

ℹ️

Pre-compute embeddings in batch and upsert them to Redis. Computing embeddings at request time adds unnecessary latency. Use a background job to refresh embeddings periodically.

πŸ’‘

For cold-start users, combine content-based features (demographics, device info) with popularity-based fallbacks until enough interaction data is collected.

Performance Metrics

MetricTargetDescription
ANN Latency< 1msRedis HNSW search
End-to-End Latency< 10msFull pipeline
Throughput> 10K QPSPer API instance
Recall@10> 0.95vs exact NN search
NDCG@10> 0.80Ranking quality
Cache Hit Rate> 90%User embedding cache

Interview Talking Points

  1. ANN vs Exact NN: HNSW provides 1000x speedup over brute-force with minimal recall loss (typically 95%+).
  2. Two-Tower Architecture: Separate user and item towers enable pre-computing item embeddings and real-time user embedding computation.
  3. Cold Start Strategy: Content-based features plus popularity fallback for new users, transitioning to collaborative filtering as data accumulates.
  4. Feature Store Design: Redis provides sub-millisecond reads for real-time features while maintaining high throughput.
  5. A/B Testing: Infrastructure for comparing model versions and ranking strategies with statistical significance.
  6. Scaling: Horizontal scaling through Redis clustering and API replicas behind a load balancer.

⚠️

Monitor ANN index quality over time. As the index grows, HNSW parameters may need tuning to maintain recall targets.

ℹ️

This project demonstrates a complete real-time recommendation pipeline. For production deployment, add model retraining automation and drift detection.

Advertisement