🤖 ML Pipeline Integration in PySpark
Pipeline Architecture
Architecture Diagram
Detailed Explanation
Core Design Pattern: Transformers and Estimators
PySpark MLlib distinguishes between two fundamental abstractions:
| Abstraction | Input | Output | Example |
|---|---|---|---|
| Transformer | DataFrame | DataFrame | StandardScaler.transform() |
| Estimator | DataFrame | Model (Transformer) | LogisticRegression.fit() |
Key Takeaway: The Pipeline class chains stages together where output of one stage feeds into the input of the next.
Feature Engineering Transformers
| Transformer | Purpose | Output |
|---|---|---|
| StringIndexer | Convert categorical strings to indices | Numerical indices |
| OneHotEncoder | Transform indices to binary vectors | Binary vectors |
| VectorAssembler | Combine columns into single vector | Feature vector |
| StandardScaler | Normalize features | Zero mean, unit variance |
Estimator Pattern: Fit-Transform Paradigm
- Estimator accepts training DataFrame
- Produces Model (which is itself a Transformer)
- Model applies to new data via
transform()method - Persists learned parameters (coefficients, intercepts)
Pipeline Composition
When fit() is called on a Pipeline:
- Executes each stage sequentially
- Fits all Estimators to produce Models
- Applies all Transformers to the data
- Entire Pipeline is serializable for persistence
Best Practice: Persist fitted Pipelines to disk for inference without retraining.
Hyperparameter Tuning
| Component | Purpose |
|---|---|
| CrossValidator | K-fold cross-validation |
| TrainValidationSplit | Single train/test split |
| ParamGridBuilder | Defines hyperparameter search space |
| Evaluator | Measures model quality |
Evaluation Metrics by Task
| Task | Evaluator | Metrics |
|---|---|---|
| Binary Classification | BinaryClassificationEvaluator | AUC-ROC, AUC-PR |
| Multiclass Classification | MulticlassClassificationEvaluator | Accuracy, Precision, Recall, F1, Log-loss |
| Regression | RegressionEvaluator | MSE, RMSE, MAE, R-squared |
Important: Match metric to business objective—imbalanced datasets require precision/recall over accuracy.
Mathematical Foundations
Definition: ML Pipeline
An ML pipeline is a directed acyclic graph of stages where vertices are transformation stages and edges define data dependencies. Each stage implements a function .
Cross-Validation Error
For -fold cross-validation with dataset split into equal partitions :
where is the loss function and is the model trained on all folds except .
Bias-Variance Decomposition
The expected prediction error of model decomposes as:
Regularization Strength
For L2-regularized regression with parameter :
The effective degrees of freedom reduce as increases:
Pipeline Complexity
Total pipeline computation cost with stages, each stage processing records:
where is the per-record cost of stage .
Key Insight
Spark ML pipelines distribute each stage independently, but data shuffle between stages creates synchronization barriers. Optimal pipeline design minimizes shuffles by combining compatible transformations within stages.
Summary
ML pipelines formalize ML workflows as DAGs of transformations. Cross-validation estimates generalization error, bias-variance decomposition guides model selection, and regularization trades off fit complexity against generalization. Pipeline cost scales linearly with stages and data volume.
Key Concepts Table
| Concept | Description | Usage Pattern |
|---|---|---|
| Transformer | Applies transformation to DataFrame | transformer.transform(df) |
| Estimator | Learns from DataFrame, produces Transformer | estimator.fit(df) → Model |
| Pipeline | Chains stages (Transformers + Estimators) | Pipeline(stages=[...]).fit(df) |
| Parameter | Hyperparameter for algorithm configuration | LogisticRegression(regParam=0.1) |
| Evaluator | Measures model quality on test data | BinaryClassificationEvaluator() |
| CrossValidator | K-fold cross-validation for tuning | CrossValidator(estimator, paramGrid) |
| Feature Vector | Combined feature column (required input) | VectorAssembler().transform(df) |
| Model Persistence | Save/load fitted pipelines | pipeline.save(path) / Pipeline.load(path) |
| StringIndexer | Converts strings to category indices | StringIndexer(inputCol="cat") |
| OneHotEncoder | Converts indices to binary vectors | OneHotEncoder(inputCol="idx") |
Code Examples
Complete Classification Pipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
StringIndexer, OneHotEncoder, VectorAssembler,
StandardScaler, Imputer
)
from pyspark.ml.classification import (
LogisticRegression, RandomForestClassifier,
GBTClassifier, LinearSVC
)
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator
)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MLPipeline") \
.config("spark.jars.packages", "org.apache.spark:spark-mllib_2.12:3.5.0") \
.getOrCreate()
# Load and prepare data
df = spark.read.parquet("/data/customer_churn")
# Define feature columns
categorical_cols = ["gender", "contract_type", "payment_method"]
numerical_cols = ["tenure_months", "monthly_charges", "total_charges"]
label_col = "churn"
# Handle missing values
imputer = Imputer(
inputCols=numerical_cols,
outputCols=[f"{col}_imputed" for col in numerical_cols],
strategy="median"
)
# Index categorical columns
indexers = [
StringIndexer(
inputCol=col,
outputCol=f"{col}_index",
handleInvalid="keep"
)
for col in categorical_cols
]
# One-hot encode indexed columns
encoders = [
OneHotEncoder(
inputCol=f"{col}_index",
outputCol=f"{col}_encoded",
dropLast=True
)
for col in categorical_cols
]
# Assemble all features into a single vector
assembler = VectorAssembler(
inputCols=[f"{col}_imputed" for col in numerical_cols] +
[f"{col}_encoded" for col in categorical_cols],
outputCol="features_raw"
)
# Scale features
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withStd=True,
withMean=True
)
# Define classifiers
lr = LogisticRegression(
featuresCol="features",
labelCol=label_col,
maxIter=100,
regParam=0.01,
elasticNetParam=0.5
)
rf = RandomForestClassifier(
featuresCol="features",
labelCol=label_col,
numTrees=100,
maxDepth=10,
featureSubsetStrategy="sqrt"
)
gbt = GBTClassifier(
featuresCol="features",
labelCol=label_col,
maxIter=100,
maxDepth=5,
stepSize=0.1
)
# Build pipeline
pipeline = Pipeline(stages=[
imputer,
*indexers,
*encoders,
assembler,
scaler,
lr
])
# Hyperparameter tuning
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [50, 100, 200]) \
.build()
evaluator = BinaryClassificationEvaluator(
labelCol=label_col,
metricName="areaUnderROC"
)
crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
parallelism=4,
seed=42
)
# Train with cross-validation
cv_model = crossval.fit(df)
# Evaluate on test set
predictions = cv_model.transform(test_df)
auc = evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")
# Detailed metrics
accuracy_evaluator = MulticlassClassificationEvaluator(
labelCol=label_col, metricName="accuracy"
)
f1_evaluator = MulticlassClassificationEvaluator(
labelCol=label_col, metricName="f1"
)
print(f"Accuracy: {accuracy_evaluator.evaluate(predictions):.4f}")
print(f"F1 Score: {f1_evaluator.evaluate(predictions):.4f}")
Feature Engineering Pipeline
from pyspark.ml.feature import (
Bucketizer, QuantileDiscretizer, SQLTransformer,
Interaction, ChiSqSelector, PCA
)
# Bucket continuous features
bucketizer = Bucketizer(
splits=[0, 18, 30, 45, 60, 100],
inputCol="age",
outputCol="age_bucket"
)
# SQL-based feature engineering
sql_transformer = SQLTransformer(
statement="""
SELECT
*,
monthly_charges / NULLIF(tenure_months, 0) AS avg_monthly_spend,
CASE
WHEN total_charges > 1000 THEN 'high_value'
WHEN total_charges > 500 THEN 'medium_value'
ELSE 'low_value'
END AS value_segment
FROM __THIS__
"""
)
# Create interaction features
interaction = Interaction(
inputCols=["gender_encoded", "contract_type_encoded"],
outputCol="gender_contract_interaction"
)
# Feature selection using Chi-Squared
selector = ChiSqSelector(
featuresCol="features",
labelCol=label_col,
numTopFeatures=20,
outputCol="selected_features"
)
# Dimensionality reduction with PCA
pca = PCA(
k=10,
inputCol="features",
outputCol="pca_features"
)
# Complete feature pipeline
feature_pipeline = Pipeline(stages=[
sql_transformer,
imputer,
*indexers,
*encoders,
interaction,
assembler,
scaler,
selector,
pca
])
feature_model = feature_pipeline.fit(df)
feature_df = feature_model.transform(df)
feature_df.select("features", "pca_features", "selected_features").show(5, truncate=False)
Performance Metrics
| Metric | LogisticRegression | RandomForest | GBTClassifier | LinearSVC |
|---|---|---|---|---|
| Training Time (1M rows, 50 features) | 2-5 seconds | 10-30 seconds | 30-90 seconds | 3-8 seconds |
| Inference Time (100K rows) | 50-100 ms | 100-300 ms | 150-400 ms | 40-90 ms |
| AUC-ROC (typical) | 0.85-0.92 | 0.88-0.95 | 0.90-0.97 | 0.84-0.91 |
| Memory Usage | Low | High | Very High | Low |
| Interpretability | High | Medium | Low | High |
| Overfitting Risk | Low | Low-Medium | High | Low |
| Feature Importance | Coefficient magnitude | Built-in | Built-in | Coefficient magnitude |
| Parallelism | Data parallel | Data + Tree parallel | Data + Tree parallel | Data parallel |
| Hyperparameter Sensitivity | Medium | Low | High | Medium |
| Handles Imbalanced Data | With weightCol | With weightCol | With weightCol | With weightCol |
Best Practices
- Always use Pipeline to ensure reproducibility and prevent data leakage from test set into training transformations
- Fit all preprocessors on training data only and apply the fitted preprocessors to test data using the same fitted model
- Use
handleInvalid="keep"in StringIndexer to gracefully handle unseen categories at inference time - Set a random seed in all stochastic algorithms and cross-validation to ensure reproducible results
- Use CrossValidator with parallelism > 1 to leverage Spark's distributed computing for faster tuning
- Scale features before training models sensitive to feature magnitudes (logistic regression, SVM, neural networks)
- Monitor overfitting by comparing training and validation metrics—if gap exceeds 5%, reduce model complexity
- Use
featureSubsetStrategy="sqrt"in RandomForest to decorrelate trees and reduce overfitting - Persist intermediate DataFrames when building complex pipelines to avoid recomputation
- Log all experiments with MLflow or similar tracking system, including data versions, hyperparameters, and metrics
- Use
VectorAssemblerbeforeStandardScalerto ensure proper feature normalization - Implement stratified sampling for imbalanced datasets using
df.sampleBy()to ensure balanced class representation in training
Mathematical Foundation Summary
MLlib Pipelines use the Transformer-Estimator-Parameter abstraction where Pipeline = [Transformer₁, Estimator₁, Transformer₂, ...] enables composable ML workflows. Cross-validation error follows CV_error = (1/k) × Σᵢ L(f̂ᵢ, Dᵢ) with k-fold partitioning. Feature hashing trades collision probability for O(1) feature space via h(x) = (a·x + b) mod p where p is prime. Random Forest reduces variance through bagging: Var(forest) = ρσ² + (1-ρ)σ²/n where ρ is tree correlation and n is ensemble size.
See also: ML Feature Engineering (32), Model Deployment (33), Production Hardening (40)
See Also
- ML Feature Engineering — Feature engineering with Spark ML
- Model Deployment — Model deployment patterns
- Structured Streaming — Real-time ML scoring
- Data Quality — Data quality validation