Data Pipeline Testing: Building Confidence in Your Data
Data pipeline testing is the systematic verification that pipeline logic, data quality, and system behavior meet specifications.
Why Data Testing Is Hard
Unique Challenges:
- Non-deterministic inputs β data arrives with unpredictable values and volumes
- Stateful transformations β outputs depend on historical state
- Distributed execution β failures occur across nodes
- Schema evolution β source schemas change without notice
- Volume β testing with production-scale data is expensive
Testing Strategy Must:
- Address all these challenges
- Remain fast enough for CI/CD pipelines
Key Insight: Unlike traditional software testing, data testing must verify not only code correctness but also data correctness.
Testing Pyramid
Test Coverage Across Pipeline
Architecture Diagram
A unit test in data pipeline testing verifies a single transformation function or operator in isolation, using controlled input data and asserting expected output. Unit tests do not require external dependencies (databases, APIs, file systems). They are the fastest and cheapest tests, forming the foundation of the testing pyramid. Each transformation function should have unit tests covering: normal cases, edge cases, empty inputs, null values, and error conditions.
An integration test verifies that multiple pipeline stages work correctly together, including interactions with external systems (databases, message queues, file systems). Integration tests use test doubles (mocks, stubs, fakes) for external dependencies or spin up lightweight test instances (Testcontainers). They verify data flow across stage boundaries and end-to-end pipeline behavior.
A contract test verifies that the schema, data types, and quality guarantees of a dataset match the agreed-upon contract between producers and consumers. Contract tests prevent schema evolution from breaking downstream consumers. They verify: (1) field names and types, (2) nullability constraints, (3) value ranges, (4) uniqueness constraints, (5) referential integrity.
A regression test verifies that pipeline output has not changed unexpectedly after code modifications. Regression tests compare current output against a known-good baseline (golden dataset). They detect unintended changes in transformation logic, configuration changes, or data drift. Regression tests are the safety net for deployment confidence.
Test Coverage Metric
Pipeline test coverage = (Number of tested transformations / Total transformations) * 100%. For a pipeline with N transformation functions, each with T test cases: Total_test_cases = N * T_avg. Target coverage: > 90% line coverage, > 80% branch coverage, 100% coverage for critical path transformations.
Test Execution Time Budget
For a CI/CD pipeline with total budget B minutes and N test categories: B >= T_unit + T_integration + T_contract + T_regression. Typical allocation: T_unit < 2 min (50% of tests, fast execution), T_integration < 5 min (30% of tests), T_contract < 1 min (10% of tests), T_regression < 2 min (10% of tests). Total: B β 10 min.
The testing pyramid invariant states: Count(unit_tests) > Count(integration_tests) > Count(e2e_tests). Formally: N_unit / N_total > 0.5, N_integration / N_total β [0.2, 0.4], N_e2e / N_total < 0.1. Violating this invariant leads to slow, brittle test suites that are expensive to maintain and slow to execute.
For regression testing, maintain a golden dataset β a known-good output that serves as the baseline for comparison. The regression test asserts: Current_output == Golden_dataset. When transformation logic changes, update the golden dataset after verifying the new output is correct. Formal: Regression_pass <-> |Current_output - Golden_dataset| == 0 (for deterministic transformations).
Key Concepts
| Concept | Description | Tool/Framework |
|---|---|---|
| Unit Test | Test individual transformation functions | pytest, unittest |
| Integration Test | Test pipeline stages together | pytest, Testcontainers |
| Contract Test | Verify schema and data contracts | Great Expectations, Pandera |
| Regression Test | Compare output against golden dataset | Custom assertions, pytest |
| Mock | Test double that replaces external dependency | unittest.mock, pytest-mock |
| Stub | Test double that returns predetermined values | unittest.mock.patch |
| Fake | Test double with simplified real implementation | In-memory database |
| Testcontainer | Lightweight Docker containers for integration tests | testcontainers-python |
| Golden Dataset | Known-good output for regression testing | Stored fixtures |
| Data Fixture | Predefined test data for consistent testing | pytest fixtures |
| Parametrize | Run test with multiple input combinations | @pytest.mark.parametrize |
| Snapshot Testing | Compare output against stored snapshots | snapshottest, syrupy |
| Mutation Testing | Introduce faults to verify test effectiveness | mutmut |
| Coverage | Percentage of code covered by tests | coverage.py, pytest-cov |
| CI/CD Integration | Run tests automatically on code changes | GitHub Actions, GitLab CI |
| Test Data Management | Create and manage test data | Faker, factory_boy |
| Slow Test Marker | Mark tests that take too long for CI | @pytest.mark.slow |
| Flaky Test | Test that intermittently passes/fails | Mark and quarantine |
- Identify testable units: Decompose pipeline into individual functions (parse, clean, transform, validate, load). Each function is a testable unit.
- Write unit tests first: For each function, write tests covering normal cases, edge cases, null inputs, empty inputs, and error conditions. Use parametrize for multiple inputs.
- Create test fixtures: Define reusable test data (DataFrames, dicts, files) as pytest fixtures. Use Faker for realistic mock data.
- Write integration tests: Test pipeline stages together using Testcontainers for databases or in-memory stores. Verify data flows correctly across boundaries.
- Define contracts: Create schema contracts using Great Expectations or Pandera. Test that input/output data satisfies contracts.
- Establish golden datasets: For each pipeline, create a known-good output. Write regression tests comparing current output against golden datasets.
- Set up CI/CD: Configure GitHub Actions or GitLab CI to run unit tests on every commit, integration tests on PRs, and full regression on merge to main.
- Monitor coverage: Track test coverage and set minimum thresholds. Block PRs that decrease coverage below threshold.
- Quarantine flaky tests: Mark flaky tests with @pytest.mark.slow and fix them separately. Never ignore flaky tests.
- Review test quality: In code reviews, verify that tests are meaningful, not just checking boxes. Tests should catch real bugs.
Production Code
Unit Test Suite for Transformation Functions
import pytest
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict
import sys
sys.path.insert(0, "../src")
# ------------------------------------------------------
# PRODUCTION CODE (tested below)
# ------------------------------------------------------
def parse_event(raw_event: Dict) -> Dict:
"""Parse raw event dict into structured format."""
return {
"event_id": raw_event["event_id"],
"user_id": raw_event["user_id"],
"amount": float(raw_event["amount"]),
"event_time": datetime.fromisoformat(raw_event["timestamp"]),
}
def clean_event(event: Dict) -> Dict:
"""Clean event: remove nulls, validate types."""
if event["amount"] is None or event["amount"] <= 0:
raise ValueError(f"Invalid amount: {event['amount']}")
if event["user_id"] is None or event["user_id"] == "":
raise ValueError(f"Invalid user_id: {event['user_id']}")
return event
def enrich_event(event: Dict, customer_db: Dict) -> Dict:
"""Enrich event with customer tier from lookup."""
customer = customer_db.get(event["user_id"])
if customer is None:
event["tier"] = "unknown"
event["credit_limit"] = 0.0
else:
event["tier"] = customer["tier"]
event["credit_limit"] = customer["credit_limit"]
return event
def aggregate_daily(events: list) -> pd.DataFrame:
"""Aggregate events by user and date."""
df = pd.DataFrame(events)
if df.empty:
return pd.DataFrame(columns=["user_id", "date", "total_amount", "event_count"])
df["date"] = df["event_time"].dt.date
return (
df.groupby(["user_id", "date"])
.agg(
total_amount=("amount", "sum"),
event_count=("event_id", "count"),
)
.reset_index()
)
# ------------------------------------------------------
# UNIT TESTS
# ------------------------------------------------------
class TestParseEvent:
"""Unit tests for parse_event function."""
def test_parse_valid_event(self):
raw = {
"event_id": "EVT-001",
"user_id": "USR-001",
"amount": "99.99",
"timestamp": "2024-01-15T10:30:00",
}
result = parse_event(raw)
assert result["event_id"] == "EVT-001"
assert result["amount"] == 99.99
assert isinstance(result["event_time"], datetime)
def test_parse_zero_amount(self):
raw = {
"event_id": "EVT-002",
"user_id": "USR-001",
"amount": "0.00",
"timestamp": "2024-01-15T10:30:00",
}
result = parse_event(raw)
assert result["amount"] == 0.0
@pytest.mark.parametrize("amount", ["100.50", "0.01", "999999.99"])
def test_parse_various_amounts(self, amount):
raw = {
"event_id": "EVT-003",
"user_id": "USR-001",
"amount": amount,
"timestamp": "2024-01-15T10:30:00",
}
result = parse_event(raw)
assert result["amount"] == float(amount)
def test_parse_invalid_timestamp(self):
raw = {
"event_id": "EVT-004",
"user_id": "USR-001",
"amount": "50.00",
"timestamp": "invalid-date",
}
with pytest.raises(ValueError):
parse_event(raw)
class TestCleanEvent:
"""Unit tests for clean_event function."""
def test_clean_valid_event(self):
event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": 99.99}
result = clean_event(event)
assert result == event
def test_clean_zero_amount_raises(self):
event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": 0}
with pytest.raises(ValueError, match="Invalid amount"):
clean_event(event)
def test_clean_negative_amount_raises(self):
event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": -10.0}
with pytest.raises(ValueError, match="Invalid amount"):
clean_event(event)
def test_clean_null_amount_raises(self):
event = {"event_id": "EVT-001", "user_id": "USR-001", "amount": None}
with pytest.raises(ValueError, match="Invalid amount"):
clean_event(event)
def test_clean_empty_user_id_raises(self):
event = {"event_id": "EVT-001", "user_id": "", "amount": 50.0}
with pytest.raises(ValueError, match="Invalid user_id"):
clean_event(event)
def test_clean_null_user_id_raises(self):
event = {"event_id": "EVT-001", "user_id": None, "amount": 50.0}
with pytest.raises(ValueError, match="Invalid user_id"):
clean_event(event)
class TestEnrichEvent:
"""Unit tests for enrich_event function."""
def test_enrich_known_customer(self):
event = {"user_id": "USR-001", "amount": 50.0}
customer_db = {"USR-001": {"tier": "gold", "credit_limit": 10000.0}}
result = enrich_event(event, customer_db)
assert result["tier"] == "gold"
assert result["credit_limit"] == 10000.0
def test_enrich_unknown_customer(self):
event = {"user_id": "USR-999", "amount": 50.0}
customer_db = {"USR-001": {"tier": "gold", "credit_limit": 10000.0}}
result = enrich_event(event, customer_db)
assert result["tier"] == "unknown"
assert result["credit_limit"] == 0.0
def test_enrich_empty_database(self):
event = {"user_id": "USR-001", "amount": 50.0}
result = enrich_event(event, {})
assert result["tier"] == "unknown"
class TestAggregateDaily:
"""Unit tests for aggregate_daily function."""
def test_aggregate_single_event(self):
events = [
{"event_id": "EVT-001", "user_id": "USR-001", "amount": 50.0,
"event_time": datetime(2024, 1, 15, 10, 0, 0)},
]
result = aggregate_daily(events)
assert len(result) == 1
assert result.iloc[0]["total_amount"] == 50.0
assert result.iloc[0]["event_count"] == 1
def test_aggregate_multiple_events_same_day(self):
events = [
{"event_id": "EVT-001", "user_id": "USR-001", "amount": 50.0,
"event_time": datetime(2024, 1, 15, 10, 0, 0)},
{"event_id": "EVT-002", "user_id": "USR-001", "amount": 30.0,
"event_time": datetime(2024, 1, 15, 14, 0, 0)},
]
result = aggregate_daily(events)
assert len(result) == 1
assert result.iloc[0]["total_amount"] == 80.0
assert result.iloc[0]["event_count"] == 2
def test_aggregate_empty_events(self):
result = aggregate_daily([])
assert len(result) == 0
assert "user_id" in result.columns
assert "total_amount" in result.columns
Integration Test with Testcontainers
import pytest
from testcontainers.postgres import PostgresContainer
import pandas as pd
import sqlalchemy
from datetime import datetime
@pytest.fixture(scope="module")
def postgres():
"""Spin up a PostgreSQL container for integration tests."""
with PostgresContainer("postgres:15") as pg:
yield pg
@pytest.fixture(scope="module")
def engine(postgres):
"""Create SQLAlchemy engine connected to test database."""
engine = sqlalchemy.create_engine(postgres.get_connection_url())
yield engine
engine.dispose()
@pytest.fixture(scope="module")
def sample_data():
"""Generate sample data for integration tests."""
return pd.DataFrame([
{"order_id": "ORD-001", "customer_id": "C001", "amount": 99.99,
"status": "completed", "created_at": datetime(2024, 1, 15)},
{"order_id": "ORD-002", "customer_id": "C002", "amount": 149.50,
"status": "completed", "created_at": datetime(2024, 1, 15)},
{"order_id": "ORD-003", "customer_id": "C001", "amount": 29.99,
"status": "pending", "created_at": datetime(2024, 1, 16)},
])
class TestOrderPipelineIntegration:
"""Integration tests for the order processing pipeline."""
def test_table_creation(self, engine):
"""Test that pipeline creates expected tables."""
engine.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR PRIMARY KEY,
customer_id VARCHAR NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status VARCHAR NOT NULL,
created_at TIMESTAMP NOT NULL
)
""")
result = engine.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_name = 'orders'"
).fetchone()
assert result is not None
def test_data_load(self, engine, sample_data):
"""Test that pipeline loads data correctly."""
sample_data.to_sql("orders", engine, if_exists="replace", index=False)
loaded = pd.read_sql("SELECT * FROM orders", engine)
assert len(loaded) == 3
assert set(loaded.columns) == {"order_id", "customer_id", "amount", "status", "created_at"}
def test_aggregation_query(self, engine, sample_data):
"""Test aggregation logic against real database."""
sample_data.to_sql("orders", engine, if_exists="replace", index=False)
result = pd.read_sql("""
SELECT customer_id, SUM(amount) as total_spent, COUNT(*) as order_count
FROM orders
GROUP BY customer_id
ORDER BY total_spent DESC
""", engine)
assert len(result) == 2
assert result.iloc[0]["customer_id"] == "C001"
assert result.iloc[0]["total_spent"] == pytest.approx(129.98, rel=1e-2)
def test_upsert_idempotency(self, engine, sample_data):
"""Test that upsert produces same result on repeated runs."""
sample_data.to_sql("orders", engine, if_exists="replace", index=False)
# Run upsert again
for _, row in sample_data.iterrows():
engine.execute("""
INSERT INTO orders (order_id, customer_id, amount, status, created_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (order_id)
DO UPDATE SET amount = EXCLUDED.amount
""", (row["order_id"], row["customer_id"], row["amount"],
row["status"], row["created_at"]))
loaded = pd.read_sql("SELECT COUNT(*) as cnt FROM orders", engine)
assert loaded.iloc[0]["cnt"] == 3 # No duplicates
Contract Test with Great Expectations
import great_expectations as gx
import pandas as pd
import pytest
def create_orders_contract():
"""Define schema contract for orders data."""
return {
"columns": {
"order_id": {"type": "string", "nullable": False, "unique": True},
"customer_id": {"type": "string", "nullable": False},
"amount": {"type": "float64", "nullable": False, "min": 0.01, "max": 1000000},
"status": {"type": "string", "nullable": False,
"allowed_values": ["pending", "processing", "completed", "cancelled"]},
"created_at": {"type": "datetime64", "nullable": False},
},
"row_count": {"min": 1, "max": 10000000},
}
class TestOrdersContract:
"""Contract tests for orders dataset."""
@pytest.fixture
def valid_orders(self):
return pd.DataFrame([
{"order_id": "ORD-001", "customer_id": "C001", "amount": 99.99,
"status": "completed", "created_at": pd.Timestamp("2024-01-15")},
])
@pytest.fixture
def invalid_orders(self):
return pd.DataFrame([
{"order_id": "ORD-001", "customer_id": "C001", "amount": -10.0,
"status": "invalid_status", "created_at": pd.Timestamp("2024-01-15")},
])
def test_schema_compliance(self, valid_orders):
contract = create_orders_contract()
for col, spec in contract["columns"].items():
assert col in valid_orders.columns, f"Missing column: {col}"
# Type check (simplified)
if spec["type"] == "string":
assert valid_orders[col].dtype == "object" or pd.api.types.is_string_dtype(valid_orders[col])
def test_not_null_constraints(self, valid_orders):
contract = create_orders_contract()
for col, spec in contract["columns"].items():
if not spec.get("nullable", True):
assert valid_orders[col].isnull().sum() == 0, f"Null values in non-nullable column: {col}"
def test_unique_constraints(self, valid_orders):
assert valid_orders["order_id"].is_unique, "order_id has duplicates"
def test_invalid_status_rejected(self, invalid_orders):
contract = create_orders_contract()
allowed = contract["columns"]["status"]["allowed_values"]
invalid = invalid_orders[~invalid_orders["status"].isin(allowed)]
assert len(invalid) > 0, "Expected invalid status to be detected"
def test_negative_amount_rejected(self, invalid_orders):
assert (invalid_orders["amount"] < 0).any(), "Expected negative amount to be detected"
Test Data Management: Use pytest fixtures with scope="session" for expensive setup (database containers, large datasets). Use scope="function" for fast, isolated tests. Use Faker library for realistic mock data. Use factory_boy for complex object factories. Avoid depending on production data for tests β always use synthetic data.
CI/CD Pipeline Configuration: Run unit tests on every commit (fast feedback, < 2 min). Run integration tests on pull requests (medium feedback, < 5 min). Run full regression tests on merge to main (comprehensive, < 10 min). Block merges if any test fails. Track coverage trends and fail if coverage drops below threshold.
- Data testing follows a pyramid: many unit tests (fast), fewer integration tests (medium), fewest E2e tests (slow).
- Unit tests verify individual transformation functions in isolation using controlled inputs.
- Integration tests verify pipeline stages work together using Testcontainers for real dependencies.
- Contract tests verify schema, types, and quality guarantees match agreed-upon specifications.
- Regression tests compare current output against golden datasets to detect unintended changes.
- Maintain test data independently using fixtures, Faker, and factory_boy.
- CI/CD should run unit tests on every commit, integration on PRs, and regression on merge.
- Target > 90% line coverage for critical pipeline code.
Best Practices
- Follow the testing pyramid: many unit tests (fast), fewer integration tests (medium), fewest E2e tests (slow).
- Test every transformation function with normal cases, edge cases, null inputs, and error conditions.
- Use pytest fixtures with appropriate scope (session, module, function) for test data and resource management.
- Implement contract tests at every pipeline boundary to catch schema evolution before it breaks consumers.
- Maintain golden datasets for regression testing. Update only after verifying new output is correct.
- Use Testcontainers for integration tests that require real databases, message queues, or file systems.
- Run tests in CI/CD: unit on every commit, integration on PRs, full regression on merge to main.
- Track test coverage and fail PRs that decrease coverage below threshold (target > 90% line coverage).
- Quarantine flaky tests with @pytest.mark.slow and fix them separately. Never ignore flaky tests.
- Review test quality in code reviews. Tests should catch real bugs, not just check boxes.
Testing Tool Comparison
| Feature | pytest | Great Expectations | Pandera | dbt Tests | Custom |
|---|---|---|---|---|---|
| Test Type | Unit/Integration | Contract/Quality | Contract | Contract | Any |
| Speed | Fast | Medium | Fast | Medium | Varies |
| Fixtures | Yes | Limited | Yes | Limited | Yes |
| Mocking | Yes | Limited | Limited | No | Yes |
| CI/CD | Native | Yes | Yes | Yes | Custom |
| Coverage | Yes | No | No | No | Custom |
| Best For | Python code | Data quality | DataFrame schemas | SQL models | Domain-specific |
Test Coverage Targets
| Code Category | Target Coverage | Rationale |
|---|---|---|
| Transformation Functions | > 95% | Core business logic |
| Data Validation | > 90% | Quality gate critical |
| Error Handling | > 85% | Resilience testing |
| Integration Points | > 80% | External dependency testing |
| Utility Functions | > 75% | Supporting code |
| Overall | > 90% | Production readiness |
See Also
- 025 - Data Quality: Validation Frameworks - Quality validation frameworks
- 028 - Error Handling, Retries, and Dead Letter Queues - Testing error scenarios
- 027 - Pipeline Monitoring and Observability - Monitoring test results
- 021 - Apache Spark: RDDs, DataFrames, and the Catalyst Optimizer - Testing Spark jobs
- 029 - Prefect: Modern Workflow Orchestration - Testing Prefect flows