πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

MLOps Lifecycle: Data Versioning, Training, Evaluation, Deployment

MLOpsMLOps Lifecycle⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Netflix, Uber, Meta, Airbnb

"Walk us through the end-to-end MLOps lifecycle at scale. How do you manage data versioning, ensure reproducible training, handle model evaluation gates, and orchestrate deployments across multiple environments?"

The Complete MLOps Lifecycle Architecture

The MLOps lifecycle is a systematic framework that governs the journey of machine learning models from conception to production and beyond. At scale, this involves coordinating data pipelines, feature engineering, model training, evaluation, deployment, monitoring, and retraining in a continuous feedback loop.

MLOps Maturity Levels

Organizations typically evolve through maturity levels:

LevelNameCharacteristics
0ManualManual process, no pipeline
1Pipeline AutomationML pipeline automation
2CI/CD for MLCI/CD with pipeline automation
3Full AutomationFull MLOps with continuous training

ℹ️

At Google scale, the MLOps lifecycle involves millions of models serving billions of predictions daily. The infrastructure must handle versioning, rollback, A/B testing, and canary deployments simultaneously.

Data Versioning Strategies

Data versioning is the foundation of reproducible ML. Unlike code, data volumes can reach petabytes, making traditional version control impractical.

Data Versioning with DVC (Data Version Control)

# dvc.yaml - DVC pipeline definition
stages:
  prepare:
    cmd: python src/data/prepare.py
    deps:
      - src/data/prepare.py
      - data/raw/dataset.csv
    params:
      - prepare.yaml:
          - test_size
          - random_state
    outs:
      - data/prepared/train.csv
      - data/prepared/test.csv
    metrics:
      - metrics/prepare.json:
          cache: false

  featurize:
    cmd: python src/features/build_features.py
    deps:
      - src/features/build_features.py
      - data/prepared/train.csv
      - data/prepared/test.csv
    outs:
      - data/features/train_features.csv
      - data/features/test_features.csv

  train:
    cmd: python src/models/train.py
    deps:
      - src/models/train.py
      - data/features/train_features.csv
    params:
      - train.yaml:
          - learning_rate
          - n_estimators
          - max_depth
    outs:
      - models/model.pkl
    metrics:
      - metrics/train.json:
          cache: false

Delta Lake for ACID Transactions on Data Lakes

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MLDataVersioning") \
    .config("spark.sql.extensions", 
            "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Enable Delta Lake time travel
df = spark.read.format("delta").load("/data/training_set")

# Version 0 - initial load
df.write.format("delta").mode("overwrite").save("/data/training_set")

# Version 1 - updated with new data
new_df = spark.read.format("delta").load("/data/training_set")
updated_df = new_df.union(spark.read.parquet("/data/new_records"))
updated_df.write.format("delta").mode("overwrite").save("/data/training_set")

# Time travel to any version
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/data/training_set")
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/data/training_set")

# Compare versions
print(f"Version 0 rows: {df_v0.count()}")
print(f"Version 1 rows: {df_v1.count()}")

# Get history
delta_table = DeltaTable.forPath(spark, "/data/training_set")
history = delta_table.history()
history.show()

LakeFS for Git-like Data Operations

# Initialize LakeFS repository
lakectl repo create s3://my-bucket/mlops-data \
    --storage-namespace s3://my-lakfs-storage \
    --default-branch main

# Create a branch for experimentation
lakectl branch create s3://my-bucket/mlops-data \
    --source main \
    experiment/v2

# Commit data changes
lakectl commit s3://my-bucket/mlops-data \
    --message "Added 100K new training samples" \
    --metadata author=ml-engineer \
    --metadata pipeline_id=12345

# Diff between branches
lakectl diff s3://my-bucket/mlops-data/main \
    s3://my-bucket/mlops-data/experiment/v2

# Merge experimental branch back
lakectl merge s3://my-bucket/mlops-data/experiment/v2 \
    s3://my-bucket/mlops-data/main

⚠️

Data versioning at scale requires careful consideration of storage costs. Delta Lake and LakeFS provide efficient storage through copy-on-write mechanisms, but petabyte-scale datasets need tiered storage strategies.

Training Pipeline Design

Reproducible Training with Configuration Management

# config/train_config.yaml
experiment:
  name: "fraud_detection_v3"
  seed: 42
  framework: "xgboost"

data:
  version: "v2.1"
  path: "s3://ml-data/training/fraud_v2.1/"
  features:
    - "transaction_amount"
    - "time_since_last_transaction"
    - "merchant_category"
    - "user_transaction_count_24h"
  target: "is_fraud"
  split:
    train: 0.7
    validation: 0.15
    test: 0.15

model:
  type: "XGBClassifier"
  params:
    n_estimators: 500
    max_depth: 8
    learning_rate: 0.01
    subsample: 0.8
    colsample_bytree: 0.8
    min_child_weight: 3
    gamma: 0.1
    reg_alpha: 0.1
    reg_lambda: 1.0

training:
  early_stopping_rounds: 50
  eval_metric: "auc"
  verbose: 100

evaluation:
  metrics:
    - "auc_roc"
    - "precision"
    - "recall"
    - "f1_score"
  thresholds:
    auc_roc: 0.95
    precision: 0.90
    recall: 0.85
  fairness:
    protected_attributes:
      - "age_group"
      - "gender"
    max_disparity: 0.1

deployment:
  strategy: "canary"
  environments:
    - "staging"
    - "production"
  rollback:
    enabled: true
    trigger: "accuracy_drop > 5%"

Python Training Pipeline with MLflow

import mlflow
import mlflow.xgboost
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score, 
    f1_score, confusion_matrix
)
import yaml
import hashlib
from datetime import datetime
from pathlib import Path

class MLTrainingPipeline:
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)
        
        mlflow.set_experiment(self.config['experiment']['name'])
        
    def _compute_data_hash(self, df: pd.DataFrame) -> str:
        """Compute deterministic hash of dataset for versioning."""
        return hashlib.md5(
            pd.util.hash_pandas_object(df).values.tobytes()
        ).hexdigest()
    
    def prepare_data(self):
        """Load and prepare training data."""
        data_path = self.config['data']['path']
        df = pd.read_parquet(data_path)
        
        features = self.config['data']['features']
        target = self.config['data']['target']
        
        X = df[features]
        y = df[target]
        
        data_hash = self._compute_data_hash(df)
        
        return X, y, data_hash
    
    def train_with_cv(self, X, y):
        """Train model with cross-validation."""
        params = self.config['model']['params']
        n_folds = 5
        
        skf = StratifiedKFold(
            n_splits=n_folds, 
            shuffle=True, 
            random_state=self.config['experiment']['seed']
        )
        
        oof_predictions = np.zeros(len(X))
        models = []
        cv_scores = []
        
        for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
            
            dtrain = xgb.DMatrix(X_train, label=y_train)
            dval = xgb.DMatrix(X_val, label=y_val)
            
            model = xgb.train(
                params,
                dtrain,
                num_boost_round=1000,
                evals=[(dval, 'val')],
                early_stopping_rounds=50,
                verbose_eval=100
            )
            
            val_pred = model.predict(dval)
            oof_predictions[val_idx] = val_pred
            
            auc = roc_auc_score(y_val, val_pred)
            cv_scores.append(auc)
            
            models.append(model)
            
            mlflow.log_metric(f"fold_{fold}_auc", auc)
        
        return models, oof_predictions, cv_scores
    
    def evaluate(self, y_true, y_pred):
        """Comprehensive model evaluation."""
        metrics = {
            'auc_roc': roc_auc_score(y_true, y_pred),
            'precision': precision_score(y_true, (y_pred > 0.5).astype(int)),
            'recall': recall_score(y_true, (y_pred > 0.5).astype(int)),
            'f1_score': f1_score(y_true, (y_pred > 0.5).astype(int)),
        }
        
        thresholds = self.config['evaluation']['thresholds']
        for metric, threshold in thresholds.items():
            if metric in metrics:
                passed = metrics[metric] >= threshold
                metrics[f'{metric}_passed'] = passed
                if not passed:
                    raise ValueError(
                        f"Metric {metric}={metrics[metric]:.4f} "
                        f"below threshold {threshold}"
                    )
        
        return metrics
    
    def run(self):
        """Execute full training pipeline."""
        with mlflow.start_run(run_name=f"run_{datetime.now():%Y%m%d_%H%M%S}"):
            # Log configuration
            mlflow.log_params(self.config['model']['params'])
            mlflow.log_param("data_version", self.config['data']['version'])
            mlflow.log_param("seed", self.config['experiment']['seed'])
            
            # Prepare data
            X, y, data_hash = self.prepare_data()
            mlflow.log_param("data_hash", data_hash)
            mlflow.log_param("dataset_size", len(X))
            
            # Train
            models, oof_preds, cv_scores = self.train_with_cv(X, y)
            
            # Evaluate
            metrics = self.evaluate(y, oof_preds)
            mlflow.log_metrics(metrics)
            mlflow.log_metric("cv_mean_auc", np.mean(cv_scores))
            mlflow.log_metric("cv_std_auc", np.std(cv_scores))
            
            # Log best model
            best_model = models[np.argmax(cv_scores)]
            mlflow.xgboost.log_model(
                best_model, 
                "model",
                registered_model_name=self.config['experiment']['name']
            )
            
            return metrics

if __name__ == "__main__":
    pipeline = MLTrainingPipeline("config/train_config.yaml")
    metrics = pipeline.run()
    print(f"Training completed. Metrics: {metrics}")

Model Evaluation Gates

Multi-Dimensional Evaluation Framework

class ModelEvaluationGates:
    """Enforce quality gates before model deployment."""
    
    def __init__(self, config: dict):
        self.config = config
        self.gates_passed = []
        self.gates_failed = []
    
    def performance_gate(self, metrics: dict) -> bool:
        """Gate 1: Model performance meets minimum thresholds."""
        thresholds = self.config['evaluation']['thresholds']
        
        for metric, threshold in thresholds.items():
            if metric not in metrics:
                self.gates_failed.append(f"Missing metric: {metric}")
                return False
            
            if metrics[metric] < threshold:
                self.gates_failed.append(
                    f"Performance: {metric}={metrics[metric]:.4f} "
                    f"< {threshold}"
                )
                return False
        
        self.gates_passed.append("Performance gate PASSED")
        return True
    
    def fairness_gate(self, predictions, protected_attrs) -> bool:
        """Gate 2: Model meets fairness criteria."""
        max_disparity = self.config['evaluation']['fairness']['max_disparity']
        
        for attr in protected_attrs.columns.unique():
            groups = protected_attrs[attr].unique()
            
            group_aucs = {}
            for group in groups:
                mask = protected_attrs[attr] == group
                if mask.sum() > 100:  # Minimum sample size
                    group_auc = roc_auc_score(
                        self.y_true[mask], 
                        predictions[mask]
                    )
                    group_aucs[group] = group_auc
            
            if len(group_aucs) > 1:
                disparities = []
                for i, g1 in enumerate(groups):
                    for g2 in groups[i+1:]:
                        if g1 in group_aucs and g2 in group_aucs:
                            disp = abs(group_aucs[g1] - group_aucs[g2])
                            disparities.append(disp)
                
                max_disp = max(disparities) if disparities else 0
                if max_disp > max_disparity:
                    self.gates_failed.append(
                        f"Fairness: {attr} disparity {max_disp:.4f} "
                        f"> {max_disparity}"
                    )
                    return False
        
        self.gates_passed.append("Fairness gate PASSED")
        return True
    
    def robustness_gate(self, model, X_test, y_test) -> bool:
        """Gate 3: Model robustness under perturbation."""
        baseline_auc = roc_auc_score(y_test, model.predict(X_test))
        
        perturbations = [
            ('noise_001', lambda x: x + np.random.normal(0, 0.001, x.shape)),
            ('noise_01', lambda x: x + np.random.normal(0, 0.01, x.shape)),
            ('noise_1', lambda x: x + np.random.normal(0, 0.1, x.shape)),
        ]
        
        for name, perturb_fn in perturbations:
            X_perturbed = perturb_fn(X_test.copy())
            perturbed_auc = roc_auc_score(y_test, model.predict(X_perturbed))
            degradation = baseline_auc - perturbed_auc
            
            mlflow.log_metric(f"robustness_{name}_degradation", degradation)
            
            if degradation > 0.05:  # 5% max degradation threshold
                self.gates_failed.append(
                    f"Robustness: {name} degradation {degradation:.4f} > 0.05"
                )
                return False
        
        self.gates_passed.append("Robustness gate PASSED")
        return True
    
    def data_quality_gate(self, data_stats: dict) -> bool:
        """Gate 4: Training data quality checks."""
        checks = {
            'null_ratio': lambda x: x < 0.05,
            'feature_variance': lambda x: x > 1e-6,
            'class_balance': lambda x: 0.2 < x < 0.8,
        }
        
        for metric, check_fn in checks.items():
            if metric in data_stats:
                if not check_fn(data_stats[metric]):
                    self.gates_failed.append(
                        f"Data Quality: {metric}={data_stats[metric]}"
                    )
                    return False
        
        self.gates_passed.append("Data quality gate PASSED")
        return True
    
    def evaluate_all(self, metrics, predictions, protected_attrs, 
                     model, X_test, y_test, data_stats) -> bool:
        """Run all evaluation gates."""
        gates = [
            ("Performance", lambda: self.performance_gate(metrics)),
            ("Fairness", lambda: self.fairness_gate(predictions, protected_attrs)),
            ("Robustness", lambda: self.robustness_gate(model, X_test, y_test)),
            ("Data Quality", lambda: self.data_quality_gate(data_stats)),
        ]
        
        all_passed = True
        for name, gate_fn in gates:
            try:
                passed = gate_fn()
                if not passed:
                    all_passed = False
            except Exception as e:
                self.gates_failed.append(f"{name}: Exception - {str(e)}")
                all_passed = False
        
        return all_passed

❌

Never skip evaluation gates for "quick" deployments. At scale, a single bad model can affect millions of users and cost millions in revenue. Always enforce automated quality gates.

Deployment Strategies

Canary Deployment with Traffic Shifting

# kubernetes/canary-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-canary
  labels:
    app: ml-model
    version: v2
spec:
  replicas: 2
  selector:
    matchLabels:
      app: ml-model
      version: v2
  template:
    metadata:
      labels:
        app: ml-model
        version: v2
    spec:
      containers:
      - name: model-server
        image: registry.example.com/ml-model:v2
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
            nvidia.com/gpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2000m"
            nvidia.com/gpu: "1"
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 30
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: ml-model-vs
spec:
  hosts:
  - ml-model
  http:
  - route:
    - destination:
        host: ml-model
        subset: stable
      weight: 90
    - destination:
        host: ml-model
        subset: canary
      weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: ml-model-dr
spec:
  host: ml-model
  subsets:
  - name: stable
    labels:
      version: v1
  - name: canary
    labels:
      version: v2

Automated Rollback on Metric Degradation

from kubernetes import client, config
import time
import requests
from prometheus_api_client import PrometheusConnect

class CanaryDeploymentManager:
    def __init__(self, prometheus_url: str, kube_config_path: str):
        config.load_kube_config(config_file=kube_config_path)
        self.apps_v1 = client.AppsV1Api()
        self.networking_v1 = client.NetworkingV1Api()
        self.prom = PrometheusConnect(url=prometheus_url)
    
    def get_model_metrics(self, model_name: str, window: str = "5m") -> dict:
        """Fetch current model metrics from Prometheus."""
        queries = {
            'latency_p99': f'histogram_quantile(0.99, rate(model_latency_seconds_bucket{{model="{model_name}"}}[{window}]))',
            'error_rate': f'sum(rate(model_errors_total{{model="{model_name}"}}[{window}])) / sum(rate(model_requests_total{{model="{model_name}"}}[{window}]))',
            'accuracy': f'avg(model_accuracy{{model="{model_name}"}}[{window}])',
        }
        
        metrics = {}
        for name, query in queries.items():
            result = self.prom.custom_query(query=query)
            if result:
                metrics[name] = float(result[0]['value'][1])
        
        return metrics
    
    def shift_traffic(self, canary_weight: int):
        """Shift traffic between stable and canary."""
        vs = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "VirtualService",
            "metadata": {"name": "ml-model-vs"},
            "spec": {
                "hosts": ["ml-model"],
                "http": [{
                    "route": [
                        {
                            "destination": {"host": "ml-model", "subset": "stable"},
                            "weight": 100 - canary_weight
                        },
                        {
                            "destination": {"host": "ml-model", "subset": "canary"},
                            "weight": canary_weight
                        }
                    ]
                }]
            }
        }
        
        self.networking_v1.patch_namespaced_custom_object(
            group="networking.istio.io",
            version="v1beta1",
            namespace="ml-production",
            plural="virtualservices",
            name="ml-model-vs",
            body=vs
        )
    
    def rollback(self, model_name: str):
        """Emergency rollback to stable version."""
        print(f"ROLLBACK: Reverting {model_name} to stable version")
        self.shift_traffic(canary_weight=0)
        
        self.apps_v1.patch_namespaced_deployment(
            name="model-canary",
            namespace="ml-production",
            body={"spec": {"replicas": 0}}
        )
        
        self.apps_v1.patch_namespaced_deployment(
            name="model-stable",
            namespace="ml-production",
            body={"spec": {"replicas": 3}}
        )
    
    def canary_deploy(self, model_name: str, 
                      canary_weight: int = 10,
                      increment: int = 10,
                      monitor_interval: int = 300,
                      max_failures: int = 3) -> bool:
        """Execute canary deployment with automated rollback."""
        
        # Define thresholds
        thresholds = {
            'latency_p99': 0.5,  # 500ms
            'error_rate': 0.01,  # 1%
            'accuracy': 0.85,    # 85%
        }
        
        failures = 0
        current_weight = canary_weight
        
        while current_weight <= 100:
            print(f"Canary: shifting {current_weight}% traffic to canary")
            self.shift_traffic(current_weight)
            
            time.sleep(monitor_interval)
            
            metrics = self.get_model_metrics(model_name)
            print(f"Current metrics: {metrics}")
            
            # Check thresholds
            violated = False
            for metric, threshold in thresholds.items():
                if metric in metrics:
                    if metric == 'error_rate':
                        if metrics[metric] > threshold:
                            violated = True
                    elif metric == 'accuracy':
                        if metrics[metric] < threshold:
                            violated = True
                    elif metrics[metric] > threshold:
                        violated = True
            
            if violated:
                failures += 1
                print(f"Threshold violation! Failure {failures}/{max_failures}")
                
                if failures >= max_failures:
                    self.rollback(model_name)
                    return False
            else:
                failures = 0
            
            if current_weight == 100:
                print("Canary deployment complete!")
                return True
            
            current_weight = min(current_weight + increment, 100)
        
        return True

Continuous Training Pipeline

ML Pipeline Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    MLOps Continuous Training                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data     │───▢│ Feature  │───▢│ Training │───▢│ Evaluate β”‚ β”‚
β”‚  β”‚  Ingest   β”‚    β”‚ Store    β”‚    β”‚ Pipeline β”‚    β”‚ & Gate   β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β”‚              β”‚              β”‚                β”‚         β”‚
β”‚       β–Ό              β–Ό              β–Ό                β–Ό         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data    β”‚    β”‚ Feature  β”‚    β”‚  Model   β”‚    β”‚ Model    β”‚ β”‚
β”‚  β”‚  Lake    β”‚    β”‚ Registry β”‚    β”‚ Registry β”‚    β”‚ Registry β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                      β”‚         β”‚
β”‚                                                      β–Ό         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Monitor  │◀───│ Deploy   │◀───│ Canary   │◀───│ Promote  β”‚ β”‚
β”‚  β”‚ & Alert  β”‚    β”‚ Service  β”‚    β”‚ Testing  β”‚    β”‚ to Prod  β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β”‚                                                        β”‚
β”‚       β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚       └─▢│  Feedback Loop: Retraining Trigger               β”‚  β”‚
β”‚          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Airflow DAG for End-to-End ML Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'mlops_continuous_training',
    default_args=default_args,
    description='End-to-end ML pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['mlops', 'production'],
) as dag:

    ingest = DockerOperator(
        task_id='data_ingestion',
        image='registry.example.com/ml-pipeline/ingest:latest',
        command='python ingest.py --date {{ ds }}',
        environment={
            'DATA_SOURCE': 's3://raw-data-bucket/',
            'OUTPUT_PATH': '/data/raw/{{ ds }}/'
        },
        mount_tmp_folder=False,
        auto_remove=True,
    )

    validate = DockerOperator(
        task_id='data_validation',
        image='registry.example.com/ml-pipeline/validate:latest',
        command='python validate.py --input /data/raw/{{ ds }}/',
    )

    feature = DockerOperator(
        task_id='feature_engineering',
        image='registry.example.com/ml-pipeline/features:latest',
        command='python build_features.py --input /data/raw/{{ ds }}/',
    )

    train = DockerOperator(
        task_id='model_training',
        image='registry.example.com/ml-pipeline/train:latest',
        command='python train.py',
        gpu_request=1,
    )

    evaluate = DockerOperator(
        task_id='model_evaluation',
        image='registry.example.com/ml-pipeline/evaluate:latest',
        command='python evaluate.py',
    )

    deploy = DockerOperator(
        task_id='model_deployment',
        image='registry.example.com/ml-pipeline/deploy:latest',
        command='python deploy.py --env staging',
    )

    ingest >> validate >> feature >> train >> evaluate >> deploy

ℹ️

The entire pipeline should complete within a defined SLA. For daily retraining, aim for < 2 hours total. For real-time systems, consider incremental training with smaller windows.

Key Metrics for MLOps Monitoring

Track these metrics across your lifecycle:

MLOPS_METRICS = {
    'data': {
        'pipeline_duration_seconds': 'gauge',
        'data_freshness_hours': 'gauge',
        'schema_violations_total': 'counter',
        'feature_drift_score': 'gauge',
    },
    'training': {
        'training_duration_seconds': 'gauge',
        'training_loss': 'gauge',
        'validation_loss': 'gauge',
        'model_size_bytes': 'gauge',
    },
    'serving': {
        'prediction_latency_seconds': 'histogram',
        'prediction_throughput_rps': 'gauge',
        'model_accuracy': 'gauge',
        'error_rate': 'gauge',
    },
    'infrastructure': {
        'gpu_utilization': 'gauge',
        'memory_usage_bytes': 'gauge',
        'cpu_utilization': 'gauge',
        'network_io_bytes': 'counter',
    }
}

Summary

The MLOps lifecycle requires careful orchestration of:

  1. Data Versioning: Use DVC, Delta Lake, or LakeFS for reproducibility
  2. Training Pipelines: Automated, configurable, with experiment tracking
  3. Evaluation Gates: Multi-dimensional quality checks before deployment
  4. Deployment Strategies: Canary, blue-green, with automated rollback
  5. Continuous Training: Feedback loops for model freshness

Master this lifecycle to build production ML systems at scale.

Advertisement