πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Contracts: Formal Agreements for Data Quality

Module 4: Advanced DE & CareerAdvanced Data Engineering🟒 Free Lesson

Advertisement

Data Contracts: Formalizing Data Agreements

Data contracts are formal, machine-readable agreements between data producers and consumers that define schema, quality, SLAs, and semantics of datasets.

Why Data Contracts Matter


Problems Without Contracts:

  • Breaking changes silently break downstream pipelines
  • No formal agreement on schema or quality
  • Difficult to track who depends on what

How Contracts Help:

  1. Catch incompatibilities before they reach consumers
  2. Enable safe evolution of data systems
  3. Formalize agreements between producers and consumers

Key Insight: Data contracts catch incompatibilities before they reach consumers, enabling safe evolution of data systems.


Architecture Overview

Data Contracts FlowProducerDomain TeamData PipelineSchema OwnerSLA CommitSales DomainContract SpecYAML / JSONSchema definitionQuality rulesSLA requirementsGit versionedCI ValidationSchema checkQuality testsSLA monitoringBreaking changeBlock on failureEnforcementRuntime validationAlertingMonitoringAuto-rollbackAlways-onConsumerAnalytics TeamML PipelinesDashboardsAPIsTrust guaranteed


Contract Specification

Contract Specification StructureMetadataNameVersionDomainOwnerContactDescriptionorders v2.1.0 / sales domainsales-engineering@company.comIdentityOwnershipCommunicationLineageSchemaFields:order_id: string (required)customer_id: string (FK)order_date: timestampstatus: enum [...]amount: decimal(14,2)TypesConstraintsReferencesDescriptionQuality Rulesuniqueness: order_idnot_null: [id, date, status]freshness: {'<'} 4h delayvolume: 1K-1M rows/dayrange: amount {'>'} 0Severity:CriticalWarningValidation logicError handlingSLA ContractAvailability: 99.9%Freshness: {'<'} 4 hoursLatency p99: {'<'} 30sRecovery: {'<'} 1 hourslack: #sales-data-alertsoncall: data-platform@coSemantic:Grain: 1 row / orderMeasures: SUM, COUNTDimensions: date, custTags: finance, daily

A data contract is a versioned, machine-readable specification that defines the schema, quality requirements, SLA, and semantics of a dataset. It serves as a formal agreement between the data producer and all consumers.

# data-contracts/orders-v2.yaml
apiVersion: v1
kind: DataContract
metadata:
  name: orders
  version: "2.1.0"
  previousVersion: "2.0.0"
  domain: sales
  owner: sales-engineering@company.com
  contact: slack:#sales-data
  description: |
    Customer orders from the e-commerce platform.
    Updated in near-real-time via Kafka CDC.

schema:
  type: struct
  fields:
    - name: order_id
      type: string
      required: true
      unique: true
      description: "Unique order identifier"
      examples: ["ORD-2025-001"]

    - name: customer_id
      type: string
      required: true
      description: "Foreign key to customers"
      references:
        table: customers
        column: customer_id

    - name: order_date
      type: timestamp
      required: true
      description: "When the order was placed"

    - name: order_status
      type: enum
      values: [pending, processing, shipped, delivered, cancelled, returned]
      required: true
      description: "Current status of the order"

    - name: line_items
      type: array
      items:
        type: struct
        fields:
          - name: product_id
            type: string
            required: true
          - name: quantity
            type: integer
            required: true
            minimum: 1
          - name: unit_price
            type: decimal(12,2)
            required: true
            minimum: 0.01

    - name: total_amount
      type: decimal(14,2)
      required: true
      minimum: 0.01
      description: "Total order amount including tax"

    - name: currency
      type: string
      required: true
      pattern: "^[A-Z]{3}$"
      description: "ISO 4217 currency code"

quality:
  level: gold
  rules:
    - type: uniqueness
      column: order_id
      severity: critical

    - type: not_null
      columns: [order_id, customer_id, order_date, order_status]
      severity: critical

    - type: freshness
      max_delay: 4h
      column: order_date
      severity: warning

    - type: volume
      min_rows_per_day: 1000
      max_rows_per_day: 1000000
      severity: warning

    - type: referential_integrity
      source_column: customer_id
      target_table: customers
      target_column: customer_id
      severity: critical

    - type: range
      column: total_amount
      min: 0.01
      max: 1000000
      severity: warning

sla:
  availability: 99.9%
  freshness: 4h
  latency_p99: 30s
  recovery_time: 1h
  notification:
    channel: slack:#sales-data-alerts
    escalation: data-platform-oncall@company.com

semantic:
  grain: "One row per order"
  measures:
    - name: total_revenue
      expression: "SUM(total_amount)"
      description: "Sum of all order amounts"

    - name: order_count
      expression: "COUNT(*)"
      description: "Total number of orders"

    - name: avg_order_value
      expression: "AVG(total_amount)"
      description: "Average order amount"

  dimensions:
    - name: order_date
      description: "Date the order was placed"
    - name: customer_id
      description: "Customer who placed the order"
    - name: order_status
      description: "Current status of the order"

tags:
  - finance
  - production
  - pii-free
  - daily-critical

Contract Enforcement

# Data Contract Enforcement Engine
import yaml
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum

class Severity(Enum):
    CRITICAL = "critical"
    WARNING = "warning"
    INFO = "info"

@dataclass
class ContractViolation:
    rule_type: str
    severity: Severity
    message: str
    column: Optional[str] = None
    actual_value: Optional[Any] = None
    expected_value: Optional[Any] = None

@dataclass
class ValidationResult:
    contract_name: str
    contract_version: str
    passed: bool
    violations: List[ContractViolation]
    checked_at: datetime
    data_sample_size: int

class ContractEnforcer:
    """Enforce data contracts against actual data."""

    def __init__(self, contract_path: str):
        with open(contract_path) as f:
            self.contract = yaml.safe_load(f)

    def validate(self, df) -> ValidationResult:
        """Run all contract validations against a DataFrame."""
        violations = []

        # Validate quality rules
        for rule in self.contract.get("quality", {}).get("rules", []):
            rule_violations = self._check_rule(df, rule)
            violations.extend(rule_violations)

        # Validate schema
        schema_violations = self._check_schema(df)
        violations.extend(schema_violations)

        # Check if any critical violations
        critical_violations = [v for v in violations if v.severity == Severity.CRITICAL]

        return ValidationResult(
            contract_name=self.contract["metadata"]["name"],
            contract_version=self.contract["metadata"]["version"],
            passed=len(critical_violations) == 0,
            violations=violations,
            checked_at=datetime.now(),
            data_sample_size=len(df)
        )

    def _check_rule(self, df, rule: Dict) -> List[ContractViolation]:
        """Check a single quality rule."""
        violations = []
        rule_type = rule["type"]
        severity = Severity(rule.get("severity", "warning"))

        if rule_type == "not_null":
            for col in rule["columns"]:
                null_count = df[col].isnull().sum()
                if null_count > 0:
                    violations.append(ContractViolation(
                        rule_type=rule_type,
                        severity=severity,
                        message=f"Null values found in {col}: {null_count}",
                        column=col,
                        actual_value=null_count,
                        expected_value=0
                    ))

        elif rule_type == "uniqueness":
            col = rule["column"]
            dup_count = df[col].duplicated().sum()
            if dup_count > 0:
                violations.append(ContractViolation(
                    rule_type=rule_type,
                    severity=severity,
                    message=f"Duplicate values in {col}: {dup_count}",
                    column=col,
                    actual_value=dup_count,
                    expected_value=0
                ))

        elif rule_type == "range":
            col = rule["column"]
            min_val = rule.get("min")
            max_val = rule.get("max")
            actual_min = df[col].min()
            actual_max = df[col].max()

            if min_val is not None and actual_min < min_val:
                violations.append(ContractViolation(
                    rule_type=rule_type,
                    severity=severity,
                    message=f"{col} below minimum: {actual_min} < {min_val}",
                    column=col,
                    actual_value=actual_min,
                    expected_value=f">= {min_val}"
                ))

            if max_val is not None and actual_max > max_val:
                violations.append(ContractViolation(
                    rule_type=rule_type,
                    severity=severity,
                    message=f"{col} above maximum: {actual_max} > {max_val}",
                    column=col,
                    actual_value=actual_max,
                    expected_value=f"<= {max_val}"
                ))

        elif rule_type == "volume":
            count = len(df)
            min_rows = rule.get("min_rows_per_day", 0)
            max_rows = rule.get("max_rows_per_day", float('inf'))

            if count < min_rows:
                violations.append(ContractViolation(
                    rule_type=rule_type,
                    severity=severity,
                    message=f"Below minimum row count: {count} < {min_rows}",
                    actual_value=count,
                    expected_value=f">= {min_rows}"
                ))

            if count > max_rows:
                violations.append(ContractViolation(
                    rule_type=rule_type,
                    severity=severity,
                    message=f"Above maximum row count: {count} > {max_rows}",
                    actual_value=count,
                    expected_value=f"<= {max_rows}"
                ))

        return violations

    def _check_schema(self, df) -> List[ContractViolation]:
        """Validate DataFrame schema against contract."""
        violations = []
        schema = self.contract.get("schema", {})
        required_fields = [
            f["name"] for f in schema.get("fields", [])
            if f.get("required", False)
        ]

        for field_name in required_fields:
            if field_name not in df.columns:
                violations.append(ContractViolation(
                    rule_type="schema",
                    severity=Severity.CRITICAL,
                    message=f"Missing required field: {field_name}",
                    column=field_name
                ))

        return violations

# Usage
enforcer = ContractEnforcer("data-contracts/orders-v2.yaml")
result = enforcer.validate(orders_df)

print(f"Contract: {result.contract_name} v{result.contract_version}")
print(f"Passed: {result.passed}")
print(f"Violations: {len(result.violations)}")

for violation in result.violations:
    print(f"  [{violation.severity.value}] {violation.message}")

Contract Versioning

Semantic versioning for data contracts: MAJOR (breaking changes), MINOR (backward-compatible additions), PATCH (fixes). Breaking changes require consumer migration before deployment.

Version ChangeTypeConsumer ImpactDeployment
MAJOR (2.0 -> 3.0)BreakingMust migrateCoordinated rollout
MINOR (2.0 -> 2.1)CompatibleOptional adoptionIndependent
PATCH (2.0.0 -> 2.0.1)FixNoneAutomatic
# Breaking changes (MAJOR version bump)
breaking_changes:
  - type: field_removed
    field: legacy_status
    migration: "Map to order_status enum"
    deadline: "2025-06-01"

  - type: field_retyped
    field: order_id
    from: integer
    to: string
    migration: "CAST to string"
    deadline: "2025-06-01"

  - type: field_renamed
    old_field: customer_id
    new_field: buyer_id
    migration: "Use buyer_id"
    deadline: "2025-06-01"

# Backward-compatible changes (MINOR version bump)
compatible_changes:
  - type: field_added
    field: shipping_address
    type: struct
    required: false

  - type: enum_value_added
    field: order_status
    new_value: "on_hold"

Key Concepts Summary

ComponentDescriptionPurposeEnforcement
Schema ContractField names, types, constraintsStructural compatibilityPre-commit, CI
Quality ContractFreshness, completeness, accuracyData quality guaranteesRuntime monitoring
SLA ContractAvailability, latency, recoveryReliability guaranteesMonitoring, alerting
Semantic ContractGrain, measures, dimensionsBusiness meaningDocumentation
Version ContractBreaking vs compatible changesSafe evolutionCI/CD checks
Ownership ContractTeam responsibilityAccountabilityGovernance
Lineage ContractSource and target dependenciesImpact analysisCatalog integration
Cost ContractCompute and storage budgetsCost managementFinOps monitoring

Performance Metrics

MetricWithout ContractsWith ContractsTarget
Breaking Changes to ProdMonthlyNever0
Pipeline Failures (Schema)30%< 2%0%
Data Quality IncidentsWeeklyMonthly< 1/month
Consumer Trust Score60%95%> 90%
Schema Change Lead TimeNone (surprise)2 weeks (planned)2+ weeks
Time to Detect Breaking ChangeHours-DaysMinutes (CI)< 5 minutes
Cross-Team CommunicationAd-hocAutomatedAutomated
Documentation CurrencyOutdatedAlways currentCurrent

10 Best Practices

  1. Version all contracts using semantic versioning β€” enforce breaking change reviews
  2. Store contracts in Git alongside code β€” track changes with PRs and reviews
  3. Automate contract validation in CI/CD β€” block deployments that violate contracts
  4. Define quality rules with severity β€” critical rules block, warnings alert
  5. Include SLAs in contracts β€” define availability, freshness, and latency requirements
  6. Document semantic meaning β€” grain, measures, and dimensions prevent misinterpretation
  7. Require consumer sign-off for breaking changes β€” coordinate migration timelines
  8. Implement runtime validation β€” catch contract violations in production pipelines
  9. Integrate with data catalog β€” contracts should be discoverable and searchable
  10. Use contract testing β€” verify contracts against real data before publishing

  • Data contracts formalize agreements between producers and consumers
  • Schema, quality, and SLA contracts catch issues before they reach consumers
  • Semantic versioning enables safe evolution of data interfaces
  • Automated enforcement in CI/CD prevents breaking changes from reaching production
  • Contracts build trust by making data expectations explicit and machine-readable

See Also

⭐

Premium Content

Data Contracts: Formal Agreements for Data Quality

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement