Data Contracts: Formalizing Data Agreements
Data contracts are formal, machine-readable agreements between data producers and consumers that define schema, quality, SLAs, and semantics of datasets.
Why Data Contracts Matter
Problems Without Contracts:
- Breaking changes silently break downstream pipelines
- No formal agreement on schema or quality
- Difficult to track who depends on what
How Contracts Help:
- Catch incompatibilities before they reach consumers
- Enable safe evolution of data systems
- Formalize agreements between producers and consumers
Key Insight: Data contracts catch incompatibilities before they reach consumers, enabling safe evolution of data systems.
Architecture Overview
Contract Specification
A data contract is a versioned, machine-readable specification that defines the schema, quality requirements, SLA, and semantics of a dataset. It serves as a formal agreement between the data producer and all consumers.
# data-contracts/orders-v2.yaml
apiVersion: v1
kind: DataContract
metadata:
name: orders
version: "2.1.0"
previousVersion: "2.0.0"
domain: sales
owner: sales-engineering@company.com
contact: slack:#sales-data
description: |
Customer orders from the e-commerce platform.
Updated in near-real-time via Kafka CDC.
schema:
type: struct
fields:
- name: order_id
type: string
required: true
unique: true
description: "Unique order identifier"
examples: ["ORD-2025-001"]
- name: customer_id
type: string
required: true
description: "Foreign key to customers"
references:
table: customers
column: customer_id
- name: order_date
type: timestamp
required: true
description: "When the order was placed"
- name: order_status
type: enum
values: [pending, processing, shipped, delivered, cancelled, returned]
required: true
description: "Current status of the order"
- name: line_items
type: array
items:
type: struct
fields:
- name: product_id
type: string
required: true
- name: quantity
type: integer
required: true
minimum: 1
- name: unit_price
type: decimal(12,2)
required: true
minimum: 0.01
- name: total_amount
type: decimal(14,2)
required: true
minimum: 0.01
description: "Total order amount including tax"
- name: currency
type: string
required: true
pattern: "^[A-Z]{3}$"
description: "ISO 4217 currency code"
quality:
level: gold
rules:
- type: uniqueness
column: order_id
severity: critical
- type: not_null
columns: [order_id, customer_id, order_date, order_status]
severity: critical
- type: freshness
max_delay: 4h
column: order_date
severity: warning
- type: volume
min_rows_per_day: 1000
max_rows_per_day: 1000000
severity: warning
- type: referential_integrity
source_column: customer_id
target_table: customers
target_column: customer_id
severity: critical
- type: range
column: total_amount
min: 0.01
max: 1000000
severity: warning
sla:
availability: 99.9%
freshness: 4h
latency_p99: 30s
recovery_time: 1h
notification:
channel: slack:#sales-data-alerts
escalation: data-platform-oncall@company.com
semantic:
grain: "One row per order"
measures:
- name: total_revenue
expression: "SUM(total_amount)"
description: "Sum of all order amounts"
- name: order_count
expression: "COUNT(*)"
description: "Total number of orders"
- name: avg_order_value
expression: "AVG(total_amount)"
description: "Average order amount"
dimensions:
- name: order_date
description: "Date the order was placed"
- name: customer_id
description: "Customer who placed the order"
- name: order_status
description: "Current status of the order"
tags:
- finance
- production
- pii-free
- daily-critical
Contract Enforcement
# Data Contract Enforcement Engine
import yaml
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
class Severity(Enum):
CRITICAL = "critical"
WARNING = "warning"
INFO = "info"
@dataclass
class ContractViolation:
rule_type: str
severity: Severity
message: str
column: Optional[str] = None
actual_value: Optional[Any] = None
expected_value: Optional[Any] = None
@dataclass
class ValidationResult:
contract_name: str
contract_version: str
passed: bool
violations: List[ContractViolation]
checked_at: datetime
data_sample_size: int
class ContractEnforcer:
"""Enforce data contracts against actual data."""
def __init__(self, contract_path: str):
with open(contract_path) as f:
self.contract = yaml.safe_load(f)
def validate(self, df) -> ValidationResult:
"""Run all contract validations against a DataFrame."""
violations = []
# Validate quality rules
for rule in self.contract.get("quality", {}).get("rules", []):
rule_violations = self._check_rule(df, rule)
violations.extend(rule_violations)
# Validate schema
schema_violations = self._check_schema(df)
violations.extend(schema_violations)
# Check if any critical violations
critical_violations = [v for v in violations if v.severity == Severity.CRITICAL]
return ValidationResult(
contract_name=self.contract["metadata"]["name"],
contract_version=self.contract["metadata"]["version"],
passed=len(critical_violations) == 0,
violations=violations,
checked_at=datetime.now(),
data_sample_size=len(df)
)
def _check_rule(self, df, rule: Dict) -> List[ContractViolation]:
"""Check a single quality rule."""
violations = []
rule_type = rule["type"]
severity = Severity(rule.get("severity", "warning"))
if rule_type == "not_null":
for col in rule["columns"]:
null_count = df[col].isnull().sum()
if null_count > 0:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"Null values found in {col}: {null_count}",
column=col,
actual_value=null_count,
expected_value=0
))
elif rule_type == "uniqueness":
col = rule["column"]
dup_count = df[col].duplicated().sum()
if dup_count > 0:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"Duplicate values in {col}: {dup_count}",
column=col,
actual_value=dup_count,
expected_value=0
))
elif rule_type == "range":
col = rule["column"]
min_val = rule.get("min")
max_val = rule.get("max")
actual_min = df[col].min()
actual_max = df[col].max()
if min_val is not None and actual_min < min_val:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"{col} below minimum: {actual_min} < {min_val}",
column=col,
actual_value=actual_min,
expected_value=f">= {min_val}"
))
if max_val is not None and actual_max > max_val:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"{col} above maximum: {actual_max} > {max_val}",
column=col,
actual_value=actual_max,
expected_value=f"<= {max_val}"
))
elif rule_type == "volume":
count = len(df)
min_rows = rule.get("min_rows_per_day", 0)
max_rows = rule.get("max_rows_per_day", float('inf'))
if count < min_rows:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"Below minimum row count: {count} < {min_rows}",
actual_value=count,
expected_value=f">= {min_rows}"
))
if count > max_rows:
violations.append(ContractViolation(
rule_type=rule_type,
severity=severity,
message=f"Above maximum row count: {count} > {max_rows}",
actual_value=count,
expected_value=f"<= {max_rows}"
))
return violations
def _check_schema(self, df) -> List[ContractViolation]:
"""Validate DataFrame schema against contract."""
violations = []
schema = self.contract.get("schema", {})
required_fields = [
f["name"] for f in schema.get("fields", [])
if f.get("required", False)
]
for field_name in required_fields:
if field_name not in df.columns:
violations.append(ContractViolation(
rule_type="schema",
severity=Severity.CRITICAL,
message=f"Missing required field: {field_name}",
column=field_name
))
return violations
# Usage
enforcer = ContractEnforcer("data-contracts/orders-v2.yaml")
result = enforcer.validate(orders_df)
print(f"Contract: {result.contract_name} v{result.contract_version}")
print(f"Passed: {result.passed}")
print(f"Violations: {len(result.violations)}")
for violation in result.violations:
print(f" [{violation.severity.value}] {violation.message}")
Contract Versioning
Semantic versioning for data contracts: MAJOR (breaking changes), MINOR (backward-compatible additions), PATCH (fixes). Breaking changes require consumer migration before deployment.
| Version Change | Type | Consumer Impact | Deployment |
|---|---|---|---|
| MAJOR (2.0 -> 3.0) | Breaking | Must migrate | Coordinated rollout |
| MINOR (2.0 -> 2.1) | Compatible | Optional adoption | Independent |
| PATCH (2.0.0 -> 2.0.1) | Fix | None | Automatic |
# Breaking changes (MAJOR version bump)
breaking_changes:
- type: field_removed
field: legacy_status
migration: "Map to order_status enum"
deadline: "2025-06-01"
- type: field_retyped
field: order_id
from: integer
to: string
migration: "CAST to string"
deadline: "2025-06-01"
- type: field_renamed
old_field: customer_id
new_field: buyer_id
migration: "Use buyer_id"
deadline: "2025-06-01"
# Backward-compatible changes (MINOR version bump)
compatible_changes:
- type: field_added
field: shipping_address
type: struct
required: false
- type: enum_value_added
field: order_status
new_value: "on_hold"
Key Concepts Summary
| Component | Description | Purpose | Enforcement |
|---|---|---|---|
| Schema Contract | Field names, types, constraints | Structural compatibility | Pre-commit, CI |
| Quality Contract | Freshness, completeness, accuracy | Data quality guarantees | Runtime monitoring |
| SLA Contract | Availability, latency, recovery | Reliability guarantees | Monitoring, alerting |
| Semantic Contract | Grain, measures, dimensions | Business meaning | Documentation |
| Version Contract | Breaking vs compatible changes | Safe evolution | CI/CD checks |
| Ownership Contract | Team responsibility | Accountability | Governance |
| Lineage Contract | Source and target dependencies | Impact analysis | Catalog integration |
| Cost Contract | Compute and storage budgets | Cost management | FinOps monitoring |
Performance Metrics
| Metric | Without Contracts | With Contracts | Target |
|---|---|---|---|
| Breaking Changes to Prod | Monthly | Never | 0 |
| Pipeline Failures (Schema) | 30% | < 2% | 0% |
| Data Quality Incidents | Weekly | Monthly | < 1/month |
| Consumer Trust Score | 60% | 95% | > 90% |
| Schema Change Lead Time | None (surprise) | 2 weeks (planned) | 2+ weeks |
| Time to Detect Breaking Change | Hours-Days | Minutes (CI) | < 5 minutes |
| Cross-Team Communication | Ad-hoc | Automated | Automated |
| Documentation Currency | Outdated | Always current | Current |
10 Best Practices
- Version all contracts using semantic versioning β enforce breaking change reviews
- Store contracts in Git alongside code β track changes with PRs and reviews
- Automate contract validation in CI/CD β block deployments that violate contracts
- Define quality rules with severity β critical rules block, warnings alert
- Include SLAs in contracts β define availability, freshness, and latency requirements
- Document semantic meaning β grain, measures, and dimensions prevent misinterpretation
- Require consumer sign-off for breaking changes β coordinate migration timelines
- Implement runtime validation β catch contract violations in production pipelines
- Integrate with data catalog β contracts should be discoverable and searchable
- Use contract testing β verify contracts against real data before publishing
- Data contracts formalize agreements between producers and consumers
- Schema, quality, and SLA contracts catch issues before they reach consumers
- Semantic versioning enables safe evolution of data interfaces
- Automated enforcement in CI/CD prevents breaking changes from reaching production
- Contracts build trust by making data expectations explicit and machine-readable
See Also
- Data Governance & Catalog β Catalog integration for contract discovery
- Data Mesh Architecture β Contracts in domain-oriented architectures
- CI/CD for Data Pipelines β Contract validation in CI
- dbt Advanced β Testing and schema validation
- Data Security & Compliance β Security requirements in contracts
- Portfolio Projects β Blog post topic: data contracts