Scikit-Learn Pipelines
Pipelines chain preprocessing steps and model training into a single object. They prevent data leakage, ensure reproducibility, and simplify deployment. Every serious ML project should use them.
Pipeline Architecture
Why Pipelines Matter
Without pipelines, it's easy to fit your scaler on the entire dataset (including test data) β introducing subtle but devastating data leakage. Pipelines guarantee that every transformation is fit only on training data and applied consistently to validation and test sets.
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import (
StandardScaler, OneHotEncoder, OrdinalEncoder,
MinMaxScaler, RobustScaler, PowerTransformer
)
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, StratifiedKFold, GridSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
import warnings
warnings.filterwarnings('ignore')
Generate a Complex Dataset
np.random.seed(42)
n = 2000
data = pd.DataFrame({
'age': np.random.normal(40, 12, n).clip(18, 80),
'income': np.random.lognormal(10.5, 0.7, n),
'credit_score': np.random.normal(650, 80, n).clip(300, 850),
'debt_ratio': np.random.beta(2, 5, n),
'months_employed': np.random.exponential(36, n).clip(0, 360),
'num_accounts': np.random.poisson(3, n) + 1,
'education': np.random.choice(['high_school', 'bachelors', 'masters', 'phd'], n, p=[0.3, 0.4, 0.2, 0.1]),
'employment_type': np.random.choice(['full_time', 'part_time', 'self_employed', 'unemployed'], n, p=[0.5, 0.2, 0.2, 0.1]),
'homeownership': np.random.choice(['own', 'mortgage', 'rent'], n, p=[0.3, 0.4, 0.3]),
'region': np.random.choice(['northeast', 'southeast', 'midwest', 'west'], n),
'target': np.random.binomial(1, 0.3, n)
})
# Introduce missing values
for col in ['credit_score', 'income', 'months_employed']:
mask = np.random.random(n) < 0.05
data.loc[mask, col] = np.nan
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()[data.isnull().sum() > 0]}")
print(f"Target distribution:\n{data['target'].value_counts(normalize=True)}")
Basic Pipeline Construction
Simple Pipeline
# A basic pipeline chains preprocessing and modeling
simple_pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
X = data.drop('target', axis=1)
y = data['target']
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
X_numeric = X[numeric_cols].fillna(X[numeric_cols].median())
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(simple_pipeline, X_numeric, y, cv=cv, scoring='roc_auc')
print(f"Basic pipeline AUC: {scores.mean():.4f} Β± {scores.std():.4f}")
Pipeline with Steps Access
# Access individual steps
simple_pipeline.fit(X_numeric, y)
print("Pipeline steps:", list(simple_pipeline.named_steps.keys()))
print("Scaler mean:", simple_pipeline.named_steps['scaler'].mean_[:3])
print("Coefficients:", simple_pipeline.named_steps['classifier'].coef_[0][:3])
ColumnTransformer for Mixed Types
Handling numeric and categorical features differently is essential for real-world data.
# Define column groups
numeric_features = ['age', 'income', 'credit_score', 'debt_ratio', 'months_employed', 'num_accounts']
ordinal_features = ['education']
nominal_features = ['employment_type', 'homeownership', 'region']
# Numeric preprocessing
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Ordinal preprocessing
education_order = ['high_school', 'bachelors', 'masters', 'phd']
ordinal_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OrdinalEncoder(categories=[education_order], handle_unknown='use_encoded_value', unknown_value=-1))
])
# Nominal preprocessing
nominal_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
])
# Combine into ColumnTransformer
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features),
('ord', ordinal_transformer, ordinal_features),
('nom', nominal_transformer, nominal_features)
], remainder='drop')
# Full pipeline
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
scores = cross_val_score(full_pipeline, X, y, cv=cv, scoring='roc_auc')
print(f"ColumnTransformer pipeline AUC: {scores.mean():.4f} Β± {scores.std():.4f}")
Custom Transformers
Creating reusable, sklearn-compatible transformers for domain-specific logic.
class OutlierClipper(BaseEstimator, TransformerMixin):
"""Clip outliers using IQR method."""
def __init__(self, factor=1.5):
self.factor = factor
def fit(self, X, y=None):
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
self.lower_ = Q1 - self.factor * IQR
self.upper_ = Q3 + self.factor * IQR
return self
def transform(self, X):
return np.clip(X, self.lower_, self.upper_)
class DateFeatureExtractor(BaseEstimator, TransformerMixin):
"""Extract features from datetime columns."""
def __init__(self, date_col):
self.date_col = date_col
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
dates = pd.to_datetime(X[self.date_col])
X[f'{self.date_col}_year'] = dates.dt.year
X[f'{self.date_col}_month'] = dates.dt.month
X[f'{self.date_col}_dayofweek'] = dates.dt.dayofweek
X[f'{self.date_col}_is_weekend'] = dates.dt.dayofweek.isin([5, 6]).astype(int)
X = X.drop(columns=[self.date_col])
return X
class FeatureInteraction(BaseEstimator, TransformerMixin):
"""Create interaction features between specified column pairs."""
def __init__(self, interaction_pairs):
self.interaction_pairs = interaction_pairs
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
for col1, col2 in self.interaction_pairs:
X[f'{col1}_x_{col2}'] = X[col1] * X[col2]
X[f'{col1}_div_{col2}'] = X[col1] / (X[col2] + 1e-8)
return X
class LogTransformer(BaseEstimator, TransformerMixin):
"""Apply log1p transformation to specified columns."""
def __init__(self, cols):
self.cols = cols
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
for col in self.cols:
if col in X.columns:
X[col] = np.log1p(X[col].clip(lower=0))
return X
Advanced Pipeline with Custom Transformers
advanced_pipeline = Pipeline([
('clip_outliers', OutlierClipper(factor=2.0)),
('log_transform', LogTransformer(cols=['income'])),
('interactions', FeatureInteraction(interaction_pairs=[('age', 'income'), ('credit_score', 'debt_ratio')])),
('preprocessor', preprocessor),
('feature_selection', SelectKBest(mutual_info_classif, k=15)),
('classifier', GradientBoostingClassifier(n_estimators=100, random_state=42))
])
scores = cross_val_score(advanced_pipeline, X, y, cv=cv, scoring='roc_auc')
print(f"Advanced pipeline AUC: {scores.mean():.4f} Β± {scores.std():.4f}")
Hyperparameter Tuning with Pipelines
Grid search can tune parameters at any step in the pipeline.
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'preprocessor__num__scaler': [StandardScaler(), RobustScaler()],
'classifier__n_estimators': [100, 200],
'classifier__max_depth': [5, 10, None],
'classifier__min_samples_split': [2, 5]
}
grid_search = GridSearchCV(
full_pipeline, param_grid, cv=3, scoring='roc_auc',
n_jobs=-1, verbose=0, refit=True
)
grid_search.fit(X, y)
print(f"Best AUC: {grid_search.best_score_:.4f}")
print(f"Best params: {grid_search.best_params_}")
Multi-Output Pipelines
Handling multiple target variables.
from sklearn.multioutput import MultiOutputClassifier
# Create second target
data['target2'] = np.random.binomial(1, 0.4, n)
y_multi = data[['target', 'target2']]
multi_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', MultiOutputClassifier(
RandomForestClassifier(n_estimators=50, random_state=42)
))
])
multi_pipeline.fit(X.iloc[:1500], y_multi.iloc[:1500])
predictions = multi_pipeline.predict(X.iloc[1500:])
print(f"Multi-output predictions shape: {predictions.shape}")
Memory-Efficient Pipelines
For large datasets, caching prevents redundant computation.
from joblib import Memory
import tempfile
# Cache directory
cache_dir = tempfile.mkdtemp()
memory = Memory(location=cache_dir, verbose=0)
cached_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
], memory=memory)
# First run computes and caches
scores1 = cross_val_score(cached_pipeline, X, y, cv=3, scoring='roc_auc')
# Second run uses cache (much faster)
scores2 = cross_val_score(cached_pipeline, X, y, cv=3, scoring='roc_auc')
print(f"Scores: {scores1.mean():.4f} / {scores2.mean():.4f}")
Pipeline Serialization
import joblib
# Fit pipeline
full_pipeline.fit(X, y)
# Save
joblib.dump(full_pipeline, 'ml_pipeline.joblib')
# Load
loaded_pipeline = joblib.load('ml_pipeline.joblib')
predictions = loaded_pipeline.predict(X.head())
print(f"Loaded pipeline predictions: {predictions}")
Putting It All Together
def build_production_pipeline(X, task='classification'):
"""Build a complete, production-ready pipeline."""
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('clipper', OutlierClipper(factor=2.0)),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
])
transformers = [('num', numeric_transformer, numeric_features)]
if categorical_features:
transformers.append(('cat', categorical_transformer, categorical_features))
preprocessor = ColumnTransformer(transformers, remainder='drop')
if task == 'classification':
model = RandomForestClassifier(n_estimators=200, random_state=42)
else:
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=200, random_state=42)
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', model)
])
return pipeline
pipeline = build_production_pipeline(X)
print("Production pipeline built successfully")
Best Practices
- Always use pipelines β prevents data leakage and ensures reproducibility
- Custom transformers should be sklearn-compatible β implement fit/transform
- Use ColumnTransformer for mixed-type data β never manually encode then concatenate
- Cache expensive steps β use joblib.Memory for large datasets
- Version your pipelines β save them with joblib alongside model metadata
- Test on small data first β pipeline errors are easier to debug on subsets
Summary
Pipelines are the backbone of production ML. They enforce correct methodology, simplify deployment, and make hyperparameter tuning systematic. Master ColumnTransformer, custom transformers, and pipeline composition β and your ML workflows will be robust, reproducible, and deployable.