πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

LLM Data Pipelines: Data Collection, Cleaning, and Preparation

LLMOps in ProductionLLM Data Pipelines🟒 Free Lesson

Advertisement

LLM Data Pipelines: Data Collection, Cleaning, and Preparation

Data pipelines for LLMs differ fundamentally from traditional ML pipelines due to the scale of text data, the need for diverse high-quality sources, and complex preprocessing requirements including tokenization and context windowing.

Data Pipeline Architecture

Data Collection

1. Multi-Source Ingestion

import hashlib
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Generator
from datetime import datetime
from pathlib import Path

@dataclass
class RawDocument:
    doc_id: str
    source: str
    content: str
    metadata: Dict[str, str] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.utcnow)

    @property
    def content_hash(self) -> str:
        return hashlib.sha256(self.content.encode()).hexdigest()[:16]

class DataSource:
    def __init__(self, name: str, source_type: str):
        self.name = name
        self.source_type = source_type

class DataIngestionPipeline:
    def __init__(self):
        self.sources: List[DataSource] = []
        self.documents: List[RawDocument] = []
        self.seen_hashes: set = set()

    def add_source(self, source: DataSource):
        self.sources.append(source)

    def ingest_file(self, filepath: str, source_name: str) -> int:
        path = Path(filepath)
        if not path.exists():
            return 0
        content = path.read_text(encoding="utf-8", errors="ignore")
        doc = RawDocument(
            doc_id=hashlib.md5(content.encode()).hexdigest(),
            source=source_name,
            content=content,
            metadata={"filename": path.name, "size": str(len(content))}
        )
        if doc.content_hash not in self.seen_hashes:
            self.documents.append(doc)
            self.seen_hashes.add(doc.content_hash)
            return 1
        return 0

    def ingest_text(self, text: str, source_name: str,
                    metadata: Optional[Dict] = None) -> RawDocument:
        doc = RawDocument(
            doc_id=hashlib.md5(text.encode()).hexdigest(),
            source=source_name,
            content=text,
            metadata=metadata or {}
        )
        if doc.content_hash not in self.seen_hashes:
            self.documents.append(doc)
            self.seen_hashes.add(doc.content_hash)
        return doc

    def get_stats(self) -> Dict:
        sources = {}
        for doc in self.documents:
            sources[doc.source] = sources.get(doc.source, 0) + 1
        return {"total": len(self.documents), "by_source": sources}

Data Cleaning

2. Text Quality Filter

import re
from typing import List, Tuple

class TextQualityFilter:
    def __init__(self, min_length: int = 100, max_length: int = 1000000):
        self.min_length = min_length
        self.max_length = max_length
        self.stop_words = set(["the", "is", "at", "which", "on"])

    def check_length(self, text: str) -> bool:
        return self.min_length <= len(text) <= self.max_length

    def check_language(self, text: str, target_lang: str = "en") -> bool:
        ascii_ratio = sum(1 for c in text if ord(c) < 128) / max(len(text), 1)
        return ascii_ratio > 0.8 if target_lang == "en" else True

    def check_quality_score(self, text: str) -> float:
        sentences = text.split(".")
        avg_sentence_len = sum(len(s.split()) for s in sentences) / max(len(sentences), 1)
        unique_words = set(text.lower().split())
        total_words = len(text.split())
        vocabulary_richness = len(unique_words) / max(total_words, 1)
        has_proper_structure = bool(re.search(r"[.!?]", text))
        score = 0.0
        if 10 < avg_sentence_len < 40:
            score += 0.3
        if vocabulary_richness > 0.3:
            score += 0.3
        if has_proper_structure:
            score += 0.2
        if self.check_length(text):
            score += 0.2
        return min(score, 1.0)

    def filter_document(self, text: str, min_quality: float = 0.5) -> Tuple[bool, float, str]:
        if not self.check_length(text):
            return False, 0.0, "length_out_of_range"
        if not self.check_language(text):
            return False, 0.0, "language_mismatch"
        quality = self.check_quality_score(text)
        if quality < min_quality:
            return False, quality, "low_quality"
        return True, quality, "passed"

3. Deduplication Engine

import hashlib
from typing import List, Dict, Set
from collections import defaultdict

class DeduplicationEngine:
    def __init__(self):
        self.exact_hashes: Set[str] = set()
        self.ngram_index: Dict[str, List[str]] = defaultdict(list)

    def exact_dedup(self, text: str) -> bool:
        text_hash = hashlib.sha256(text.encode()).hexdigest()
        if text_hash in self.exact_hashes:
            return False
        self.exact_hashes.add(text_hash)
        return True

    def _get_ngrams(self, text: str, n: int = 3) -> Set[str]:
        words = text.lower().split()
        return {" ".join(words[i:i+n]) for i in range(len(words) - n + 1)}

    def fuzzy_dedup(self, text: str, threshold: float = 0.8) -> bool:
        ngrams = self._get_ngrams(text)
        text_key = sorted(ngrams)[:10]
        text_sig = "|".join(text_key)
        if text_sig in self.ngram_index:
            return False
        self.ngram_index[text_sig].append(text[:100])
        return True

Key Formulas

TF-IDF Score

TF-IDF(t,d)=TF(t,d)Γ—log⁑NDF(t)\text{TF-IDF}(t, d) = \text{TF}(t, d) \times \log\frac{N}{\text{DF}(t)}

Here,

  • TF(t,d)\text{TF}(t, d)=Term frequency of term t in document d
  • NN=Total number of documents
  • DF(t)\text{DF}(t)=Number of documents containing term t

MinHash Similarity

Jaccard(A,B)β‰ˆβˆ£MinHash(A)∩MinHash(B)∣∣MinHash(A)βˆͺMinHash(B)∣\text{Jaccard}(A, B) \approx \frac{|\text{MinHash}(A) \cap \text{MinHash}(B)|}{|\text{MinHash}(A) \cup \text{MinHash}(B)|}

Here,

  • A,BA, B=Sets of n-grams from two documents
  • MinHash\text{MinHash}=MinHash signature of the set

Pipeline Metrics

StageInputOutputFilter RateLatency
IngestionRaw filesDocuments0%10ms/doc
Length FilterDocumentsSized docs20-30%0.1ms/doc
Quality FilterSized docsQuality docs40-60%5ms/doc
Exact DedupQuality docsUnique docs10-20%1ms/doc
Fuzzy DedupUnique docsClean docs5-15%50ms/doc
TokenizationClean docsTokens0%2ms/doc

Best Practices

  1. Source diversity: Mix web crawls, books, code, and domain-specific data
  2. Aggressive deduplication: Use both exact and fuzzy methods to reduce redundancy
  3. Quality scoring: Filter low-quality text early to save compute downstream
  4. Version control datasets: Track all changes with DVC or similar tools
  5. Monitor data drift: Track source distributions over time
⭐

Premium Content

LLM Data Pipelines: Data Collection, Cleaning, and Preparation

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement