πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Feature Stores: Design, Implementation & Best Practices

Data EngineeringML Infrastructure⭐ Premium

Advertisement

Feature Stores: Design, Implementation & Best Practices

Difficulty: Senior Level | Companies: Uber, Airbnb, Netflix, Stripe, Spotify

1. What is a Feature Store?

A feature store is a centralized repository for storing, serving, and managing ML features:

Architecture Diagram
Feature Store Architecture
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Feature Store                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   Offline Store       β”‚    Online Store          β”‚
β”‚   (Batch features)    β”‚    (Low-latency serving) β”‚
β”‚   - S3/GCS/Redshift   β”‚    - Redis/DynamoDB       β”‚
β”‚   - Feature Registry  β”‚    - Feature Registry     β”‚
β”‚   - Point-in-time      β”‚    - Feature Registry     β”‚
β”‚     correctness       β”‚                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚              Feature Pipeline                     β”‚
β”‚   - Feature definitions (Python)                  β”‚
β”‚   - Feature transformations                       β”‚
β”‚   - Feature validation & monitoring               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why Feature Stores Matter

ProblemSolution
Feature computation duplicated across teamsCentralized feature definitions
Training/serving skewSame features for training and inference
Feature reuse is hardDiscoverable feature catalog
Feature freshness variesAutomated refresh pipelines
No versioningFeature versioning and lineage

ℹ️

Key Insight: Feature stores solve the training-serving skew problem β€” the #1 cause of ML model performance degradation in production.

2. Feature Store Design

Core Schema

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
from enum import Enum

class FeatureType(Enum):
    NUMERIC = "numeric"
    CATEGORICAL = "categorical"
    EMBEDDING = "embedding"
    TIMESTAMP = "timestamp"

@dataclass
class FeatureDefinition:
    name: str
    entity: str  # e.g., "user_id"
    feature_type: FeatureType
    description: str
    owner: str
    freshness_sla_minutes: int = 60
    tags: List[str] = field(default_factory=list)

@dataclass
class FeatureValue:
    entity_id: str
    feature_name: str
    value: Any
    timestamp: datetime
    version: int = 1

@dataclass
class FeatureGroup:
    name: str
    entity: str
    features: List[FeatureDefinition]
    source_table: str
    refresh_schedule: str  # cron expression
    
    def to_dict(self):
        return {
            "name": self.name,
            "entity": self.entity,
            "features": [f.__dict__ for f in self.features],
            "source": self.source_table,
            "schedule": self.refresh_schedule
        }

3. Offline vs Online Store

Offline Store (Batch Features)

import pandas as pd
from datetime import datetime, timedelta

class OfflineFeatureStore:
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
    
    def compute_features(self, entity_df: pd.DataFrame, 
                         source_df: pd.DataFrame) -> pd.DataFrame:
        """Compute features for training"""
        features = entity_df.merge(
            source_df,
            on="user_id",
            how="left"
        )
        
        # Feature engineering
        features["avg_purchase_value"] = features.groupby("user_id")["amount"].transform("mean")
        features["purchase_count_30d"] = features.groupby("user_id")["amount"].transform(
            lambda x: x.rolling("30D").count()
        )
        features["days_since_last_purchase"] = (
            datetime.now() - features.groupby("user_id")["timestamp"].transform("max")
        ).dt.days
        
        return features
    
    def point_in_time_join(self, entities: pd.DataFrame, 
                           features: pd.DataFrame) -> pd.DataFrame:
        """Join features at the correct point in time (no data leakage)"""
        result = entities.copy()
        
        for _, entity_row in entities.iterrows():
            entity_ts = entity_row["event_timestamp"]
            entity_id = entity_row["user_id"]
            
            # Only use features computed BEFORE the event
            valid_features = features[
                (features["user_id"] == entity_id) & 
                (features["computed_at"] <= entity_ts)
            ]
            
            # Get the latest feature value before the event
            if not valid_features.empty:
                latest = valid_features.sort_values("computed_at").iloc[-1]
                for col in self.feature_columns:
                    result.loc[_, col] = latest[col]
        
        return result

Online Store (Low-Latency Serving)

import redis
import json
from typing import Dict, List

class OnlineFeatureStore:
    def __init__(self, redis_host: str, redis_port: int):
        self.client = redis.Redis(host=redis_host, port=redis_port)
    
    def get_features(self, entity_id: str, feature_names: List[str]) -> Dict[str, float]:
        """Get features for online inference (sub-millisecond)"""
        pipe = self.client.pipeline()
        for name in feature_names:
            pipe.hget(f"features:{entity_id}", name)
        values = pipe.execute()
        
        return {
            name: float(val) if val else 0.0
            for name, val in zip(feature_names, values)
        }
    
    def set_features(self, entity_id: str, features: Dict[str, float]):
        """Write computed features to online store"""
        self.client.hset(
            f"features:{entity_id}",
            mapping={k: str(v) for k, v in features.items()}
        )
        self.client.expire(f"features:{entity_id}", 86400)  # 24h TTL
    
    def get_feature_vector(self, entity_ids: List[str], 
                           feature_names: List[str]) -> List[Dict[str, float]]:
        """Batch get for model serving"""
        pipe = self.client.pipeline()
        for eid in entity_ids:
            for name in feature_names:
                pipe.hget(f"features:{eid}", name)
        results = pipe.execute()
        
        vectors = []
        idx = 0
        for eid in entity_ids:
            vector = {}
            for name in feature_names:
                val = results[idx]
                vector[name] = float(val) if val else 0.0
                idx += 1
            vectors.append(vector)
        
        return vectors

4. Feature Engineering Patterns

Windowed Aggregations

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

def compute_user_features(events_df):
    """Compute user-level features with multiple time windows"""
    
    # Define time windows
    windows = {
        "1d": 1,
        "7d": 7,
        "30d": 30,
        "90d": 90,
    }
    
    features = events_df
    
    for window_name, days in windows.items():
        window_spec = Window \
            .partitionBy("user_id") \
            .orderBy(F.col("event_timestamp").cast("long")) \
            .rangeBetween(-days * 86400, 0)
        
        features = features \
            .withColumn(f"purchase_count_{window_name}", 
                F.sum(F.when(F.col("event_type") == "PURCHASE", 1).otherwise(0))
                .over(window_spec)) \
            .withColumn(f"total_spend_{window_name}",
                F.sum(F.when(F.col("event_type") == "PURCHASE", F.col("amount")).otherwise(0))
                .over(window_spec)) \
            .withColumn(f"avg_order_value_{window_name}",
                F.avg(F.when(F.col("event_type") == "PURCHASE", F.col("amount")))
                .over(window_spec)) \
            .withColumn(f"days_since_last_{window_name}",
                F.datediff(F.current_date(), F.max("event_timestamp").over(window_spec)))
    
    return features

# Write to feature store
user_features = compute_user_features(spark.read.parquet("s3://events/"))
user_features.write.format("delta").save("s3://feature-store/user_features")

Cross-Entity Features

def compute_cross_entity_features(users_df, purchases_df, sessions_df):
    """Features that combine multiple entities"""
    
    # User Γ— Purchase features
    user_purchase_features = purchases_df \
        .groupBy("user_id") \
        .agg(
            F.count("*").alias("total_purchases"),
            F.sum("amount").alias("lifetime_value"),
            F.avg("amount").alias("avg_order_value"),
            F.stddev("amount").alias("order_value_std"),
            F.countDistinct("product_id").alias("unique_products_bought"),
        )
    
    # User Γ— Session features
    user_session_features = sessions_df \
        .groupBy("user_id") \
        .agg(
            F.count("*").alias("total_sessions"),
            F.avg("session_duration_seconds").alias("avg_session_duration"),
            F.avg("pages_viewed").alias("avg_pages_per_session"),
        )
    
    # Combine
    return users_df \
        .join(user_purchase_features, "user_id", "left") \
        .join(user_session_features, "user_id", "left")

5. Point-in-Time Correctness

⚠️

Critical Concept: Without point-in-time correctness, your model will have data leakage β€” it will see future features during training, leading to optimistic evaluation metrics and poor production performance.

def point_in_time_correct_features(entities, feature_df, feature_name):
    """Join features at the correct point in time"""
    
    # For each entity, get the feature value computed BEFORE the entity's timestamp
    result = entities.alias("e").join(
        feature_df.alias("f"),
        on=(entities["user_id"] == feature_df["user_id"]) &
           (feature_df["computed_at"] <= entities["event_timestamp"]),
        how="left"
    )
    
    # Window to get the latest value
    window_spec = Window \
        .partitionBy("e.user_id", "e.event_timestamp") \
        .orderBy(F.col("f.computed_at").desc())
    
    result = result \
        .withColumn("rn", F.row_number().over(window_spec)) \
        .filter(F.col("rn") == 1) \
        .select("e.*", f"f.{feature_name}")
    
    return result

6. Feast Feature Store Example

from feast import FeatureStore, Entity, Feature, ValueType, FeatureView
from feast.data_source import PushSource, RequestSource
from datetime import timedelta

# Define entity
user = Entity(name="user_id", value_type=ValueType.INT64, description="User ID")

# Define feature view
user_features = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Feature(name="avg_purchase_value_30d", dtype=ValueType.FLOAT),
        Feature(name="purchase_count_30d", dtype=ValueType.INT64),
        Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
    ],
    online=True,
    source=PushSource(name="user_features_source", batch_source=...),
)

# Materialize features
store = FeatureStore(repo_path=".")
store.materialize(incremental=True)

# Get features for serving
feature_vector = store.get_online_features(
    features=[
        "user_features:avg_purchase_value_30d",
        "user_features:purchase_count_30d",
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

ℹ️

Best Practice: Always validate that features used in training can be computed at serving time. If a feature requires a JOIN that's only possible in batch, it can't be used for real-time inference.

Follow-Up Questions

  1. How would you handle feature drift in a feature store?
  2. Design a feature store that supports both batch and streaming features.
  3. How do you version features and handle backward compatibility?
  4. Compare feature stores (Feast, Tecton, Hopsworks) for different use cases.
  5. How would you implement feature monitoring and alerting?

Advertisement