πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

MLOps Maturity: Level 0-3, Manual to Full Automation

MLOpsMLOps Maturity Model⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Microsoft, Amazon, Netflix, Uber

"Design an MLOps maturity roadmap for an organization moving from manual ML processes to full automation. What are the key milestones, challenges, and cultural changes required?"

MLOps Maturity Overview

The MLOps maturity model provides a framework for organizations to assess and improve their ML operations capabilities.

Maturity Levels Overview

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                MLOps Maturity Levels                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Level 0: Manual Process                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Manual model training and deployment                  β”‚   β”‚
β”‚  β”‚ β€’ No version control for data/models                    β”‚   β”‚
β”‚  β”‚ β€’ Ad-hoc processes                                     β”‚   β”‚
β”‚  β”‚ β€’ Single data scientist working in silos                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                  β”‚
β”‚                              β–Ό                                  β”‚
β”‚  Level 1: ML Pipeline Automation                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Automated training pipelines                          β”‚   β”‚
β”‚  β”‚ β€’ Basic experiment tracking                             β”‚   β”‚
β”‚  β”‚ β€’ Model versioning                                     β”‚   β”‚
β”‚  β”‚ β€’ Basic monitoring                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                  β”‚
β”‚                              β–Ό                                  β”‚
β”‚  Level 2: CI/CD for ML                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Continuous integration for ML code                    β”‚   β”‚
β”‚  β”‚ β€’ Automated testing (data, model, integration)          β”‚   β”‚
β”‚  β”‚ β€’ Automated deployment pipelines                        β”‚   β”‚
β”‚  β”‚ β€’ Feature stores                                       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                  β”‚
β”‚                              β–Ό                                  β”‚
β”‚  Level 3: Full Automation                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Automated retraining on drift                         β”‚   β”‚
β”‚  β”‚ β€’ A/B testing and canary deployments                    β”‚   β”‚
β”‚  β”‚ β€’ Full monitoring with auto-remediation                 β”‚   β”‚
β”‚  β”‚ β€’ ML governance and compliance                          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Level 0: Manual Process

Characteristics

At Level 0, ML processes are manual and ad-hoc:

LEVEL_0_CHARACTERISTICS = {
    "training": {
        "process": "Manual script execution",
        "frequency": "On-demand",
        "reproducibility": "Low - manual environment setup",
        "tracking": "Manual notebooks, no versioning"
    },
    "deployment": {
        "process": "Manual model copy to production",
        "rollback": "Manual file restoration",
        "testing": "Ad-hoc validation",
        "monitoring": "None or basic logging"
    },
    "infrastructure": {
        "compute": "Individual workstations",
        "storage": "Local files, shared drives",
        "version_control": "Limited Git usage",
        "environments": "Inconsistent across team"
    },
    "team": {
        "structure": "Data scientists work in silos",
        "collaboration": "Email, meetings",
        "knowledge_sharing": "Minimal"
    }
}

Pain Points at Level 0

LEVEL_0_PAIN_POINTS = [
    "Model training takes days/weeks due to manual processes",
    "No reproducibility - results vary between runs",
    "Deployment is risky and error-prone",
    "Model performance degrades silently in production",
    "Team collaboration is inefficient",
    "No audit trail for compliance",
    "Cannot scale to multiple models",
    "Technical debt accumulates rapidly"
]

Transition to Level 1

class Level0ToLevel1Transition:
    """Transition from Level 0 to Level 1."""
    
    def __init__(self):
        self.milestones = [
            "Implement version control for code and data",
            "Set up experiment tracking",
            "Create automated training pipeline",
            "Establish basic model registry",
            "Implement basic monitoring"
        ]
    
    def create_implementation_plan(self) -> dict:
        """Create implementation plan."""
        
        plan = {
            "phase_1_foundation": {
                "duration": "4-6 weeks",
                "tasks": [
                    "Set up Git repositories with proper branching strategy",
                    "Implement DVC for data versioning",
                    "Deploy MLflow for experiment tracking",
                    "Create standard project structure"
                ],
                "deliverables": [
                    "Git repository with CI",
                    "DVC configuration",
                    "MLflow server",
                    "Project template"
                ]
            },
            "phase_2_automation": {
                "duration": "6-8 weeks",
                "tasks": [
                    "Build automated training pipeline",
                    "Implement model versioning",
                    "Create basic monitoring",
                    "Set up model registry"
                ],
                "deliverables": [
                    "Airflow/Kubeflow pipeline",
                    "Model registry",
                    "Basic Prometheus metrics",
                    "Grafana dashboard"
                ]
            },
            "phase_3_optimization": {
                "duration": "4-6 weeks",
                "tasks": [
                    "Optimize pipeline performance",
                    "Implement data validation",
                    "Create deployment automation",
                    "Establish team workflows"
                ],
                "deliverables": [
                    "Optimized pipelines",
                    "Data validation framework",
                    "Deployment scripts",
                    "Team documentation"
                ]
            }
        }
        
        return plan

⚠️

Level 0 is unsustainable for production ML. Organizations should prioritize moving to Level 1 to enable scalable, reproducible ML processes.

Level 1: ML Pipeline Automation

Characteristics

At Level 1, basic ML pipeline automation is in place:

LEVEL_1_CHARACTERISTICS = {
    "training": {
        "process": "Automated pipelines with orchestration",
        "frequency": "Scheduled or triggered",
        "reproducibility": "Medium - pipeline defined in code",
        "tracking": "MLflow/W&B for experiments"
    },
    "deployment": {
        "process": "Scripted deployment with basic validation",
        "rollback": "Version-based rollback",
        "testing": "Model performance validation",
        "monitoring": "Basic metrics (latency, errors)"
    },
    "infrastructure": {
        "compute": "Shared compute cluster",
        "storage": "Centralized data lake",
        "version_control": "Git for code and models",
        "environments": "Containerized training"
    },
    "team": {
        "structure": "Cross-functional ML teams",
        "collaboration": "Shared dashboards, regular syncs",
        "knowledge_sharing": "Model cards, documentation"
    }
}

Level 1 Implementation

# level1_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import mlflow
import json

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

class Level1MLPipeline:
    """Level 1 ML Pipeline Implementation."""
    
    def __init__(self, config: dict):
        self.config = config
        mlflow.set_experiment(config['experiment_name'])
    
    def prepare_data(self):
        """Prepare training data."""
        
        # Load data
        import pandas as pd
        df = pd.read_parquet(self.config['data_path'])
        
        # Basic validation
        assert len(df) > 0, "Empty dataset"
        assert 'label' in df.columns, "Missing label column"
        
        # Log dataset info
        mlflow.log_param("dataset_rows", len(df))
        mlflow.log_param("dataset_columns", len(df.columns))
        
        return df
    
    def train_model(self, df):
        """Train model with experiment tracking."""
        
        from sklearn.model_selection import train_test_split
        import xgboost as xgb
        
        with mlflow.start_run():
            # Split data
            X = df.drop(columns=['label'])
            y = df['label']
            X_train, X_val, y_train, y_val = train_test_split(
                X, y, test_size=0.2, random_state=42
            )
            
            # Train model
            params = self.config.get('model_params', {})
            
            dtrain = xgb.DMatrix(X_train, label=y_train)
            dval = xgb.DMatrix(X_val, label=y_val)
            
            model = xgb.train(
                params,
                dtrain,
                num_boost_round=100,
                evals=[(dval, 'val')],
                early_stopping_rounds=10
            )
            
            # Evaluate
            from sklearn.metrics import roc_auc_score
            val_pred = model.predict(dval)
            auc = roc_auc_score(y_val, val_pred)
            
            # Log metrics and params
            mlflow.log_params(params)
            mlflow.log_metric("auc_roc", auc)
            
            # Save model
            mlflow.xgboost.log_model(model, "model")
            
            return {'model': model, 'auc': auc}
    
    def evaluate_model(self, model, X_val, y_val):
        """Evaluate model and check quality gates."""
        
        from sklearn.metrics import (
            roc_auc_score, precision_score, recall_score
        )
        
        y_pred = model.predict(X_val)
        
        metrics = {
            'auc_roc': roc_auc_score(y_val, y_pred),
            'precision': precision_score(y_val, (y_pred > 0.5).astype(int)),
            'recall': recall_score(y_val, (y_pred > 0.5).astype(int))
        }
        
        # Check quality gates
        thresholds = self.config.get('thresholds', {})
        
        for metric, threshold in thresholds.items():
            if metric in metrics and metrics[metric] < threshold:
                raise ValueError(
                    f"Quality gate failed: {metric}={metrics[metric]} < {threshold}"
                )
        
        return metrics

def create_level1_dag():
    """Create Level 1 Airflow DAG."""
    
    config = {
        'experiment_name': 'level1_ml_experiment',
        'data_path': 's3://ml-data/training/',
        'model_params': {
            'objective': 'binary:logistic',
            'max_depth': 6,
            'learning_rate': 0.1
        },
        'thresholds': {
            'auc_roc': 0.85
        }
    }
    
    pipeline = Level1MLPipeline(config)
    
    with DAG(
        'level1_ml_pipeline',
        default_args=default_args,
        schedule_interval='@daily',
        start_date=datetime(2024, 1, 1),
        catchup=False
    ) as dag:
        
        prepare = PythonOperator(
            task_id='prepare_data',
            python_callable=pipeline.prepare_data
        )
        
        train = PythonOperator(
            task_id='train_model',
            python_callable=pipeline.train_model
        )
        
        evaluate = PythonOperator(
            task_id='evaluate_model',
            python_callable=pipeline.evaluate_model
        )
        
        prepare >> train >> evaluate
    
    return dag

ℹ️

Level 1 provides the foundation for reproducible ML. Focus on establishing automated pipelines, experiment tracking, and basic monitoring before advancing to Level 2.

Level 2: CI/CD for ML

Characteristics

At Level 2, CI/CD practices are applied to ML:

LEVEL_2_CHARACTERISTICS = {
    "training": {
        "process": "Automated with data/model validation",
        "frequency": "Event-driven or scheduled",
        "reproducibility": "High - full pipeline versioning",
        "tracking": "Comprehensive experiment tracking"
    },
    "deployment": {
        "process": "CI/CD with automated testing",
        "rollback": "Automated rollback on failure",
        "testing": "Data, model, and integration tests",
        "monitoring": "Comprehensive with drift detection"
    },
    "infrastructure": {
        "compute": "Kubernetes with auto-scaling",
        "storage": "Feature store, model registry",
        "version_control": "GitOps for all components",
        "environments": "Consistent dev/staging/prod"
    },
    "team": {
        "structure": "Platform team + product teams",
        "collaboration": "Shared tools and practices",
        "knowledge_sharing": "Automated documentation"
    }
}

Level 2 CI/CD Pipeline

# .github/workflows/ml-cicd.yaml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'src/**'
      - 'models/**'
      - 'config/**'
  pull_request:
    branches: [main]

jobs:
  # Code quality checks
  code-quality:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install black flake8 mypy
    
    - name: Format check
      run: black --check src/
    
    - name: Lint
      run: flake8 src/
    
    - name: Type check
      run: mypy src/

  # Data validation
  data-validation:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Validate training data
      run: |
        python -m src.data.validate \
          --input s3://ml-data/training/latest/ \
          --config config/data_schema.yaml

  # Model training
  training:
    needs: [code-quality, data-validation]
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Train model
      run: |
        python -m src.models.train \
          --config config/train_config.yaml \
          --experiment ${{ github.sha }}
    
    - name: Upload model artifact
      uses: actions/upload-artifact@v3
      with:
        name: model
        path: models/

  # Model evaluation
  evaluation:
    needs: training
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Download model
      uses: actions/download-artifact@v3
      with:
        name: model
    
    - name: Evaluate model
      run: |
        python -m src.models.evaluate \
          --model models/model.pkl \
          --test-data s3://ml-data/test/ \
          --thresholds config/thresholds.yaml
    
    - name: Generate model card
      run: |
        python -m src documentation.generate_card \
          --model models/model.pkl \
          --output model_card.md

  # Model deployment (staging)
  deploy-staging:
    needs: evaluation
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v3
    
    - name: Deploy to staging
      run: |
        kubectl apply -f k8s/staging/ \
          --set image.tag=${{ github.sha }}
    
    - name: Run integration tests
      run: |
        python -m src.tests.integration \
          --endpoint http://staging.ml.example.com

  # Model deployment (production)
  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
    - uses: actions/checkout@v3
    
    - name: Deploy to production
      run: |
        kubectl apply -f k8s/production/ \
          --set image.tag=${{ github.sha }}
    
    - name: Verify deployment
      run: |
        python -m src.monitoring.verify \
          --endpoint http://ml.example.com

  # Monitoring setup
  monitoring:
    needs: deploy-production
    runs-on: ubuntu-latest
    steps:
    - name: Update monitoring dashboards
      run: |
        python -m src.monitoring.update_dashboards \
          --model-name fraud_detection \
          --version ${{ github.sha }}
    
    - name: Configure alerts
      run: |
        python -m src.monitoring.configure_alerts \
          --model-name fraud_detection \
          --config config/alerts.yaml

Level 2 Testing Framework

# tests/ml_tests.py
import pytest
import numpy as np
import pandas as pd
from typing import Dict

class TestDataValidation:
    """Data validation tests."""
    
    def test_schema_validation(self):
        """Test data schema matches expected."""
        import yaml
        
        with open('config/data_schema.yaml') as f:
            expected_schema = yaml.safe_load(f)
        
        df = pd.read_parquet('s3://ml-data/training/latest/')
        
        # Check columns
        expected_columns = set(expected_schema['columns'])
        actual_columns = set(df.columns)
        
        assert expected_columns == actual_columns
    
    def test_data_quality(self):
        """Test data quality checks."""
        df = pd.read_parquet('s3://ml-data/training/latest/')
        
        # Check missing values
        missing_rates = df.isna().mean()
        assert (missing_rates < 0.1).all()
        
        # Check row count
        assert len(df) > 1000
    
    def test_feature_distributions(self):
        """Test feature distributions are within expected ranges."""
        df = pd.read_parquet('s3://ml-data/training/latest/')
        
        for col in df.select_dtypes(include=['int64', 'float64']).columns:
            # Check for extreme skewness
            from scipy.stats import skew
            skewness = abs(skew(df[col].dropna()))
            assert skewness < 10, f"Feature {col} has extreme skewness"

class TestModelPerformance:
    """Model performance tests."""
    
    @pytest.fixture
    def trained_model(self):
        """Load trained model."""
        import joblib
        return joblib.load('models/model.pkl')
    
    @pytest.fixture
    def test_data(self):
        """Load test data."""
        return pd.read_parquet('s3://ml-data/test/')
    
    def test_model_accuracy(self, trained_model, test_data):
        """Test model meets minimum accuracy threshold."""
        from sklearn.metrics import roc_auc_score
        
        X = test_data.drop(columns=['label'])
        y = test_data['label']
        
        y_pred = trained_model.predict(X)
        auc = roc_auc_score(y, y_pred)
        
        assert auc >= 0.85, f"Model AUC {auc} below threshold"
    
    def test_prediction_distribution(self, trained_model, test_data):
        """Test prediction distribution is reasonable."""
        X = test_data.drop(columns=['label'])
        
        predictions = trained_model.predict(X)
        
        # Check predictions are in valid range
        assert (predictions >= 0).all() and (predictions <= 1).all()
        
        # Check prediction variance
        assert np.std(predictions) > 0.01
    
    def test_model_fairness(self, trained_model, test_data):
        """Test model fairness across groups."""
        X = test_data.drop(columns=['label'])
        y = test_data['label']
        
        # Assuming 'gender' column exists
        if 'gender' in test_data.columns:
            for group in test_data['gender'].unique():
                mask = test_data['gender'] == group
                group_auc = roc_auc_score(y[mask], trained_model.predict(X[mask]))
                
                # Check group AUC is within acceptable range
                assert group_auc >= 0.80, f"Group {group} AUC {group_auc} too low"

class TestIntegration:
    """Integration tests for ML pipeline."""
    
    def test_pipeline_end_to_end(self):
        """Test complete pipeline runs successfully."""
        from src.pipeline import MLPipeline
        
        pipeline = MLPipeline(config_path='config/pipeline.yaml')
        
        # Run pipeline
        results = pipeline.run()
        
        assert results['status'] == 'success'
        assert results['metrics']['auc_roc'] >= 0.85
    
    def test_model_serving(self):
        """Test model serving endpoint."""
        import requests
        
        response = requests.post(
            'http://localhost:8080/predict',
            json={
                'features': {
                    'feature_1': 1.0,
                    'feature_2': 2.0,
                    'feature_3': 3.0
                }
            }
        )
        
        assert response.status_code == 200
        
        result = response.json()
        assert 'prediction' in result
        assert 0 <= result['prediction'] <= 1
    
    def test_monitoring_endpoints(self):
        """Test monitoring endpoints are available."""
        import requests
        
        # Health check
        response = requests.get('http://localhost:8080/health')
        assert response.status_code == 200
        
        # Metrics
        response = requests.get('http://localhost:9090/metrics')
        assert response.status_code == 200

ℹ️

Level 2 enables rapid, reliable ML deployments with automated testing and monitoring. Focus on building confidence in automated pipelines before advancing to Level 3.

Level 3: Full Automation

Characteristics

At Level 3, ML processes are fully automated with self-healing capabilities:

LEVEL_3_CHARACTERISTICS = {
    "training": {
        "process": "Automated retraining on drift/performance",
        "frequency": "Continuous or event-driven",
        "reproducibility": "Complete lineage tracking",
        "tracking": "Full audit trail with governance"
    },
    "deployment": {
        "process": "Canary/A-B with automated rollback",
        "rollback": "Instant automated rollback",
        "testing": "Comprehensive test suite",
        "monitoring": "Predictive monitoring with auto-remediation"
    },
    "infrastructure": {
        "compute": "Multi-cloud with cost optimization",
        "storage": "Distributed feature store",
        "version_control": "GitOps with policy enforcement",
        "environments": "Ephemeral environments"
    },
    "team": {
        "structure": "ML platform + embedded ML engineers",
        "collaboration": "Self-service tools and documentation",
        "knowledge_sharing": "Automated knowledge base"
    }
}

Level 3 Self-Healing System

import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json

class SelfHealingMLSystem:
    """Level 3 self-healing ML system."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.monitoring = MonitoringSystem(config)
        self.retraining = AutomatedRetraining(config)
        self.deployment = CanaryDeployment(config)
    
    def run_continuous_optimization(self):
        """Run continuous optimization loop."""
        
        while True:
            # Monitor system health
            health_status = self.monitoring.check_health()
            
            if health_status['status'] == 'degraded':
                # Trigger self-healing
                self._handle_degradation(health_status)
            
            # Check for drift
            drift_status = self.monitoring.check_drift()
            
            if drift_status['drift_detected']:
                # Trigger retraining
                self.retraining.trigger_retraining(
                    reason='data_drift',
                    drift_score=drift_status['drift_score']
                )
            
            # Optimize resources
            self._optimize_resources()
            
            # Wait before next check
            time.sleep(self.config.get('check_interval', 300))
    
    def _handle_degradation(self, health_status: Dict):
        """Handle system degradation."""
        
        issues = health_status.get('issues', [])
        
        for issue in issues:
            if issue['type'] == 'latency_spike':
                self._handle_latency_spike(issue)
            elif issue['type'] == 'accuracy_drop':
                self._handle_accuracy_drop(issue)
            elif issue['type'] == 'error_rate':
                self._handle_error_rate(issue)
    
    def _handle_latency_spike(self, issue: Dict):
        """Handle latency spike."""
        
        # Check if we need to scale up
        current_replicas = self.deployment.get_current_replicas()
        
        if issue['severity'] == 'high':
            # Scale up immediately
            new_replicas = min(
                current_replicas * 2,
                self.config.get('max_replicas', 20)
            )
            self.deployment.scale_deployment(new_replicas)
        
        elif issue['severity'] == 'medium':
            # Scale up gradually
            new_replicas = min(
                current_replicas + 2,
                self.config.get('max_replicas', 20)
            )
            self.deployment.scale_deployment(new_replicas)
    
    def _handle_accuracy_drop(self, issue: Dict):
        """Handle accuracy drop."""
        
        # Check if retraining is needed
        if issue['accuracy_drop'] > 0.1:
            # Trigger emergency retraining
            self.retraining.trigger_retraining(
                reason='accuracy_emergency',
                priority='high'
            )
        else:
            # Schedule retraining
            self.retraining.schedule_retraining(
                reason='accuracy_decay',
                priority='medium'
            )
    
    def _handle_error_rate(self, issue: Dict):
        """Handle high error rate."""
        
        if issue['error_rate'] > 0.1:
            # Rollback to previous stable version
            self.deployment.rollback_to_stable()
        
        elif issue['error_rate'] > 0.05:
            # Scale up and monitor
            current_replicas = self.deployment.get_current_replicas()
            self.deployment.scale_deployment(current_replicas + 2)
    
    def _optimize_resources(self):
        """Optimize resource utilization."""
        
        # Get current utilization
        utilization = self.monitoring.get_resource_utilization()
        
        # Optimize GPU allocation
        if utilization['gpu'] < 0.3:
            # Scale down GPU resources
            self.deployment.optimize_gpu_allocation(reduce=True)
        elif utilization['gpu'] > 0.8:
            # Scale up GPU resources
            self.deployment.optimize_gpu_allocation(reduce=False)
        
        # Optimize memory
        if utilization['memory'] > 0.9:
            # Alert or scale up
            self.monitoring.send_alert(
                'memory_high',
                f"Memory utilization at {utilization['memory']*100:.1f}%"
            )

class MonitoringSystem:
    """Comprehensive monitoring system."""
    
    def __init__(self, config: Dict):
        self.config = config
    
    def check_health(self) -> Dict:
        """Check overall system health."""
        
        issues = []
        
        # Check latency
        latency_stats = self._get_latency_stats()
        if latency_stats['p99'] > self.config.get('latency_threshold', 1.0):
            issues.append({
                'type': 'latency_spike',
                'severity': 'high',
                'value': latency_stats['p99']
            })
        
        # Check error rate
        error_rate = self._get_error_rate()
        if error_rate > self.config.get('error_threshold', 0.05):
            issues.append({
                'type': 'error_rate',
                'severity': 'high',
                'value': error_rate
            })
        
        # Check accuracy
        accuracy = self._get_current_accuracy()
        baseline_accuracy = self.config.get('baseline_accuracy', 0.9)
        if baseline_accuracy - accuracy > 0.05:
            issues.append({
                'type': 'accuracy_drop',
                'severity': 'medium',
                'accuracy_drop': baseline_accuracy - accuracy
            })
        
        status = 'healthy' if not issues else 'degraded'
        
        return {
            'status': status,
            'issues': issues,
            'timestamp': datetime.now().isoformat()
        }
    
    def check_drift(self) -> Dict:
        """Check for data drift."""
        
        from scipy.stats import ks_2samp
        
        # Get reference and current data
        reference_data = self._get_reference_data()
        current_data = self._get_current_data()
        
        drift_detected = False
        drift_scores = {}
        
        for col in reference_data.columns:
            if reference_data[col].dtype in ['int64', 'float64']:
                stat, p_value = ks_2samp(
                    reference_data[col].dropna(),
                    current_data[col].dropna()
                )
                
                drift_scores[col] = stat
                
                if p_value < 0.05:
                    drift_detected = True
        
        return {
            'drift_detected': drift_detected,
            'drift_scores': drift_scores,
            'timestamp': datetime.now().isoformat()
        }
    
    def get_resource_utilization(self) -> Dict:
        """Get resource utilization metrics."""
        
        return {
            'cpu': 0.65,
            'memory': 0.75,
            'gpu': 0.45,
            'timestamp': datetime.now().isoformat()
        }
    
    def send_alert(self, alert_type: str, message: str):
        """Send alert through configured channels."""
        
        print(f"ALERT [{alert_type}]: {message}")
    
    def _get_latency_stats(self) -> Dict:
        """Get latency statistics."""
        return {'p99': 0.5, 'p95': 0.3, 'mean': 0.1}
    
    def _get_error_rate(self) -> float:
        """Get current error rate."""
        return 0.02
    
    def _get_current_accuracy(self) -> float:
        """Get current model accuracy."""
        return 0.92
    
    def _get_reference_data(self):
        """Get reference data for drift detection."""
        return pd.DataFrame({'feature': np.random.randn(1000)})
    
    def _get_current_data(self):
        """Get current data for drift detection."""
        return pd.DataFrame({'feature': np.random.randn(1000) + 0.1})

class AutomatedRetraining:
    """Automated retraining system."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.retraining_queue = []
    
    def trigger_retraining(self, reason: str, 
                          priority: str = 'medium',
                          drift_score: float = None):
        """Trigger model retraining."""
        
        job = {
            'job_id': f"retrain_{datetime.now():%Y%m%d%H%M%S}",
            'reason': reason,
            'priority': priority,
            'drift_score': drift_score,
            'triggered_at': datetime.now().isoformat(),
            'status': 'queued'
        }
        
        self.retraining_queue.append(job)
        
        # Execute if high priority
        if priority == 'high':
            self._execute_retraining(job)
        
        return job
    
    def schedule_retraining(self, reason: str,
                           priority: str = 'low',
                           schedule_time: datetime = None):
        """Schedule retraining for later."""
        
        if schedule_time is None:
            schedule_time = datetime.now() + timedelta(hours=1)
        
        job = {
            'job_id': f"retrain_{datetime.now():%Y%m%d%H%M%S}",
            'reason': reason,
            'priority': priority,
            'scheduled_time': schedule_time.isoformat(),
            'status': 'scheduled'
        }
        
        self.retraining_queue.append(job)
        
        return job
    
    def _execute_retraining(self, job: Dict):
        """Execute retraining job."""
        
        print(f"Executing retraining job: {job['job_id']}")
        
        # Run training pipeline
        # This would call your actual training pipeline
        job['status'] = 'completed'
        job['completed_at'] = datetime.now().isoformat()

class CanaryDeployment:
    """Canary deployment system."""
    
    def __init__(self, config: Dict):
        self.config = config
    
    def get_current_replicas(self) -> int:
        """Get current replica count."""
        return 3
    
    def scale_deployment(self, replicas: int):
        """Scale deployment to specified replicas."""
        
        print(f"Scaling deployment to {replicas} replicas")
    
    def rollback_to_stable(self):
        """Rollback to stable version."""
        
        print("Rolling back to stable version")
    
    def optimize_gpu_allocation(self, reduce: bool = True):
        """Optimize GPU allocation."""
        
        if reduce:
            print("Reducing GPU allocation")
        else:
            print("Increasing GPU allocation")
    
    def canary_deploy(self, new_version: str,
                     traffic_percentage: int = 10):
        """Deploy canary version."""
        
        print(f"Deploying canary {new_version} with {traffic_percentage}% traffic")

Maturity Assessment

Assessment Framework

from typing import Dict, List
from dataclasses import dataclass

@dataclass
class MaturityDimension:
    name: str
    current_level: int
    target_level: int
    gaps: List[str]
    recommendations: List[str]

class MLOpsMaturityAssessment:
    """Assess MLOps maturity across dimensions."""
    
    def __init__(self):
        self.dimensions = self._define_dimensions()
    
    def _define_dimensions(self) -> Dict[str, Dict]:
        """Define maturity dimensions and criteria."""
        
        return {
            "version_control": {
                "level_0": ["No version control", "Manual file management"],
                "level_1": ["Git for code", "Basic branching"],
                "level_2": ["Git for code and data", "DVC/LFS", "Branch protection"],
                "level_3": ["GitOps", "Policy enforcement", "Automated compliance"]
            },
            "experiment_tracking": {
                "level_0": ["Notebooks", "Manual logging"],
                "level_1": ["MLflow/W&B", "Basic parameter tracking"],
                "level_2": ["Comprehensive tracking", "Model registry"],
                "level_3": ["Full lineage", "Automated documentation"]
            },
            "pipeline_automation": {
                "level_0": ["Manual scripts", "Ad-hoc execution"],
                "level_1": ["Airflow/Kubeflow", "Scheduled runs"],
                "level_2": ["Event-driven", "CI/CD integration"],
                "level_3": ["Self-healing", "Auto-optimization"]
            },
            "testing": {
                "level_0": ["Manual validation", "Ad-hoc testing"],
                "level_1": ["Basic unit tests", "Model validation"],
                "level_2": ["Data validation", "Integration tests"],
                "level_3": ["Comprehensive test suite", "Chaos engineering"]
            },
            "monitoring": {
                "level_0": ["No monitoring", "Manual checks"],
                "level_1": ["Basic metrics", "Grafana dashboards"],
                "level_2": ["Drift detection", "Alerting"],
                "level_3": ["Predictive monitoring", "Auto-remediation"]
            },
            "deployment": {
                "level_0": ["Manual deployment", "File copy"],
                "level_1": ["Scripted deployment", "Basic validation"],
                "level_2": ["CI/CD", "Canary deployments"],
                "level_3": ["Automated A/B testing", "Self-healing"]
            }
        }
    
    def assess_dimension(self, dimension: str,
                        current_state: List[str]) -> MaturityDimension:
        """Assess maturity for a single dimension."""
        
        criteria = self.dimensions[dimension]
        
        # Determine current level
        current_level = 0
        for level in range(3, 0, -1):
            level_criteria = criteria[f'level_{level}']
            if any(criterion in current_state for criterion in level_criteria):
                current_level = level
                break
        
        # Determine gaps
        gaps = []
        for level in range(current_level + 1, 4):
            level_criteria = criteria[f'level_{level}']
            gaps.extend(level_criteria)
        
        # Generate recommendations
        recommendations = self._generate_recommendations(
            dimension, current_level, gaps
        )
        
        return MaturityDimension(
            name=dimension,
            current_level=current_level,
            target_level=min(current_level + 1, 3),
            gaps=gaps,
            recommendations=recommendations
        )
    
    def _generate_recommendations(self, dimension: str,
                                 current_level: int,
                                 gaps: List[str]) -> List[str]:
        """Generate recommendations for improvement."""
        
        recommendations = {
            "version_control": {
                0: ["Implement Git for all code", "Create branching strategy"],
                1: ["Add DVC for data versioning", "Implement branch protection"],
                2: ["Adopt GitOps practices", "Implement policy enforcement"]
            },
            "experiment_tracking": {
                0: ["Deploy MLflow/W&B", "Establish tracking standards"],
                1: ["Implement model registry", "Add comprehensive metrics"],
                2: ["Enable full lineage tracking", "Automate documentation"]
            },
            "pipeline_automation": {
                0: ["Deploy Airflow/Kubeflow", "Create standard pipelines"],
                1: ["Implement event-driven triggers", "Add CI/CD integration"],
                2: ["Implement self-healing", "Add auto-optimization"]
            }
        }
        
        return recommendations.get(dimension, {}).get(current_level, [])
    
    def generate_assessment_report(self, 
                                  assessments: List[MaturityDimension]) -> Dict:
        """Generate comprehensive assessment report."""
        
        overall_score = sum(a.current_level for a in assessments) / len(assessments)
        
        return {
            'timestamp': datetime.now().isoformat(),
            'overall_maturity_score': overall_score,
            'overall_level': int(overall_score),
            'dimensions': [
                {
                    'name': a.name,
                    'current_level': a.current_level,
                    'target_level': a.target_level,
                    'gaps': a.gaps,
                    'recommendations': a.recommendations
                }
                for a in assessments
            ],
            'priority_areas': [
                a.name for a in assessments
                if a.current_level < 2
            ],
            'roadmap': self._generate_roadmap(assessments)
        }
    
    def _generate_roadmap(self, 
                         assessments: List[MaturityDimension]) -> Dict:
        """Generate implementation roadmap."""
        
        roadmap = {
            'short_term': [],  # 0-3 months
            'medium_term': [],  # 3-6 months
            'long_term': []  # 6-12 months
        }
        
        for assessment in assessments:
            if assessment.current_level == 0:
                roadmap['short_term'].extend(assessment.recommendations[:2])
            elif assessment.current_level == 1:
                roadmap['medium_term'].extend(assessment.recommendations[:2])
            elif assessment.current_level == 2:
                roadmap['long_term'].extend(assessment.recommendations[:2])
        
        return roadmap

ℹ️

Use the maturity assessment to identify gaps and prioritize improvements. Focus on quick wins first, then build towards higher maturity levels over time.

Summary

The MLOps maturity model provides a roadmap for organizational transformation:

  1. Level 0: Manual processes - focus on establishing basic practices
  2. Level 1: Pipeline automation - implement automated training and tracking
  3. Level 2: CI/CD for ML - add automated testing and deployment
  4. Level 3: Full automation - implement self-healing and optimization

Assess your current maturity, identify gaps, and create a roadmap for improvement. Maturity is a journey, not a destination.

Advertisement