Model Versioning
Model versioning is the practice of tracking and managing different versions of ML models, ensuring reproducibility, accountability, and easy rollback capabilities.
Why Versioning Matters
- Reproducibility: Recreate any previous model state
- Collaboration: Multiple teams can work on models simultaneously
- Audit Trail: Complete history of model changes
- Rollback: Quickly revert to previous versions
- Comparison: Analyze performance across versions
Versioning Strategies
1. Semantic Versioning
As described in the previous module, semantic versioning uses major.minor.patch format.
2. Git-based Versioning
import git
import hashlib
import json
from datetime import datetime
class GitModelVersioner:
def __init__(self, repo_path):
self.repo = git.Repo(repo_path)
def create_version(self, model_path, metadata):
"""Create a new versioned model"""
# Calculate model hash
model_hash = self._calculate_hash(model_path)
# Create version tag
version_tag = self._create_tag(model_hash, metadata)
# Commit to repository
self.repo.index.add([model_path])
commit = self.repo.index.commit(f"Model version: {version_tag}")
return {
"version": version_tag,
"commit": commit.hexsha,
"hash": model_hash,
"timestamp": datetime.now().isoformat()
}
def _calculate_hash(self, file_path):
"""Calculate SHA-256 hash of model file"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _create_tag(self, model_hash, metadata):
"""Create version tag with metadata"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
short_hash = model_hash[:8]
return f"v{timestamp}_{short_hash}"
3. Content-addressable Storage
class ContentAddressableStore:
def __init__(self, storage_path):
self.storage_path = storage_path
self.index = {}
def store(self, content, metadata=None):
"""Store content with address based on hash"""
content_hash = self._hash_content(content)
storage_path = self._get_storage_path(content_hash)
# Store content
with open(storage_path, 'wb') as f:
f.write(content)
# Update index
self.index[content_hash] = {
"path": storage_path,
"metadata": metadata or {},
"stored_at": datetime.now().isoformat()
}
return content_hash
def retrieve(self, content_hash):
"""Retrieve content by hash"""
if content_hash not in self.index:
raise KeyError(f"Content not found: {content_hash}")
path = self.index[content_hash]["path"]
with open(path, 'rb') as f:
return f.read()
def _hash_content(self, content):
"""Calculate content hash"""
return hashlib.sha256(content).hexdigest()
def _get_storage_path(self, content_hash):
"""Generate storage path from hash"""
import os
return os.path.join(self.storage_path, content_hash[:2], content_hash[2:])
Model Registry
Registry Schema
class ModelRegistry:
def __init__(self, db_connection):
self.db = db_connection
self._initialize_tables()
def _initialize_tables(self):
"""Create registry tables"""
self.db.execute("""
CREATE TABLE IF NOT EXISTS models (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
version TEXT NOT NULL,
state TEXT NOT NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP,
metadata JSONB
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS model_versions (
id TEXT PRIMARY KEY,
model_id TEXT REFERENCES models(id),
version TEXT NOT NULL,
artifact_path TEXT,
metrics JSONB,
parameters JSONB,
created_at TIMESTAMP,
UNIQUE(model_id, version)
)
""")
def register_model(self, model_name, version, artifact_path, metrics=None, parameters=None):
"""Register a new model version"""
model_id = f"{model_name}_{version}"
self.db.execute("""
INSERT INTO models (id, name, version, state, created_at, updated_at, metadata)
VALUES (%s, %s, %s, 'development', NOW(), NOW(), %s)
ON CONFLICT (id) DO UPDATE SET updated_at = NOW()
""", (model_id, model_name, version, json.dumps({"artifact": artifact_path})))
self.db.execute("""
INSERT INTO model_versions (id, model_id, version, artifact_path, metrics, parameters, created_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
""", (f"{model_id}_v", model_id, version, artifact_path,
json.dumps(metrics or {}), json.dumps(parameters or {})))
return model_id
def get_model(self, model_name, version=None):
"""Retrieve model from registry"""
if version:
query = "SELECT * FROM model_versions WHERE model_id = %s AND version = %s"
params = (model_name, version)
else:
query = "SELECT * FROM model_versions WHERE model_id = %s ORDER BY created_at DESC LIMIT 1"
params = (model_name,)
return self.db.execute(query, params).fetchone()
Registry Operations
| Operation | Description | Example |
|---|---|---|
| Register | Add new model version | registry.register(model) |
| Retrieve | Get model by name/version | registry.get("model", "v1.0") |
| Promote | Change model state | registry.promote(model_id, "production") |
| Compare | Compare versions | registry.compare("v1", "v2") |
| Archive | Mark as archived | registry.archive(model_id) |
Lineage Tracking
Lineage Graph Implementation
from collections import defaultdict
import networkx as nx
class ModelLineageTracker:
def __init__(self):
self.graph = nx.DiGraph()
def add_data_node(self, data_id, metadata):
"""Add data source node"""
self.graph.add_node(data_id, type="data", metadata=metadata)
def add_model_node(self, model_id, metadata):
"""Add model node"""
self.graph.add_node(model_id, type="model", metadata=metadata)
def add_code_node(self, code_id, metadata):
"""Add code/script node"""
self.graph.add_node(code_id, type="code", metadata=metadata)
def add_dependency(self, from_id, to_id, dependency_type):
"""Add dependency edge"""
self.graph.add_edge(from_id, to_id, dependency_type=dependency_type)
def get_upstream(self, node_id, depth=None):
"""Get upstream dependencies"""
return list(nx.ancestors(self.graph, node_id))
def get_downstream(self, node_id, depth=None):
"""Get downstream dependents"""
return list(nx.descendants(self.graph, node_id))
def get_lineage(self, node_id):
"""Get complete lineage"""
return {
"upstream": self.get_upstream(node_id),
"downstream": self.get_downstream(node_id),
"subgraph": self._get_subgraph(node_id)
}
def _get_subgraph(self, node_id):
"""Get subgraph centered on node"""
ancestors = nx.ancestors(self.graph, node_id)
descendants = nx.descendants(self.graph, node_id)
nodes = ancestors | descendants | {node_id}
return self.graph.subgraph(nodes)
Mathematical Foundation
Version Similarity
To compare model versions, we can use various similarity metrics:
Model Version Similarity
Where ( M_1 ) and ( M_2 ) are sets of model characteristics.
Performance Delta
The performance change between versions:
Performance Delta
Version Importance Score
def calculate_version_importance(version_metrics, baseline_metrics):
"""Calculate importance score for version"""
improvement = version_metrics["accuracy"] - baseline_metrics["accuracy"]
latency_change = version_metrics["latency"] - baseline_metrics["latency"]
# Weighted importance score
importance = (
0.7 * improvement - # Accuracy improvement
0.3 * (latency_change / 1000) # Latency penalty (ms to seconds)
)
return importance
Comparison Framework
Model Version Comparison
class ModelVersionComparator:
def __init__(self, registry):
self.registry = registry
def compare(self, version_a, version_b):
"""Compare two model versions"""
model_a = self.registry.get_version(version_a)
model_b = self.registry.get_version(version_b)
comparison = {
"metrics": self._compare_metrics(model_a.metrics, model_b.metrics),
"parameters": self._compare_parameters(model_a.parameters, model_b.parameters),
"artifacts": self._compare_artifacts(model_a.artifacts, model_b.artifacts),
"recommendation": self._generate_recommendation(model_a, model_b)
}
return comparison
def _compare_metrics(self, metrics_a, metrics_b):
"""Compare performance metrics"""
comparison = {}
for metric in metrics_a:
if metric in metrics_b:
diff = metrics_b[metric] - metrics_a[metric]
improvement = diff > 0
comparison[metric] = {
"a": metrics_a[metric],
"b": metrics_b[metric],
"diff": diff,
"improved": improvement
}
return comparison
def _generate_recommendation(self, model_a, model_b):
"""Generate recommendation based on comparison"""
score_a = self._calculate_score(model_a)
score_b = self._calculate_score(model_b)
if score_b > score_a * 1.05: # 5% improvement threshold
return "RECOMMEND_B"
elif score_a > score_b * 1.05:
return "RECOMMEND_A"
else:
return "SIMILAR_PERFORMANCE"
Best Practices
1. Immutable Artifacts
- Never modify stored artifacts
- Use content-addressable storage
- Maintain checksums for integrity
2. Comprehensive Metadata
- Store training parameters
- Record dataset versions
- Track environmental conditions
3. Automated Versioning
- Integrate with CI/CD pipelines
- Auto-version on successful training
- Tag versions with meaningful labels
4. Access Control
- Implement role-based access
- Log all registry operations
- Maintain audit trails
Common Patterns
Blue-Green Deployment
class BlueGreenDeployment:
def __init__(self, registry):
self.registry = registry
def deploy(self, model_id, environment):
"""Deploy model using blue-green strategy"""
# Get current production model
current = self.registry.get_production_model()
# Deploy new version to staging
staging = self.registry.deploy_to_staging(model_id)
# Validate staging deployment
if self.validate_deployment(staging):
# Switch traffic to new version
self.switch_traffic(staging)
# Keep old version for rollback
self.keep_rollback(current)
return staging
else:
raise ValueError("Deployment validation failed")
Canary Deployment
class CanaryDeployment:
def __init__(self, registry, traffic_manager):
self.registry = registry
self.traffic_manager = traffic_manager
def deploy(self, model_id, canary_percentage=10):
"""Deploy model using canary strategy"""
# Deploy canary version
canary = self.registry.deploy_canary(model_id)
# Route small percentage of traffic
self.traffic_manager.route_percentage(canary, canary_percentage)
# Monitor canary performance
metrics = self.monitor_canary(canary)
# Gradually increase traffic if healthy
if self.is_healthy(metrics):
self.increase_traffic(canary, increment=10)
return canary
Summary
Model versioning is fundamental to reliable ML operations. By implementing proper version control, registry management, and lineage tracking, organizations can ensure reproducibility, accountability, and efficient model management throughout the ML lifecycle.