πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

RAGops Fundamentals

RAGopsRAG Pipeline🟒 Free Lesson

Advertisement

What is RAGops?

RAGops (Retrieval-Augmented Generation Operations) is the practice of building, deploying, and maintaining production RAG systems. It encompasses the entire lifecycle from document ingestion to retrieval quality monitoring.

Document Ingestion

Source Connectors

from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class Document:
    id: str
    content: str
    metadata: dict
    source: str
    content_hash: str = ""

    def __post_init__(self):
        if not self.content_hash:
            self.content_hash = hashlib.md5(self.content.encode()).hexdigest()

class DocumentIngestor:
    def __init__(self):
        self.processors = {}

    def register_processor(self, source_type: str, processor):
        self.processors[source_type] = processor

    def ingest(self, source_path: str, source_type: str) -> list[Document]:
        if source_type not in self.processors:
            raise ValueError(f"No processor for {source_type}")
        raw_docs = self.processors[source_type].load(source_path)
        return self._clean_and_normalize(raw_docs)

    def _clean_and_normalize(self, docs: list[Document]) -> list[Document]:
        cleaned = []
        for doc in docs:
            doc.content = self._clean_text(doc.content)
            if len(doc.content) > 50:  # Skip very short docs
                cleaned.append(doc)
        return cleaned

    def _clean_text(self, text: str) -> str:
        import re
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'[^\w\s.,;:!?()\-\'"]+', '', text)
        return text.strip()

PDF Processing

import fitz  # PyMuPDF
from pathlib import Path

class PDFProcessor:
    def load(self, file_path: str) -> list[Document]:
        doc = fitz.open(file_path)
        documents = []
        for page_num, page in enumerate(doc):
            text = page.get_text("text")
            if text.strip():
                documents.append(Document(
                    id=f"{Path(file_path).stem}_p{page_num}",
                    content=text,
                    metadata={
                        "source": file_path,
                        "page": page_num + 1,
                        "total_pages": len(doc),
                        "type": "pdf"
                    },
                    source="pdf"
                ))
        doc.close()
        return documents

Chunking Strategies

Chunking determines how documents are split into retrieval units. The choice of strategy significantly impacts retrieval quality.

StrategyChunk SizeOverlapBest For
Fixed-size512 tokens50 tokensGeneral purpose
Sentence-basedVariableNoneSemantic coherence
RecursiveVariableNoneStructured documents
SemanticVariableNoneTopic segmentation
Document-levelFull docN/AShort documents

Fixed-Size Chunking

from transformers import AutoTokenizer

class FixedSizeChunker:
    def __init__(self, chunk_size: int = 512, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    def chunk(self, document: Document) -> list[Document]:
        tokens = self.tokenizer.encode(document.content, add_special_tokens=False)
        chunks = []

        for i in range(0, len(tokens), self.chunk_size - self.overlap):
            chunk_tokens = tokens[i:i + self.chunk_size]
            chunk_text = self.tokenizer.decode(chunk_tokens)
            chunks.append(Document(
                id=f"{document.id}_c{i}",
                content=chunk_text,
                metadata={
                    **document.metadata,
                    "chunk_index": len(chunks),
                    "chunk_size": len(chunk_tokens),
                    "parent_id": document.id
                },
                source=document.source
            ))
        return chunks

Recursive Character Splitting

class RecursiveChunker:
    def __init__(self, chunk_size: int = 1000, overlap: int = 200):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.separators = ["\n\n", "\n", ". ", " "]

    def chunk(self, document: Document) -> list[Document]:
        return self._recursive_split(document, self.separators)

    def _recursive_split(self, doc: Document, separators: list[str]) -> list[Document]:
        if len(doc.content) <= self.chunk_size:
            return [doc]

        separator = separators[0] if separators else " "
        splits = doc.content.split(separator)

        chunks = []
        current_chunk = ""
        for split in splits:
            if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
                current_chunk += split + separator
            else:
                if current_chunk:
                    chunks.append(Document(
                        id=f"{doc.id}_c{len(chunks)}",
                        content=current_chunk.strip(),
                        metadata={**doc.metadata, "chunk_index": len(chunks)},
                        source=doc.source
                    ))
                current_chunk = split + separator

        if current_chunk:
            chunks.append(Document(
                id=f"{doc.id}_c{len(chunks)}",
                content=current_chunk.strip(),
                metadata={**doc.metadata, "chunk_index": len(chunks)},
                source=doc.source
            ))

        return chunks

Embedding Pipeline

from sentence_transformers import SentenceTransformer
import numpy as np

class EmbeddingPipeline:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()

    def embed(self, texts: list[str], batch_size: int = 64) -> np.ndarray:
        embeddings = self.model.encode(
            texts,
            batch_size=batch_size,
            show_progress_bar=True,
            normalize_embeddings=True  # For cosine similarity
        )
        return embeddings

    def embed_documents(self, documents: list[Document]) -> list[dict]:
        texts = [doc.content for doc in documents]
        embeddings = self.embed(texts)
        return [
            {
                "id": doc.id,
                "embedding": emb.tolist(),
                "metadata": doc.metadata,
                "content": doc.content
            }
            for doc, emb in zip(documents, embeddings)
        ]

Vector Store Indexing

class VectorIndexer:
    def __init__(self, vector_store):
        self.store = vector_store

    def index_batch(self, items: list[dict], batch_size: int = 100):
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            self.store.upsert(
                vectors=[
                    {
                        "id": item["id"],
                        "values": item["embedding"],
                        "metadata": {**item["metadata"], "content": item["content"]}
                    }
                    for item in batch
                ]
            )

Retrieval Quality Metrics

MetricFormulaInterpretation
Recall@KRelevant retrieved / Total relevantCoverage of relevant docs
Precision@KRelevant retrieved / KPrecision of top results
MRR1/rank of first relevantRanking quality
NDCGNormalized discount cumulative gainPosition-aware ranking

\text{MRR} = \frac{1}{|Q|} \sum_{i=1}^{|Q|} \frac{1}{\text{rank}_i}

Effective RAGops requires careful attention to each pipeline stage, as retrieval quality directly impacts downstream generation quality.

⭐

Premium Content

RAGops Fundamentals

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement