πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Quality: Validation Frameworks and Anomaly Detection

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Data Quality: Ensuring Trustworthy Data at Scale

Data quality is the degree to which data satisfies its intended purpose β€” accuracy, completeness, consistency, timeliness, and validity.

The Data Quality Dimensions


Core Dimensions:

  1. Completeness β€” are all expected records present?
  2. Accuracy β€” do values reflect reality?
  3. Consistency β€” do related datasets agree?
  4. Timeliness β€” is data available within SLA?
  5. Validity β€” do values conform to business rules?
  6. Uniqueness β€” are there duplicate records?
  7. Freshness β€” how recent is the data?
  8. Schema compliance β€” does the data match expected types and constraints?

Priority by Use Case:

Use CaseCompletenessAccuracyConsistencyTimelinessUniqueness
Financial ReportingCriticalCriticalCriticalHighCritical
Real-Time DashboardsHighHighMediumCriticalHigh
ML TrainingHighHighHighMediumMedium
AnalyticsHighMediumHighMediumHigh
ComplianceCriticalCriticalCriticalHighCritical

Key Insight: Poor data quality costs organizations an average of $12.9 million annually (Gartner). Data quality is not a one-time check but a continuous engineering discipline embedded throughout the data lifecycle.

Data Quality Framework

Quality RulesSchema RulesStatisticalBusiness RulesValidation ChecksPre-Load ChecksPost-Load ChecksContinuous MonitorOutputsPass/Fail ReportsAnomaly ScoresAlert ActionsGreat Expect.Soda Coredbt Tests

Quality Dimensions

Six Data Quality DimensionsCompletenessAll expectedrecords presentAccuracyValues reflectrealityConsistencyRelated datasetsagreeTimelinessAvailable withinSLAValidityValues conformto rulesUniquenessNo duplicaterecordsDQ Score = w1Completeness + w2Accuracy + w3Consistency + w4Timeliness + w5Validity + w6Uniqueness

Architecture Diagram

Data quality is the fitness of data for its intended use in operations, decision-making, and planning. Formally: DQ(Data) = Ξ£(w_i * score_i), where score_i is the quality score for dimension i (completeness, accuracy, consistency, timeliness, validity, uniqueness) and w_i is the weight assigned to each dimension based on business importance.

A data contract is a formal agreement between data producers and consumers that specifies the schema, semantics, quality guarantees, and SLAs of a dataset. Data contracts encode: (1) schema (field names, types, constraints), (2) semantics (business meaning, units, allowed values), (3) quality (null rates, uniqueness, freshness), and (4) SLAs (availability, latency, completeness).

Anomaly detection in data quality identifies data points, patterns, or distributions that deviate significantly from expected behavior. Methods include: (1) Statistical β€” Z-score, IQR (Interquartile Range), (2) Time-series β€” seasonal decomposition, ARIMA residuals, (3) ML-based β€” isolation forests, autoencoders, (4) Rule-based β€” thresholds, business rules.

Z-Score Anomaly Detection

For a data point x in a distribution with mean ΞΌ and standard deviation Οƒ, the Z-score is: Z = (x - ΞΌ) / Οƒ. A data point is considered anomalous if |Z| > threshold (typically 2-3). For multivariate data, Mahalanobis distance is used: D = sqrt((x - ΞΌ)^T * Ξ£^{-1} * (x - ΞΌ)), where Ξ£ is the covariance matrix. Points with D > threshold are anomalous.

IQR Method

The Interquartile Range (IQR) method identifies outliers as points outside Q1 - kIQR or Q3 + kIQR, where IQR = Q3 - Q1 and k is typically 1.5 (mild outliers) or 3.0 (extreme outliers). For a dataset of size N, the expected number of false positives is: FP = N * P(|Z| > k*IQR/Οƒ) β‰ˆ N * 0.007 for k=1.5.

Completeness Score

Completeness = (Total_expected_records - Missing_records) / Total_expected_records * 100%. For a pipeline with N source records and M loaded records: Completeness = M / N * 100%. Target: Completeness > 99.9% for critical pipelines. Acceptable loss: Completeness_loss < 0.01% per pipeline stage.

Every data pipeline must have quality gates at stage boundaries. A quality gate is a set of checks that must pass before data flows to the next stage. If a gate fails, the pipeline must halt, alert, and provide diagnostic information. Formal: Pipeline_output = Validated(Pipeline_input) if all gates pass; Pipeline_output = βˆ… (halt) if any gate fails. This prevents downstream systems from consuming bad data.

Data contracts are enforced at three points: (1) at ingestion β€” reject data that violates schema contracts, (2) at transformation β€” validate output against quality contracts, (3) at serving β€” verify freshness and completeness before serving to consumers. If any enforcement point detects a violation, the contract is breached and the pipeline must take corrective action.

Key Concepts

ConceptDescriptionTool/Framework
Schema ValidationVerify field names, types, and constraintsGreat Expectations, Pandera
Null Rate CheckEnsure null percentage is below thresholdGreat Expectations expect_column_values_to_not_be_null
Uniqueness CheckVerify no duplicate recordsexpect_column_values_to_be_unique
Range CheckValues fall within expected boundsexpect_column_values_to_be_between
Freshness CheckData arrives within SLACustom checks, Airflow sensors
Referential IntegrityForeign keys reference valid recordsexpect_column_values_to_be_in_set
Distribution CheckData distribution matches expectedKS test, Chi-squared test
Statistical OutlierValues deviate significantly from normalZ-score, IQR method
Data ContractFormal schema and quality agreementDataContract specification
Great ExpectationsPython data validation frameworkexpect_* methods
Soda CoreLightweight data quality frameworkYAML-based checks
PandaseraDataFrame schema validationColumn(dtype, nullable=False)
dbt TestsSQL-based data testsunique, not_null, accepted_values
Anomaly DetectionIdentify unusual patterns in dataIsolation Forest, Z-score, IQR
Quality DashboardVisualize quality metrics over timeGrafana, Looker, custom UI
SLA MonitoringTrack data availability and freshnessAirflow, custom monitors
Data ProfilingStatistical summary of datasetpandas-profiling, ydata-profiling
Lineage TrackingTrack data flow from source to targetOpenLineage, Apache Atlas
  1. Define quality dimensions: Identify which dimensions matter most for your use case (completeness, accuracy, freshness, etc.).
  2. Create data contracts: Document schema, semantics, quality guarantees, and SLAs for each dataset.
  3. Implement pre-load checks: Validate source data before loading. Reject or quarantine records that violate contracts.
  4. Implement post-load checks: Validate loaded data against quality rules. Compare record counts, null rates, distributions.
  5. Set up anomaly detection: Monitor metrics over time and alert on statistical deviations.
  6. Build quality dashboards: Visualize quality metrics, trends, and anomalies for stakeholders.
  7. Implement alerting: Send alerts (Slack, PagerDuty, email) when quality gates fail or anomalies are detected.
  8. Track data lineage: Use OpenLineage to track data flow and identify upstream sources of quality issues.
  9. Automate remediation: For common issues (nulls, duplicates), implement automated fixes. For complex issues, create tickets.
  10. Review and iterate: Regularly review quality metrics, update thresholds, and add new checks based on data patterns.

Production Code

Great Expectations Suite

import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.dataset import PandasDataset
import pandas as pd
import logging

logger = logging.getLogger(__name__)


class DataQualityValidator:
    """Production data quality validation using Great Expectations."""

    def __init__(self, ge_context_root: str = "great_expectations"):
        self.context = gx.get_context()
        self.suite_name = "production_quality_suite"

    def create_orders_suite(self) -> ExpectationSuite:
        """Create a comprehensive quality suite for orders data."""
        suite = self.context.add_or_update_expectation_suite(
            expectation_suite_name=self.suite_name
        )

        # Schema expectations
        suite.add_expectation(
            gx.expectations.ExpectTableSchemaToMatch(
                schema={
                    "columns": [
                        {"name": "order_id", "type": "string"},
                        {"name": "customer_id", "type": "string"},
                        {"name": "amount", "type": "double"},
                        {"name": "status", "type": "string"},
                        {"name": "created_at", "type": "timestamp"},
                    ]
                }
            )
        )

        # Completeness expectations
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
        )
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
        )
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToNotBeNull(column="amount")
        )

        # Uniqueness expectation
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
        )

        # Range expectations
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToBeBetween(
                column="amount",
                min_value=0.01,
                max_value=1000000.0,
            )
        )

        # Set membership expectation
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToBeInSet(
                column="status",
                value_set=["pending", "processing", "completed", "cancelled", "refunded"],
            )
        )

        # Regex pattern expectation
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToMatchRegex(
                column="order_id",
                regex=r"^ORD-\d{8}-\d{6}$",
            )
        )

        # Table-level expectations
        suite.add_expectation(
            gx.expectations.ExpectTableRowCountToBeBetween(
                min_value=1000,
                max_value=10000000,
            )
        )

        # Freshness expectation (data should be less than 24 hours old)
        suite.add_expectation(
            gx.expectations.ExpectColumnMaxToBeBetween(
                column="created_at",
                min_value="2024-01-15T00:00:00",
                max_value="2024-01-16T00:00:00",
            )
        )

        return suite

    def validate_batch(self, df: pd.DataFrame) -> dict:
        """
        Validate a batch of data against the quality suite.
        
        Parameters:
            df (pd.DataFrame): Data to validate
            
        Returns:
            dict: Validation results with success status, check counts, and failure details
            
        Example output:
            {
                "success": False,
                "total_checks": 8,
                "passed": 6,
                "failed": 2,
                "failed_expectations": [
                    {"expectation": "expect_column_values_to_not_be_null", "column": "amount", ...},
                    {"expectation": "expect_column_values_to_be_between", "column": "amount", ...}
                ]
            }
        """
        batch = self.context.add_or_update_datasource(
            name="pandas_datasource",
            class_name="PandasDatasource",
        )

        batch_request = batch.build_batch_request(dataframe=df)
        checkpoint_name = "quality_checkpoint"

        checkpoint = self.context.add_or_update_simple_checkpoint(
            name=checkpoint_name,
            validations=[
                {
                    "batch_request": batch_request,
                    "expectation_suite_name": self.suite_name,
                }
            ],
        )

        result = checkpoint.run()
        success = result.success

        # Extract detailed results
        results = {
            "success": success,
            "total_checks": len(result.results),
            "passed": sum(1 for r in result.results if r.success),
            "failed": sum(1 for r in result.results if not r.success),
            "failed_expectations": [
                {
                    "expectation": str(r.expectation_config.expectation_type),
                    "column": r.expectation_config.kwargs.get("column"),
                    "result": r.result,
                }
                for r in result.results
                if not r.success
            ],
        }

        if not success:
            logger.warning(
                f"Data quality validation FAILED: "
                f"{results['failed']}/{results['total_checks']} checks failed"
            )
            for failed in results["failed_expectations"]:
                logger.warning(f"  FAILED: {failed['expectation']} on {failed['column']}")

        return results

Anomaly Detection Pipeline

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)


@dataclass
class AnomalyResult:
    """Result of an anomaly detection check."""
    metric: str
    value: float
    expected_range: Tuple[float, float]
    z_score: float
    is_anomaly: bool
    severity: str  # low, medium, high, critical


class DataQualityAnomalyDetector:
    """Detect anomalies in data quality metrics using statistical methods."""

    def __init__(self, lookback_days: int = 30, z_threshold: float = 2.5):
        self.lookback_days = lookback_days
        self.z_threshold = z_threshold

    def detect_metric_anomalies(
        self,
        current_metrics: Dict[str, float],
        historical_metrics: pd.DataFrame,
    ) -> List[AnomalyResult]:
        """Detect anomalies by comparing current metrics against historical baseline."""
        results = []

        for metric_name, current_value in current_metrics.items():
            if metric_name not in historical_metrics.columns:
                continue

            historical = historical_metrics[metric_name].dropna()
            if len(historical) < 10:
                logger.warning(f"Insufficient history for {metric_name}")
                continue

            mean = historical.mean()
            std = historical.std()

            if std == 0:
                z_score = 0
            else:
                z_score = (current_value - mean) / std

            # Calculate expected range
            expected_low = mean - self.z_threshold * std
            expected_high = mean + self.z_threshold * std

            is_anomaly = abs(z_score) > self.z_threshold

            # Determine severity
            if abs(z_score) > 4:
                severity = "critical"
            elif abs(z_score) > 3:
                severity = "high"
            elif abs(z_score) > self.z_threshold:
                severity = "medium"
            else:
                severity = "low"

            results.append(AnomalyResult(
                metric=metric_name,
                value=current_value,
                expected_range=(expected_low, expected_high),
                z_score=z_score,
                is_anomaly=is_anomaly,
                severity=severity,
            ))

        return results

    def detect_distribution_shift(
        self,
        current_data: pd.Series,
        historical_data: pd.Series,
        metric_name: str,
    ) -> AnomalyResult:
        """Detect distribution shift using Kolmogorov-Smirnov test."""
        # KS test for distribution comparison
        ks_statistic, p_value = stats.ks_2samp(historical_data, current_data)

        is_anomaly = p_value < 0.05  # Significant distribution shift

        return AnomalyResult(
            metric=f"{metric_name}_distribution",
            value=ks_statistic,
            expected_range=(0, 0.1),
            z_score=ks_statistic * 10,  # Scale for consistency
            is_anomaly=is_anomaly,
            severity="high" if is_anomaly else "low",
        )

    def detect_trend_anomaly(
        self,
        time_series: pd.Series,
        metric_name: str,
        window: int = 7,
    ) -> List[AnomalyResult]:
        """Detect anomalies in time-series data using rolling statistics."""
        results = []

        rolling_mean = time_series.rolling(window=window).mean()
        rolling_std = time_series.rolling(window=window).std()

        for i in range(window, len(time_series)):
            if pd.isna(rolling_std.iloc[i]) or rolling_std.iloc[i] == 0:
                continue

            z_score = (time_series.iloc[i] - rolling_mean.iloc[i]) / rolling_std.iloc[i]
            is_anomaly = abs(z_score) > self.z_threshold

            if is_anomaly:
                results.append(AnomalyResult(
                    metric=f"{metric_name}_trend",
                    value=float(time_series.iloc[i]),
                    expected_range=(
                        float(rolling_mean.iloc[i] - self.z_threshold * rolling_std.iloc[i]),
                        float(rolling_mean.iloc[i] + self.z_threshold * rolling_std.iloc[i]),
                    ),
                    z_score=float(z_score),
                    is_anomaly=True,
                    severity="high" if abs(z_score) > 3 else "medium",
                ))

        return results

    def generate_quality_report(
        self,
        df: pd.DataFrame,
        expected_schema: Dict[str, str],
    ) -> Dict:
        """Generate a comprehensive data quality report."""
        report = {
            "timestamp": datetime.utcnow().isoformat(),
            "total_rows": len(df),
            "total_columns": len(df.columns),
            "completeness": {},
            "uniqueness": {},
            "validity": {},
            "anomalies": [],
        }

        # Completeness analysis
        for col in df.columns:
            null_rate = df[col].isnull().mean()
            report["completeness"][col] = {
                "null_rate": float(null_rate),
                "null_count": int(df[col].isnull().sum()),
                "status": "pass" if null_rate < 0.05 else "fail",
            }

        # Uniqueness analysis
        for col in df.columns:
            if "id" in col.lower():
                duplicate_rate = 1 - (len(df[col].unique()) / len(df))
                report["uniqueness"][col] = {
                    "duplicate_rate": float(duplicate_rate),
                    "duplicate_count": int(len(df) - len(df[col].unique())),
                    "status": "pass" if duplicate_rate < 0.01 else "fail",
                }

        # Type validation
        for col, expected_type in expected_schema.items():
            if col in df.columns:
                actual_type = str(df[col].dtype)
                report["validity"][col] = {
                    "expected_type": expected_type,
                    "actual_type": actual_type,
                    "status": "pass" if actual_type == expected_type else "fail",
                }

        return report

Great Expectations vs Soda Core vs Pandera: Great Expectations is the most feature-rich framework with data docs, profiling, and integration with Airflow/dbt. Soda Core is lightweight and YAML-based, ideal for simple checks. Pandera is best for DataFrame schema validation in pandas/polars workflows. For production, Great Expectations provides the most comprehensive quality engineering capabilities.

Data Quality SLAs: Define quality SLAs for each pipeline: (1) Completeness > 99.9%, (2) Freshness < 1 hour, (3) Null rate < 1% for critical fields, (4) Uniqueness = 100% for primary keys, (5) Anomaly score < 2.5 sigma. Violate SLAs trigger alerts and may halt downstream processing.

  • Data quality is multidimensional: completeness, accuracy, consistency, timeliness, validity, uniqueness, freshness.
  • Quality gates at pipeline stage boundaries prevent bad data from propagating downstream.
  • Great Expectations provides comprehensive data validation with expect_* methods and data docs.
  • Anomaly detection uses statistical methods (Z-score, IQR, KS test) to identify deviations from expected patterns.
  • Data contracts formalize schema, semantics, quality guarantees, and SLAs between producers and consumers.
  • Automated quality checks, anomaly detection, and alerting are essential for production data systems.
  • Quality metrics should be visualized on dashboards and tracked over time for trend analysis.

Best Practices

  1. Implement quality gates at every pipeline stage boundary. Halt the pipeline if quality checks fail.
  2. Define data contracts with source system owners specifying schema, quality guarantees, and SLAs.
  3. Use Great Expectations or Soda Core for declarative quality rule definitions and automated validation.
  4. Monitor anomaly scores over time. Alert when metrics deviate beyond 2.5 standard deviations.
  5. Track completeness, freshness, and uniqueness as primary quality KPIs for every pipeline.
  6. Implement pre-load validation to reject malformed source data before it enters the pipeline.
  7. Build quality dashboards to visualize metrics, trends, and anomalies for stakeholders.
  8. Automate remediation for common issues (nulls, duplicates). Create tickets for complex issues.
  9. Use statistical methods (Z-score, KS test) for anomaly detection rather than static thresholds.
  10. Review quality metrics in weekly data reviews. Update thresholds and add new checks as data patterns evolve.

Data Quality Tool Comparison

FeatureGreat ExpectationsSoda CorePanderadbt Tests
LanguagePythonYAMLPythonSQL
Schema ValidationYesYesYesLimited
Statistical ChecksYesYesYesNo
Anomaly DetectionYesLimitedYesNo
Data DocsYesNoNoNo
Airflow IntegrationYesYesYesYes
dbt IntegrationYesYesYesNative
Learning CurveMediumLowLowLow
Best ForEnterpriseQuick checksDataFrame workflowsAnalytics engineering

Quality Dimension Thresholds

DimensionCriticalWarningTargetMeasurement
Completeness< 95%< 99%> 99.9%Null rate per column
Uniqueness< 99%< 99.9%100%Duplicate rate on PKs
Validity< 95%< 99%> 99.9%Rule violation rate
Freshness> 24h old> 4h old< 1h oldTime since last update
Consistency< 90% match< 99% match> 99.9%Cross-dataset agreement

See Also

⭐

Premium Content

Data Quality: Validation Frameworks and Anomaly Detection

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement