Spark ML Pipelines
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
ML Pipeline Fundamentals
Spark ML uses a Pipeline API where data flows through stages of Transformers and Estimators. This ensures consistent preprocessing for training and inference.
Building a Basic Pipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
spark = SparkSession.builder \
.appName("MLPipelines") \
.getOrCreate()
# Load data
df = spark.read.parquet("hdfs://data/customer-churn")
# Define preprocessing stages
indexer_gender = StringIndexer(
inputCol="gender",
outputCol="gender_index",
handleInvalid="keep"
)
indexer_region = StringIndexer(
inputCol="region",
outputCol="region_index",
handleInvalid="keep"
)
# Assemble features
assembler = VectorAssembler(
inputCols=["age", "income", "tenure_months", "gender_index", "region_index"],
outputCol="raw_features"
)
# Scale features
scaler = StandardScaler(
inputCol="raw_features",
outputCol="features",
withStd=True,
withMean=True
)
# Define classifier
classifier = RandomForestClassifier(
featuresCol="features",
labelCol="churned",
numTrees=100,
maxDepth=10,
seed=42
)
# Create pipeline
pipeline = Pipeline(stages=[
indexer_gender,
indexer_region,
assembler,
scaler,
classifier
])
# Fit pipeline
model = pipeline.fit(train_df)
# Transform data
predictions = model.transform(test_df)
βΉοΈ
Interview Insight: Pipelines ensure the same preprocessing is applied during training and inference. This prevents training-serving skew, a common production ML issue.
Feature Engineering at Scale
from pyspark.ml.feature import (
HashingTF, IDF, Tokenizer,
OneHotEncoder, Bucketizer,
VectorAssembler, SQLTransformer
)
# Text feature extraction
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
text_pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])
# Categorical encoding
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_onehot")
# Bucketization
bucketizer = Bucketizer(
splits=[0, 18, 35, 50, 65, 100],
inputCol="age",
outputCol="age_bucket"
)
# Custom transformations with SQL
sql_transformer = SQLTransformer(
statement="SELECT *, (income / tenure_months) as income_per_month FROM __THIS__"
)
# Combine all features
assembler = VectorAssembler(
inputCols=["features", "category_onehot", "age_bucket", "income_per_month"],
outputCol="final_features"
)
# Complete feature engineering pipeline
feature_pipeline = Pipeline(stages=[
tokenizer, hashing_tf, idf,
indexer, encoder, bucketizer,
sql_transformer, assembler
])
Cross-Validation and Hyperparameter Tuning
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Define parameter grid
param_grid = ParamGridBuilder() \
.addGrid(classifier.numTrees, [50, 100, 200]) \
.addGrid(classifier.maxDepth, [5, 10, 15]) \
.addGrid(classifier.featureSubsetStrategy, ["auto", "sqrt", "log2"]) \
.build()
# Define evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="churned",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
# Create cross-validator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4,
seed=42
)
# Fit with cross-validation
cv_model = cv.fit(train_df)
# Get best model
best_model = cv_model.bestModel
print(f"Best AUC: {cv_model.avgMetrics}")
# Access best parameters
rf_model = best_model.stages[-1]
print(f"Num Trees: {rf_model.getNumTrees}")
print(f"Max Depth: {rf_model.getOrDefault("maxDepth")}")
β οΈ
Warning: Cross-validation with large datasets can be expensive. Consider using TrainValidationSplit for faster tuning, or sample data for initial exploration.
Model Evaluation
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
RegressionEvaluator
)
# Binary classification metrics
binary_evaluator = BinaryClassificationEvaluator(
labelCol="churned",
rawPredictionCol="rawPrediction"
)
auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})
pr = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderPR"})
print(f"AUC: {auc:.4f}")
print(f"PR: {pr:.4f}")
# Multiclass metrics
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction"
)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print(f"Accuracy: {accuracy:.4f}")
print(f"F1: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
# Confusion matrix
predictions.groupBy("label", "prediction").count().show()
Model Persistence and Serving
# Save pipeline model
model.write().overwrite().save("hdfs://models/churn-prediction")
# Load pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("hdfs://models/churn-prediction")
# Use for batch inference
new_data = spark.read.parquet("hdfs://data/new-customers")
predictions = loaded_model.transform(new_data)
# Save predictions
predictions.write.mode("overwrite").parquet("hdfs://predictions/churn")
# Real-time serving with Spark Structured Streaming
stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "customer-events") \
.load()
parsed = stream.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
parsed = parsed.select("data.*")
# Apply model to stream
predictions_stream = loaded_model.transform(parsed)
query = predictions_stream \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "predictions") \
.option("checkpointLocation", "hdfs://checkpoints/predictions") \
.start()
βΉοΈ
Pro Tip: Always save the entire pipeline, not just the model. This ensures preprocessing steps are applied consistently during inference.
Distributed Training
# Spark ML provides distributed training for tree-based models
# For deep learning, use Horovod or TensorFlow on Spark
# Example: Distributed random forest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=10,
subsamplingRate=0.8, # Bootstrap sampling
featureSubsetStrategy="auto",
seed=42
)
# Training is distributed across executors
model = rf.fit(train_df)
# Check feature importance
importance = model.featureImportances.toArray()
feature_names = assembler.getInputCols()
for name, imp in sorted(zip(feature_names, importance), key=lambda x: -x[1]):
print(f"{name}: {imp:.4f}")
βΉοΈ
Key Takeaway: Spark ML Pipelines provide a unified API for building, tuning, and deploying ML models at scale. Use pipelines to ensure consistency between training and inference, and leverage distributed training for large datasets.
Follow-Up Questions
- How does Spark handle imbalanced datasets in ML pipelines?
- Explain the difference between
transformandfitin the Pipeline API. - How would you implement custom Transformers for feature engineering?
- Describe strategies for model versioning and A/B testing in production.
- How does Spark ML handle missing values during preprocessing?