Feature Stores: Design, Implementation & Best Practices
Difficulty: Senior Level | Companies: Uber, Airbnb, Netflix, Stripe, Spotify
1. What is a Feature Store?
A feature store is a centralized repository for storing, serving, and managing ML features:
Feature Store Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Feature Store β
βββββββββββββββββββββββββ¬ββββββββββββββββββββββββββ€
β Offline Store β Online Store β
β (Batch features) β (Low-latency serving) β
β - S3/GCS/Redshift β - Redis/DynamoDB β
β - Feature Registry β - Feature Registry β
β - Point-in-time β - Feature Registry β
β correctness β β
βββββββββββββββββββββββββ΄ββββββββββββββββββββββββββ€
β Feature Pipeline β
β - Feature definitions (Python) β
β - Feature transformations β
β - Feature validation & monitoring β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Why Feature Stores Matter
| Problem | Solution |
|---|---|
| Feature computation duplicated across teams | Centralized feature definitions |
| Training/serving skew | Same features for training and inference |
| Feature reuse is hard | Discoverable feature catalog |
| Feature freshness varies | Automated refresh pipelines |
| No versioning | Feature versioning and lineage |
βΉοΈ
Key Insight: Feature stores solve the training-serving skew problem β the #1 cause of ML model performance degradation in production.
2. Feature Store Design
Core Schema
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
from enum import Enum
class FeatureType(Enum):
NUMERIC = "numeric"
CATEGORICAL = "categorical"
EMBEDDING = "embedding"
TIMESTAMP = "timestamp"
@dataclass
class FeatureDefinition:
name: str
entity: str # e.g., "user_id"
feature_type: FeatureType
description: str
owner: str
freshness_sla_minutes: int = 60
tags: List[str] = field(default_factory=list)
@dataclass
class FeatureValue:
entity_id: str
feature_name: str
value: Any
timestamp: datetime
version: int = 1
@dataclass
class FeatureGroup:
name: str
entity: str
features: List[FeatureDefinition]
source_table: str
refresh_schedule: str # cron expression
def to_dict(self):
return {
"name": self.name,
"entity": self.entity,
"features": [f.__dict__ for f in self.features],
"source": self.source_table,
"schedule": self.refresh_schedule
}
3. Offline vs Online Store
Offline Store (Batch Features)
import pandas as pd
from datetime import datetime, timedelta
class OfflineFeatureStore:
def __init__(self, storage_path: str):
self.storage_path = storage_path
def compute_features(self, entity_df: pd.DataFrame,
source_df: pd.DataFrame) -> pd.DataFrame:
"""Compute features for training"""
features = entity_df.merge(
source_df,
on="user_id",
how="left"
)
# Feature engineering
features["avg_purchase_value"] = features.groupby("user_id")["amount"].transform("mean")
features["purchase_count_30d"] = features.groupby("user_id")["amount"].transform(
lambda x: x.rolling("30D").count()
)
features["days_since_last_purchase"] = (
datetime.now() - features.groupby("user_id")["timestamp"].transform("max")
).dt.days
return features
def point_in_time_join(self, entities: pd.DataFrame,
features: pd.DataFrame) -> pd.DataFrame:
"""Join features at the correct point in time (no data leakage)"""
result = entities.copy()
for _, entity_row in entities.iterrows():
entity_ts = entity_row["event_timestamp"]
entity_id = entity_row["user_id"]
# Only use features computed BEFORE the event
valid_features = features[
(features["user_id"] == entity_id) &
(features["computed_at"] <= entity_ts)
]
# Get the latest feature value before the event
if not valid_features.empty:
latest = valid_features.sort_values("computed_at").iloc[-1]
for col in self.feature_columns:
result.loc[_, col] = latest[col]
return result
Online Store (Low-Latency Serving)
import redis
import json
from typing import Dict, List
class OnlineFeatureStore:
def __init__(self, redis_host: str, redis_port: int):
self.client = redis.Redis(host=redis_host, port=redis_port)
def get_features(self, entity_id: str, feature_names: List[str]) -> Dict[str, float]:
"""Get features for online inference (sub-millisecond)"""
pipe = self.client.pipeline()
for name in feature_names:
pipe.hget(f"features:{entity_id}", name)
values = pipe.execute()
return {
name: float(val) if val else 0.0
for name, val in zip(feature_names, values)
}
def set_features(self, entity_id: str, features: Dict[str, float]):
"""Write computed features to online store"""
self.client.hset(
f"features:{entity_id}",
mapping={k: str(v) for k, v in features.items()}
)
self.client.expire(f"features:{entity_id}", 86400) # 24h TTL
def get_feature_vector(self, entity_ids: List[str],
feature_names: List[str]) -> List[Dict[str, float]]:
"""Batch get for model serving"""
pipe = self.client.pipeline()
for eid in entity_ids:
for name in feature_names:
pipe.hget(f"features:{eid}", name)
results = pipe.execute()
vectors = []
idx = 0
for eid in entity_ids:
vector = {}
for name in feature_names:
val = results[idx]
vector[name] = float(val) if val else 0.0
idx += 1
vectors.append(vector)
return vectors
4. Feature Engineering Patterns
Windowed Aggregations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
def compute_user_features(events_df):
"""Compute user-level features with multiple time windows"""
# Define time windows
windows = {
"1d": 1,
"7d": 7,
"30d": 30,
"90d": 90,
}
features = events_df
for window_name, days in windows.items():
window_spec = Window \
.partitionBy("user_id") \
.orderBy(F.col("event_timestamp").cast("long")) \
.rangeBetween(-days * 86400, 0)
features = features \
.withColumn(f"purchase_count_{window_name}",
F.sum(F.when(F.col("event_type") == "PURCHASE", 1).otherwise(0))
.over(window_spec)) \
.withColumn(f"total_spend_{window_name}",
F.sum(F.when(F.col("event_type") == "PURCHASE", F.col("amount")).otherwise(0))
.over(window_spec)) \
.withColumn(f"avg_order_value_{window_name}",
F.avg(F.when(F.col("event_type") == "PURCHASE", F.col("amount")))
.over(window_spec)) \
.withColumn(f"days_since_last_{window_name}",
F.datediff(F.current_date(), F.max("event_timestamp").over(window_spec)))
return features
# Write to feature store
user_features = compute_user_features(spark.read.parquet("s3://events/"))
user_features.write.format("delta").save("s3://feature-store/user_features")
Cross-Entity Features
def compute_cross_entity_features(users_df, purchases_df, sessions_df):
"""Features that combine multiple entities"""
# User Γ Purchase features
user_purchase_features = purchases_df \
.groupBy("user_id") \
.agg(
F.count("*").alias("total_purchases"),
F.sum("amount").alias("lifetime_value"),
F.avg("amount").alias("avg_order_value"),
F.stddev("amount").alias("order_value_std"),
F.countDistinct("product_id").alias("unique_products_bought"),
)
# User Γ Session features
user_session_features = sessions_df \
.groupBy("user_id") \
.agg(
F.count("*").alias("total_sessions"),
F.avg("session_duration_seconds").alias("avg_session_duration"),
F.avg("pages_viewed").alias("avg_pages_per_session"),
)
# Combine
return users_df \
.join(user_purchase_features, "user_id", "left") \
.join(user_session_features, "user_id", "left")
5. Point-in-Time Correctness
β οΈ
Critical Concept: Without point-in-time correctness, your model will have data leakage β it will see future features during training, leading to optimistic evaluation metrics and poor production performance.
def point_in_time_correct_features(entities, feature_df, feature_name):
"""Join features at the correct point in time"""
# For each entity, get the feature value computed BEFORE the entity's timestamp
result = entities.alias("e").join(
feature_df.alias("f"),
on=(entities["user_id"] == feature_df["user_id"]) &
(feature_df["computed_at"] <= entities["event_timestamp"]),
how="left"
)
# Window to get the latest value
window_spec = Window \
.partitionBy("e.user_id", "e.event_timestamp") \
.orderBy(F.col("f.computed_at").desc())
result = result \
.withColumn("rn", F.row_number().over(window_spec)) \
.filter(F.col("rn") == 1) \
.select("e.*", f"f.{feature_name}")
return result
6. Feast Feature Store Example
from feast import FeatureStore, Entity, Feature, ValueType, FeatureView
from feast.data_source import PushSource, RequestSource
from datetime import timedelta
# Define entity
user = Entity(name="user_id", value_type=ValueType.INT64, description="User ID")
# Define feature view
user_features = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=1),
schema=[
Feature(name="avg_purchase_value_30d", dtype=ValueType.FLOAT),
Feature(name="purchase_count_30d", dtype=ValueType.INT64),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
],
online=True,
source=PushSource(name="user_features_source", batch_source=...),
)
# Materialize features
store = FeatureStore(repo_path=".")
store.materialize(incremental=True)
# Get features for serving
feature_vector = store.get_online_features(
features=[
"user_features:avg_purchase_value_30d",
"user_features:purchase_count_30d",
],
entity_rows=[{"user_id": 123}]
).to_dict()
βΉοΈ
Best Practice: Always validate that features used in training can be computed at serving time. If a feature requires a JOIN that's only possible in batch, it can't be used for real-time inference.
Follow-Up Questions
- How would you handle feature drift in a feature store?
- Design a feature store that supports both batch and streaming features.
- How do you version features and handle backward compatibility?
- Compare feature stores (Feast, Tecton, Hopsworks) for different use cases.
- How would you implement feature monitoring and alerting?