LLM Data Pipelines: Data Collection, Cleaning, and Preparation
Data pipelines for LLMs differ fundamentally from traditional ML pipelines due to the scale of text data, the need for diverse high-quality sources, and complex preprocessing requirements including tokenization and context windowing.
Data Pipeline Architecture
Data Collection
1. Multi-Source Ingestion
import hashlib
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Generator
from datetime import datetime
from pathlib import Path
@dataclass
class RawDocument:
doc_id: str
source: str
content: str
metadata: Dict[str, str] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
@property
def content_hash(self) -> str:
return hashlib.sha256(self.content.encode()).hexdigest()[:16]
class DataSource:
def __init__(self, name: str, source_type: str):
self.name = name
self.source_type = source_type
class DataIngestionPipeline:
def __init__(self):
self.sources: List[DataSource] = []
self.documents: List[RawDocument] = []
self.seen_hashes: set = set()
def add_source(self, source: DataSource):
self.sources.append(source)
def ingest_file(self, filepath: str, source_name: str) -> int:
path = Path(filepath)
if not path.exists():
return 0
content = path.read_text(encoding="utf-8", errors="ignore")
doc = RawDocument(
doc_id=hashlib.md5(content.encode()).hexdigest(),
source=source_name,
content=content,
metadata={"filename": path.name, "size": str(len(content))}
)
if doc.content_hash not in self.seen_hashes:
self.documents.append(doc)
self.seen_hashes.add(doc.content_hash)
return 1
return 0
def ingest_text(self, text: str, source_name: str,
metadata: Optional[Dict] = None) -> RawDocument:
doc = RawDocument(
doc_id=hashlib.md5(text.encode()).hexdigest(),
source=source_name,
content=text,
metadata=metadata or {}
)
if doc.content_hash not in self.seen_hashes:
self.documents.append(doc)
self.seen_hashes.add(doc.content_hash)
return doc
def get_stats(self) -> Dict:
sources = {}
for doc in self.documents:
sources[doc.source] = sources.get(doc.source, 0) + 1
return {"total": len(self.documents), "by_source": sources}
Data Cleaning
2. Text Quality Filter
import re
from typing import List, Tuple
class TextQualityFilter:
def __init__(self, min_length: int = 100, max_length: int = 1000000):
self.min_length = min_length
self.max_length = max_length
self.stop_words = set(["the", "is", "at", "which", "on"])
def check_length(self, text: str) -> bool:
return self.min_length <= len(text) <= self.max_length
def check_language(self, text: str, target_lang: str = "en") -> bool:
ascii_ratio = sum(1 for c in text if ord(c) < 128) / max(len(text), 1)
return ascii_ratio > 0.8 if target_lang == "en" else True
def check_quality_score(self, text: str) -> float:
sentences = text.split(".")
avg_sentence_len = sum(len(s.split()) for s in sentences) / max(len(sentences), 1)
unique_words = set(text.lower().split())
total_words = len(text.split())
vocabulary_richness = len(unique_words) / max(total_words, 1)
has_proper_structure = bool(re.search(r"[.!?]", text))
score = 0.0
if 10 < avg_sentence_len < 40:
score += 0.3
if vocabulary_richness > 0.3:
score += 0.3
if has_proper_structure:
score += 0.2
if self.check_length(text):
score += 0.2
return min(score, 1.0)
def filter_document(self, text: str, min_quality: float = 0.5) -> Tuple[bool, float, str]:
if not self.check_length(text):
return False, 0.0, "length_out_of_range"
if not self.check_language(text):
return False, 0.0, "language_mismatch"
quality = self.check_quality_score(text)
if quality < min_quality:
return False, quality, "low_quality"
return True, quality, "passed"
3. Deduplication Engine
import hashlib
from typing import List, Dict, Set
from collections import defaultdict
class DeduplicationEngine:
def __init__(self):
self.exact_hashes: Set[str] = set()
self.ngram_index: Dict[str, List[str]] = defaultdict(list)
def exact_dedup(self, text: str) -> bool:
text_hash = hashlib.sha256(text.encode()).hexdigest()
if text_hash in self.exact_hashes:
return False
self.exact_hashes.add(text_hash)
return True
def _get_ngrams(self, text: str, n: int = 3) -> Set[str]:
words = text.lower().split()
return {" ".join(words[i:i+n]) for i in range(len(words) - n + 1)}
def fuzzy_dedup(self, text: str, threshold: float = 0.8) -> bool:
ngrams = self._get_ngrams(text)
text_key = sorted(ngrams)[:10]
text_sig = "|".join(text_key)
if text_sig in self.ngram_index:
return False
self.ngram_index[text_sig].append(text[:100])
return True
Key Formulas
TF-IDF Score
Here,
- =Term frequency of term t in document d
- =Total number of documents
- =Number of documents containing term t
MinHash Similarity
Here,
- =Sets of n-grams from two documents
- =MinHash signature of the set
Pipeline Metrics
| Stage | Input | Output | Filter Rate | Latency |
|---|---|---|---|---|
| Ingestion | Raw files | Documents | 0% | 10ms/doc |
| Length Filter | Documents | Sized docs | 20-30% | 0.1ms/doc |
| Quality Filter | Sized docs | Quality docs | 40-60% | 5ms/doc |
| Exact Dedup | Quality docs | Unique docs | 10-20% | 1ms/doc |
| Fuzzy Dedup | Unique docs | Clean docs | 5-15% | 50ms/doc |
| Tokenization | Clean docs | Tokens | 0% | 2ms/doc |
Best Practices
- Source diversity: Mix web crawls, books, code, and domain-specific data
- Aggressive deduplication: Use both exact and fuzzy methods to reduce redundancy
- Quality scoring: Filter low-quality text early to save compute downstream
- Version control datasets: Track all changes with DVC or similar tools
- Monitor data drift: Track source distributions over time