πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

MLOps for Data Engineering: ML Pipeline Infrastructure

Module 4: Advanced DE & CareerAdvanced Data Engineering🟒 Free Lesson

Advertisement

MLOps: Bridging Data Engineering and Machine Learning

MLOps applies DevOps practices to machine learning, automating model training, deployment, monitoring, and governance.

Why MLOps Matters


The ML Production Gap:

  • 87% of ML models never make it to production (Gartner)
  • Notebook prototypes β‰  production systems
  • Gap requires infrastructure for feature engineering, model versioning, A/B testing, and monitoring

Data Engineer's Role:

  • Build infrastructure that makes ML models production-ready
  • Implement feature stores, model registries, and serving infrastructure
  • Ensure monitoring and governance

Key Insight: Data engineers build the infrastructure that makes ML models production-ready, bridging the gap between notebook prototypes and production systems.


Architecture Overview

MLOps Architecture PipelineData SourcesRaw DataEventsLogsDB CDCLake / S3IngestionKafkaAirflowCDCStreamingKafka TopicsFeature StoreFeast / TectonOffline StoreOnline StoreFeature RegRedis + DeltaTrainingMLflowKubeflowExperimentsHyperparamsGPU ClusterRegistryModel VersionsStaging/ProdArtifactsLineageMLflow RegServingFastAPISeldonBatch InferenceA/B TestingDocker + K8sMonitorDriftAlertRetrain trigger

Feature Store

A feature store is a centralized repository for storing, serving, and managing ML features. It provides consistent feature computation for training and serving, eliminating train-serve skew.

Feature Store Architecture

  • Offline Store: Historical features for training (Parquet, Delta Lake)
  • Online Store: Low-latency features for serving (Redis, DynamoDB)
  • Feature Registry: Metadata catalog of all features (schema, owner, freshness)
  • Point-in-Time Correctness: Features are computed using only data available at prediction time
  • Feature Freshness: Online_Freshness = max(latency of upstream pipeline)
# Feature Store with Feast
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, RequestSource
from feast.data_format import AvroFormat
from datetime import datetime, timedelta

# Define entities
customer_entity = Entity(
    name="customer_id",
    value_type=ValueType.INT64,
    description="Customer identifier"
)

# Define features
customer_features = [
    Feature(name="total_orders_30d", value_type=ValueType.INT64),
    Feature(name="total_spend_30d", value_type=ValueType.FLOAT),
    Feature(name="avg_order_value", value_type=ValueType.FLOAT),
    Feature(name="days_since_last_order", value_type=ValueType.INT64),
    Feature(name="customer_segment", value_type=ValueType.STRING),
    Feature(name="lifetime_value", value_type=ValueType.FLOAT),
]

# Define feature view
from feast import FeatureView

customer_feature_view = FeatureView(
    name="customer_features",
    entities=["customer_id"],
    ttl=timedelta(days=1),
    features=customer_features,
    online=True,
    batch_engine="spark",
    source=FileSource(
        path="s3://features/customer_features/",
        event_timestamp_column="event_timestamp"
    )
)

# Define feature service for versioning
from feast import FeatureService

customer_feature_service = FeatureService(
    name="customer_features_v1",
    features=[customer_feature_view],
    tags={"team": "ml-platform", "version": "v1"}
)

# Materialize features
store = FeatureStore(repo_path=".")
store.materialize(start_date=datetime.now() - timedelta(days=30), end_date=datetime.now())

# Retrieve features for training
training_df = store.get_historical_features(
    entity_df=entity_df,  # DataFrame with entity columns and timestamps
    features=[
        "customer_features:total_orders_30d",
        "customer_features:total_spend_30d",
        "customer_features:customer_segment"
    ]
).to_df()

# Retrieve features for online serving
online_features = store.get_online_features(
    features=[
        "customer_features:total_orders_30d",
        "customer_features:total_spend_30d",
    ],
    entity_rows=[{"customer_id": 12345}]
).to_dict()

Experiment Tracking and Model Registry

Model LifecycleExperimentMLflow runsDraftValidationEvaluate metricsStagingRegisteredVersion v1.0.0ProductionServingAPI endpointActiveMonitoringDrift detectionRetrain?Retrain triggered by drift or scheduleEach stage has approval gates and quality checks

Experiment tracking records all parameters, metrics, and artifacts from ML training runs. It enables reproducibility, comparison, and governance of ML models.

# MLflow Experiment Tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

# Load data from feature store
df = pd.read_parquet("s3://features/customer_churn/")
X = df.drop("churned", axis=1)
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Start MLflow experiment
mlflow.set_experiment("customer_churn_prediction")

with mlflow.start_run(run_name="rf_baseline_v1"):
    # Log parameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred)
    }

    # Log metrics
    mlflow.log_metrics(metrics)

    # Log model
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="customer_churn_model"
    )

    # Log feature importance
    importance_df = pd.DataFrame({
        "feature": X.columns,
        "importance": model.feature_importances_
    }).sort_values("importance", ascending=False)
    importance_df.to_csv("feature_importance.csv", index=False)
    mlflow.log_artifact("feature_importance.csv")

    print(f"Metrics: {metrics}")

# Register model version
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="customer_churn_model",
    version=1,
    stage="staging"
)

# Promote to production
client.transition_model_version_stage(
    name="customer_churn_model",
    version=1,
    stage="production"
)

Model Serving

Model serving deploys trained models as APIs for real-time or batch predictions. It handles request routing, load balancing, versioning, and monitoring.

# FastAPI Model Serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import pandas as pd
import redis
import json
from typing import List, Optional

app = FastAPI(title="ML Model Serving API")

# Load model from registry
model = mlflow.pyfunc.load_model("models:/customer_churn_model/production")

# Redis for feature caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class PredictionRequest(BaseModel):
    customer_id: int
    features: Optional[dict] = None

class PredictionResponse(BaseModel):
    customer_id: int
    prediction: float
    probability: float
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    # Get features from feature store or cache
    cache_key = f"features:{request.customer_id}"
    cached = redis_client.get(cache_key)

    if cached:
        features = json.loads(cached)
    else:
        # Fetch from feature store
        features = fetch_features_from_store(request.customer_id)
        redis_client.setex(cache_key, 300, json.dumps(features))

    # Prepare input
    input_df = pd.DataFrame([features])

    # Predict
    prediction = model.predict(input_df)[0]
    probability = model.predict_proba(input_df)[0][1]

    return PredictionResponse(
        customer_id=request.customer_id,
        prediction=float(prediction),
        probability=float(probability),
        model_version="1.0.0"
    )

@app.post("/predict/batch")
async def predict_batch(requests: List[PredictionRequest]):
    results = []
    for req in requests:
        result = await predict(req)
        results.append(result)
    return results

def fetch_features_from_store(customer_id: int) -> dict:
    """Fetch features from the feature store."""
    from feast import FeatureStore
    store = FeatureStore(repo_path=".")

    features = store.get_online_features(
        features=[
            "customer_features:total_orders_30d",
            "customer_features:total_spend_30d",
            "customer_features:avg_order_value",
            "customer_features:customer_segment",
        ],
        entity_rows=[{"customer_id": customer_id}]
    ).to_dict()

    return {k: v[0] for k, v in features.items()}

Model Monitoring

Model monitoring tracks model performance, data drift, and concept drift in production. It detects when models degrade and need retraining.

Drift Detection

  • Data Drift: Distribution shift in input features
    • PSI (Population Stability Index) = Ξ£ (P_new - P_old) Γ— ln(P_new / P_old)
    • PSI < 0.1: No drift, PSI 0.1-0.25: Moderate drift, PSI > 0.25: Significant drift
  • Concept Drift: Relationship between features and target changes
    • Monitor prediction distribution and error rates over time
  • Performance Drift: Model metrics degrade below threshold
    • Alert when F1 < threshold for N consecutive periods
# Model Monitoring Service
import numpy as np
from scipy.stats import ks_2samp, chi2_contingency
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime

@dataclass
class DriftAlert:
    feature: str
    drift_type: str  # data_drift, concept_drift, performance_drift
    severity: str    # low, medium, high
    psi_score: float
    detected_at: datetime
    message: str

class ModelMonitor:
    """Monitor model performance and detect drift."""

    def __init__(self, reference_data, model_version: str):
        self.reference_data = reference_data
        self.model_version = model_version
        self.alerts: List[DriftAlert] = []

    def calculate_psi(self, expected, actual, bins=10):
        """Calculate Population Stability Index."""
        expected_hist, _ = np.histogram(expected, bins=bins, density=True)
        actual_hist, _ = np.histogram(actual, bins=bins, density=True)

        # Avoid division by zero
        expected_hist = np.clip(expected_hist, 1e-6, None)
        actual_hist = np.clip(actual_hist, 1e-6, None)

        psi = np.sum((actual_hist - expected_hist) * np.log(actual_hist / expected_hist))
        return psi

    def detect_data_drift(self, new_data, threshold=0.25):
        """Detect data drift using PSI."""
        for column in self.reference_data.columns:
            if self.reference_data[column].dtype in ['float64', 'int64']:
                psi = self.calculate_psi(
                    self.reference_data[column].values,
                    new_data[column].values
                )

                if psi > threshold:
                    self.alerts.append(DriftAlert(
                        feature=column,
                        drift_type="data_drift",
                        severity="high" if psi > 0.25 else "medium",
                        psi_score=psi,
                        detected_at=datetime.now(),
                        message=f"Significant drift detected in {column}: PSI={psi:.4f}"
                    ))

        return self.alerts

    def detect_performance_drift(self, predictions, actuals,
                                  f1_threshold=0.7, window_size=100):
        """Detect performance drift using sliding window F1."""
        from sklearn.metrics import f1_score

        if len(predictions) < window_size:
            return []

        recent_f1 = f1_score(
            actuals[-window_size:],
            predictions[-window_size:]
        )

        if recent_f1 < f1_threshold:
            self.alerts.append(DriftAlert(
                feature="model_performance",
                drift_type="performance_drift",
                severity="high",
                psi_score=recent_f1,
                detected_at=datetime.now(),
                message=f"Model F1 dropped to {recent_f1:.4f} (threshold: {f1_threshold})"
            ))

        return self.alerts

# Usage
reference_data = pd.read_parquet("s3://training-data/reference/")
monitor = ModelMonitor(reference_data, "v1.0.0")

# Check for drift
new_data = pd.read_parquet("s3://production-data/latest/")
drift_alerts = monitor.detect_data_drift(new_data)

# Check performance
predictions = model.predict(new_data.drop("target", axis=1))
performance_alerts = monitor.detect_performance_drift(
    predictions, new_data["target"].values
)

all_alerts = drift_alerts + performance_alerts
for alert in all_alerts:
    print(f"[{alert.severity.upper()}] {alert.message}")

Key Concepts Summary

ComponentDescriptionTool/TechnologyMetric
Feature StoreCentralized feature repositoryFeast, Tecton, HopsworksFeature freshness
Experiment TrackingRecord training runsMLflow, Weights & BiasesReproducibility rate
Model RegistryVersion and stage modelsMLflow RegistryModel versions
Model ServingDeploy models as APIsFastAPI, Seldon, BentoMLLatency, throughput
Model MonitoringDetect drift and degradationEvidently, WhylabsDrift score
Pipeline OrchestrationAutomate ML workflowsAirflow, Kubeflow, PrefectPipeline reliability
Data ValidationValidate training dataGreat Expectations, TFXData quality score
A/B TestingCompare model versionsCustom, LaunchDarklyLift over baseline

Performance Metrics

MetricTargetMeasurementImpact
Feature Freshness< 1 hourMax age of featuresModel accuracy
Prediction Latency< 100msp99 latencyUser experience
Model Accuracy> 0.85 F1Weekly evaluationBusiness impact
Drift Detection< 24 hoursTime to detectProactive retraining
Training Pipeline< 2 hoursEnd-to-end runtimeIteration speed
Model Deployment< 5 minutesTime to productionBusiness velocity

10 Best Practices

  1. Use a feature store to eliminate train-serve skew and enable feature reuse
  2. Track all experiments with MLflow β€” log parameters, metrics, and artifacts
  3. Version control models in a registry with staging/production environments
  4. Implement model monitoring for data drift, concept drift, and performance degradation
  5. Use A/B testing for model deployment β€” never replace production models without comparison
  6. Automate retraining pipelines triggered by drift detection or scheduled cadence
  7. Validate training data with Great Expectations before model training
  8. Deploy models as microservices with health checks and graceful degradation
  9. Monitor prediction distributions β€” sudden changes indicate model issues
  10. Maintain model lineage β€” track which data, features, and code produced each model version

  • Feature stores provide consistent features for training and serving (eliminate skew)
  • Experiment tracking enables reproducibility and comparison of ML runs
  • Model registries version and stage models through deployment lifecycle
  • Model monitoring detects drift and performance degradation proactively
  • Data engineers build the infrastructure that makes ML models production-ready

See Also

⭐

Premium Content

MLOps for Data Engineering: ML Pipeline Infrastructure

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement