Data Contracts: Schema Management & Governance
Difficulty: Senior Level | Companies: LinkedIn, Uber, Airbnb, Netflix, Spotify
1. What Are Data Contracts?
Data contracts are formal agreements between data producers and consumers that define:
- Schema (structure, types, nullability)
- Semantics (meaning, business rules)
- SLAs (freshness, completeness, availability)
βΉοΈ
Key Insight: Data contracts shift data quality left β problems are caught at the producer level, not after downstream dashboards break.
2. Contract Schema Definition
Avro-Based Contract
{
"type": "record",
"name": "UserEvents",
"namespace": "com.company.events",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier"},
{"name": "user_id", "type": "long", "doc": "Internal user ID"},
{"name": "event_type", "type": {"type": "enum", "name": "EventType", "symbols": ["CLICK", "VIEW", "PURCHASE", "SIGNUP"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "properties", "type": {"type": "map", "values": "string"}, "default": {}},
{"name": "page_url", "type": ["null", "string"], "default": null}
],
"contract": {
"version": "2.1.0",
"owner": "platform-team",
"sla": {
"freshness_minutes": 15,
"completeness_threshold": 0.995,
"nullability_policy": "page_url may be null, all others required"
}
}
}
Protobuf Contract
syntax = "proto3";
package company.events;
message UserEvent {
string event_id = 1;
int64 user_id = 2;
EventType event_type = 3;
google.protobuf.Timestamp timestamp = 4;
map<string, string> properties = 5;
optional string page_url = 6;
}
enum EventType {
EVENT_TYPE_UNSPECIFIED = 0;
CLICK = 1;
VIEW = 2;
PURCHASE = 3;
SIGNUP = 4;
}
3. Contract Evolution Strategies
Backward vs Forward Compatibility
Architecture Diagram
Compatibility Modes:
βββ Backward Compatible: New schema reads old data
βββ Forward Compatible: Old schema reads new data
βββ Full Compatible: Both directions work
βββ Breaking: Neither direction works
Rule of thumb:
β
ADD new fields (with defaults)
β
Make fields optional
β REMOVE fields
β CHANGE field types
β RENAME fields
Schema Evolution Example
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
class CompatibilityMode(Enum):
BACKWARD = "backward"
FORWARD = "forward"
FULL = "full"
NONE = "none"
@dataclass
class SchemaEvolutionRule:
field_name: str
old_type: str
new_type: str
is_compatible: bool
reason: str
class ContractEvolution:
def __init__(self, mode: CompatibilityMode = CompatibilityMode.BACKWARD):
self.mode = mode
self.rules: List[SchemaEvolutionRule] = []
def validate_evolution(self, old_schema: dict, new_schema: dict) -> List[str]:
violations = []
old_fields = {f['name']: f for f in old_schema.get('fields', [])}
new_fields = {f['name']: f for f in new_schema.get('fields', [])}
# Check for removed fields
for name in old_fields:
if name not in new_fields:
if self.mode in [CompatibilityMode.BACKWARD, CompatibilityMode.FULL]:
violations.append(f"REMOVED field '{name}' β violates {self.mode.value} compatibility")
# Check for type changes
for name in old_fields:
if name in new_fields:
old_type = old_fields[name]['type']
new_type = new_fields[name]['type']
if old_type != new_type:
violations.append(f"CHANGED type of '{name}' from {old_type} to {new_type}")
# Check for added required fields (no default)
for name in new_fields:
if name not in old_fields:
field = new_fields[name]
if field.get('required', True) and 'default' not in field:
violations.append(f"ADDED required field '{name}' without default β breaks old consumers")
return violations
# Usage
evolver = ContractEvolution(CompatibilityMode.BACKWARD)
violations = evolver.validate_evolution(old_schema, new_schema)
if violations:
print(f"Contract evolution violated: {violations}")
β οΈ
Common Interview Trap: Adding a required field without a default value breaks backward compatibility. Always provide defaults for new fields.
4. Contract Testing Framework
Producer-Side Validation
import pandera as pa
from pandera import Column, Check, DataFrameSchema
# Define contract as code
user_events_contract = DataFrameSchema({
"event_id": Column(str, unique=True, nullable=False),
"user_id": Column(int, checks=Check.gt(0), nullable=False),
"event_type": Column(str, isin=["CLICK", "VIEW", "PURCHASE", "SIGNUP"], nullable=False),
"timestamp": Column("datetime64[ns]", nullable=False),
"page_url": Column(str, nullable=True),
"properties": Column(dict, nullable=True),
}, coerce=True)
# Validate at write time
def validate_producer_output(df, contract):
try:
contract.validate(df)
return {"status": "passed", "rows": len(df)}
except pa.errors.SchemaError as e:
return {"status": "failed", "errors": str(e)}
# Schema-level contract tests
def test_schema_compliance(spark_df):
"""Run before publishing to Kafka"""
pandas_df = spark_df.limit(1000).toPandas()
result = validate_producer_output(pandas_df, user_events_contract)
assert result["status"] == "passed", f"Contract violation: {result['errors']}"
Consumer-Side Validation
class ContractValidator:
def __init__(self, contract_schema: dict):
self.schema = contract_schema
def validate_batch(self, df, contract_version: str) -> dict:
errors = []
warnings = []
# Check required columns exist
required = self.schema.get('required_columns', [])
for col in required:
if col not in df.columns:
errors.append(f"Missing required column: {col}")
# Check data types
for col, expected_type in self.schema.get('column_types', {}).items():
if col in df.columns:
actual_type = str(df[col].dtype)
if actual_type != expected_type:
errors.append(f"Type mismatch: {col} expected {expected_type}, got {actual_type}")
# Check freshness
if 'timestamp_column' in self.schema:
ts_col = self.schema['timestamp_column']
if ts_col in df.columns:
max_ts = df[ts_col].max()
freshness_minutes = (datetime.now() - max_ts).total_seconds() / 60
sla_minutes = self.schema.get('freshness_sla_minutes', 60)
if freshness_minutes > sla_minutes:
warnings.append(f"Data is {freshness_minutes:.0f}min old, SLA is {sla_minutes}min")
return {"errors": errors, "warnings": warnings, "valid": len(errors) == 0}
5. Contract Registry & Discovery
Central Contract Registry
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class DataContract:
contract_id: str
name: str
version: str
owner: str
schema: dict
sla: dict
created_at: datetime = field(default_factory=datetime.now)
status: str = "active"
consumers: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
class ContractRegistry:
def __init__(self):
self.contracts: Dict[str, DataContract] = {}
self.changelog: List[dict] = []
def register(self, contract: DataContract) -> str:
key = f"{contract.name}:{contract.version}"
if key in self.contracts:
raise ValueError(f"Contract {key} already registered")
self.contracts[key] = contract
self.changelog.append({
"action": "REGISTER",
"contract": key,
"timestamp": datetime.now().isoformat()
})
return key
def deprecate(self, name: str, version: str):
key = f"{name}:{version}"
self.contracts[key].status = "deprecated"
self.changelog.append({
"action": "DEPRECATE",
"contract": key,
"timestamp": datetime.now().isoformat()
})
def get_active_contract(self, name: str) -> Optional[DataContract]:
for key, contract in self.contracts.items():
if contract.name == name and contract.status == "active":
return contract
return None
def impact_analysis(self, contract_name: str) -> List[str]:
contract = self.get_active_contract(contract_name)
return contract.consumers if contract else []
6. SLA Monitoring & Alerting
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable
@dataclass
class SLAConfig:
freshness_minutes: int = 60
completeness_threshold: float = 0.99
uniqueness_threshold: float = 0.999
null_ratio_threshold: float = 0.05
class SLAMonitor:
def __init__(self, config: SLAConfig, alert_fn: Callable):
self.config = config
self.alert_fn = alert_fn
def check(self, df, contract_name: str) -> dict:
results = {}
# Freshness check
if 'event_timestamp' in df.columns:
max_ts = df['event_timestamp'].max()
age_minutes = (datetime.now() - max_ts).total_seconds() / 60
results['freshness'] = {
'age_minutes': age_minutes,
'sla_minutes': self.config.freshness_minutes,
'passed': age_minutes <= self.config.freshness_minutes
}
# Completeness check
total_rows = df.count()
non_null_rows = df.dropna().count()
completeness = non_null_rows / total_rows if total_rows > 0 else 0
results['completeness'] = {
'ratio': completeness,
'threshold': self.config.completeness_threshold,
'passed': completeness >= self.config.completeness_threshold
}
# Alert on failures
for check, result in results.items():
if not result['passed']:
self.alert_fn(contract_name, check, result)
return results
7. Contract Governance Framework
βΉοΈ
Best Practice: Treat data contracts like API contracts. Version them, enforce them in CI/CD, and deprecate old versions with migration periods.
Follow-Up Questions
- How would you implement data contracts for streaming pipelines?
- How do you handle contract violations in production?
- Design a contract registry that supports 10,000+ contracts.
- How do data contracts relate to data mesh architecture?
- How would you migrate from contract-less to contracted data?