Data Quality Validation
Bad data silently corrupts models and erodes trust. Learn to systematically validate data quality using modern frameworks and establish data contracts that prevent issues before they occur.
The Data Quality Spectrum
Data quality issues range from simple nulls to subtle distribution shifts that only surface in production metrics.
Great Expectations
Great Expectations lets you define, validate, and document expectations about your data.
import great_expectations as gx
# Initialize a DataContext
context = gx.get_context()
# Create a DataFrame validator
import pandas as pd
df = pd.read_csv("customers.csv")
validator = context.sources.pandas_default.read_csv("customers.csv")
# Define expectations
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_column_values_to_be_in_set(
"status", ["active", "inactive", "pending", "churned"]
)
validator.expect_column_pair_values_A_to_be_greater_than_B(
"account_open_date", "last_activity_date"
)
# Statistical expectations
validator.expect_column_mean_to_be_between("purchase_amount", min_value=10, max_value=500)
validator.expect_column_stdev_to_be_between("response_time_ms", min_value=0, max_value=5000)
validator.expect_compound_columns_to_be_unique(["user_id", "transaction_date"])
# Save and run
results = validator.validate()
print(results)
Pandera for Typed Validation
Pandera integrates with pandas and provides schema-based validation with type safety.
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define a strict schema
schema = DataFrameSchema({
"customer_id": Column(int, Check.greater_than(0), unique=True, nullable=False),
"name": Column(str, Check.str_length(min_value=1, max_value=200), nullable=False),
"email": Column(str, Check.str_matches(r"^[\w\.-]+@[\w\.-]+\.\w+$"), nullable=False),
"age": Column(int, Check.in_range(0, 120), nullable=True),
"signup_date": Column(pa.DateTime, nullable=False),
"lifetime_value": Column(float, Check.greater_than_or_equal_to(0), nullable=False),
"segment": Column(str, Check.isin(["enterprise", "mid_market", "smb", "consumer"])),
})
# Validate
df = pd.read_csv("customers.csv", parse_dates=["signup_date"])
validated_df = schema.validate(df)
# Custom checks
schema_with_custom = DataFrameSchema(
columns={...},
checks=[
Check(lambda df: df["end_date"] > df["start_date"], element_wise=False),
Check(lambda df: df.groupby("customer_id").size().max() <= 1000, element_wise=False),
]
)
# Decorator for validation in pipelines
@pa.check_types
def process_customers(df: pa.typing.DataFrame[Schema]) -> pd.DataFrame:
# df is guaranteed to conform to Schema
return df[df["status"] == "active"]
Data Contracts
Data contracts formalize the agreement between data producers and consumers.
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
import json
class QualityLevel(Enum):
CRITICAL = "critical" # Pipeline fails
WARNING = "warning" # Alert but continue
INFO = "info" # Log only
@dataclass
class DataContract:
schema_name: str
owner: str
sla_minutes: int
columns: Dict[str, dict]
quality_rules: List[dict]
def validate(self, df):
violations = []
for col_name, col_spec in self.columns.items():
if col_spec.get("required", True) and col_name not in df.columns:
violations.append(f"Missing required column: {col_name}")
continue
if col_name in df.columns:
if col_spec.get("nullable", False) == False:
null_count = df[col_name].isna().sum()
if null_count > 0:
violations.append(f"Null values in non-nullable column {col_name}: {null_count}")
for rule in self.quality_rules:
if not rule["check"](df):
violations.append(f"Quality rule failed: {rule['name']}")
return violations
# Define a contract
contract = DataContract(
schema_name="customer_events",
owner="data-platform-team",
sla_minutes=15,
columns={
"event_id": {"type": "string", "required": True, "nullable": False},
"user_id": {"type": "string", "required": True, "nullable": False},
"event_type": {"type": "string", "required": True, "nullable": False,
"allowed_values": ["click", "view", "purchase", "signup"]},
"timestamp": {"type": "datetime", "required": True, "nullable": False},
"amount": {"type": "float", "required": False, "nullable": True,
"min": 0, "max": 1000000}
},
quality_rules=[
{
"name": "event_freshness",
"check": lambda df: (pd.Timestamp.now() - df["timestamp"].max()).total_seconds() < 900,
"level": QualityLevel.CRITICAL
},
{
"name": "unique_events",
"check": lambda df: df["event_id"].is_unique,
"level": QualityLevel.CRITICAL
},
{
"name": "reasonable_amounts",
"check": lambda df: (df["amount"].dropna() >= 0).all(),
"level": QualityLevel.WARNING
}
]
)
Monitoring and Alerting
import time
from datetime import datetime
from dataclasses import dataclass
from typing import Callable
import logging
@dataclass
class QualityCheck:
name: str
check_fn: Callable
threshold: float
window_minutes: int = 60
class DataQualityMonitor:
def __init__(self):
self.checks = []
self.alert_history = []
self.logger = logging.getLogger("data_quality")
def add_check(self, check: QualityCheck):
self.checks.append(check)
def run_checks(self, data_source):
results = []
for check in self.checks:
try:
metric = check.check_fn(data_source)
status = "PASS" if metric >= check.threshold else "FAIL"
results.append({
"check": check.name,
"metric": metric,
"threshold": check.threshold,
"status": status,
"timestamp": datetime.now()
})
if status == "FAIL":
self._send_alert(check, metric)
except Exception as e:
self.logger.error(f"Check {check.name} failed with error: {e}")
results.append({
"check": check.name,
"status": "ERROR",
"error": str(e),
"timestamp": datetime.now()
})
return results
def _send_alert(self, check, metric):
alert = {
"check": check.name,
"metric": metric,
"threshold": check.threshold,
"timestamp": datetime.now(),
"severity": "critical"
}
self.alert_history.append(alert)
# Integrate with PagerDuty, Slack, etc.
self.logger.warning(f"ALERT: {check.name} failed ({metric} < {check.threshold})")
# Usage
monitor = DataQualityMonitor()
monitor.add_check(QualityCheck(
name="null_rate",
check_fn=lambda df: 1 - df.isnull().mean().mean(),
threshold=0.98
))
monitor.add_check(QualityCheck(
name="freshness_minutes",
check_fn=lambda df: (pd.Timestamp.now() - df["timestamp"].max()).total_seconds() / 60,
threshold=30
))
results = monitor.run_checks(df)
Key Takeaways
- Start with completeness and uniqueness checks, then add statistical validations
- Data contracts prevent "garbage in, garbage out" by encoding expectations as code
- Automate quality checks in CI/CD pipelines to catch issues before deployment
- Track quality metrics over time to detect gradual degradation