MLOps: Bridging Data Engineering and Machine Learning
MLOps applies DevOps practices to machine learning, automating model training, deployment, monitoring, and governance.
Why MLOps Matters
The ML Production Gap:
- 87% of ML models never make it to production (Gartner)
- Notebook prototypes β production systems
- Gap requires infrastructure for feature engineering, model versioning, A/B testing, and monitoring
Data Engineer's Role:
- Build infrastructure that makes ML models production-ready
- Implement feature stores, model registries, and serving infrastructure
- Ensure monitoring and governance
Key Insight: Data engineers build the infrastructure that makes ML models production-ready, bridging the gap between notebook prototypes and production systems.
Architecture Overview
Feature Store
A feature store is a centralized repository for storing, serving, and managing ML features. It provides consistent feature computation for training and serving, eliminating train-serve skew.
Feature Store Architecture
- Offline Store: Historical features for training (Parquet, Delta Lake)
- Online Store: Low-latency features for serving (Redis, DynamoDB)
- Feature Registry: Metadata catalog of all features (schema, owner, freshness)
- Point-in-Time Correctness: Features are computed using only data available at prediction time
- Feature Freshness: Online_Freshness = max(latency of upstream pipeline)
# Feature Store with Feast
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, RequestSource
from feast.data_format import AvroFormat
from datetime import datetime, timedelta
# Define entities
customer_entity = Entity(
name="customer_id",
value_type=ValueType.INT64,
description="Customer identifier"
)
# Define features
customer_features = [
Feature(name="total_orders_30d", value_type=ValueType.INT64),
Feature(name="total_spend_30d", value_type=ValueType.FLOAT),
Feature(name="avg_order_value", value_type=ValueType.FLOAT),
Feature(name="days_since_last_order", value_type=ValueType.INT64),
Feature(name="customer_segment", value_type=ValueType.STRING),
Feature(name="lifetime_value", value_type=ValueType.FLOAT),
]
# Define feature view
from feast import FeatureView
customer_feature_view = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=1),
features=customer_features,
online=True,
batch_engine="spark",
source=FileSource(
path="s3://features/customer_features/",
event_timestamp_column="event_timestamp"
)
)
# Define feature service for versioning
from feast import FeatureService
customer_feature_service = FeatureService(
name="customer_features_v1",
features=[customer_feature_view],
tags={"team": "ml-platform", "version": "v1"}
)
# Materialize features
store = FeatureStore(repo_path=".")
store.materialize(start_date=datetime.now() - timedelta(days=30), end_date=datetime.now())
# Retrieve features for training
training_df = store.get_historical_features(
entity_df=entity_df, # DataFrame with entity columns and timestamps
features=[
"customer_features:total_orders_30d",
"customer_features:total_spend_30d",
"customer_features:customer_segment"
]
).to_df()
# Retrieve features for online serving
online_features = store.get_online_features(
features=[
"customer_features:total_orders_30d",
"customer_features:total_spend_30d",
],
entity_rows=[{"customer_id": 12345}]
).to_dict()
Experiment Tracking and Model Registry
Experiment tracking records all parameters, metrics, and artifacts from ML training runs. It enables reproducibility, comparison, and governance of ML models.
# MLflow Experiment Tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
# Load data from feature store
df = pd.read_parquet("s3://features/customer_churn/")
X = df.drop("churned", axis=1)
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Start MLflow experiment
mlflow.set_experiment("customer_churn_prediction")
with mlflow.start_run(run_name="rf_baseline_v1"):
# Log parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"random_state": 42
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred)
}
# Log metrics
mlflow.log_metrics(metrics)
# Log model
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="customer_churn_model"
)
# Log feature importance
importance_df = pd.DataFrame({
"feature": X.columns,
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
importance_df.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
print(f"Metrics: {metrics}")
# Register model version
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="staging"
)
# Promote to production
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="production"
)
Model Serving
Model serving deploys trained models as APIs for real-time or batch predictions. It handles request routing, load balancing, versioning, and monitoring.
# FastAPI Model Serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import pandas as pd
import redis
import json
from typing import List, Optional
app = FastAPI(title="ML Model Serving API")
# Load model from registry
model = mlflow.pyfunc.load_model("models:/customer_churn_model/production")
# Redis for feature caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class PredictionRequest(BaseModel):
customer_id: int
features: Optional[dict] = None
class PredictionResponse(BaseModel):
customer_id: int
prediction: float
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
# Get features from feature store or cache
cache_key = f"features:{request.customer_id}"
cached = redis_client.get(cache_key)
if cached:
features = json.loads(cached)
else:
# Fetch from feature store
features = fetch_features_from_store(request.customer_id)
redis_client.setex(cache_key, 300, json.dumps(features))
# Prepare input
input_df = pd.DataFrame([features])
# Predict
prediction = model.predict(input_df)[0]
probability = model.predict_proba(input_df)[0][1]
return PredictionResponse(
customer_id=request.customer_id,
prediction=float(prediction),
probability=float(probability),
model_version="1.0.0"
)
@app.post("/predict/batch")
async def predict_batch(requests: List[PredictionRequest]):
results = []
for req in requests:
result = await predict(req)
results.append(result)
return results
def fetch_features_from_store(customer_id: int) -> dict:
"""Fetch features from the feature store."""
from feast import FeatureStore
store = FeatureStore(repo_path=".")
features = store.get_online_features(
features=[
"customer_features:total_orders_30d",
"customer_features:total_spend_30d",
"customer_features:avg_order_value",
"customer_features:customer_segment",
],
entity_rows=[{"customer_id": customer_id}]
).to_dict()
return {k: v[0] for k, v in features.items()}
Model Monitoring
Model monitoring tracks model performance, data drift, and concept drift in production. It detects when models degrade and need retraining.
Drift Detection
- Data Drift: Distribution shift in input features
- PSI (Population Stability Index) = Ξ£ (P_new - P_old) Γ ln(P_new / P_old)
- PSI < 0.1: No drift, PSI 0.1-0.25: Moderate drift, PSI > 0.25: Significant drift
- Concept Drift: Relationship between features and target changes
- Monitor prediction distribution and error rates over time
- Performance Drift: Model metrics degrade below threshold
- Alert when F1 < threshold for N consecutive periods
# Model Monitoring Service
import numpy as np
from scipy.stats import ks_2samp, chi2_contingency
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class DriftAlert:
feature: str
drift_type: str # data_drift, concept_drift, performance_drift
severity: str # low, medium, high
psi_score: float
detected_at: datetime
message: str
class ModelMonitor:
"""Monitor model performance and detect drift."""
def __init__(self, reference_data, model_version: str):
self.reference_data = reference_data
self.model_version = model_version
self.alerts: List[DriftAlert] = []
def calculate_psi(self, expected, actual, bins=10):
"""Calculate Population Stability Index."""
expected_hist, _ = np.histogram(expected, bins=bins, density=True)
actual_hist, _ = np.histogram(actual, bins=bins, density=True)
# Avoid division by zero
expected_hist = np.clip(expected_hist, 1e-6, None)
actual_hist = np.clip(actual_hist, 1e-6, None)
psi = np.sum((actual_hist - expected_hist) * np.log(actual_hist / expected_hist))
return psi
def detect_data_drift(self, new_data, threshold=0.25):
"""Detect data drift using PSI."""
for column in self.reference_data.columns:
if self.reference_data[column].dtype in ['float64', 'int64']:
psi = self.calculate_psi(
self.reference_data[column].values,
new_data[column].values
)
if psi > threshold:
self.alerts.append(DriftAlert(
feature=column,
drift_type="data_drift",
severity="high" if psi > 0.25 else "medium",
psi_score=psi,
detected_at=datetime.now(),
message=f"Significant drift detected in {column}: PSI={psi:.4f}"
))
return self.alerts
def detect_performance_drift(self, predictions, actuals,
f1_threshold=0.7, window_size=100):
"""Detect performance drift using sliding window F1."""
from sklearn.metrics import f1_score
if len(predictions) < window_size:
return []
recent_f1 = f1_score(
actuals[-window_size:],
predictions[-window_size:]
)
if recent_f1 < f1_threshold:
self.alerts.append(DriftAlert(
feature="model_performance",
drift_type="performance_drift",
severity="high",
psi_score=recent_f1,
detected_at=datetime.now(),
message=f"Model F1 dropped to {recent_f1:.4f} (threshold: {f1_threshold})"
))
return self.alerts
# Usage
reference_data = pd.read_parquet("s3://training-data/reference/")
monitor = ModelMonitor(reference_data, "v1.0.0")
# Check for drift
new_data = pd.read_parquet("s3://production-data/latest/")
drift_alerts = monitor.detect_data_drift(new_data)
# Check performance
predictions = model.predict(new_data.drop("target", axis=1))
performance_alerts = monitor.detect_performance_drift(
predictions, new_data["target"].values
)
all_alerts = drift_alerts + performance_alerts
for alert in all_alerts:
print(f"[{alert.severity.upper()}] {alert.message}")
Key Concepts Summary
| Component | Description | Tool/Technology | Metric |
|---|---|---|---|
| Feature Store | Centralized feature repository | Feast, Tecton, Hopsworks | Feature freshness |
| Experiment Tracking | Record training runs | MLflow, Weights & Biases | Reproducibility rate |
| Model Registry | Version and stage models | MLflow Registry | Model versions |
| Model Serving | Deploy models as APIs | FastAPI, Seldon, BentoML | Latency, throughput |
| Model Monitoring | Detect drift and degradation | Evidently, Whylabs | Drift score |
| Pipeline Orchestration | Automate ML workflows | Airflow, Kubeflow, Prefect | Pipeline reliability |
| Data Validation | Validate training data | Great Expectations, TFX | Data quality score |
| A/B Testing | Compare model versions | Custom, LaunchDarkly | Lift over baseline |
Performance Metrics
| Metric | Target | Measurement | Impact |
|---|---|---|---|
| Feature Freshness | < 1 hour | Max age of features | Model accuracy |
| Prediction Latency | < 100ms | p99 latency | User experience |
| Model Accuracy | > 0.85 F1 | Weekly evaluation | Business impact |
| Drift Detection | < 24 hours | Time to detect | Proactive retraining |
| Training Pipeline | < 2 hours | End-to-end runtime | Iteration speed |
| Model Deployment | < 5 minutes | Time to production | Business velocity |
10 Best Practices
- Use a feature store to eliminate train-serve skew and enable feature reuse
- Track all experiments with MLflow β log parameters, metrics, and artifacts
- Version control models in a registry with staging/production environments
- Implement model monitoring for data drift, concept drift, and performance degradation
- Use A/B testing for model deployment β never replace production models without comparison
- Automate retraining pipelines triggered by drift detection or scheduled cadence
- Validate training data with Great Expectations before model training
- Deploy models as microservices with health checks and graceful degradation
- Monitor prediction distributions β sudden changes indicate model issues
- Maintain model lineage β track which data, features, and code produced each model version
- Feature stores provide consistent features for training and serving (eliminate skew)
- Experiment tracking enables reproducibility and comparison of ML runs
- Model registries version and stage models through deployment lifecycle
- Model monitoring detects drift and performance degradation proactively
- Data engineers build the infrastructure that makes ML models production-ready
See Also
- Real-Time Analytics β Streaming infrastructure for real-time feature serving
- Data Lakehouse β Delta Lake for feature store offline storage
- Delta Lake & Iceberg β ACID transactions for ML data pipelines
- dbt Advanced β Feature engineering with dbt
- Performance Optimization β Model serving latency optimization
- Portfolio Projects β MLOps portfolio project specifications