Interview Question (Hard) β Asked at: Google, Microsoft, Amazon, Netflix, Uber
"Design an MLOps maturity roadmap for an organization moving from manual ML processes to full automation. What are the key milestones, challenges, and cultural changes required?"
MLOps Maturity Overview
The MLOps maturity model provides a framework for organizations to assess and improve their ML operations capabilities.
Maturity Levels Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MLOps Maturity Levels β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Level 0: Manual Process β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Manual model training and deployment β β
β β β’ No version control for data/models β β
β β β’ Ad-hoc processes β β
β β β’ Single data scientist working in silos β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β Level 1: ML Pipeline Automation β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Automated training pipelines β β
β β β’ Basic experiment tracking β β
β β β’ Model versioning β β
β β β’ Basic monitoring β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β Level 2: CI/CD for ML β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Continuous integration for ML code β β
β β β’ Automated testing (data, model, integration) β β
β β β’ Automated deployment pipelines β β
β β β’ Feature stores β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β Level 3: Full Automation β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Automated retraining on drift β β
β β β’ A/B testing and canary deployments β β
β β β’ Full monitoring with auto-remediation β β
β β β’ ML governance and compliance β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Level 0: Manual Process
Characteristics
At Level 0, ML processes are manual and ad-hoc:
LEVEL_0_CHARACTERISTICS = {
"training": {
"process": "Manual script execution",
"frequency": "On-demand",
"reproducibility": "Low - manual environment setup",
"tracking": "Manual notebooks, no versioning"
},
"deployment": {
"process": "Manual model copy to production",
"rollback": "Manual file restoration",
"testing": "Ad-hoc validation",
"monitoring": "None or basic logging"
},
"infrastructure": {
"compute": "Individual workstations",
"storage": "Local files, shared drives",
"version_control": "Limited Git usage",
"environments": "Inconsistent across team"
},
"team": {
"structure": "Data scientists work in silos",
"collaboration": "Email, meetings",
"knowledge_sharing": "Minimal"
}
}
Pain Points at Level 0
LEVEL_0_PAIN_POINTS = [
"Model training takes days/weeks due to manual processes",
"No reproducibility - results vary between runs",
"Deployment is risky and error-prone",
"Model performance degrades silently in production",
"Team collaboration is inefficient",
"No audit trail for compliance",
"Cannot scale to multiple models",
"Technical debt accumulates rapidly"
]
Transition to Level 1
class Level0ToLevel1Transition:
"""Transition from Level 0 to Level 1."""
def __init__(self):
self.milestones = [
"Implement version control for code and data",
"Set up experiment tracking",
"Create automated training pipeline",
"Establish basic model registry",
"Implement basic monitoring"
]
def create_implementation_plan(self) -> dict:
"""Create implementation plan."""
plan = {
"phase_1_foundation": {
"duration": "4-6 weeks",
"tasks": [
"Set up Git repositories with proper branching strategy",
"Implement DVC for data versioning",
"Deploy MLflow for experiment tracking",
"Create standard project structure"
],
"deliverables": [
"Git repository with CI",
"DVC configuration",
"MLflow server",
"Project template"
]
},
"phase_2_automation": {
"duration": "6-8 weeks",
"tasks": [
"Build automated training pipeline",
"Implement model versioning",
"Create basic monitoring",
"Set up model registry"
],
"deliverables": [
"Airflow/Kubeflow pipeline",
"Model registry",
"Basic Prometheus metrics",
"Grafana dashboard"
]
},
"phase_3_optimization": {
"duration": "4-6 weeks",
"tasks": [
"Optimize pipeline performance",
"Implement data validation",
"Create deployment automation",
"Establish team workflows"
],
"deliverables": [
"Optimized pipelines",
"Data validation framework",
"Deployment scripts",
"Team documentation"
]
}
}
return plan
β οΈ
Level 0 is unsustainable for production ML. Organizations should prioritize moving to Level 1 to enable scalable, reproducible ML processes.
Level 1: ML Pipeline Automation
Characteristics
At Level 1, basic ML pipeline automation is in place:
LEVEL_1_CHARACTERISTICS = {
"training": {
"process": "Automated pipelines with orchestration",
"frequency": "Scheduled or triggered",
"reproducibility": "Medium - pipeline defined in code",
"tracking": "MLflow/W&B for experiments"
},
"deployment": {
"process": "Scripted deployment with basic validation",
"rollback": "Version-based rollback",
"testing": "Model performance validation",
"monitoring": "Basic metrics (latency, errors)"
},
"infrastructure": {
"compute": "Shared compute cluster",
"storage": "Centralized data lake",
"version_control": "Git for code and models",
"environments": "Containerized training"
},
"team": {
"structure": "Cross-functional ML teams",
"collaboration": "Shared dashboards, regular syncs",
"knowledge_sharing": "Model cards, documentation"
}
}
Level 1 Implementation
# level1_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import mlflow
import json
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
class Level1MLPipeline:
"""Level 1 ML Pipeline Implementation."""
def __init__(self, config: dict):
self.config = config
mlflow.set_experiment(config['experiment_name'])
def prepare_data(self):
"""Prepare training data."""
# Load data
import pandas as pd
df = pd.read_parquet(self.config['data_path'])
# Basic validation
assert len(df) > 0, "Empty dataset"
assert 'label' in df.columns, "Missing label column"
# Log dataset info
mlflow.log_param("dataset_rows", len(df))
mlflow.log_param("dataset_columns", len(df.columns))
return df
def train_model(self, df):
"""Train model with experiment tracking."""
from sklearn.model_selection import train_test_split
import xgboost as xgb
with mlflow.start_run():
# Split data
X = df.drop(columns=['label'])
y = df['label']
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
params = self.config.get('model_params', {})
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params,
dtrain,
num_boost_round=100,
evals=[(dval, 'val')],
early_stopping_rounds=10
)
# Evaluate
from sklearn.metrics import roc_auc_score
val_pred = model.predict(dval)
auc = roc_auc_score(y_val, val_pred)
# Log metrics and params
mlflow.log_params(params)
mlflow.log_metric("auc_roc", auc)
# Save model
mlflow.xgboost.log_model(model, "model")
return {'model': model, 'auc': auc}
def evaluate_model(self, model, X_val, y_val):
"""Evaluate model and check quality gates."""
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score
)
y_pred = model.predict(X_val)
metrics = {
'auc_roc': roc_auc_score(y_val, y_pred),
'precision': precision_score(y_val, (y_pred > 0.5).astype(int)),
'recall': recall_score(y_val, (y_pred > 0.5).astype(int))
}
# Check quality gates
thresholds = self.config.get('thresholds', {})
for metric, threshold in thresholds.items():
if metric in metrics and metrics[metric] < threshold:
raise ValueError(
f"Quality gate failed: {metric}={metrics[metric]} < {threshold}"
)
return metrics
def create_level1_dag():
"""Create Level 1 Airflow DAG."""
config = {
'experiment_name': 'level1_ml_experiment',
'data_path': 's3://ml-data/training/',
'model_params': {
'objective': 'binary:logistic',
'max_depth': 6,
'learning_rate': 0.1
},
'thresholds': {
'auc_roc': 0.85
}
}
pipeline = Level1MLPipeline(config)
with DAG(
'level1_ml_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
prepare = PythonOperator(
task_id='prepare_data',
python_callable=pipeline.prepare_data
)
train = PythonOperator(
task_id='train_model',
python_callable=pipeline.train_model
)
evaluate = PythonOperator(
task_id='evaluate_model',
python_callable=pipeline.evaluate_model
)
prepare >> train >> evaluate
return dag
βΉοΈ
Level 1 provides the foundation for reproducible ML. Focus on establishing automated pipelines, experiment tracking, and basic monitoring before advancing to Level 2.
Level 2: CI/CD for ML
Characteristics
At Level 2, CI/CD practices are applied to ML:
LEVEL_2_CHARACTERISTICS = {
"training": {
"process": "Automated with data/model validation",
"frequency": "Event-driven or scheduled",
"reproducibility": "High - full pipeline versioning",
"tracking": "Comprehensive experiment tracking"
},
"deployment": {
"process": "CI/CD with automated testing",
"rollback": "Automated rollback on failure",
"testing": "Data, model, and integration tests",
"monitoring": "Comprehensive with drift detection"
},
"infrastructure": {
"compute": "Kubernetes with auto-scaling",
"storage": "Feature store, model registry",
"version_control": "GitOps for all components",
"environments": "Consistent dev/staging/prod"
},
"team": {
"structure": "Platform team + product teams",
"collaboration": "Shared tools and practices",
"knowledge_sharing": "Automated documentation"
}
}
Level 2 CI/CD Pipeline
# .github/workflows/ml-cicd.yaml
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'src/**'
- 'models/**'
- 'config/**'
pull_request:
branches: [main]
jobs:
# Code quality checks
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install black flake8 mypy
- name: Format check
run: black --check src/
- name: Lint
run: flake8 src/
- name: Type check
run: mypy src/
# Data validation
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Validate training data
run: |
python -m src.data.validate \
--input s3://ml-data/training/latest/ \
--config config/data_schema.yaml
# Model training
training:
needs: [code-quality, data-validation]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Train model
run: |
python -m src.models.train \
--config config/train_config.yaml \
--experiment ${{ github.sha }}
- name: Upload model artifact
uses: actions/upload-artifact@v3
with:
name: model
path: models/
# Model evaluation
evaluation:
needs: training
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Download model
uses: actions/download-artifact@v3
with:
name: model
- name: Evaluate model
run: |
python -m src.models.evaluate \
--model models/model.pkl \
--test-data s3://ml-data/test/ \
--thresholds config/thresholds.yaml
- name: Generate model card
run: |
python -m src documentation.generate_card \
--model models/model.pkl \
--output model_card.md
# Model deployment (staging)
deploy-staging:
needs: evaluation
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/ \
--set image.tag=${{ github.sha }}
- name: Run integration tests
run: |
python -m src.tests.integration \
--endpoint http://staging.ml.example.com
# Model deployment (production)
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v3
- name: Deploy to production
run: |
kubectl apply -f k8s/production/ \
--set image.tag=${{ github.sha }}
- name: Verify deployment
run: |
python -m src.monitoring.verify \
--endpoint http://ml.example.com
# Monitoring setup
monitoring:
needs: deploy-production
runs-on: ubuntu-latest
steps:
- name: Update monitoring dashboards
run: |
python -m src.monitoring.update_dashboards \
--model-name fraud_detection \
--version ${{ github.sha }}
- name: Configure alerts
run: |
python -m src.monitoring.configure_alerts \
--model-name fraud_detection \
--config config/alerts.yaml
Level 2 Testing Framework
# tests/ml_tests.py
import pytest
import numpy as np
import pandas as pd
from typing import Dict
class TestDataValidation:
"""Data validation tests."""
def test_schema_validation(self):
"""Test data schema matches expected."""
import yaml
with open('config/data_schema.yaml') as f:
expected_schema = yaml.safe_load(f)
df = pd.read_parquet('s3://ml-data/training/latest/')
# Check columns
expected_columns = set(expected_schema['columns'])
actual_columns = set(df.columns)
assert expected_columns == actual_columns
def test_data_quality(self):
"""Test data quality checks."""
df = pd.read_parquet('s3://ml-data/training/latest/')
# Check missing values
missing_rates = df.isna().mean()
assert (missing_rates < 0.1).all()
# Check row count
assert len(df) > 1000
def test_feature_distributions(self):
"""Test feature distributions are within expected ranges."""
df = pd.read_parquet('s3://ml-data/training/latest/')
for col in df.select_dtypes(include=['int64', 'float64']).columns:
# Check for extreme skewness
from scipy.stats import skew
skewness = abs(skew(df[col].dropna()))
assert skewness < 10, f"Feature {col} has extreme skewness"
class TestModelPerformance:
"""Model performance tests."""
@pytest.fixture
def trained_model(self):
"""Load trained model."""
import joblib
return joblib.load('models/model.pkl')
@pytest.fixture
def test_data(self):
"""Load test data."""
return pd.read_parquet('s3://ml-data/test/')
def test_model_accuracy(self, trained_model, test_data):
"""Test model meets minimum accuracy threshold."""
from sklearn.metrics import roc_auc_score
X = test_data.drop(columns=['label'])
y = test_data['label']
y_pred = trained_model.predict(X)
auc = roc_auc_score(y, y_pred)
assert auc >= 0.85, f"Model AUC {auc} below threshold"
def test_prediction_distribution(self, trained_model, test_data):
"""Test prediction distribution is reasonable."""
X = test_data.drop(columns=['label'])
predictions = trained_model.predict(X)
# Check predictions are in valid range
assert (predictions >= 0).all() and (predictions <= 1).all()
# Check prediction variance
assert np.std(predictions) > 0.01
def test_model_fairness(self, trained_model, test_data):
"""Test model fairness across groups."""
X = test_data.drop(columns=['label'])
y = test_data['label']
# Assuming 'gender' column exists
if 'gender' in test_data.columns:
for group in test_data['gender'].unique():
mask = test_data['gender'] == group
group_auc = roc_auc_score(y[mask], trained_model.predict(X[mask]))
# Check group AUC is within acceptable range
assert group_auc >= 0.80, f"Group {group} AUC {group_auc} too low"
class TestIntegration:
"""Integration tests for ML pipeline."""
def test_pipeline_end_to_end(self):
"""Test complete pipeline runs successfully."""
from src.pipeline import MLPipeline
pipeline = MLPipeline(config_path='config/pipeline.yaml')
# Run pipeline
results = pipeline.run()
assert results['status'] == 'success'
assert results['metrics']['auc_roc'] >= 0.85
def test_model_serving(self):
"""Test model serving endpoint."""
import requests
response = requests.post(
'http://localhost:8080/predict',
json={
'features': {
'feature_1': 1.0,
'feature_2': 2.0,
'feature_3': 3.0
}
}
)
assert response.status_code == 200
result = response.json()
assert 'prediction' in result
assert 0 <= result['prediction'] <= 1
def test_monitoring_endpoints(self):
"""Test monitoring endpoints are available."""
import requests
# Health check
response = requests.get('http://localhost:8080/health')
assert response.status_code == 200
# Metrics
response = requests.get('http://localhost:9090/metrics')
assert response.status_code == 200
βΉοΈ
Level 2 enables rapid, reliable ML deployments with automated testing and monitoring. Focus on building confidence in automated pipelines before advancing to Level 3.
Level 3: Full Automation
Characteristics
At Level 3, ML processes are fully automated with self-healing capabilities:
LEVEL_3_CHARACTERISTICS = {
"training": {
"process": "Automated retraining on drift/performance",
"frequency": "Continuous or event-driven",
"reproducibility": "Complete lineage tracking",
"tracking": "Full audit trail with governance"
},
"deployment": {
"process": "Canary/A-B with automated rollback",
"rollback": "Instant automated rollback",
"testing": "Comprehensive test suite",
"monitoring": "Predictive monitoring with auto-remediation"
},
"infrastructure": {
"compute": "Multi-cloud with cost optimization",
"storage": "Distributed feature store",
"version_control": "GitOps with policy enforcement",
"environments": "Ephemeral environments"
},
"team": {
"structure": "ML platform + embedded ML engineers",
"collaboration": "Self-service tools and documentation",
"knowledge_sharing": "Automated knowledge base"
}
}
Level 3 Self-Healing System
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
class SelfHealingMLSystem:
"""Level 3 self-healing ML system."""
def __init__(self, config: Dict):
self.config = config
self.monitoring = MonitoringSystem(config)
self.retraining = AutomatedRetraining(config)
self.deployment = CanaryDeployment(config)
def run_continuous_optimization(self):
"""Run continuous optimization loop."""
while True:
# Monitor system health
health_status = self.monitoring.check_health()
if health_status['status'] == 'degraded':
# Trigger self-healing
self._handle_degradation(health_status)
# Check for drift
drift_status = self.monitoring.check_drift()
if drift_status['drift_detected']:
# Trigger retraining
self.retraining.trigger_retraining(
reason='data_drift',
drift_score=drift_status['drift_score']
)
# Optimize resources
self._optimize_resources()
# Wait before next check
time.sleep(self.config.get('check_interval', 300))
def _handle_degradation(self, health_status: Dict):
"""Handle system degradation."""
issues = health_status.get('issues', [])
for issue in issues:
if issue['type'] == 'latency_spike':
self._handle_latency_spike(issue)
elif issue['type'] == 'accuracy_drop':
self._handle_accuracy_drop(issue)
elif issue['type'] == 'error_rate':
self._handle_error_rate(issue)
def _handle_latency_spike(self, issue: Dict):
"""Handle latency spike."""
# Check if we need to scale up
current_replicas = self.deployment.get_current_replicas()
if issue['severity'] == 'high':
# Scale up immediately
new_replicas = min(
current_replicas * 2,
self.config.get('max_replicas', 20)
)
self.deployment.scale_deployment(new_replicas)
elif issue['severity'] == 'medium':
# Scale up gradually
new_replicas = min(
current_replicas + 2,
self.config.get('max_replicas', 20)
)
self.deployment.scale_deployment(new_replicas)
def _handle_accuracy_drop(self, issue: Dict):
"""Handle accuracy drop."""
# Check if retraining is needed
if issue['accuracy_drop'] > 0.1:
# Trigger emergency retraining
self.retraining.trigger_retraining(
reason='accuracy_emergency',
priority='high'
)
else:
# Schedule retraining
self.retraining.schedule_retraining(
reason='accuracy_decay',
priority='medium'
)
def _handle_error_rate(self, issue: Dict):
"""Handle high error rate."""
if issue['error_rate'] > 0.1:
# Rollback to previous stable version
self.deployment.rollback_to_stable()
elif issue['error_rate'] > 0.05:
# Scale up and monitor
current_replicas = self.deployment.get_current_replicas()
self.deployment.scale_deployment(current_replicas + 2)
def _optimize_resources(self):
"""Optimize resource utilization."""
# Get current utilization
utilization = self.monitoring.get_resource_utilization()
# Optimize GPU allocation
if utilization['gpu'] < 0.3:
# Scale down GPU resources
self.deployment.optimize_gpu_allocation(reduce=True)
elif utilization['gpu'] > 0.8:
# Scale up GPU resources
self.deployment.optimize_gpu_allocation(reduce=False)
# Optimize memory
if utilization['memory'] > 0.9:
# Alert or scale up
self.monitoring.send_alert(
'memory_high',
f"Memory utilization at {utilization['memory']*100:.1f}%"
)
class MonitoringSystem:
"""Comprehensive monitoring system."""
def __init__(self, config: Dict):
self.config = config
def check_health(self) -> Dict:
"""Check overall system health."""
issues = []
# Check latency
latency_stats = self._get_latency_stats()
if latency_stats['p99'] > self.config.get('latency_threshold', 1.0):
issues.append({
'type': 'latency_spike',
'severity': 'high',
'value': latency_stats['p99']
})
# Check error rate
error_rate = self._get_error_rate()
if error_rate > self.config.get('error_threshold', 0.05):
issues.append({
'type': 'error_rate',
'severity': 'high',
'value': error_rate
})
# Check accuracy
accuracy = self._get_current_accuracy()
baseline_accuracy = self.config.get('baseline_accuracy', 0.9)
if baseline_accuracy - accuracy > 0.05:
issues.append({
'type': 'accuracy_drop',
'severity': 'medium',
'accuracy_drop': baseline_accuracy - accuracy
})
status = 'healthy' if not issues else 'degraded'
return {
'status': status,
'issues': issues,
'timestamp': datetime.now().isoformat()
}
def check_drift(self) -> Dict:
"""Check for data drift."""
from scipy.stats import ks_2samp
# Get reference and current data
reference_data = self._get_reference_data()
current_data = self._get_current_data()
drift_detected = False
drift_scores = {}
for col in reference_data.columns:
if reference_data[col].dtype in ['int64', 'float64']:
stat, p_value = ks_2samp(
reference_data[col].dropna(),
current_data[col].dropna()
)
drift_scores[col] = stat
if p_value < 0.05:
drift_detected = True
return {
'drift_detected': drift_detected,
'drift_scores': drift_scores,
'timestamp': datetime.now().isoformat()
}
def get_resource_utilization(self) -> Dict:
"""Get resource utilization metrics."""
return {
'cpu': 0.65,
'memory': 0.75,
'gpu': 0.45,
'timestamp': datetime.now().isoformat()
}
def send_alert(self, alert_type: str, message: str):
"""Send alert through configured channels."""
print(f"ALERT [{alert_type}]: {message}")
def _get_latency_stats(self) -> Dict:
"""Get latency statistics."""
return {'p99': 0.5, 'p95': 0.3, 'mean': 0.1}
def _get_error_rate(self) -> float:
"""Get current error rate."""
return 0.02
def _get_current_accuracy(self) -> float:
"""Get current model accuracy."""
return 0.92
def _get_reference_data(self):
"""Get reference data for drift detection."""
return pd.DataFrame({'feature': np.random.randn(1000)})
def _get_current_data(self):
"""Get current data for drift detection."""
return pd.DataFrame({'feature': np.random.randn(1000) + 0.1})
class AutomatedRetraining:
"""Automated retraining system."""
def __init__(self, config: Dict):
self.config = config
self.retraining_queue = []
def trigger_retraining(self, reason: str,
priority: str = 'medium',
drift_score: float = None):
"""Trigger model retraining."""
job = {
'job_id': f"retrain_{datetime.now():%Y%m%d%H%M%S}",
'reason': reason,
'priority': priority,
'drift_score': drift_score,
'triggered_at': datetime.now().isoformat(),
'status': 'queued'
}
self.retraining_queue.append(job)
# Execute if high priority
if priority == 'high':
self._execute_retraining(job)
return job
def schedule_retraining(self, reason: str,
priority: str = 'low',
schedule_time: datetime = None):
"""Schedule retraining for later."""
if schedule_time is None:
schedule_time = datetime.now() + timedelta(hours=1)
job = {
'job_id': f"retrain_{datetime.now():%Y%m%d%H%M%S}",
'reason': reason,
'priority': priority,
'scheduled_time': schedule_time.isoformat(),
'status': 'scheduled'
}
self.retraining_queue.append(job)
return job
def _execute_retraining(self, job: Dict):
"""Execute retraining job."""
print(f"Executing retraining job: {job['job_id']}")
# Run training pipeline
# This would call your actual training pipeline
job['status'] = 'completed'
job['completed_at'] = datetime.now().isoformat()
class CanaryDeployment:
"""Canary deployment system."""
def __init__(self, config: Dict):
self.config = config
def get_current_replicas(self) -> int:
"""Get current replica count."""
return 3
def scale_deployment(self, replicas: int):
"""Scale deployment to specified replicas."""
print(f"Scaling deployment to {replicas} replicas")
def rollback_to_stable(self):
"""Rollback to stable version."""
print("Rolling back to stable version")
def optimize_gpu_allocation(self, reduce: bool = True):
"""Optimize GPU allocation."""
if reduce:
print("Reducing GPU allocation")
else:
print("Increasing GPU allocation")
def canary_deploy(self, new_version: str,
traffic_percentage: int = 10):
"""Deploy canary version."""
print(f"Deploying canary {new_version} with {traffic_percentage}% traffic")
Maturity Assessment
Assessment Framework
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class MaturityDimension:
name: str
current_level: int
target_level: int
gaps: List[str]
recommendations: List[str]
class MLOpsMaturityAssessment:
"""Assess MLOps maturity across dimensions."""
def __init__(self):
self.dimensions = self._define_dimensions()
def _define_dimensions(self) -> Dict[str, Dict]:
"""Define maturity dimensions and criteria."""
return {
"version_control": {
"level_0": ["No version control", "Manual file management"],
"level_1": ["Git for code", "Basic branching"],
"level_2": ["Git for code and data", "DVC/LFS", "Branch protection"],
"level_3": ["GitOps", "Policy enforcement", "Automated compliance"]
},
"experiment_tracking": {
"level_0": ["Notebooks", "Manual logging"],
"level_1": ["MLflow/W&B", "Basic parameter tracking"],
"level_2": ["Comprehensive tracking", "Model registry"],
"level_3": ["Full lineage", "Automated documentation"]
},
"pipeline_automation": {
"level_0": ["Manual scripts", "Ad-hoc execution"],
"level_1": ["Airflow/Kubeflow", "Scheduled runs"],
"level_2": ["Event-driven", "CI/CD integration"],
"level_3": ["Self-healing", "Auto-optimization"]
},
"testing": {
"level_0": ["Manual validation", "Ad-hoc testing"],
"level_1": ["Basic unit tests", "Model validation"],
"level_2": ["Data validation", "Integration tests"],
"level_3": ["Comprehensive test suite", "Chaos engineering"]
},
"monitoring": {
"level_0": ["No monitoring", "Manual checks"],
"level_1": ["Basic metrics", "Grafana dashboards"],
"level_2": ["Drift detection", "Alerting"],
"level_3": ["Predictive monitoring", "Auto-remediation"]
},
"deployment": {
"level_0": ["Manual deployment", "File copy"],
"level_1": ["Scripted deployment", "Basic validation"],
"level_2": ["CI/CD", "Canary deployments"],
"level_3": ["Automated A/B testing", "Self-healing"]
}
}
def assess_dimension(self, dimension: str,
current_state: List[str]) -> MaturityDimension:
"""Assess maturity for a single dimension."""
criteria = self.dimensions[dimension]
# Determine current level
current_level = 0
for level in range(3, 0, -1):
level_criteria = criteria[f'level_{level}']
if any(criterion in current_state for criterion in level_criteria):
current_level = level
break
# Determine gaps
gaps = []
for level in range(current_level + 1, 4):
level_criteria = criteria[f'level_{level}']
gaps.extend(level_criteria)
# Generate recommendations
recommendations = self._generate_recommendations(
dimension, current_level, gaps
)
return MaturityDimension(
name=dimension,
current_level=current_level,
target_level=min(current_level + 1, 3),
gaps=gaps,
recommendations=recommendations
)
def _generate_recommendations(self, dimension: str,
current_level: int,
gaps: List[str]) -> List[str]:
"""Generate recommendations for improvement."""
recommendations = {
"version_control": {
0: ["Implement Git for all code", "Create branching strategy"],
1: ["Add DVC for data versioning", "Implement branch protection"],
2: ["Adopt GitOps practices", "Implement policy enforcement"]
},
"experiment_tracking": {
0: ["Deploy MLflow/W&B", "Establish tracking standards"],
1: ["Implement model registry", "Add comprehensive metrics"],
2: ["Enable full lineage tracking", "Automate documentation"]
},
"pipeline_automation": {
0: ["Deploy Airflow/Kubeflow", "Create standard pipelines"],
1: ["Implement event-driven triggers", "Add CI/CD integration"],
2: ["Implement self-healing", "Add auto-optimization"]
}
}
return recommendations.get(dimension, {}).get(current_level, [])
def generate_assessment_report(self,
assessments: List[MaturityDimension]) -> Dict:
"""Generate comprehensive assessment report."""
overall_score = sum(a.current_level for a in assessments) / len(assessments)
return {
'timestamp': datetime.now().isoformat(),
'overall_maturity_score': overall_score,
'overall_level': int(overall_score),
'dimensions': [
{
'name': a.name,
'current_level': a.current_level,
'target_level': a.target_level,
'gaps': a.gaps,
'recommendations': a.recommendations
}
for a in assessments
],
'priority_areas': [
a.name for a in assessments
if a.current_level < 2
],
'roadmap': self._generate_roadmap(assessments)
}
def _generate_roadmap(self,
assessments: List[MaturityDimension]) -> Dict:
"""Generate implementation roadmap."""
roadmap = {
'short_term': [], # 0-3 months
'medium_term': [], # 3-6 months
'long_term': [] # 6-12 months
}
for assessment in assessments:
if assessment.current_level == 0:
roadmap['short_term'].extend(assessment.recommendations[:2])
elif assessment.current_level == 1:
roadmap['medium_term'].extend(assessment.recommendations[:2])
elif assessment.current_level == 2:
roadmap['long_term'].extend(assessment.recommendations[:2])
return roadmap
βΉοΈ
Use the maturity assessment to identify gaps and prioritize improvements. Focus on quick wins first, then build towards higher maturity levels over time.
Summary
The MLOps maturity model provides a roadmap for organizational transformation:
- Level 0: Manual processes - focus on establishing basic practices
- Level 1: Pipeline automation - implement automated training and tracking
- Level 2: CI/CD for ML - add automated testing and deployment
- Level 3: Full automation - implement self-healing and optimization
Assess your current maturity, identify gaps, and create a roadmap for improvement. Maturity is a journey, not a destination.