Real-Time ML Pipelines: Streaming Feature Engineering
Difficulty: Senior Level | Companies: Uber, Netflix, Stripe, DoorDash, Instacart
1. Batch vs Real-Time ML Pipelines
Architecture Diagram
Batch ML Pipeline (traditional):
Source β Batch ETL β Feature Store β Model Training β Batch Prediction
(Hourly/Daily)
Real-Time ML Pipeline:
Source β Stream Processing β Feature Store β Model Serving β Online Prediction
(Millisecond/Latency < 100ms)
Latency Requirements
| Use Case | Latency | Example |
|---|---|---|
| Recommendation | < 100ms | Product recommendations |
| Fraud Detection | < 50ms | Transaction scoring |
| Ad Bidding | < 10ms | Real-time CTR prediction |
| Search Ranking | < 50ms | Query result ranking |
| Content Moderation | < 5s | User-generated content |
βΉοΈ
Key Insight: Real-time ML isn't just about streaming data β it's about streaming FEATURES. The feature computation is often the bottleneck, not the model inference.
2. Streaming Feature Engineering
Kafka β Spark Structured Streaming β Feature Store
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("RealTimeFeatures") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Read from Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user_events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON events
events = raw_stream \
.select(F.from_json(F.col("value").cast("string"), event_schema).alias("data")) \
.select("data.*")
# Compute windowed features
windowed_features = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
F.col("user_id"),
F.window("event_timestamp", "1 hour", "5 minutes")
).agg(
F.count("*").alias("events_last_hour"),
F.sum(F.when(F.col("event_type") == "PURCHASE", F.col("amount")).otherwise(0)).alias("spend_last_hour"),
F.countDistinct("session_id").alias("sessions_last_hour"),
)
# Write to feature store (online)
query = windowed_features.writeStream \
.format("redis") \
.option("checkpointLocation", "s3://checkpoints/user-features") \
.outputMode("update") \
.start()
# Write to feature store (offline)
offline_query = windowed_features.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/user-features-offline") \
.trigger(processingTime="5 minutes") \
.start("s3://feature-store/user_features_realtime")
Flink Feature Engineering
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)
# Define source table from Kafka
tenv.execute_sql("""
CREATE TABLE user_events (
event_id STRING,
user_id BIGINT,
event_type STRING,
amount DOUBLE,
event_timestamp TIMESTAMP(3),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Windowed aggregation
tenv.execute_sql("""
CREATE TABLE user_features_realtime AS
SELECT
user_id,
TUMBLE_START(event_timestamp, INTERVAL '1' HOUR) AS window_start,
COUNT(*) AS events_last_hour,
SUM(CASE WHEN event_type = 'PURCHASE' THEN amount ELSE 0 END) AS spend_last_hour,
COUNT(DISTINCT event_id) AS unique_events
FROM user_events
GROUP BY user_id, TUMBLE(event_timestamp, INTERVAL '1' HOUR)
""")
3. Online Model Serving
Real-Time Inference Architecture
FastAPI Model Server
from fastapi import FastAPI
import numpy as np
import redis
import joblib
from pydantic import BaseModel
from typing import Dict, List
app = FastAPI()
redis_client = redis.Redis(host="redis", port=6379)
model = joblib.load("model.pkl")
class PredictionRequest(BaseModel):
user_id: int
context: Dict
class PredictionResponse(BaseModel):
user_id: int
prediction: float
features_used: Dict[str, float]
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
# 1. Fetch features from online store (< 5ms)
features = fetch_features(request.user_id)
# 2. Create feature vector
feature_vector = prepare_feature_vector(features, request.context)
# 3. Run model inference (< 10ms)
prediction = model.predict_proba(feature_vector.reshape(1, -1))[0][1]
return PredictionResponse(
user_id=request.user_id,
prediction=prediction,
features_used=features
)
def fetch_features(user_id: int) -> Dict[str, float]:
"""Fetch from online feature store"""
pipe = redis_client.pipeline()
feature_names = ["avg_purchase_30d", "purchase_count_7d", "days_since_last"]
for name in feature_names:
pipe.hget(f"user_features:{user_id}", name)
values = pipe.execute()
return {name: float(v) if v else 0.0 for name, v in zip(feature_names, values)}
4. Feature Freshness Monitoring
from datetime import datetime, timedelta
class FeatureFreshnessMonitor:
def __init__(self, feature_store):
self.feature_store = feature_store
self.alerts = []
def check_freshness(self, feature_group: str, max_age_minutes: int):
"""Check if features are fresh enough"""
latest_timestamp = self.feature_store.get_latest_timestamp(feature_group)
age = datetime.now() - latest_timestamp
if age > timedelta(minutes=max_age_minutes):
self.alerts.append({
"feature_group": feature_group,
"age_minutes": age.total_seconds() / 60,
"max_age_minutes": max_age_minutes,
"severity": "CRITICAL" if age > timedelta(hours=1) else "WARNING"
})
return False
return True
def check_feature_coverage(self, feature_group: str, entity_ids: List[int]):
"""Check what percentage of entities have features"""
total = len(entity_ids)
present = sum(1 for eid in entity_ids
if self.feature_store.has_features(eid, feature_group))
coverage = present / total
if coverage < 0.95:
self.alerts.append({
"feature_group": feature_group,
"coverage": coverage,
"missing_count": total - present,
"severity": "HIGH"
})
return coverage
5. Lambda vs Kappa Architecture for ML
Architecture Diagram
Lambda Architecture (pragmatic):
ββββββββββββββββββββββββββββββββββββββββββββββββ
β Batch Layer β
β (Historical features, model training) β
ββββββββββββββββββββββββββββββββββββββββββββββββ€
β Speed Layer β
β (Real-time features, online prediction) β
ββββββββββββββββββββββββββββββββββββββββββββββββ€
β Serving Layer β
β (Merged batch + real-time features) β
ββββββββββββββββββββββββββββββββββββββββββββββββ
Kappa Architecture (simpler):
ββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Processing β
β (All computation through event stream) β
ββββββββββββββββββββββββββββββββββββββββββββββββ€
β Serving Layer β
β (Features + Predictions) β
ββββββββββββββββββββββββββββββββββββββββββββββββ
βΉοΈ
Best Practice: Use Kappa for new ML systems (simpler ops). Use Lambda when you need batch replay for model retraining. The feature store is the bridge between both.
Follow-Up Questions
- How do you handle feature staleness in real-time predictions?
- Design a real-time fraud detection pipeline with < 50ms latency.
- How do you A/B test between batch and real-time ML models?
- How would you handle model retraining with streaming features?
- Design a real-time recommendation system for 100M+ users.