Interview Question (Hard) β Asked at: Google, Meta, Uber, Netflix, Airbnb
"Design an ML pipeline orchestration system that handles data validation, feature engineering, model training, evaluation, and deployment. How do you ensure idempotency, handle failures, and manage dependencies?"
ML Pipeline Architecture Overview
ML pipelines coordinate the end-to-end workflow of building, deploying, and maintaining machine learning models. A well-designed pipeline ensures reproducibility, scalability, and maintainability.
Pipeline Design Principles
- Idempotency: Same inputs produce same outputs
- Reproducibility: Pipelines can be re-run with deterministic results
- Modularity: Components are independent and composable
- Observability: Full visibility into pipeline execution
- Fault Tolerance: Graceful handling of failures
Pipeline Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML Pipeline Orchestration β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data βββββΆβ Feature βββββΆβ Training βββββΆβ Evaluate β β
β β Ingest β β Compute β β Pipeline β β Pipeline β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Schedule β β Trigger β β Monitor β β Deploy β β
β β (Cron) β β (Event) β β (Alerts) β β (Canary) β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Pipeline State Store β β
β β (PostgreSQL / Redis / etcd) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Kubeflow Pipelines
KFP Pipeline Definition
from kfp import dsl
from kfp import compiler
from kfp.dsl import (
Input, Output, Dataset, Model, Metrics,
component, pipeline, Condition
)
from kfp import kubernetes
import json
@component(
base_image="python:3.9",
packages_to_install=[
"pandas==2.0.0",
"great-expectations==0.17.0",
"pyarrow==12.0.0"
]
)
def data_validation(
input_data: Input[Dataset],
validated_data: Output[Dataset],
report: Output[Dataset]
):
"""Validate incoming data quality."""
import pandas as pd
import great_expectations as ge
from great_expectations.core import ExpectationSuite
from pathlib import Path
import json
# Load data
df = pd.read_parquet(input_data.path)
# Create expectation suite
suite = ExpectationSuite(expectation_suite_name="data_quality")
# Define expectations
expectations = [
{"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "user_id"}},
{"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {"column": "user_id"}},
{"expectation_type": "expect_column_values_to_be_between",
"kwargs": {"column": "amount", "min_value": 0, "max_value": 100000}},
{"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {"min_value": 1000, "max_value": 10000000}},
]
for exp in expectations:
suite.add_expectation(exp)
# Validate
ge_df = ge.from_pandas(df)
results = ge_df.validate(suite)
# Save validated data
df.to_parquet(validated_data.path, index=False)
# Save validation report
report_data = {
"passed": results.success,
"statistics": results.statistics,
"results": [
{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"result": r.result
}
for r in results.results
]
}
with open(report.path, 'w') as f:
json.dump(report_data, f, indent=2, default=str)
@component(
base_image="python:3.9",
packages_to_install=[
"pandas==2.0.0",
"scikit-learn==1.3.0",
"feature-engine==1.6.0"
]
)
def feature_engineering(
input_data: Input[Dataset],
output_features: Output[Dataset],
feature_config: dict
):
"""Transform raw data into ML features."""
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import joblib
df = pd.read_parquet(input_data.path)
# Define transformations based on config
numeric_features = feature_config.get('numeric_features', [])
categorical_features = feature_config.get('categorical_features', [])
# Create preprocessing pipeline
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), numeric_features),
('cat', LabelEncoder(), categorical_features)
]
)
# Fit and transform
X = df.drop(columns=[feature_config['target']])
y = df[feature_config['target']]
X_processed = preprocessor.fit_transform(X)
# Save processed features
feature_names = (
numeric_features +
[f"cat_{f}" for f in categorical_features]
)
output_df = pd.DataFrame(X_processed, columns=feature_names)
output_df[feature_config['target']] = y.values
output_df.to_parquet(output_features.path, index=False)
# Save preprocessor
joblib.dump(preprocessor, f"{output_features.path}_preprocessor.pkl")
@component(
base_image="python:3.9",
packages_to_install=[
"xgboost==1.7.0",
"scikit-learn==1.3.0",
"mlflow==2.5.0"
]
)
def model_training(
train_data: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
hyperparams: dict
):
"""Train XGBoost model with MLflow tracking."""
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score
import mlflow
import json
mlflow.set_experiment("kfp_training")
df = pd.read_parquet(train_data.path)
target_col = hyperparams.get('target', 'label')
X = df.drop(columns=[target_col])
y = df[target_col]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
with mlflow.start_run():
# Log hyperparameters
mlflow.log_params(hyperparams)
# Train model
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
'objective': 'binary:logistic',
'eval_metric': 'auc',
'max_depth': hyperparams.get('max_depth', 6),
'learning_rate': hyperparams.get('learning_rate', 0.1),
'n_estimators': hyperparams.get('n_estimators', 100),
'subsample': hyperparams.get('subsample', 0.8),
'colsample_bytree': hyperparams.get('colsample_bytree', 0.8),
}
model_xgb = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dval, 'val')],
early_stopping_rounds=50,
verbose_eval=100
)
# Evaluate
val_pred = model_xgb.predict(dval)
eval_metrics = {
'auc_roc': float(roc_auc_score(y_val, val_pred)),
'precision': float(precision_score(y_val, (val_pred > 0.5).astype(int))),
'recall': float(recall_score(y_val, (val_pred > 0.5).astype(int))),
}
mlflow.log_metrics(eval_metrics)
# Save model
model_xgb.save_model(model.path)
# Log metrics
metrics.log_metrics(eval_metrics)
@component(
base_image="python:3.9",
packages_to_install=["requests==2.31.0"]
)
def model_evaluation(
model: Input[Model],
test_data: Input[Dataset],
metrics: Output[Metrics],
threshold_config: dict
):
"""Evaluate model against quality gates."""
import pandas as pd
import xgboost as xgb
import json
# Load test data
test_df = pd.read_parquet(test_data.path)
target_col = threshold_config.get('target', 'label')
X_test = test_df.drop(columns=[target_col])
y_test = test_df[target_col]
# Load and run model
model_xgb = xgb.Booster()
model_xgb.load_model(model.path)
dtest = xgb.DMatrix(X_test)
predictions = model_xgb.predict(dtest)
# Calculate metrics
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score, f1_score
)
eval_metrics = {
'auc_roc': float(roc_auc_score(y_test, predictions)),
'precision': float(precision_score(y_test, (predictions > 0.5).astype(int))),
'recall': float(recall_score(y_test, (predictions > 0.5).astype(int))),
'f1_score': float(f1_score(y_test, (predictions > 0.5).astype(int))),
}
# Check thresholds
thresholds = threshold_config.get('thresholds', {})
gate_results = {}
for metric_name, threshold in thresholds.items():
if metric_name in eval_metrics:
passed = eval_metrics[metric_name] >= threshold
gate_results[f"{metric_name}_passed"] = passed
if not passed:
raise ValueError(
f"Quality gate failed: {metric_name}={eval_metrics[metric_name]:.4f} "
f"< {threshold}"
)
eval_metrics.update(gate_results)
metrics.log_metrics(eval_metrics)
@dsl.pipeline(
name='ml-training-pipeline',
description='End-to-end ML training pipeline with validation',
pipeline_root='gs://my-bucket/pipeline-root'
)
def ml_pipeline(
data_path: str,
hyperparams: dict,
threshold_config: dict
):
"""Main ML pipeline definition."""
# Step 1: Data validation
validation_op = data_validation(input_data=data_path)
# Step 2: Feature engineering
feature_op = feature_engineering(
input_data=validation_op.outputs['validated_data'],
feature_config=hyperparams
)
# Step 3: Model training
training_op = model_training(
train_data=feature_op.outputs['output_features'],
hyperparams=hyperparams
)
# Step 4: Model evaluation
evaluation_op = model_evaluation(
model=training_op.outputs['model'],
test_data=validation_op.outputs['validated_data'],
threshold_config=threshold_config
)
# Step 5: Conditional deployment
with Condition(evaluation_op.outputs['metrics']['auc_roc'] > 0.95):
deploy_op = kubernetes.load_artifact_from_uri(
artifact_uri="deploy_model",
uri=training_op.outputs['model'].uri
)
# Compile pipeline
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path='ml_pipeline.yaml'
)
βΉοΈ
Kubeflow Pipelines run on Kubernetes and provide native support for GPU scheduling, distributed training, and experiment tracking. Use it for teams already invested in the Kubernetes ecosystem.
Apache Airflow
Airflow DAG for ML Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
from docker.types import Mount
import json
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
def validate_data_quality(**context):
"""Check data quality before training."""
import great_expectations as ge
ti = context['ti']
data_path = ti.xcom_pull(task_ids='data_ingestion')
df = ge.read_csv(data_path)
ge_df = ge.from_pandas(df)
# Run validations
results = ge_df.validate([
{"expect_column_values_to_not_be_null": {"column": "user_id"}},
{"expect_column_values_to_be_unique": {"column": "user_id"}},
{"expect_column_values_to_be_between": {
"column": "amount", "min_value": 0, "max_value": 100000
}},
])
if not results.success:
raise ValueError("Data quality validation failed")
return {"status": "passed", "rows": len(df)}
def check_model_performance(**context):
"""Check if model meets performance thresholds."""
ti = context['ti']
metrics = ti.xcom_pull(task_ids='model_training')
thresholds = {
'auc_roc': 0.90,
'precision': 0.85,
'recall': 0.80
}
for metric, threshold in thresholds.items():
if metrics.get(metric, 0) < threshold:
return 'notify_failure'
return 'deploy_model'
with DAG(
'ml_training_pipeline',
default_args=default_args,
description='Production ML training pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'production'],
max_active_runs=1,
) as dag:
with TaskGroup('data_preparation') as data_prep:
data_ingestion = DockerOperator(
task_id='data_ingestion',
image='registry.example.com/ml/data-ingest:latest',
command='python ingest.py',
environment={
'DATA_SOURCE': '{{ var.value.data_source }}',
'OUTPUT_PATH': '/data/raw/'
},
docker_url='unix://var/run/docker.sock',
mount_tmp_folder=False,
auto_remove=True,
)
data_validation = PythonOperator(
task_id='data_validation',
python_callable=validate_data_quality,
)
data_ingestion >> data_validation
with TaskGroup('feature_engineering') as feature_eng:
feature_computation = DockerOperator(
task_id='feature_computation',
image='registry.example.com/ml/feature-eng:latest',
command='python compute_features.py',
environment={
'INPUT_PATH': '/data/raw/',
'OUTPUT_PATH': '/data/features/'
},
)
feature_validation = PostgresOperator(
task_id='feature_validation',
sql="""
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) as unique_users,
AVG(amount) as avg_amount
FROM features_table
WHERE date = '{{ ds }}'
""",
postgres_conn_id='ml_database'
)
feature_computation >> feature_validation
with TaskGroup('model_training') as training:
train_model = DockerOperator(
task_id='train_model',
image='registry.example.com/ml/training:latest',
command='python train.py',
environment={
'FEATURES_PATH': '/data/features/',
'MODEL_OUTPUT': '/models/',
'EXPERIMENT_NAME': 'airflow_pipeline_{{ ds }}'
},
mount=[
Mount(source='gpu-runtime', target='/usr/local/nvidia', type='volume')
],
)
evaluate_model = DockerOperator(
task_id='evaluate_model',
image='registry.example.com/ml/evaluation:latest',
command='python evaluate.py',
)
train_model >> evaluate_model
deploy_decision = BranchPythonOperator(
task_id='deploy_decision',
python_callable=check_model_performance,
)
deploy_model = DockerOperator(
task_id='deploy_model',
image='registry.example.com/ml/deployment:latest',
command='python deploy.py --env production',
)
notify_failure = PostgresOperator(
task_id='notify_failure',
sql="""
INSERT INTO pipeline_notifications
(pipeline_run, status, message, created_at)
VALUES (
'{{ run_id }}',
'FAILED',
'Model performance below threshold',
NOW()
)
""",
postgres_conn_id='ml_database'
)
# Define task dependencies
data_prep >> feature_eng >> training >> deploy_decision
deploy_decision >> [deploy_model, notify_failure]
Prefect
Prefect ML Flow
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.deployments import Deployment
from datetime import timedelta
from typing import Optional, Dict, List
import pandas as pd
import numpy as np
from pathlib import Path
import json
import hashlib
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
log_prints=True
)
def load_data(data_path: str) -> pd.DataFrame:
"""Load and validate input data."""
logger = get_run_logger()
df = pd.read_parquet(data_path)
logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
# Basic validation
assert len(df) > 0, "DataFrame is empty"
assert 'user_id' in df.columns, "Missing user_id column"
return df
@task(retries=2, log_prints=True)
def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""Compute features from raw data."""
logger = get_run_logger()
features = df.copy()
# Numeric features
for col in config.get('numeric_features', []):
features[f'{col}_log'] = np.log1p(features[col].clip(lower=0))
features[f'{col}_sqrt'] = np.sqrt(features[col].clip(lower=0))
# Aggregation features
if 'groupby' in config:
group_col = config['groupby']['column']
agg_funcs = config['groupby']['aggregations']
for func_name, func in agg_funcs.items():
grouped = features.groupby(group_col).agg({col: func})
features = features.merge(
grouped,
on=group_col,
suffixes=('', f'_{group_col}_{func_name}')
)
# Time features
if 'timestamp_column' in config:
ts_col = config['timestamp_column']
features[ts_col] = pd.to_datetime(features[ts_col])
features['hour'] = features[ts_col].dt.hour
features['dayofweek'] = features[ts_col].dt.dayofweek
features['month'] = features[ts_col].dt.month
logger.info(f"Computed {len(features.columns)} features")
return features
@task(retries=1, log_prints=True)
def train_model(features: pd.DataFrame, config: dict) -> dict:
"""Train model and return metrics."""
logger = get_run_logger()
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score
target = config['target']
X = features.drop(columns=[target])
y = features[target]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
params = config.get('model_params', {})
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dval, 'val')],
early_stopping_rounds=50,
verbose_eval=100
)
# Evaluate
val_pred = model.predict(dval)
metrics = {
'auc_roc': float(roc_auc_score(y_val, val_pred)),
'precision': float(precision_score(y_val, (val_pred > 0.5).astype(int))),
'recall': float(recall_score(y_val, (val_pred > 0.5).astype(int))),
}
logger.info(f"Model metrics: {metrics}")
# Save model
model_path = Path(config['model_output']) / 'model.json'
model.save_model(str(model_path))
return {
'metrics': metrics,
'model_path': str(model_path),
'feature_importance': dict(model.get_score())
}
@task(retries=1, log_prints=True)
def evaluate_model(metrics: dict, config: dict) -> bool:
"""Evaluate model against quality gates."""
logger = get_run_logger()
thresholds = config.get('thresholds', {})
for metric_name, threshold in thresholds.items():
if metric_name in metrics:
if metrics[metric_name] < threshold:
logger.warning(
f"Quality gate failed: {metric_name}={metrics[metric_name]:.4f} "
f"< {threshold}"
)
return False
logger.info("All quality gates passed")
return True
@task(retries=2, log_prints=True)
def deploy_model(model_path: str, config: dict):
"""Deploy model to production."""
logger = get_run_logger()
import shutil
# Copy model to deployment location
deploy_path = Path(config['deploy_path'])
deploy_path.mkdir(parents=True, exist_ok=True)
shutil.copy(model_path, deploy_path / 'model.json')
# Create deployment metadata
metadata = {
'deployed_at': datetime.now().isoformat(),
'model_path': model_path,
'config': config
}
with open(deploy_path / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2, default=str)
logger.info(f"Model deployed to {deploy_path}")
@flow(
name="ml-training-pipeline",
description="End-to-end ML training pipeline",
log_prints=True,
timeout_seconds=3600,
retries=1,
retry_delay_seconds=300
)
def ml_pipeline(
data_path: str,
config_path: str,
deploy_path: str = "/models/production"
):
"""Main ML pipeline flow."""
logger = get_run_logger()
# Load configuration
with open(config_path) as f:
config = json.load(f)
config['deploy_path'] = deploy_path
# Execute pipeline
df = load_data(data_path)
features = compute_features(df, config)
training_results = train_model(features, config)
# Evaluate
passed = evaluate_model(training_results['metrics'], config)
if passed:
deploy_model(training_results['model_path'], config)
logger.info("Pipeline completed successfully")
else:
logger.error("Pipeline failed quality gates")
raise ValueError("Model did not meet quality thresholds")
return training_results
# Deploy as a scheduled flow
if __name__ == "__main__":
deployment = Deployment.build_from_flow(
flow=ml_pipeline,
name="daily-training",
parameters={
"data_path": "s3://bucket/data/latest/",
"config_path": "config/training_config.json"
},
schedule={
"anchor_date": "2024-01-01T02:00:00",
"interval": 86400 # Daily
},
work_queue_name="ml-queues",
work_pool_name="ml-pools"
)
deployment.apply()
βΉοΈ
Prefect 2.0 provides a Python-native orchestration experience with dynamic workflows, task caching, and automatic retry logic. It's ideal for teams wanting flexible, code-first pipeline definitions.
Dagster
Dagster ML Assets
from dagster import (
asset, AssetMaterialization, Output, MetadataValue,
schedule, ScheduleDefinition, job, op, In, Out,
RetryPolicy, Config, DagsterInstance
)
from dagstermill import define_dagstermill_asset
from dagster_aws.s3 import S3Resource
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from typing import Dict, Any
class TrainingConfig(Config):
data_path: str
model_name: str
target_column: str
hyperparameters: Dict[str, Any]
thresholds: Dict[str, float]
@asset(
name="raw_data",
group_name="data_ingestion",
retry_policy=RetryPolicy(max_retries=3, delay=timedelta(minutes=5)),
io_manager_key="s3_io_manager"
)
def raw_data(context, config: TrainingConfig) -> pd.DataFrame:
"""Load raw training data from S3."""
context.log.info(f"Loading data from {config.data_path}")
df = pd.read_parquet(config.data_path)
context.log_output_event(
MetadataValue.int(len(df)),
description="Number of rows loaded"
)
context.emit_event(
asset_key="raw_data",
event_type="ASSET_MATERIALIZED",
metadata={
"rows": MetadataValue.int(len(df)),
"columns": MetadataValue.int(len(df.columns)),
}
)
return df
@asset(
name="validated_data",
group_name="data_validation",
deps=["raw_data"],
io_manager_key="s3_io_manager"
)
def validated_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:
"""Validate data quality."""
df = raw_data.copy()
# Validation checks
validations = [
("No null user_ids", df['user_id'].notna().all()),
("Unique user_ids", df['user_id'].is_unique),
("Amount in valid range", (df['amount'] >= 0).all() and (df['amount'] <= 100000).all()),
("Minimum rows", len(df) >= 1000),
]
for check_name, result in validations:
if not result:
raise ValueError(f"Validation failed: {check_name}")
context.log.info(f"β {check_name}")
return df
@asset(
name="features",
group_name="feature_engineering",
deps=["validated_data"],
io_manager_key="s3_io_manager"
)
def features(context, validated_data: pd.DataFrame, config: TrainingConfig) -> pd.DataFrame:
"""Compute features from validated data."""
df = validated_data.copy()
# Feature transformations
df['log_amount'] = np.log1p(df['amount'])
df['amount_squared'] = df['amount'] ** 2
# Rolling features
df['rolling_mean_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling(7).mean()
)
df['rolling_std_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling(7).std()
)
context.log.info(f"Computed {len(df.columns)} features")
return df
@asset(
name="trained_model",
group_name="model_training",
deps=["features"],
io_manager_key="s3_io_manager",
retry_policy=RetryPolicy(max_retries=1)
)
def trained_model(context, features: pd.DataFrame, config: TrainingConfig) -> dict:
"""Train XGBoost model."""
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
X = features.drop(columns=[config.target_column])
y = features[config.target_column]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
config.hyperparameters,
dtrain,
num_boost_round=1000,
evals=[(dval, 'val')],
early_stopping_rounds=50
)
# Evaluate
val_pred = model.predict(dval)
auc_roc = float(roc_auc_score(y_val, val_pred))
context.log.info(f"Model AUC-ROC: {auc_roc:.4f}")
# Check threshold
if auc_roc < config.thresholds.get('auc_roc', 0.90):
raise ValueError(f"AUC-ROC {auc_roc:.4f} below threshold")
return {
'model': model,
'metrics': {'auc_roc': auc_roc},
'feature_names': list(X.columns)
}
@asset(
name="model_registry_entry",
group_name="deployment",
deps=["trained_model"],
io_manager_key="s3_io_manager"
)
def model_registry_entry(context, trained_model: dict, config: TrainingConfig):
"""Register model in model registry."""
import json
entry = {
'model_name': config.model_name,
'version': datetime.now().strftime('%Y%m%d%H%M%S'),
'metrics': trained_model['metrics'],
'feature_names': trained_model['feature_names'],
'hyperparameters': config.hyperparameters,
'created_at': datetime.now().isoformat(),
}
context.log.info(f"Registered model: {entry['version']}")
return entry
# Define schedules
daily_schedule = ScheduleDefinition(
job=ml_job,
cron_schedule="0 2 * * *", # Daily at 2 AM
default_status=ScheduleDefinition.DefaultStatus.RUNNING
)
# Define jobs
ml_job = ml_job.to_job(
name="daily_ml_pipeline",
config=TrainingConfig(
data_path="s3://bucket/data/latest/",
model_name="fraud_detection",
target_column="is_fraud",
hyperparameters={
'objective': 'binary:logistic',
'eval_metric': 'auc',
'max_depth': 6,
'learning_rate': 0.1,
},
thresholds={
'auc_roc': 0.90
}
)
)
Pipeline Comparison Matrix
| Feature | Kubeflow | Airflow | Prefect | Dagster |
|---|---|---|---|---|
| Orchestration | Kubernetes-native | Worker-based | Hybrid | In-process |
| Scheduling | Cron | Cron/Sensors | Interval | Cron/Sensors |
| UI | Built-in | Built-in | Cloud/Helm | Built-in |
| Caching | Component-level | XCom-based | Task-level | Asset-level |
| GPU Support | Native | Docker/K8s | Docker | Docker/K8s |
| Scalability | Excellent | Good | Good | Good |
| Learning Curve | Steep | Moderate | Easy | Moderate |
| Best For | K8s shops | Complex workflows | Python teams | Data-centric |
β οΈ
Choose your orchestrator based on your team's expertise and infrastructure. Kubeflow excels for Kubernetes-native ML, Airflow for complex dependencies, Prefect for Python-first teams, and Dagster for data-centric workflows.
Summary
ML pipeline orchestration is critical for production ML:
- Kubeflow: Best for Kubernetes-native ML with GPU support
- Airflow: Ideal for complex multi-step workflows with dependencies
- Prefect: Python-native with dynamic workflows and caching
- Dagster: Data-centric with asset-based orchestration
Implement proper idempotency, monitoring, and error handling for production reliability.