πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Pipeline Testing: Unit, Integration, Contract, and Regression

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Data Pipeline Testing: Building Confidence in Your Data

Data pipeline testing is the systematic verification that pipeline logic, data quality, and system behavior meet specifications.

Why Data Testing Is Hard


Unique Challenges:

  1. Non-deterministic inputs β€” data arrives with unpredictable values and volumes
  2. Stateful transformations β€” outputs depend on historical state
  3. Distributed execution β€” failures occur across nodes
  4. Schema evolution β€” source schemas change without notice
  5. Volume β€” testing with production-scale data is expensive

Testing Strategy Must:

  • Address all these challenges
  • Remain fast enough for CI/CD pipelines

Key Insight: Unlike traditional software testing, data testing must verify not only code correctness but also data correctness.

Testing Pyramid

Testing PyramidUnit Tests80% of tests | Fastest | CheapestIntegration Tests15% of tests | Medium speedContract TestsSchema validationE2E / Acceptance5% of tests | Slowest80%15%-5%

Test Coverage Across Pipeline

Test Coverage Across Pipeline StagesExtractUnit + ContractValidateUnit + IntegrationTransformUnit + RegressionLoadIntegration + E2ETest Types:UnitIntegrationContractRegressionE2ETarget: 95% unit, 80% branch, 100% critical path

Architecture Diagram

A unit test in data pipeline testing verifies a single transformation function or operator in isolation, using controlled input data and asserting expected output. Unit tests do not require external dependencies (databases, APIs, file systems). They are the fastest and cheapest tests, forming the foundation of the testing pyramid. Each transformation function should have unit tests covering: normal cases, edge cases, empty inputs, null values, and error conditions.

An integration test verifies that multiple pipeline stages work correctly together, including interactions with external systems (databases, message queues, file systems). Integration tests use test doubles (mocks, stubs, fakes) for external dependencies or spin up lightweight test instances (Testcontainers). They verify data flow across stage boundaries and end-to-end pipeline behavior.

A contract test verifies that the schema, data types, and quality guarantees of a dataset match the agreed-upon contract between producers and consumers. Contract tests prevent schema evolution from breaking downstream consumers. They verify: (1) field names and types, (2) nullability constraints, (3) value ranges, (4) uniqueness constraints, (5) referential integrity.

A regression test verifies that pipeline output has not changed unexpectedly after code modifications. Regression tests compare current output against a known-good baseline (golden dataset). They detect unintended changes in transformation logic, configuration changes, or data drift. Regression tests are the safety net for deployment confidence.

Test Coverage Metric

Pipeline test coverage = (Number of tested transformations / Total transformations) * 100%. For a pipeline with N transformation functions, each with T test cases: Total_test_cases = N * T_avg. Target coverage: > 90% line coverage, > 80% branch coverage, 100% coverage for critical path transformations.

Test Execution Time Budget

For a CI/CD pipeline with total budget B minutes and N test categories: B >= T_unit + T_integration + T_contract + T_regression. Typical allocation: T_unit < 2 min (50% of tests, fast execution), T_integration < 5 min (30% of tests), T_contract < 1 min (10% of tests), T_regression < 2 min (10% of tests). Total: B β‰ˆ 10 min.

The testing pyramid invariant states: Count(unit_tests) > Count(integration_tests) > Count(e2e_tests). Formally: N_unit / N_total > 0.5, N_integration / N_total ∈ [0.2, 0.4], N_e2e / N_total < 0.1. Violating this invariant leads to slow, brittle test suites that are expensive to maintain and slow to execute.

For regression testing, maintain a golden dataset β€” a known-good output that serves as the baseline for comparison. The regression test asserts: Current_output == Golden_dataset. When transformation logic changes, update the golden dataset after verifying the new output is correct. Formal: Regression_pass <-> |Current_output - Golden_dataset| == 0 (for deterministic transformations).

Key Concepts

ConceptDescriptionTool/Framework
Unit TestTest individual transformation functionspytest, unittest
Integration TestTest pipeline stages togetherpytest, Testcontainers
Contract TestVerify schema and data contractsGreat Expectations, Pandera
Regression TestCompare output against golden datasetCustom assertions, pytest
MockTest double that replaces external dependencyunittest.mock, pytest-mock
StubTest double that returns predetermined valuesunittest.mock.patch
FakeTest double with simplified real implementationIn-memory database
TestcontainerLightweight Docker containers for integration teststestcontainers-python
Golden DatasetKnown-good output for regression testingStored fixtures
Data FixturePredefined test data for consistent testingpytest fixtures
ParametrizeRun test with multiple input combinations@pytest.mark.parametrize
Snapshot TestingCompare output against stored snapshotssnapshottest, syrupy
Mutation TestingIntroduce faults to verify test effectivenessmutmut
CoveragePercentage of code covered by testscoverage.py, pytest-cov
CI/CD IntegrationRun tests automatically on code changesGitHub Actions, GitLab CI
Test Data ManagementCreate and manage test dataFaker, factory_boy
Slow Test MarkerMark tests that take too long for CI@pytest.mark.slow
Flaky TestTest that intermittently passes/failsMark and quarantine
  1. Identify testable units: Decompose pipeline into individual functions (parse, clean, transform, validate, load). Each function is a testable unit.
  2. Write unit tests first: For each function, write tests covering normal cases, edge cases, null inputs, empty inputs, and error conditions. Use parametrize for multiple inputs.
  3. Create test fixtures: Define reusable test data (DataFrames, dicts, files) as pytest fixtures. Use Faker for realistic mock data.
  4. Write integration tests: Test pipeline stages together using Testcontainers for databases or in-memory stores. Verify data flows correctly across boundaries.
  5. Define contracts: Create schema contracts using Great Expectations or Pandera. Test that input/output data satisfies contracts.
  6. Establish golden datasets: For each pipeline, create a known-good output. Write regression tests comparing current output against golden datasets.
  7. Set up CI/CD: Configure GitHub Actions or GitLab CI to run unit tests on every commit, integration tests on PRs, and full regression on merge to main.
  8. Monitor coverage: Track test coverage and set minimum thresholds. Block PRs that decrease coverage below threshold.
  9. Quarantine flaky tests: Mark flaky tests with @pytest.mark.slow and fix them separately. Never ignore flaky tests.
  10. Review test quality: In code reviews, verify that tests are meaningful, not just checking boxes. Tests should catch real bugs.

Production Code

Unit Test Suite for Transformation Functions

import pytest
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict
import sys
sys.path.insert(0, "../src")


# ------------------------------------------------------
# PRODUCTION CODE (tested below)
# ------------------------------------------------------
def parse_event(raw_event: Dict) -> Dict:
    """Parse raw event dict into structured format."""
    return {
        "event_id": raw_event["event_id"],
        "user_id": raw_event["user_id"],
        "amount": float(raw_event["amount"]),
        "event_time": datetime.fromisoformat(raw_event["timestamp"]),
    }


def clean_event(event: Dict) -> Dict:
    """Clean event: remove nulls, validate types."""
    if event["amount"] is None or event["amount"] <= 0:
        raise ValueError(f"Invalid amount: {event['amount']}")
    if event["user_id"] is None or event["user_id"] == "":
        raise ValueError(f"Invalid user_id: {event['user_id']}")
    return event


def enrich_event(event: Dict, customer_db: Dict) -> Dict:
    """Enrich event with customer tier from lookup."""
    customer = customer_db.get(event["user_id"])
    if customer is None:
        event["tier"] = "unknown"
        event["credit_limit"] = 0.0
    else:
        event["tier"] = customer["tier"]
        event["credit_limit"] = customer["credit_limit"]
    return event


def aggregate_daily(events: list) -> pd.DataFrame:
    """Aggregate events by user and date."""
    df = pd.DataFrame(events)
    if df.empty:
        return pd.DataFrame(columns=["user_id", "date", "total_amount", "event_count"])
    df["date"] = df["event_time"].dt.date
    return (
        df.groupby(["user_id", "date"])
        .agg(
            total_amount=("amount", "sum"),
            event_count=("event_id", "count"),
        )
        .reset_index()
    )


# ------------------------------------------------------
# UNIT TESTS
# ------------------------------------------------------
class TestParseEvent:
    """Unit tests for parse_event function."""

    def test_parse_valid_event(self):
        raw = {
            "event_id": "EVT-001",
            "user_id": "USR-001",
            "amount": "99.99",
            "timestamp": "2024-01-15T10:30:00",
        }
        result = parse_event(raw)
        assert result["event_id"] == "EVT-001"
        assert result["amount"] == 99.99
        assert isinstance(result["event_time"], datetime)

    def test_parse_zero_amount(self):
        raw = {
            "event_id": "EVT-002",
            "user_id": "USR-001",
            "amount": "0.00",
            "timestamp": "2024-01-15T10:30:00",
        }
        result = parse_event(raw)
        assert result["amount"] == 0.0

    @pytest.mark.parametrize("amount", ["100.50", "0.01", "999999.99"])
    def test_parse_various_amounts(self, amount):
        raw = {
            "event_id": "EVT-003",
            "user_id": "USR-001",
            "amount": amount,
            "timestamp": "2024-01-15T10:30:00",
        }
        result = parse_event(raw)
        assert result["amount"] == float(amount)

    def test_parse_invalid_timestamp(self):
        raw = {
            "event_id": "EVT-004",
            "user_id": "USR-001",
            "amount": "50.00",
            "timestamp": "invalid-date",
        }
        with pytest.raises(ValueError):
            parse_event(raw)


class TestCleanEvent:
    """Unit tests for clean_event function."""

    def test_clean_valid_event(self):
        event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": 99.99}
        result = clean_event(event)
        assert result == event

    def test_clean_zero_amount_raises(self):
        event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": 0}
        with pytest.raises(ValueError, match="Invalid amount"):
            clean_event(event)

    def test_clean_negative_amount_raises(self):
        event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": -10.0}
        with pytest.raises(ValueError, match="Invalid amount"):
            clean_event(event)

    def test_clean_null_amount_raises(self):
        event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": None}
        with pytest.raises(ValueError, match="Invalid amount"):
            clean_event(event)

    def test_clean_empty_user_id_raises(self):
        event = {"event_id": "EVT-001", "user_id": "", "amount": 50.0}
        with pytest.raises(ValueError, match="Invalid user_id"):
            clean_event(event)

    def test_clean_null_user_id_raises(self):
        event = {"event_id": "EVT-001", "user_id": None, "amount": 50.0}
        with pytest.raises(ValueError, match="Invalid user_id"):
            clean_event(event)


class TestEnrichEvent:
    """Unit tests for enrich_event function."""

    def test_enrich_known_customer(self):
        event = {"user_id": "USR-001", "amount": 50.0}
        customer_db = {"USR-001": {"tier": "gold", "credit_limit": 10000.0}}
        result = enrich_event(event, customer_db)
        assert result["tier"] == "gold"
        assert result["credit_limit"] == 10000.0

    def test_enrich_unknown_customer(self):
        event = {"user_id": "USR-999", "amount": 50.0}
        customer_db = {"USR-001": {"tier": "gold", "credit_limit": 10000.0}}
        result = enrich_event(event, customer_db)
        assert result["tier"] == "unknown"
        assert result["credit_limit"] == 0.0

    def test_enrich_empty_database(self):
        event = {"user_id": "USR-001", "amount": 50.0}
        result = enrich_event(event, {})
        assert result["tier"] == "unknown"


class TestAggregateDaily:
    """Unit tests for aggregate_daily function."""

    def test_aggregate_single_event(self):
        events = [
            {"event_id": "EVT-001", "user_id": "USR-001", "amount": 50.0,
             "event_time": datetime(2024, 1, 15, 10, 0, 0)},
        ]
        result = aggregate_daily(events)
        assert len(result) == 1
        assert result.iloc[0]["total_amount"] == 50.0
        assert result.iloc[0]["event_count"] == 1

    def test_aggregate_multiple_events_same_day(self):
        events = [
            {"event_id": "EVT-001", "user_id": "USR-001", "amount": 50.0,
             "event_time": datetime(2024, 1, 15, 10, 0, 0)},
            {"event_id": "EVT-002", "user_id": "USR-001", "amount": 30.0,
             "event_time": datetime(2024, 1, 15, 14, 0, 0)},
        ]
        result = aggregate_daily(events)
        assert len(result) == 1
        assert result.iloc[0]["total_amount"] == 80.0
        assert result.iloc[0]["event_count"] == 2

    def test_aggregate_empty_events(self):
        result = aggregate_daily([])
        assert len(result) == 0
        assert "user_id" in result.columns
        assert "total_amount" in result.columns

Integration Test with Testcontainers

import pytest
from testcontainers.postgres import PostgresContainer
import pandas as pd
import sqlalchemy
from datetime import datetime


@pytest.fixture(scope="module")
def postgres():
    """Spin up a PostgreSQL container for integration tests."""
    with PostgresContainer("postgres:15") as pg:
        yield pg


@pytest.fixture(scope="module")
def engine(postgres):
    """Create SQLAlchemy engine connected to test database."""
    engine = sqlalchemy.create_engine(postgres.get_connection_url())
    yield engine
    engine.dispose()


@pytest.fixture(scope="module")
def sample_data():
    """Generate sample data for integration tests."""
    return pd.DataFrame([
        {"order_id": "ORD-001", "customer_id": "C001", "amount": 99.99,
         "status": "completed", "created_at": datetime(2024, 1, 15)},
        {"order_id": "ORD-002", "customer_id": "C002", "amount": 149.50,
         "status": "completed", "created_at": datetime(2024, 1, 15)},
        {"order_id": "ORD-003", "customer_id": "C001", "amount": 29.99,
         "status": "pending", "created_at": datetime(2024, 1, 16)},
    ])


class TestOrderPipelineIntegration:
    """Integration tests for the order processing pipeline."""

    def test_table_creation(self, engine):
        """Test that pipeline creates expected tables."""
        engine.execute("""
            CREATE TABLE IF NOT EXISTS orders (
                order_id VARCHAR PRIMARY KEY,
                customer_id VARCHAR NOT NULL,
                amount DECIMAL(10,2) NOT NULL,
                status VARCHAR NOT NULL,
                created_at TIMESTAMP NOT NULL
            )
        """)
        result = engine.execute(
            "SELECT table_name FROM information_schema.tables "
            "WHERE table_name = 'orders'"
        ).fetchone()
        assert result is not None

    def test_data_load(self, engine, sample_data):
        """Test that pipeline loads data correctly."""
        sample_data.to_sql("orders", engine, if_exists="replace", index=False)

        loaded = pd.read_sql("SELECT * FROM orders", engine)
        assert len(loaded) == 3
        assert set(loaded.columns) == {"order_id", "customer_id", "amount", "status", "created_at"}

    def test_aggregation_query(self, engine, sample_data):
        """Test aggregation logic against real database."""
        sample_data.to_sql("orders", engine, if_exists="replace", index=False)

        result = pd.read_sql("""
            SELECT customer_id, SUM(amount) as total_spent, COUNT(*) as order_count
            FROM orders
            GROUP BY customer_id
            ORDER BY total_spent DESC
        """, engine)

        assert len(result) == 2
        assert result.iloc[0]["customer_id"] == "C001"
        assert result.iloc[0]["total_spent"] == pytest.approx(129.98, rel=1e-2)

    def test_upsert_idempotency(self, engine, sample_data):
        """Test that upsert produces same result on repeated runs."""
        sample_data.to_sql("orders", engine, if_exists="replace", index=False)

        # Run upsert again
        for _, row in sample_data.iterrows():
            engine.execute("""
                INSERT INTO orders (order_id, customer_id, amount, status, created_at)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (order_id)
                DO UPDATE SET amount = EXCLUDED.amount
            """, (row["order_id"], row["customer_id"], row["amount"],
                  row["status"], row["created_at"]))

        loaded = pd.read_sql("SELECT COUNT(*) as cnt FROM orders", engine)
        assert loaded.iloc[0]["cnt"] == 3  # No duplicates

Contract Test with Great Expectations

import great_expectations as gx
import pandas as pd
import pytest


def create_orders_contract():
    """Define schema contract for orders data."""
    return {
        "columns": {
            "order_id": {"type": "string", "nullable": False, "unique": True},
            "customer_id": {"type": "string", "nullable": False},
            "amount": {"type": "float64", "nullable": False, "min": 0.01, "max": 1000000},
            "status": {"type": "string", "nullable": False,
                       "allowed_values": ["pending", "processing", "completed", "cancelled"]},
            "created_at": {"type": "datetime64", "nullable": False},
        },
        "row_count": {"min": 1, "max": 10000000},
    }


class TestOrdersContract:
    """Contract tests for orders dataset."""

    @pytest.fixture
    def valid_orders(self):
        return pd.DataFrame([
            {"order_id": "ORD-001", "customer_id": "C001", "amount": 99.99,
             "status": "completed", "created_at": pd.Timestamp("2024-01-15")},
        ])

    @pytest.fixture
    def invalid_orders(self):
        return pd.DataFrame([
            {"order_id": "ORD-001", "customer_id": "C001", "amount": -10.0,
             "status": "invalid_status", "created_at": pd.Timestamp("2024-01-15")},
        ])

    def test_schema_compliance(self, valid_orders):
        contract = create_orders_contract()
        for col, spec in contract["columns"].items():
            assert col in valid_orders.columns, f"Missing column: {col}"
            # Type check (simplified)
            if spec["type"] == "string":
                assert valid_orders[col].dtype == "object" or pd.api.types.is_string_dtype(valid_orders[col])

    def test_not_null_constraints(self, valid_orders):
        contract = create_orders_contract()
        for col, spec in contract["columns"].items():
            if not spec.get("nullable", True):
                assert valid_orders[col].isnull().sum() == 0, f"Null values in non-nullable column: {col}"

    def test_unique_constraints(self, valid_orders):
        assert valid_orders["order_id"].is_unique, "order_id has duplicates"

    def test_invalid_status_rejected(self, invalid_orders):
        contract = create_orders_contract()
        allowed = contract["columns"]["status"]["allowed_values"]
        invalid = invalid_orders[~invalid_orders["status"].isin(allowed)]
        assert len(invalid) > 0, "Expected invalid status to be detected"

    def test_negative_amount_rejected(self, invalid_orders):
        assert (invalid_orders["amount"] < 0).any(), "Expected negative amount to be detected"

Test Data Management: Use pytest fixtures with scope="session" for expensive setup (database containers, large datasets). Use scope="function" for fast, isolated tests. Use Faker library for realistic mock data. Use factory_boy for complex object factories. Avoid depending on production data for tests β€” always use synthetic data.

CI/CD Pipeline Configuration: Run unit tests on every commit (fast feedback, < 2 min). Run integration tests on pull requests (medium feedback, < 5 min). Run full regression tests on merge to main (comprehensive, < 10 min). Block merges if any test fails. Track coverage trends and fail if coverage drops below threshold.

  • Data testing follows a pyramid: many unit tests (fast), fewer integration tests (medium), fewest E2e tests (slow).
  • Unit tests verify individual transformation functions in isolation using controlled inputs.
  • Integration tests verify pipeline stages work together using Testcontainers for real dependencies.
  • Contract tests verify schema, types, and quality guarantees match agreed-upon specifications.
  • Regression tests compare current output against golden datasets to detect unintended changes.
  • Maintain test data independently using fixtures, Faker, and factory_boy.
  • CI/CD should run unit tests on every commit, integration on PRs, and regression on merge.
  • Target > 90% line coverage for critical pipeline code.

Best Practices

  1. Follow the testing pyramid: many unit tests (fast), fewer integration tests (medium), fewest E2e tests (slow).
  2. Test every transformation function with normal cases, edge cases, null inputs, and error conditions.
  3. Use pytest fixtures with appropriate scope (session, module, function) for test data and resource management.
  4. Implement contract tests at every pipeline boundary to catch schema evolution before it breaks consumers.
  5. Maintain golden datasets for regression testing. Update only after verifying new output is correct.
  6. Use Testcontainers for integration tests that require real databases, message queues, or file systems.
  7. Run tests in CI/CD: unit on every commit, integration on PRs, full regression on merge to main.
  8. Track test coverage and fail PRs that decrease coverage below threshold (target > 90% line coverage).
  9. Quarantine flaky tests with @pytest.mark.slow and fix them separately. Never ignore flaky tests.
  10. Review test quality in code reviews. Tests should catch real bugs, not just check boxes.

Testing Tool Comparison

FeaturepytestGreat ExpectationsPanderadbt TestsCustom
Test TypeUnit/IntegrationContract/QualityContractContractAny
SpeedFastMediumFastMediumVaries
FixturesYesLimitedYesLimitedYes
MockingYesLimitedLimitedNoYes
CI/CDNativeYesYesYesCustom
CoverageYesNoNoNoCustom
Best ForPython codeData qualityDataFrame schemasSQL modelsDomain-specific

Test Coverage Targets

Code CategoryTarget CoverageRationale
Transformation Functions> 95%Core business logic
Data Validation> 90%Quality gate critical
Error Handling> 85%Resilience testing
Integration Points> 80%External dependency testing
Utility Functions> 75%Supporting code
Overall> 90%Production readiness

See Also

⭐

Premium Content

Data Pipeline Testing: Unit, Integration, Contract, and Regression

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement