What is RAGops?
RAGops (Retrieval-Augmented Generation Operations) is the practice of building, deploying, and maintaining production RAG systems. It encompasses the entire lifecycle from document ingestion to retrieval quality monitoring.
Document Ingestion
Source Connectors
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class Document:
id: str
content: str
metadata: dict
source: str
content_hash: str = ""
def __post_init__(self):
if not self.content_hash:
self.content_hash = hashlib.md5(self.content.encode()).hexdigest()
class DocumentIngestor:
def __init__(self):
self.processors = {}
def register_processor(self, source_type: str, processor):
self.processors[source_type] = processor
def ingest(self, source_path: str, source_type: str) -> list[Document]:
if source_type not in self.processors:
raise ValueError(f"No processor for {source_type}")
raw_docs = self.processors[source_type].load(source_path)
return self._clean_and_normalize(raw_docs)
def _clean_and_normalize(self, docs: list[Document]) -> list[Document]:
cleaned = []
for doc in docs:
doc.content = self._clean_text(doc.content)
if len(doc.content) > 50: # Skip very short docs
cleaned.append(doc)
return cleaned
def _clean_text(self, text: str) -> str:
import re
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'[^\w\s.,;:!?()\-\'"]+', '', text)
return text.strip()
PDF Processing
import fitz # PyMuPDF
from pathlib import Path
class PDFProcessor:
def load(self, file_path: str) -> list[Document]:
doc = fitz.open(file_path)
documents = []
for page_num, page in enumerate(doc):
text = page.get_text("text")
if text.strip():
documents.append(Document(
id=f"{Path(file_path).stem}_p{page_num}",
content=text,
metadata={
"source": file_path,
"page": page_num + 1,
"total_pages": len(doc),
"type": "pdf"
},
source="pdf"
))
doc.close()
return documents
Chunking Strategies
Chunking determines how documents are split into retrieval units. The choice of strategy significantly impacts retrieval quality.
| Strategy | Chunk Size | Overlap | Best For |
|---|---|---|---|
| Fixed-size | 512 tokens | 50 tokens | General purpose |
| Sentence-based | Variable | None | Semantic coherence |
| Recursive | Variable | None | Structured documents |
| Semantic | Variable | None | Topic segmentation |
| Document-level | Full doc | N/A | Short documents |
Fixed-Size Chunking
from transformers import AutoTokenizer
class FixedSizeChunker:
def __init__(self, chunk_size: int = 512, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
def chunk(self, document: Document) -> list[Document]:
tokens = self.tokenizer.encode(document.content, add_special_tokens=False)
chunks = []
for i in range(0, len(tokens), self.chunk_size - self.overlap):
chunk_tokens = tokens[i:i + self.chunk_size]
chunk_text = self.tokenizer.decode(chunk_tokens)
chunks.append(Document(
id=f"{document.id}_c{i}",
content=chunk_text,
metadata={
**document.metadata,
"chunk_index": len(chunks),
"chunk_size": len(chunk_tokens),
"parent_id": document.id
},
source=document.source
))
return chunks
Recursive Character Splitting
class RecursiveChunker:
def __init__(self, chunk_size: int = 1000, overlap: int = 200):
self.chunk_size = chunk_size
self.overlap = overlap
self.separators = ["\n\n", "\n", ". ", " "]
def chunk(self, document: Document) -> list[Document]:
return self._recursive_split(document, self.separators)
def _recursive_split(self, doc: Document, separators: list[str]) -> list[Document]:
if len(doc.content) <= self.chunk_size:
return [doc]
separator = separators[0] if separators else " "
splits = doc.content.split(separator)
chunks = []
current_chunk = ""
for split in splits:
if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
current_chunk += split + separator
else:
if current_chunk:
chunks.append(Document(
id=f"{doc.id}_c{len(chunks)}",
content=current_chunk.strip(),
metadata={**doc.metadata, "chunk_index": len(chunks)},
source=doc.source
))
current_chunk = split + separator
if current_chunk:
chunks.append(Document(
id=f"{doc.id}_c{len(chunks)}",
content=current_chunk.strip(),
metadata={**doc.metadata, "chunk_index": len(chunks)},
source=doc.source
))
return chunks
Embedding Pipeline
from sentence_transformers import SentenceTransformer
import numpy as np
class EmbeddingPipeline:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
def embed(self, texts: list[str], batch_size: int = 64) -> np.ndarray:
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=True,
normalize_embeddings=True # For cosine similarity
)
return embeddings
def embed_documents(self, documents: list[Document]) -> list[dict]:
texts = [doc.content for doc in documents]
embeddings = self.embed(texts)
return [
{
"id": doc.id,
"embedding": emb.tolist(),
"metadata": doc.metadata,
"content": doc.content
}
for doc, emb in zip(documents, embeddings)
]
Vector Store Indexing
class VectorIndexer:
def __init__(self, vector_store):
self.store = vector_store
def index_batch(self, items: list[dict], batch_size: int = 100):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
self.store.upsert(
vectors=[
{
"id": item["id"],
"values": item["embedding"],
"metadata": {**item["metadata"], "content": item["content"]}
}
for item in batch
]
)
Retrieval Quality Metrics
| Metric | Formula | Interpretation |
|---|---|---|
| Recall@K | Relevant retrieved / Total relevant | Coverage of relevant docs |
| Precision@K | Relevant retrieved / K | Precision of top results |
| MRR | 1/rank of first relevant | Ranking quality |
| NDCG | Normalized discount cumulative gain | Position-aware ranking |
\text{MRR} = \frac{1}{|Q|} \sum_{i=1}^{|Q|} \frac{1}{\text{rank}_i}
Effective RAGops requires careful attention to each pipeline stage, as retrieval quality directly impacts downstream generation quality.