Feature Stores
A feature store is a centralized repository for storing, managing, and serving machine learning features. It provides consistent feature engineering across training and serving, enabling feature reuse and reducing duplication.
Key Benefits
- Consistency: Same features for training and serving
- Reuse: Share features across teams and models
- Scalability: Handle large-scale feature computation
- Low Latency: Real-time feature serving
- Feature Discovery: Find and understand available features
Feature Store Architecture
Feast Implementation
Basic Feast Setup
from feast import FeatureStore, Entity, Feature, ValueType
from feast.data_source import SparkSource, FileSource
from feast.feature_view import FeatureView
from feast.entity import Entity
from datetime import timedelta
# Define entity
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="Driver ID"
)
# Define feature view
driver_features = FeatureView(
name="driver_features",
entities=["driver_id"],
ttl=timedelta(days=1),
schema=[
Feature(name="average_daily_rides", dtype=ValueType.FLOAT),
Feature(name="total_distance", dtype=ValueType.FLOAT),
Feature(name="acceptance_rate", dtype=ValueType.FLOAT),
],
online=True,
batch_source=FileSource(
path="data/driver_features.parquet",
event_timestamp_column="event_timestamp"
)
)
# Initialize feature store
store = FeatureStore(repo_path=".")
# Apply feature definitions
store.apply([driver, driver_features])
Feature Retrieval
import pandas as pd
from datetime import datetime
class FeatureRetriever:
def __init__(self, feature_store_path):
self.store = FeatureStore(repo_path=feature_store_path)
def get_online_features(self, entity_rows, feature_refs):
"""Get real-time features for inference"""
response = self.store.get_online_features(
features=feature_refs,
entity_rows=entity_rows
)
return pd.DataFrame(response.to_dict())
def get_historical_features(self, entity_df, feature_refs):
"""Get historical features for training"""
job = self.store.get_historical_features(
features=feature_refs,
entity_df=entity_df
)
return job.to_df()
def get_feature_vector(self, entity_id, feature_refs):
"""Get feature vector for single entity"""
response = self.store.get_online_features(
features=feature_refs,
entity_rows=[{"driver_id": entity_id}]
)
return response.to_dict()
# Usage
retriever = FeatureRetriever(".")
# For training
training_features = retriever.get_historical_features(
entity_df=pd.DataFrame({"driver_id": [1, 2, 3], "event_timestamp": [datetime.now()] * 3}),
feature_refs=["driver_features:average_daily_rides", "driver_features:total_distance"]
)
# For inference
inference_features = retriever.get_online_features(
entity_rows=[{"driver_id": 1}],
feature_refs=["driver_features:average_daily_rides", "driver_features:total_distance"]
)
Tecton Implementation
Tecton Feature Definitions
from tecton import tecton_context, FeatureSource, Feature, Entity
from tecton.framework.batch_feature_view import BatchFeatureView
from tecton.framework.stream_feature_view import StreamFeatureView
from tecton.framework.operator import Operator
# Define entities
driver = Entity(name="driver_id", join_keys=["driver_id"])
# Batch feature view
driver_stats = BatchFeatureView(
name="driver_stats",
entities=[driver],
ttl="1d",
online=True,
sources=[
FeatureSource(
name="ride_events",
table="ride_events_table",
timestamp_field="event_timestamp"
)
],
features=[
Feature(name="ride_count_7d", dtype="int64"),
Feature(name="avg_distance_7d", dtype="float64"),
Feature(name="total_earnings_7d", dtype="float64"),
],
owner="team@company.com",
description="7-day rolling driver statistics"
)
# Stream feature view
driver_stream = StreamFeatureView(
name="driver_stream",
entities=[driver],
ttl="5m",
online=True,
sources=[
FeatureSource(
name="ride_events_stream",
kafka_topic="ride_events",
timestamp_field="event_timestamp"
)
],
features=[
Feature(name="current_ride_status", dtype="string"),
Feature(name="last_location_lat", dtype="float64"),
Feature(name="last_location_lon", dtype="float64"),
],
owner="team@company.com",
description="Real-time driver features"
)
Offline vs Online Stores
| Aspect | Offline Store | Online Store |
|---|---|---|
| Latency | Seconds to minutes | Milliseconds |
| Use Case | Training, batch inference | Real-time serving |
| Storage | Data lake, warehouse | Redis, DynamoDB |
| Consistency | Strong consistency | Eventual consistency |
| Cost | Lower cost per GB | Higher cost per GB |
| Access Pattern | Batch queries | Point lookups |
Offline Store Implementation
import pandas as pd
from typing import Dict, List
class OfflineFeatureStore:
def __init__(self, storage_path):
self.storage_path = storage_path
def write_features(self, feature_df, feature_group_name):
"""Write features to offline store"""
# Partition by date
feature_df["date"] = pd.to_datetime(feature_df["event_timestamp"]).dt.date
for date, group in feature_df.groupby("date"):
file_path = f"{self.storage_path}/{feature_group_name}/{date}/features.parquet"
group.to_parquet(file_path, index=False)
def read_features(self, feature_group_name, start_date, end_date):
"""Read features from offline store"""
import glob
pattern = f"{self.storage_path}/{feature_group_name}/*/features.parquet"
files = glob.glob(pattern)
# Filter by date range
filtered_files = []
for file in files:
date_str = file.split("/")[-2]
file_date = pd.to_datetime(date_str).date()
if start_date <= file_date <= end_date:
filtered_files.append(file)
# Read and concatenate
dfs = [pd.read_parquet(f) for f in filtered_files]
return pd.concat(dfs, ignore_index=True)
Online Store Implementation
import redis
import json
from typing import Dict, List, Optional
class OnlineFeatureStore:
def __init__(self, redis_host, redis_port):
self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
def write_features(self, entity_id: str, features: Dict, feature_group: str):
"""Write features to online store"""
key = f"{feature_group}:{entity_id}"
self.redis.hset(key, mapping=features)
def read_features(self, entity_id: str, feature_group: str, feature_names: List[str]) -> Optional[Dict]:
"""Read features from online store"""
key = f"{feature_group}:{entity_id}"
values = self.redis.hget(key, feature_names)
if values is None:
return None
return dict(zip(feature_names, values))
def read_features_batch(self, entity_ids: List[str], feature_group: str, feature_names: List[str]) -> List[Dict]:
"""Read features for multiple entities"""
results = []
for entity_id in entity_ids:
features = self.read_features(entity_id, feature_group, feature_names)
results.append(features)
return results
def delete_features(self, entity_id: str, feature_group: str):
"""Delete features for entity"""
key = f"{feature_group}:{entity_id}"
self.redis.delete(key)
Mathematical Foundation
Feature Importance
Feature importance can be measured using various metrics:
Feature Importance (Permutation)
Where:
- ( L ) is the loss function
- ( f ) is the model
- ( x_{\pi_j} ) is the feature ( j ) permuted
- ( K ) is the number of permutations
Feature Correlation
The correlation matrix for feature selection:
Feature Correlation
Feature Normalization
Min-max normalization for feature scaling:
Min-Max Normalization
Feature Engineering Patterns
Real-time Feature Computation
class RealTimeFeatureEngine:
def __init__(self):
self.window_sizes = [5, 15, 60] # minutes
def compute_rolling_features(self, events, entity_id):
"""Compute rolling features for real-time"""
entity_events = events[events["entity_id"] == entity_id]
features = {}
for window in self.window_sizes:
window_events = entity_events[
entity_events["timestamp"] > datetime.now() - timedelta(minutes=window)
]
features[f"count_{window}m"] = len(window_events)
features[f"sum_{window}m"] = window_events["value"].sum()
features[f"avg_{window}m"] = window_events["value"].mean()
return features
Feature Transformation Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
import numpy as np
class FeatureTransformer:
def __init__(self):
self.scalers = {}
self.encoders = {}
def fit_transform(self, df, feature_config):
"""Fit and transform features"""
transformed_df = df.copy()
for feature, config in feature_config.items():
if config["type"] == "numerical":
scaler = StandardScaler()
transformed_df[feature] = scaler.fit_transform(df[[feature]])
self.scalers[feature] = scaler
elif config["type"] == "categorical":
encoder = LabelEncoder()
transformed_df[feature] = encoder.fit_transform(df[feature])
self.encoders[feature] = encoder
return transformed_df
def transform(self, df):
"""Transform features using fitted transformers"""
transformed_df = df.copy()
for feature, scaler in self.scalers.items():
transformed_df[feature] = scaler.transform(df[[feature]])
for feature, encoder in self.encoders.items():
transformed_df[feature] = encoder.transform(df[feature])
return transformed_df
Best Practices
1. Feature Naming Conventions
# Good naming convention
feature_names = {
"driver_avg_distance_7d": "Average distance driven in last 7 days",
"ride_count_24h": "Number of rides in last 24 hours",
"earnings_total_mtd": "Total earnings month-to-date"
}
# Bad naming
bad_names = ["feature1", "distance", "count"]
2. Feature Documentation
feature_metadata = {
"name": "driver_avg_distance_7d",
"description": "Average distance driven by driver in last 7 days",
"owner": "team@company.com",
"data_type": "float64",
"source": "ride_events_table",
"update_frequency": "daily",
"version": "1.2.0"
}
3. Feature Validation
def validate_feature(df, feature_name, constraints):
"""Validate feature values"""
violations = []
if "min" in constraints and df[feature_name].min() < constraints["min"]:
violations.append(f"{feature_name} below minimum")
if "max" in constraints and df[feature_name].max() > constraints["max"]:
violations.append(f"{feature_name} above maximum")
if "null_check" in constraints and df[feature_name].isnull().any():
violations.append(f"{feature_name} contains nulls")
return violations
Summary
Feature stores are essential for managing machine learning features at scale. By providing consistency between training and serving, enabling feature reuse, and supporting both batch and real-time feature serving, feature stores accelerate ML development and improve model performance.