Building a Vector Search Engine
FAISS + HNSW + FastAPI | Semantic Search at Scale
Project Overview
Problem Statement
Traditional keyword search fails for semantic queries. Users searching for "affordable running shoes for flat feet" need results that understand meaning, not just keyword matching. Vector search engines encode semantic meaning into dense vectors and find similar items using approximate nearest neighbor algorithms.
Objectives
- Build a vector search engine supporting text, image, and multimodal queries
- Implement FAISS with HNSW indexing for sub-millisecond search
- Support hybrid search (semantic + keyword)
- Scale to 100M+ vectors with horizontal sharding
- Create a production API with filtering and ranking
| Component | Technology |
|---|---|
| Vector Index | FAISS + HNSW |
| Embeddings | OpenAI / Sentence Transformers |
| API Framework | FastAPI |
| Storage | Redis + PostgreSQL |
| Sharding | FAISS Shards + Load Balancer |
| Caching | Redis |
| Monitoring | Prometheus + Grafana |
Architecture Diagram
+-------------------------------------------------------------------+
| Vector Search Engine Architecture |
+-------------------------------------------------------------------+
| +--------------+ +--------------+ +------------------+ |
| | Query Input |--->| Embedding |--->| FAISS Index | |
| | (Text/Image) | | Service | | (HNSW) | |
+--------------+ +--------------+ +--------+---------+ |
| | |
| v |
| +--------------+ +--------------+ +------------------+ |
| | Results |<---| Re-ranking |<---| Candidate | |
| | Response | | (Cross-Enc) | | Filtering | |
| +--------------+ +--------------+ +------------------+ |
| | | |
| v v |
| +--------------+ +--------------+ +------------------+ |
| | Cache | | Hybrid | | Metadata | |
| | (Redis) | | Search | | Store (PG) | |
| +--------------+ +--------------+ +------------------+ |
+-------------------------------------------------------------------+
Step-by-Step Implementation
Step 1: Environment Setup
mkdir vector-search && cd vector-search
pip install faiss-cpu # or faiss-gpu for GPU support
pip install sentence-transformers openai
pip install fastapi uvicorn redis
pip install numpy psycopg2-binary
pip install prometheus-client
Step 2: FAISS Index Builder
Build and manage FAISS indexes with HNSW for efficient approximate nearest neighbor search.
# src/index/faiss_index.py
import faiss
import numpy as np
import pickle
import os
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class IndexConfig:
dimension: int = 384
index_type: str = "hnsw" # flat, ivf, hnsw
hnsw_m: int = 32
hnsw_ef_construction: int = 200
hnsw_ef_search: int = 100
nprobe: int = 10 # for IVF
class FAISSIndexBuilder:
def __init__(self, config: IndexConfig = None):
self.config = config or IndexConfig()
self.index = None
self.id_map = {} # internal_id -> external_id
self.reverse_map = {} # external_id -> internal_id
self.next_id = 0
def build_index(self, embeddings: np.ndarray, ids: List[str]):
"""Build index from scratch with pre-computed embeddings."""
n_vectors = len(embeddings)
dim = embeddings.shape[1]
if self.config.index_type == "flat":
self.index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity
elif self.config.index_type == "hnsw":
self.index = faiss.IndexHNSWFlat(dim, self.config.hnsw_m)
self.index.hnsw.efConstruction = self.config.hnsw_ef_construction
self.index.hnsw.efSearch = self.config.hnsw_ef_search
elif self.config.index_type == "ivf":
quantizer = faiss.IndexFlatIP(dim)
self.index = faiss.IndexIVFFlat(quantizer, dim, n_vectors // 10)
self.index.nprobe = self.config.nprobe
self.index.train(embeddings.astype(np.float32))
# Normalize for cosine similarity
faiss.normalize_L2(embeddings.astype(np.float32))
self.index.add(embeddings.astype(np.float32))
# Build ID maps
for i, ext_id in enumerate(ids):
self.id_map[i] = ext_id
self.reverse_map[ext_id] = i
self.next_id = len(ids)
logger.info(f"Built {self.config.index_type} index with {n_vectors} vectors, dim={dim}")
def search(
self, query: np.ndarray, top_k: int = 10,
filter_ids: Optional[List[str]] = None
) -> List[Tuple[str, float]]:
"""Search for nearest neighbors."""
query = query.reshape(1, -1).astype(np.float32)
faiss.normalize_L2(query)
# Search more if filtering
search_k = top_k * 3 if filter_ids else top_k
scores, indices = self.index.search(query, min(search_k, self.index.ntotal))
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
ext_id = self.id_map.get(idx)
if ext_id is None:
continue
if filter_ids and ext_id not in filter_ids:
continue
results.append((ext_id, float(score)))
if len(results) >= top_k:
break
return results
def add_vectors(self, embeddings: np.ndarray, ids: List[str]):
"""Add new vectors to the index."""
faiss.normalize_L2(embeddings.astype(np.float32))
self.index.add(embeddings.astype(np.float32))
for ext_id in ids:
self.id_map[self.next_id] = ext_id
self.reverse_map[ext_id] = self.next_id
self.next_id += 1
def save(self, path: str):
"""Save index and mappings to disk."""
os.makedirs(path, exist_ok=True)
faiss.write_index(self.index, os.path.join(path, "index.faiss"))
with open(os.path.join(path, "mappings.pkl"), "wb") as f:
pickle.dump({
"id_map": self.id_map,
"reverse_map": self.reverse_map,
"next_id": self.next_id,
"config": self.config,
}, f)
logger.info(f"Saved index to {path}")
def load(self, path: str):
"""Load index and mappings from disk."""
self.index = faiss.read_index(os.path.join(path, "index.faiss"))
with open(os.path.join(path, "mappings.pkl"), "rb") as f:
data = pickle.load(f)
self.id_map = data["id_map"]
self.reverse_map = data["reverse_map"]
self.next_id = data["next_id"]
self.config = data["config"]
logger.info(f"Loaded index from {path} with {self.index.ntotal} vectors")
Step 3: Embedding Service
Generate embeddings for documents and queries using various embedding models.
# src/embeddings/embedding_service.py
import numpy as np
from typing import List, Union
from sentence_transformers import SentenceTransformer
import openai
import hashlib
import json
class EmbeddingService:
def __init__(self, model_type: str = "sentence-transformer", model_name: str = None):
self.model_type = model_type
if model_type == "sentence-transformer":
self.model = SentenceTransformer(model_name or "all-MiniLM-L6-v2")
self.dimension = self.model.get_sentence_embedding_dimension()
elif model_type == "openai":
self.model = None
self.dimension = 1536
def embed_texts(self, texts: List[str], batch_size: int = 256) -> np.ndarray:
if self.model_type == "sentence-transformer":
return self.model.encode(texts, batch_size=batch_size, show_progress_bar=True)
elif self.model_type == "openai":
embeddings = []
for i in range(0, len(texts), 2048):
batch = texts[i:i+2048]
response = openai.embeddings.create(input=batch, model="text-embedding-3-small")
embeddings.extend([e.embedding for e in response.data])
return np.array(embeddings)
def embed_query(self, query: str) -> np.ndarray:
return self.embed_texts([query])[0]
def embed_image(self, image_path: str) -> np.ndarray:
# Placeholder for CLIP-based image embedding
# In production, use open_clip or CLIP model
raise NotImplementedError("Image embedding not yet implemented")
Step 4: Hybrid Search Engine
Combine semantic vector search with keyword-based BM25 for optimal retrieval quality.
# src/search/hybrid_search.py
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import numpy as np
from rank_bm25 import BM25Okapi
import jieba
@dataclass
class SearchResult:
id: str
score: float
content: str
metadata: Dict
search_type: str # semantic, keyword, hybrid
class HybridSearchEngine:
def __init__(self, alpha: float = 0.7):
"""alpha: weight for semantic search (1-alpha for keyword)."""
self.alpha = alpha
self.vector_index = None
self.bm25 = None
self.documents = {}
self.corpus_tokens = []
def index_documents(self, documents: List[Dict]):
"""Index documents for both semantic and keyword search."""
self.documents = {doc["id"]: doc for doc in documents}
# Build BM25 index
self.corpus_tokens = [
list(jieba.cut(doc["content"])) for doc in documents
]
self.bm25 = BM25Okapi(self.corpus_tokens)
def search(
self, query: str, query_embedding: np.ndarray,
top_k: int = 10, filter_fn=None
) -> List[SearchResult]:
"""Hybrid search combining semantic and keyword results."""
# Semantic search
semantic_results = self.vector_index.search(query_embedding, top_k=top_k * 2)
# Keyword search
query_tokens = list(jieba.cut(query))
bm25_scores = self.bm25.get_scores(query_tokens)
bm25_top_idx = np.argsort(bm25_scores)[::-1][:top_k * 2]
keyword_results = [
(list(self.documents.keys())[idx], bm25_scores[idx]) for idx in bm25_top_idx
]
# Merge results with Reciprocal Rank Fusion
merged = {}
k = 60 # RRF constant
for rank, (doc_id, score) in enumerate(semantic_results):
merged[doc_id] = merged.get(doc_id, 0) + self.alpha / (k + rank + 1)
for rank, (doc_id, score) in enumerate(keyword_results):
merged[doc_id] = merged.get(doc_id, 0) + (1 - self.alpha) / (k + rank + 1)
# Sort and return top_k
sorted_results = sorted(merged.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [
SearchResult(
id=doc_id, score=score,
content=self.documents[doc_id]["content"],
metadata=self.documents[doc_id].get("metadata", {}),
search_type="hybrid",
)
for doc_id, score in sorted_results
]
Step 5: FastAPI Serving Layer
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from contextlib import asynccontextmanager
import numpy as np
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
from src.index.faiss_index import FAISSIndexBuilder
from src.embeddings.embedding_service import EmbeddingService
from src.search.hybrid_search import HybridSearchEngine
app.state.embedder = EmbeddingService("sentence-transformer")
app.state.search_engine = HybridSearchEngine(alpha=0.7)
# Load pre-built index...
yield
app = FastAPI(title="Vector Search API", version="1.0.0", lifespan=lifespan)
class SearchRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
top_k: int = Field(10, ge=1, le=100)
filters: Optional[Dict] = None
search_mode: str = Field("hybrid", regex="^(semantic|keyword|hybrid)$")
class SearchResponse(BaseModel):
results: List[Dict]
total_results: int
latency_ms: float
@app.post("/api/v1/search", response_model=SearchResponse)
async def search(request: SearchRequest):
start = time.time()
# Generate query embedding
query_emb = app.state.embedder.embed_query(request.query)
# Search
results = app.state.search_engine.search(
query=request.query,
query_embedding=query_emb,
top_k=request.top_k,
)
latency = (time.time() - start) * 1000
return SearchResponse(
results=[{"id": r.id, "score": r.score, "content": r.content[:200]} for r in results],
total_results=len(results),
latency_ms=latency,
)
@app.post("/api/v1/index")
async def index_documents(documents: List[Dict]):
app.state.search_engine.index_documents(documents)
return {"indexed": len(documents)}
βΉοΈ
Use IVF-PQ (Inverted File Product Quantization) for very large indexes (100M+ vectors). It provides 10-50x compression with minimal recall loss.
π‘
Cache frequently queried embeddings in Redis. Embedding computation is expensive, and many queries repeat across users.
Performance Metrics
| Metric | Target | Description |
|---|---|---|
| Search Latency | < 5ms | P99 latency |
| Recall@10 | > 0.95 | vs exact NN |
| Index Build Time | < 30min | 10M vectors |
| Memory Usage | < 2GB | Per 10M vectors |
| Throughput | > 1K QPS | Per instance |
Interview Talking Points
- HNSW vs IVF: HNSW provides better recall with lower latency but uses more memory. IVF-PQ provides compression for large-scale deployment.
- Cosine vs L2: Use cosine similarity for text embeddings (magnitude-invariant). Use L2 for image embeddings where magnitude matters.
- Hybrid Search RRF: Reciprocal Rank Fusion combines results from different retrieval methods without requiring score normalization.
- Scaling: Horizontal sharding with consistent hashing distributes vectors across multiple FAISS indexes.
- Caching: Multi-level caching (query -> embedding -> results) significantly reduces latency for repeated queries.
- Evaluation: Use standard IR metrics (NDCG, MAP, Recall@K) with human-annotated relevance judgments.
β οΈ
HNSW indexes consume significant memory. Monitor memory usage and implement index compression (PQ) or sharding for large-scale deployments.
βΉοΈ
For multimodal search (text + images), consider using CLIP embeddings that project both modalities into the same vector space.