Data Quality: Ensuring Trustworthy Data at Scale
Data quality is the degree to which data satisfies its intended purpose β accuracy, completeness, consistency, timeliness, and validity.
The Data Quality Dimensions
Core Dimensions:
- Completeness β are all expected records present?
- Accuracy β do values reflect reality?
- Consistency β do related datasets agree?
- Timeliness β is data available within SLA?
- Validity β do values conform to business rules?
- Uniqueness β are there duplicate records?
- Freshness β how recent is the data?
- Schema compliance β does the data match expected types and constraints?
Priority by Use Case:
| Use Case | Completeness | Accuracy | Consistency | Timeliness | Uniqueness |
|---|---|---|---|---|---|
| Financial Reporting | Critical | Critical | Critical | High | Critical |
| Real-Time Dashboards | High | High | Medium | Critical | High |
| ML Training | High | High | High | Medium | Medium |
| Analytics | High | Medium | High | Medium | High |
| Compliance | Critical | Critical | Critical | High | Critical |
Key Insight: Poor data quality costs organizations an average of $12.9 million annually (Gartner). Data quality is not a one-time check but a continuous engineering discipline embedded throughout the data lifecycle.
Data Quality Framework
Quality Dimensions
Architecture Diagram
Data quality is the fitness of data for its intended use in operations, decision-making, and planning. Formally: DQ(Data) = Ξ£(w_i * score_i), where score_i is the quality score for dimension i (completeness, accuracy, consistency, timeliness, validity, uniqueness) and w_i is the weight assigned to each dimension based on business importance.
A data contract is a formal agreement between data producers and consumers that specifies the schema, semantics, quality guarantees, and SLAs of a dataset. Data contracts encode: (1) schema (field names, types, constraints), (2) semantics (business meaning, units, allowed values), (3) quality (null rates, uniqueness, freshness), and (4) SLAs (availability, latency, completeness).
Anomaly detection in data quality identifies data points, patterns, or distributions that deviate significantly from expected behavior. Methods include: (1) Statistical β Z-score, IQR (Interquartile Range), (2) Time-series β seasonal decomposition, ARIMA residuals, (3) ML-based β isolation forests, autoencoders, (4) Rule-based β thresholds, business rules.
Z-Score Anomaly Detection
For a data point x in a distribution with mean ΞΌ and standard deviation Ο, the Z-score is: Z = (x - ΞΌ) / Ο. A data point is considered anomalous if |Z| > threshold (typically 2-3). For multivariate data, Mahalanobis distance is used: D = sqrt((x - ΞΌ)^T * Ξ£^{-1} * (x - ΞΌ)), where Ξ£ is the covariance matrix. Points with D > threshold are anomalous.
IQR Method
The Interquartile Range (IQR) method identifies outliers as points outside Q1 - kIQR or Q3 + kIQR, where IQR = Q3 - Q1 and k is typically 1.5 (mild outliers) or 3.0 (extreme outliers). For a dataset of size N, the expected number of false positives is: FP = N * P(|Z| > k*IQR/Ο) β N * 0.007 for k=1.5.
Completeness Score
Completeness = (Total_expected_records - Missing_records) / Total_expected_records * 100%. For a pipeline with N source records and M loaded records: Completeness = M / N * 100%. Target: Completeness > 99.9% for critical pipelines. Acceptable loss: Completeness_loss < 0.01% per pipeline stage.
Every data pipeline must have quality gates at stage boundaries. A quality gate is a set of checks that must pass before data flows to the next stage. If a gate fails, the pipeline must halt, alert, and provide diagnostic information. Formal: Pipeline_output = Validated(Pipeline_input) if all gates pass; Pipeline_output = β (halt) if any gate fails. This prevents downstream systems from consuming bad data.
Data contracts are enforced at three points: (1) at ingestion β reject data that violates schema contracts, (2) at transformation β validate output against quality contracts, (3) at serving β verify freshness and completeness before serving to consumers. If any enforcement point detects a violation, the contract is breached and the pipeline must take corrective action.
Key Concepts
| Concept | Description | Tool/Framework |
|---|---|---|
| Schema Validation | Verify field names, types, and constraints | Great Expectations, Pandera |
| Null Rate Check | Ensure null percentage is below threshold | Great Expectations expect_column_values_to_not_be_null |
| Uniqueness Check | Verify no duplicate records | expect_column_values_to_be_unique |
| Range Check | Values fall within expected bounds | expect_column_values_to_be_between |
| Freshness Check | Data arrives within SLA | Custom checks, Airflow sensors |
| Referential Integrity | Foreign keys reference valid records | expect_column_values_to_be_in_set |
| Distribution Check | Data distribution matches expected | KS test, Chi-squared test |
| Statistical Outlier | Values deviate significantly from normal | Z-score, IQR method |
| Data Contract | Formal schema and quality agreement | DataContract specification |
| Great Expectations | Python data validation framework | expect_* methods |
| Soda Core | Lightweight data quality framework | YAML-based checks |
| Pandasera | DataFrame schema validation | Column(dtype, nullable=False) |
| dbt Tests | SQL-based data tests | unique, not_null, accepted_values |
| Anomaly Detection | Identify unusual patterns in data | Isolation Forest, Z-score, IQR |
| Quality Dashboard | Visualize quality metrics over time | Grafana, Looker, custom UI |
| SLA Monitoring | Track data availability and freshness | Airflow, custom monitors |
| Data Profiling | Statistical summary of dataset | pandas-profiling, ydata-profiling |
| Lineage Tracking | Track data flow from source to target | OpenLineage, Apache Atlas |
- Define quality dimensions: Identify which dimensions matter most for your use case (completeness, accuracy, freshness, etc.).
- Create data contracts: Document schema, semantics, quality guarantees, and SLAs for each dataset.
- Implement pre-load checks: Validate source data before loading. Reject or quarantine records that violate contracts.
- Implement post-load checks: Validate loaded data against quality rules. Compare record counts, null rates, distributions.
- Set up anomaly detection: Monitor metrics over time and alert on statistical deviations.
- Build quality dashboards: Visualize quality metrics, trends, and anomalies for stakeholders.
- Implement alerting: Send alerts (Slack, PagerDuty, email) when quality gates fail or anomalies are detected.
- Track data lineage: Use OpenLineage to track data flow and identify upstream sources of quality issues.
- Automate remediation: For common issues (nulls, duplicates), implement automated fixes. For complex issues, create tickets.
- Review and iterate: Regularly review quality metrics, update thresholds, and add new checks based on data patterns.
Production Code
Great Expectations Suite
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.dataset import PandasDataset
import pandas as pd
import logging
logger = logging.getLogger(__name__)
class DataQualityValidator:
"""Production data quality validation using Great Expectations."""
def __init__(self, ge_context_root: str = "great_expectations"):
self.context = gx.get_context()
self.suite_name = "production_quality_suite"
def create_orders_suite(self) -> ExpectationSuite:
"""Create a comprehensive quality suite for orders data."""
suite = self.context.add_or_update_expectation_suite(
expectation_suite_name=self.suite_name
)
# Schema expectations
suite.add_expectation(
gx.expectations.ExpectTableSchemaToMatch(
schema={
"columns": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "created_at", "type": "timestamp"},
]
}
)
)
# Completeness expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="amount")
)
# Uniqueness expectation
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# Range expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount",
min_value=0.01,
max_value=1000000.0,
)
)
# Set membership expectation
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "processing", "completed", "cancelled", "refunded"],
)
)
# Regex pattern expectation
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="order_id",
regex=r"^ORD-\d{8}-\d{6}$",
)
)
# Table-level expectations
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=10000000,
)
)
# Freshness expectation (data should be less than 24 hours old)
suite.add_expectation(
gx.expectations.ExpectColumnMaxToBeBetween(
column="created_at",
min_value="2024-01-15T00:00:00",
max_value="2024-01-16T00:00:00",
)
)
return suite
def validate_batch(self, df: pd.DataFrame) -> dict:
"""
Validate a batch of data against the quality suite.
Parameters:
df (pd.DataFrame): Data to validate
Returns:
dict: Validation results with success status, check counts, and failure details
Example output:
{
"success": False,
"total_checks": 8,
"passed": 6,
"failed": 2,
"failed_expectations": [
{"expectation": "expect_column_values_to_not_be_null", "column": "amount", ...},
{"expectation": "expect_column_values_to_be_between", "column": "amount", ...}
]
}
"""
batch = self.context.add_or_update_datasource(
name="pandas_datasource",
class_name="PandasDatasource",
)
batch_request = batch.build_batch_request(dataframe=df)
checkpoint_name = "quality_checkpoint"
checkpoint = self.context.add_or_update_simple_checkpoint(
name=checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": self.suite_name,
}
],
)
result = checkpoint.run()
success = result.success
# Extract detailed results
results = {
"success": success,
"total_checks": len(result.results),
"passed": sum(1 for r in result.results if r.success),
"failed": sum(1 for r in result.results if not r.success),
"failed_expectations": [
{
"expectation": str(r.expectation_config.expectation_type),
"column": r.expectation_config.kwargs.get("column"),
"result": r.result,
}
for r in result.results
if not r.success
],
}
if not success:
logger.warning(
f"Data quality validation FAILED: "
f"{results['failed']}/{results['total_checks']} checks failed"
)
for failed in results["failed_expectations"]:
logger.warning(f" FAILED: {failed['expectation']} on {failed['column']}")
return results
Anomaly Detection Pipeline
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@dataclass
class AnomalyResult:
"""Result of an anomaly detection check."""
metric: str
value: float
expected_range: Tuple[float, float]
z_score: float
is_anomaly: bool
severity: str # low, medium, high, critical
class DataQualityAnomalyDetector:
"""Detect anomalies in data quality metrics using statistical methods."""
def __init__(self, lookback_days: int = 30, z_threshold: float = 2.5):
self.lookback_days = lookback_days
self.z_threshold = z_threshold
def detect_metric_anomalies(
self,
current_metrics: Dict[str, float],
historical_metrics: pd.DataFrame,
) -> List[AnomalyResult]:
"""Detect anomalies by comparing current metrics against historical baseline."""
results = []
for metric_name, current_value in current_metrics.items():
if metric_name not in historical_metrics.columns:
continue
historical = historical_metrics[metric_name].dropna()
if len(historical) < 10:
logger.warning(f"Insufficient history for {metric_name}")
continue
mean = historical.mean()
std = historical.std()
if std == 0:
z_score = 0
else:
z_score = (current_value - mean) / std
# Calculate expected range
expected_low = mean - self.z_threshold * std
expected_high = mean + self.z_threshold * std
is_anomaly = abs(z_score) > self.z_threshold
# Determine severity
if abs(z_score) > 4:
severity = "critical"
elif abs(z_score) > 3:
severity = "high"
elif abs(z_score) > self.z_threshold:
severity = "medium"
else:
severity = "low"
results.append(AnomalyResult(
metric=metric_name,
value=current_value,
expected_range=(expected_low, expected_high),
z_score=z_score,
is_anomaly=is_anomaly,
severity=severity,
))
return results
def detect_distribution_shift(
self,
current_data: pd.Series,
historical_data: pd.Series,
metric_name: str,
) -> AnomalyResult:
"""Detect distribution shift using Kolmogorov-Smirnov test."""
# KS test for distribution comparison
ks_statistic, p_value = stats.ks_2samp(historical_data, current_data)
is_anomaly = p_value < 0.05 # Significant distribution shift
return AnomalyResult(
metric=f"{metric_name}_distribution",
value=ks_statistic,
expected_range=(0, 0.1),
z_score=ks_statistic * 10, # Scale for consistency
is_anomaly=is_anomaly,
severity="high" if is_anomaly else "low",
)
def detect_trend_anomaly(
self,
time_series: pd.Series,
metric_name: str,
window: int = 7,
) -> List[AnomalyResult]:
"""Detect anomalies in time-series data using rolling statistics."""
results = []
rolling_mean = time_series.rolling(window=window).mean()
rolling_std = time_series.rolling(window=window).std()
for i in range(window, len(time_series)):
if pd.isna(rolling_std.iloc[i]) or rolling_std.iloc[i] == 0:
continue
z_score = (time_series.iloc[i] - rolling_mean.iloc[i]) / rolling_std.iloc[i]
is_anomaly = abs(z_score) > self.z_threshold
if is_anomaly:
results.append(AnomalyResult(
metric=f"{metric_name}_trend",
value=float(time_series.iloc[i]),
expected_range=(
float(rolling_mean.iloc[i] - self.z_threshold * rolling_std.iloc[i]),
float(rolling_mean.iloc[i] + self.z_threshold * rolling_std.iloc[i]),
),
z_score=float(z_score),
is_anomaly=True,
severity="high" if abs(z_score) > 3 else "medium",
))
return results
def generate_quality_report(
self,
df: pd.DataFrame,
expected_schema: Dict[str, str],
) -> Dict:
"""Generate a comprehensive data quality report."""
report = {
"timestamp": datetime.utcnow().isoformat(),
"total_rows": len(df),
"total_columns": len(df.columns),
"completeness": {},
"uniqueness": {},
"validity": {},
"anomalies": [],
}
# Completeness analysis
for col in df.columns:
null_rate = df[col].isnull().mean()
report["completeness"][col] = {
"null_rate": float(null_rate),
"null_count": int(df[col].isnull().sum()),
"status": "pass" if null_rate < 0.05 else "fail",
}
# Uniqueness analysis
for col in df.columns:
if "id" in col.lower():
duplicate_rate = 1 - (len(df[col].unique()) / len(df))
report["uniqueness"][col] = {
"duplicate_rate": float(duplicate_rate),
"duplicate_count": int(len(df) - len(df[col].unique())),
"status": "pass" if duplicate_rate < 0.01 else "fail",
}
# Type validation
for col, expected_type in expected_schema.items():
if col in df.columns:
actual_type = str(df[col].dtype)
report["validity"][col] = {
"expected_type": expected_type,
"actual_type": actual_type,
"status": "pass" if actual_type == expected_type else "fail",
}
return report
Great Expectations vs Soda Core vs Pandera: Great Expectations is the most feature-rich framework with data docs, profiling, and integration with Airflow/dbt. Soda Core is lightweight and YAML-based, ideal for simple checks. Pandera is best for DataFrame schema validation in pandas/polars workflows. For production, Great Expectations provides the most comprehensive quality engineering capabilities.
Data Quality SLAs: Define quality SLAs for each pipeline: (1) Completeness > 99.9%, (2) Freshness < 1 hour, (3) Null rate < 1% for critical fields, (4) Uniqueness = 100% for primary keys, (5) Anomaly score < 2.5 sigma. Violate SLAs trigger alerts and may halt downstream processing.
- Data quality is multidimensional: completeness, accuracy, consistency, timeliness, validity, uniqueness, freshness.
- Quality gates at pipeline stage boundaries prevent bad data from propagating downstream.
- Great Expectations provides comprehensive data validation with expect_* methods and data docs.
- Anomaly detection uses statistical methods (Z-score, IQR, KS test) to identify deviations from expected patterns.
- Data contracts formalize schema, semantics, quality guarantees, and SLAs between producers and consumers.
- Automated quality checks, anomaly detection, and alerting are essential for production data systems.
- Quality metrics should be visualized on dashboards and tracked over time for trend analysis.
Best Practices
- Implement quality gates at every pipeline stage boundary. Halt the pipeline if quality checks fail.
- Define data contracts with source system owners specifying schema, quality guarantees, and SLAs.
- Use Great Expectations or Soda Core for declarative quality rule definitions and automated validation.
- Monitor anomaly scores over time. Alert when metrics deviate beyond 2.5 standard deviations.
- Track completeness, freshness, and uniqueness as primary quality KPIs for every pipeline.
- Implement pre-load validation to reject malformed source data before it enters the pipeline.
- Build quality dashboards to visualize metrics, trends, and anomalies for stakeholders.
- Automate remediation for common issues (nulls, duplicates). Create tickets for complex issues.
- Use statistical methods (Z-score, KS test) for anomaly detection rather than static thresholds.
- Review quality metrics in weekly data reviews. Update thresholds and add new checks as data patterns evolve.
Data Quality Tool Comparison
| Feature | Great Expectations | Soda Core | Pandera | dbt Tests |
|---|---|---|---|---|
| Language | Python | YAML | Python | SQL |
| Schema Validation | Yes | Yes | Yes | Limited |
| Statistical Checks | Yes | Yes | Yes | No |
| Anomaly Detection | Yes | Limited | Yes | No |
| Data Docs | Yes | No | No | No |
| Airflow Integration | Yes | Yes | Yes | Yes |
| dbt Integration | Yes | Yes | Yes | Native |
| Learning Curve | Medium | Low | Low | Low |
| Best For | Enterprise | Quick checks | DataFrame workflows | Analytics engineering |
Quality Dimension Thresholds
| Dimension | Critical | Warning | Target | Measurement |
|---|---|---|---|---|
| Completeness | < 95% | < 99% | > 99.9% | Null rate per column |
| Uniqueness | < 99% | < 99.9% | 100% | Duplicate rate on PKs |
| Validity | < 95% | < 99% | > 99.9% | Rule violation rate |
| Freshness | > 24h old | > 4h old | < 1h old | Time since last update |
| Consistency | < 90% match | < 99% match | > 99.9% | Cross-dataset agreement |
See Also
- 026 - Data Pipeline Testing - Testing quality validation logic
- 024 - Data Ingestion Patterns - Quality at ingestion time
- 027 - Pipeline Monitoring and Observability - Monitoring quality metrics
- 016 - ETL vs ELT - Quality in transformation paradigms
- 025 - Data Quality: Validation Frameworks - This lesson