Experiment Tracking
Experiment tracking is the systematic recording of all parameters, code versions, metrics, and artifacts associated with machine learning experiments to ensure reproducibility and facilitate comparison.
Key Components
- Parameters: Hyperparameters and configuration settings
- Metrics: Performance measurements (accuracy, loss, etc.)
- Artifacts: Model files, datasets, visualizations
- Code: Specific code versions used in experiments
- Environment: System and library versions
Experiment Tracking Architecture
MLflow Implementation
Basic MLflow Usage
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
def train_experiment(data, labels, params):
"""Train model with MLflow tracking"""
mlflow.set_experiment("classification_experiment")
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(data["train"], labels["train"])
# Make predictions
predictions = model.predict(data["test"])
# Calculate metrics
accuracy = accuracy_score(labels["test"], predictions)
f1 = f1_score(labels["test"], predictions, average="weighted")
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("confusion_matrix.png")
return {
"accuracy": accuracy,
"f1_score": f1,
"model_uri": mlflow.get_artifact_uri("model")
}
# Example usage
params = {
"n_estimators": 100,
"max_depth": 10,
"random_state": 42
}
results = train_experiment(train_data, train_labels, params)
MLflow Advanced Features
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType
class MLflowManager:
def __init__(self, experiment_name):
self.experiment_name = experiment_name
self.client = MlflowClient()
mlflow.set_experiment(experiment_name)
def search_best_runs(self, metric="accuracy", n=5):
"""Search for best performing runs"""
runs = mlflow.search_runs(
experiment_names=[self.experiment_name],
order_by=[f"metrics.{metric} DESC"],
max_results=n
)
return runs
def compare_runs(self, run_ids):
"""Compare multiple runs"""
comparisons = []
for run_id in run_ids:
run = self.client.get_run(run_id)
comparisons.append({
"run_id": run_id,
"params": run.data.params,
"metrics": run.data.metrics,
"start_time": run.info.start_time
})
return comparisons
def promote_model(self, run_id, model_name, stage):
"""Promote model to registry"""
model_uri = f"runs:/{run_id}/model"
result = mlflow.register_model(model_uri, model_name)
self.client.transition_model_version_stage(
name=model_name,
version=result.version,
stage=stage
)
return result
def get_experiment_history(self):
"""Get complete experiment history"""
experiment = self.client.get_experiment_by_name(self.experiment_name)
runs = self.client.search_runs([experiment.experiment_id])
return sorted(runs, key=lambda x: x.info.start_time, reverse=True)
Weights & Biases Integration
Basic W&B Usage
import wandb
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
def train_with_wandb(config, train_loader, val_loader):
"""Train model with W&B tracking"""
# Initialize W&B run
wandb.init(project="classification_project", config=config)
# Create model
model = nn.Sequential(
nn.Linear(config["input_dim"], config["hidden_dim"]),
nn.ReLU(),
nn.Linear(config["hidden_dim"], config["output_dim"])
)
# Training loop
optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
criterion = nn.CrossEntropyLoss()
for epoch in range(config["epochs"]):
# Training
train_loss = train_epoch(model, train_loader, optimizer, criterion)
# Validation
val_loss, val_accuracy = evaluate(model, val_loader, criterion)
# Log metrics
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_accuracy,
"epoch": epoch
})
# Save model
wandb.save("model.pth")
# Finish run
wandb.finish()
return model
def train_epoch(model, loader, optimizer, criterion):
"""Single training epoch"""
model.train()
total_loss = 0
for batch in loader:
optimizer.zero_grad()
outputs = model(batch["data"])
loss = criterion(outputs, batch["labels"])
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(loader)
W&B Sweeps
import wandb
sweep_config = {
"method": "bayes",
"metric": {
"name": "val_accuracy",
"goal": "maximize"
},
"parameters": {
"hidden_dim": {
"values": [64, 128, 256, 512]
},
"lr": {
"min": 0.0001,
"max": 0.01
},
"batch_size": {
"values": [32, 64, 128]
}
}
}
def train_sweep():
"""Training function for sweep"""
wandb.init()
config = wandb.config
# Train model with config
model = train_model(config)
# Log final metrics
wandb.log({"val_accuracy": model.val_accuracy})
# Run sweep
sweep_id = wandb.sweep(sweep_config, project="classification_project")
wandb.agent(sweep_id, function=train_sweep, count=50)
Custom Tracking Solution
Implementation
import json
import hashlib
from datetime import datetime
from pathlib import Path
import pickle
class ExperimentTracker:
def __init__(self, base_path):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
self.experiments = {}
def create_experiment(self, name, description=""):
"""Create new experiment"""
experiment_id = self._generate_id(name)
experiment_path = self.base_path / experiment_id
experiment_path.mkdir(exist_ok=True)
self.experiments[experiment_id] = {
"name": name,
"description": description,
"created_at": datetime.now().isoformat(),
"runs": []
}
return experiment_id
def start_run(self, experiment_id, run_name=None):
"""Start new run in experiment"""
run_id = self._generate_id(run_name or f"run_{len(self.experiments[experiment_id]['runs'])}")
run_path = self.base_path / experiment_id / run_id
run_path.mkdir(exist_ok=True)
run_info = {
"id": run_id,
"name": run_name,
"started_at": datetime.now().isoformat(),
"params": {},
"metrics": [],
"artifacts": []
}
self.experiments[experiment_id]["runs"].append(run_info)
return run_id
def log_params(self, experiment_id, run_id, params):
"""Log parameters"""
run = self._get_run(experiment_id, run_id)
run["params"].update(params)
self._save_experiment(experiment_id)
def log_metric(self, experiment_id, run_id, metric_name, value, step=None):
"""Log metric"""
run = self._get_run(experiment_id, run_id)
metric_entry = {
"name": metric_name,
"value": value,
"timestamp": datetime.now().isoformat(),
"step": step
}
run["metrics"].append(metric_entry)
self._save_experiment(experiment_id)
def log_artifact(self, experiment_id, run_id, artifact_path, artifact_name):
"""Log artifact"""
run = self._get_run(experiment_id, run_id)
# Copy artifact to run directory
import shutil
dest_path = self.base_path / experiment_id / run_id / artifact_name
shutil.copy2(artifact_path, dest_path)
run["artifacts"].append({
"name": artifact_name,
"path": str(dest_path),
"timestamp": datetime.now().isoformat()
})
self._save_experiment(experiment_id)
def _get_run(self, experiment_id, run_id):
"""Get run by ID"""
for run in self.experiments[experiment_id]["runs"]:
if run["id"] == run_id:
return run
raise ValueError(f"Run {run_id} not found")
def _generate_id(self, name):
"""Generate unique ID"""
timestamp = datetime.now().isoformat()
return hashlib.md5(f"{name}_{timestamp}".encode()).hexdigest()[:12]
def _save_experiment(self, experiment_id):
"""Save experiment to disk"""
experiment_path = self.base_path / experiment_id / "experiment.json"
with open(experiment_path, 'w') as f:
json.dump(self.experiments[experiment_id], f, indent=2)
Mathematical Foundation
Hyperparameter Optimization
Bayesian Optimization
The acquisition function for Bayesian optimization:
Expected Improvement
Where:
- ( f(x) ) is the predicted objective value
- ( f(x^+) ) is the best observed value
- ( x^+ ) is the best observed point
Grid Search vs Random Search
The probability of finding the optimal configuration:
Search Efficiency
Where:
- ( p ) is the probability of sampling optimal region per trial
- ( n ) is the number of trials
Early Stopping
The optimal stopping point can be determined by:
Early Stopping Criterion
Where:
- ( L(t) ) is the validation loss at epoch ( t )
- ( \lambda ) is the regularization parameter
- ( \text{complexity}(t) ) is the model complexity term
Comparison Table
| Feature | MLflow | W&B | Custom |
|---|---|---|---|
| Setup | Easy | Easy | Complex |
| Visualization | Basic | Advanced | Custom |
| Collaboration | Good | Excellent | Limited |
| Scalability | Good | Excellent | Variable |
| Cost | Free | Freemium | Free |
| Integration | Many | Many | Custom |
| Hosting | Self-hosted | Cloud | Self-hosted |
Best Practices
1. Consistent Naming
# Good practice
EXPERIMENT_NAME = "image_classification_v2"
RUN_NAME = f"resnet50_lr{learning_rate}_bs{batch_size}"
# Bad practice
EXPERIMENT_NAME = "experiment1"
RUN_NAME = "run"
2. Comprehensive Logging
def log_experiment_metadata():
"""Log comprehensive experiment metadata"""
import sys
import platform
metadata = {
"python_version": sys.version,
"platform": platform.platform(),
"framework_versions": get_framework_versions(),
"git_commit": get_git_commit(),
"environment": get_environment_info()
}
mlflow.log_params(metadata)
3. Artifact Management
def manage_artifacts(run_id):
"""Properly manage experiment artifacts"""
# Log only necessary artifacts
artifacts_to_log = [
"model.pth",
"confusion_matrix.png",
"training_curves.png",
"feature_importance.png"
]
for artifact in artifacts_to_log:
if os.path.exists(artifact):
mlflow.log_artifact(artifact)
Summary
Experiment tracking is essential for reproducible machine learning. By implementing proper tracking with tools like MLflow or W&B, teams can effectively compare experiments, share results, and build upon previous work.