πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Feature Stores

AIOps FoundationsFeature Engineering🟒 Free Lesson

Advertisement

Feature Stores

A feature store is a centralized repository for storing, managing, and serving machine learning features. It provides consistent feature engineering across training and serving, enabling feature reuse and reducing duplication.

Key Benefits

  • Consistency: Same features for training and serving
  • Reuse: Share features across teams and models
  • Scalability: Handle large-scale feature computation
  • Low Latency: Real-time feature serving
  • Feature Discovery: Find and understand available features

Feature Store Architecture

Feast Implementation

Basic Feast Setup

from feast import FeatureStore, Entity, Feature, ValueType
from feast.data_source import SparkSource, FileSource
from feast.feature_view import FeatureView
from feast.entity import Entity
from datetime import timedelta

# Define entity
driver = Entity(
    name="driver_id",
    value_type=ValueType.INT64,
    description="Driver ID"
)

# Define feature view
driver_features = FeatureView(
    name="driver_features",
    entities=["driver_id"],
    ttl=timedelta(days=1),
    schema=[
        Feature(name="average_daily_rides", dtype=ValueType.FLOAT),
        Feature(name="total_distance", dtype=ValueType.FLOAT),
        Feature(name="acceptance_rate", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/driver_features.parquet",
        event_timestamp_column="event_timestamp"
    )
)

# Initialize feature store
store = FeatureStore(repo_path=".")

# Apply feature definitions
store.apply([driver, driver_features])

Feature Retrieval

import pandas as pd
from datetime import datetime

class FeatureRetriever:
    def __init__(self, feature_store_path):
        self.store = FeatureStore(repo_path=feature_store_path)
    
    def get_online_features(self, entity_rows, feature_refs):
        """Get real-time features for inference"""
        response = self.store.get_online_features(
            features=feature_refs,
            entity_rows=entity_rows
        )
        return pd.DataFrame(response.to_dict())
    
    def get_historical_features(self, entity_df, feature_refs):
        """Get historical features for training"""
        job = self.store.get_historical_features(
            features=feature_refs,
            entity_df=entity_df
        )
        return job.to_df()
    
    def get_feature_vector(self, entity_id, feature_refs):
        """Get feature vector for single entity"""
        response = self.store.get_online_features(
            features=feature_refs,
            entity_rows=[{"driver_id": entity_id}]
        )
        return response.to_dict()

# Usage
retriever = FeatureRetriever(".")

# For training
training_features = retriever.get_historical_features(
    entity_df=pd.DataFrame({"driver_id": [1, 2, 3], "event_timestamp": [datetime.now()] * 3}),
    feature_refs=["driver_features:average_daily_rides", "driver_features:total_distance"]
)

# For inference
inference_features = retriever.get_online_features(
    entity_rows=[{"driver_id": 1}],
    feature_refs=["driver_features:average_daily_rides", "driver_features:total_distance"]
)

Tecton Implementation

Tecton Feature Definitions

from tecton import tecton_context, FeatureSource, Feature, Entity
from tecton.framework.batch_feature_view import BatchFeatureView
from tecton.framework.stream_feature_view import StreamFeatureView
from tecton.framework.operator import Operator

# Define entities
driver = Entity(name="driver_id", join_keys=["driver_id"])

# Batch feature view
driver_stats = BatchFeatureView(
    name="driver_stats",
    entities=[driver],
    ttl="1d",
    online=True,
    sources=[
        FeatureSource(
            name="ride_events",
            table="ride_events_table",
            timestamp_field="event_timestamp"
        )
    ],
    features=[
        Feature(name="ride_count_7d", dtype="int64"),
        Feature(name="avg_distance_7d", dtype="float64"),
        Feature(name="total_earnings_7d", dtype="float64"),
    ],
    owner="team@company.com",
    description="7-day rolling driver statistics"
)

# Stream feature view
driver_stream = StreamFeatureView(
    name="driver_stream",
    entities=[driver],
    ttl="5m",
    online=True,
    sources=[
        FeatureSource(
            name="ride_events_stream",
            kafka_topic="ride_events",
            timestamp_field="event_timestamp"
        )
    ],
    features=[
        Feature(name="current_ride_status", dtype="string"),
        Feature(name="last_location_lat", dtype="float64"),
        Feature(name="last_location_lon", dtype="float64"),
    ],
    owner="team@company.com",
    description="Real-time driver features"
)

Offline vs Online Stores

AspectOffline StoreOnline Store
LatencySeconds to minutesMilliseconds
Use CaseTraining, batch inferenceReal-time serving
StorageData lake, warehouseRedis, DynamoDB
ConsistencyStrong consistencyEventual consistency
CostLower cost per GBHigher cost per GB
Access PatternBatch queriesPoint lookups

Offline Store Implementation

import pandas as pd
from typing import Dict, List

class OfflineFeatureStore:
    def __init__(self, storage_path):
        self.storage_path = storage_path
    
    def write_features(self, feature_df, feature_group_name):
        """Write features to offline store"""
        # Partition by date
        feature_df["date"] = pd.to_datetime(feature_df["event_timestamp"]).dt.date
        
        for date, group in feature_df.groupby("date"):
            file_path = f"{self.storage_path}/{feature_group_name}/{date}/features.parquet"
            group.to_parquet(file_path, index=False)
    
    def read_features(self, feature_group_name, start_date, end_date):
        """Read features from offline store"""
        import glob
        
        pattern = f"{self.storage_path}/{feature_group_name}/*/features.parquet"
        files = glob.glob(pattern)
        
        # Filter by date range
        filtered_files = []
        for file in files:
            date_str = file.split("/")[-2]
            file_date = pd.to_datetime(date_str).date()
            if start_date <= file_date <= end_date:
                filtered_files.append(file)
        
        # Read and concatenate
        dfs = [pd.read_parquet(f) for f in filtered_files]
        return pd.concat(dfs, ignore_index=True)

Online Store Implementation

import redis
import json
from typing import Dict, List, Optional

class OnlineFeatureStore:
    def __init__(self, redis_host, redis_port):
        self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    
    def write_features(self, entity_id: str, features: Dict, feature_group: str):
        """Write features to online store"""
        key = f"{feature_group}:{entity_id}"
        self.redis.hset(key, mapping=features)
    
    def read_features(self, entity_id: str, feature_group: str, feature_names: List[str]) -> Optional[Dict]:
        """Read features from online store"""
        key = f"{feature_group}:{entity_id}"
        values = self.redis.hget(key, feature_names)
        
        if values is None:
            return None
        
        return dict(zip(feature_names, values))
    
    def read_features_batch(self, entity_ids: List[str], feature_group: str, feature_names: List[str]) -> List[Dict]:
        """Read features for multiple entities"""
        results = []
        for entity_id in entity_ids:
            features = self.read_features(entity_id, feature_group, feature_names)
            results.append(features)
        return results
    
    def delete_features(self, entity_id: str, feature_group: str):
        """Delete features for entity"""
        key = f"{feature_group}:{entity_id}"
        self.redis.delete(key)

Mathematical Foundation

Feature Importance

Feature importance can be measured using various metrics:

Feature Importance (Permutation)

FIj=1Kβˆ‘k=1K(L(y,f(x))βˆ’L(y,f(xΟ€j)))FI_j = \frac{1}{K}\sum_{k=1}^{K}\left(L(y, f(x)) - L(y, f(x_{\pi_j}))\right)

Where:

  • ( L ) is the loss function
  • ( f ) is the model
  • ( x_{\pi_j} ) is the feature ( j ) permuted
  • ( K ) is the number of permutations

Feature Correlation

The correlation matrix for feature selection:

Feature Correlation

Rij=Cov(Xi,Xj)σiσjR_{ij} = \frac{\text{Cov}(X_i, X_j)}{\sigma_i \sigma_j}

Feature Normalization

Min-max normalization for feature scaling:

Min-Max Normalization

Xnorm=Xβˆ’XminXmaxβˆ’XminX_{norm} = \frac{X - X_{min}}{X_{max} - X_{min}}

Feature Engineering Patterns

Real-time Feature Computation

class RealTimeFeatureEngine:
    def __init__(self):
        self.window_sizes = [5, 15, 60]  # minutes
    
    def compute_rolling_features(self, events, entity_id):
        """Compute rolling features for real-time"""
        entity_events = events[events["entity_id"] == entity_id]
        
        features = {}
        for window in self.window_sizes:
            window_events = entity_events[
                entity_events["timestamp"] > datetime.now() - timedelta(minutes=window)
            ]
            
            features[f"count_{window}m"] = len(window_events)
            features[f"sum_{window}m"] = window_events["value"].sum()
            features[f"avg_{window}m"] = window_events["value"].mean()
        
        return features

Feature Transformation Pipeline

from sklearn.preprocessing import StandardScaler, LabelEncoder
import numpy as np

class FeatureTransformer:
    def __init__(self):
        self.scalers = {}
        self.encoders = {}
    
    def fit_transform(self, df, feature_config):
        """Fit and transform features"""
        transformed_df = df.copy()
        
        for feature, config in feature_config.items():
            if config["type"] == "numerical":
                scaler = StandardScaler()
                transformed_df[feature] = scaler.fit_transform(df[[feature]])
                self.scalers[feature] = scaler
            
            elif config["type"] == "categorical":
                encoder = LabelEncoder()
                transformed_df[feature] = encoder.fit_transform(df[feature])
                self.encoders[feature] = encoder
        
        return transformed_df
    
    def transform(self, df):
        """Transform features using fitted transformers"""
        transformed_df = df.copy()
        
        for feature, scaler in self.scalers.items():
            transformed_df[feature] = scaler.transform(df[[feature]])
        
        for feature, encoder in self.encoders.items():
            transformed_df[feature] = encoder.transform(df[feature])
        
        return transformed_df

Best Practices

1. Feature Naming Conventions

# Good naming convention
feature_names = {
    "driver_avg_distance_7d": "Average distance driven in last 7 days",
    "ride_count_24h": "Number of rides in last 24 hours",
    "earnings_total_mtd": "Total earnings month-to-date"
}

# Bad naming
bad_names = ["feature1", "distance", "count"]

2. Feature Documentation

feature_metadata = {
    "name": "driver_avg_distance_7d",
    "description": "Average distance driven by driver in last 7 days",
    "owner": "team@company.com",
    "data_type": "float64",
    "source": "ride_events_table",
    "update_frequency": "daily",
    "version": "1.2.0"
}

3. Feature Validation

def validate_feature(df, feature_name, constraints):
    """Validate feature values"""
    violations = []
    
    if "min" in constraints and df[feature_name].min() < constraints["min"]:
        violations.append(f"{feature_name} below minimum")
    
    if "max" in constraints and df[feature_name].max() > constraints["max"]:
        violations.append(f"{feature_name} above maximum")
    
    if "null_check" in constraints and df[feature_name].isnull().any():
        violations.append(f"{feature_name} contains nulls")
    
    return violations

Summary

Feature stores are essential for managing machine learning features at scale. By providing consistency between training and serving, enabling feature reuse, and supporting both batch and real-time feature serving, feature stores accelerate ML development and improve model performance.

⭐

Premium Content

Feature Stores

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement