πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Feature Store Design: Online vs Offline, Point-in-Time Correctness

MLOpsFeature Engineering⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Uber, Airbnb, Spotify, LinkedIn, Stripe

"Design a feature store that supports both batch training and real-time serving. How do you handle point-in-time correctness, feature sharing across teams, and data freshness guarantees?"

Feature Store Architecture Overview

A feature store is a centralized platform for storing, accessing, and serving machine learning features. It bridges the gap between data engineering and ML engineering, providing a unified interface for feature creation, storage, and retrieval.

Core Components

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Feature Store Architecture                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                   Feature Definition                    β”‚   β”‚
β”‚  β”‚         (Feast, Tecton, Hopsworks Feature Server)       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                  β”‚
β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚         β–Ό                    β–Ό                    β–Ό            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚  Offline β”‚      β”‚   Feature    β”‚      β”‚  Online  β”‚        β”‚
β”‚  β”‚  Store   β”‚      β”‚   Registry   β”‚      β”‚  Store   β”‚        β”‚
β”‚  β”‚(S3/GCS/  β”‚      β”‚(Metadata DB) β”‚      β”‚(Redis/   β”‚        β”‚
β”‚  β”‚ Delta)   β”‚      β”‚              β”‚      β”‚ DynamoDB)β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚       β”‚                  β”‚                   β”‚                β”‚
β”‚       β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                β”‚
β”‚       β”‚   β”‚        Feature Transform      β”‚  β”‚                β”‚
β”‚       β”‚   β”‚  (Spark/Flink/Beam Streaming) β”‚  β”‚                β”‚
β”‚       β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                β”‚
β”‚       β”‚                                      β”‚                β”‚
β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚                      β–Ό                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚              Training Data Pipeline                      β”‚  β”‚
β”‚  β”‚    (Point-in-time joins, Feature snapshots)              β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Online vs Offline Store Design

Offline Store (Batch Features)

The offline store is optimized for large-scale batch reads, training data generation, and historical feature computation.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

class OfflineFeatureStore:
    def __init__(self, spark: SparkSession, storage_path: str):
        self.spark = spark
        self.storage_path = storage_path
    
    def create_feature_table(self, source_df, entity_columns, 
                              timestamp_column, feature_columns):
        """Create an offline feature table with proper partitioning."""
        
        # Add event timestamp for point-in-time queries
        df = source_df.select(
            *entity_columns,
            timestamp_column,
            *feature_columns
        )
        
        # Partition by date for efficient queries
        df = df.withColumn(
            "event_date", 
            F.to_date(F.col(timestamp_column))
        )
        
        # Write to Delta Lake with partitioning
        df.write \
            .format("delta") \
            .partitionBy("event_date") \
            .mode("overwrite") \
            .save(f"{self.storage_path}/features/{feature_table_name}")
        
        return df
    
    def point_in_time_join(self, entities_df, feature_table_name, 
                           feature_columns, timestamp_column):
        """Perform point-in-time correct join for training data."""
        
        feature_df = self.spark.read.format("delta").load(
            f"{self.storage_path}/features/{feature_table_name}"
        )
        
        # Window for point-in-time correctness
        window = Window.partitionBy("entity_id").orderBy("event_timestamp")
        
        # For each entity at each timestamp, get the latest feature value
        # that was computed BEFORE the entity timestamp
        joined_df = entities_df.alias("e").join(
            feature_df.alias("f"),
            (F.col("e.entity_id") == F.col("f.entity_id")) &
            (F.col("f.event_timestamp") <= F.col(f"e.{timestamp_column}")),
            "left"
        ).withColumn(
            "rn",
            F.row_number().over(
                Window.partitionBy("e.entity_id", f"e.{timestamp_column}")
                .orderBy(F.col("f.event_timestamp").desc())
            )
        ).filter(F.col("rn") == 1).drop("rn")
        
        return joined_df.select(
            "e.entity_id",
            f"e.{timestamp_column}",
            *[f"f.{col}" for col in feature_columns]
        )
    
    def compute_training_dataset(self, label_df, feature_tables: dict):
        """Compute complete training dataset with point-in-time joins."""
        
        result = label_df
        
        for table_name, config in feature_tables.items():
            result = self.point_in_time_join(
                result,
                table_name,
                config['features'],
                config['timestamp']
            )
        
        return result

Online Store (Real-time Features)

The online store is optimized for low-latency, high-throughput feature serving.

import redis
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np

class OnlineFeatureStore:
    def __init__(self, redis_host: str, redis_port: int, 
                 redis_db: int = 0):
        self.redis = redis.Redis(
            host=redis_host, 
            port=redis_port, 
            db=redis_db,
            decode_responses=True
        )
        self.ttl_seconds = 86400 * 7  # 7 days default TTL
    
    def _build_key(self, entity_id: str, feature_group: str) -> str:
        """Build Redis key for feature lookup."""
        return f"feature:{feature_group}:{entity_id}"
    
    def set_features(self, entity_id: str, feature_group: str,
                     features: Dict[str, float], 
                     event_timestamp: datetime = None):
        """Store features for an entity."""
        key = self._build_key(entity_id, feature_group)
        
        if event_timestamp is None:
            event_timestamp = datetime.utcnow()
        
        data = {
            'features': features,
            'event_timestamp': event_timestamp.isoformat(),
            'computed_at': datetime.utcnow().isoformat()
        }
        
        # Use pipeline for atomic operations
        pipe = self.redis.pipeline()
        pipe.setex(key, self.ttl_seconds, json.dumps(data))
        pipe.setex(
            f"meta:{key}", 
            self.ttl_seconds,
            json.dumps({
                'feature_group': feature_group,
                'entity_id': entity_id,
                'feature_names': list(features.keys())
            })
        )
        pipe.execute()
    
    def get_features(self, entity_id: str, feature_group: str,
                     feature_names: List[str] = None) -> Optional[Dict]:
        """Retrieve features for an entity."""
        key = self._build_key(entity_id, feature_group)
        
        data = self.redis.get(key)
        if data is None:
            return None
        
        data = json.loads(data)
        features = data['features']
        
        if feature_names:
            features = {k: v for k, v in features.items() 
                       if k in feature_names}
        
        return {
            'features': features,
            'event_timestamp': data['event_timestamp'],
            'computed_at': data['computed_at']
        }
    
    def get_online_features(self, entities: List[Dict]) -> List[Dict]:
        """Batch feature retrieval for multiple entities."""
        results = []
        
        pipe = self.redis.pipeline()
        keys = []
        
        for entity in entities:
            key = self._build_key(
                entity['entity_id'], 
                entity['feature_group']
            )
            keys.append(key)
            pipe.get(key)
        
        raw_results = pipe.execute()
        
        for entity, raw_data in zip(entities, raw_results):
            if raw_data:
                data = json.loads(raw_data)
                features = data['features']
                
                # Apply default values for missing features
                defaults = entity.get('defaults', {})
                for feat_name, default_val in defaults.items():
                    if feat_name not in features:
                        features[feat_name] = default_val
                
                results.append({
                    'entity_id': entity['entity_id'],
                    'features': features,
                    'event_timestamp': data['event_timestamp']
                })
            else:
                results.append({
                    'entity_id': entity['entity_id'],
                    'features': entity.get('defaults', {}),
                    'event_timestamp': None
                })
        
        return results
    
    def set_features_with_versioning(self, entity_id: str, 
                                      feature_group: str,
                                      features: Dict[str, float],
                                      feature_version: str):
        """Store features with version tracking."""
        key = f"feature:{feature_group}:{entity_id}:v{feature_version}"
        
        data = {
            'features': features,
            'feature_version': feature_version,
            'event_timestamp': datetime.utcnow().isoformat()
        }
        
        self.redis.setex(key, self.ttl_seconds, json.dumps(data))
        
        # Update version pointer
        pointer_key = f"feature:{feature_group}:{entity_id}:current"
        self.redis.set(pointer_key, feature_version)

ℹ️

Online stores typically target p99 latency < 10ms. Use Redis Cluster for horizontal scaling, or DynamoDB for serverless auto-scaling. Consider feature caching at the application layer for extremely high QPS.

Point-in-Time Correctness

Point-in-time correctness ensures that during training, features are joined with labels using only data that was available at the time of prediction.

The Problem: Data Leakage

# WRONG - Data Leakage Example
# Using future information to predict past events
def data_leakage_example():
    """
    BAD: User's transaction history includes transactions AFTER the label
    """
    training_data = """
    | user_id | label_date | label | avg_transaction_amount |
    |---------|------------|-------|------------------------|
    | 123     | 2024-01-15 | 1     | 150.00                 |  # WRONG!
    """
    # The avg_transaction_amount on Jan 15 includes transactions from Jan 16+
    # This creates data leakage and artificially high accuracy

# CORRECT - Point-in-time Join
def point_in_time_correct():
    """
    GOOD: Features are computed using only data available BEFORE the label date
    """
    training_data = """
    | user_id | label_date | label | avg_transaction_amount |
    |---------|------------|-------|------------------------|
    | 123     | 2024-01-15 | 1     | 125.00                 |  # CORRECT
    """
    # avg_transaction_amount only includes transactions BEFORE Jan 15

Feast Point-in-Time Join Implementation

from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, FeatureView
from feast.data_source import RequestSource
from feast.protos.feast.types.Value_pb2 import Float
import pandas as pd

# Define entity
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User identifier"
)

# Define feature view with offline source
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=365),
    features=[
        Feature(name="avg_transaction_amount_30d", dtype=Float),
        Feature(name="transaction_count_30d", dtype=Float),
        Feature(name="days_since_last_transaction", dtype=Float),
    ],
    online=True,
    source=FileSource(
        path="s3://feature-store/user_features/",
        event_timestamp_column="event_timestamp"
    )
)

# Point-in-time correct retrieval
store = FeatureStore(repo_path="feature_repo/")

# For training - get historical features
entity_df = pd.DataFrame({
    "user_id": [123, 456, 789],
    "event_timestamp": pd.to_datetime([
        "2024-01-15", "2024-01-16", "2024-01-17"
    ])
})

# This performs point-in-time correct join automatically
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:avg_transaction_amount_30d",
        "user_features:transaction_count_30d",
        "user_features:days_since_last_transaction"
    ]
).to_df()

print(training_df)

Streaming Feature Computation

from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import apache_beam as beam

class ComputeTransactionFeatures(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        user_id, transactions = element
        
        # Window-based aggregation
        features = {
            'transaction_count': len(transactions),
            'total_amount': sum(t['amount'] for t in transactions),
            'avg_amount': np.mean([t['amount'] for t in transactions]),
            'max_amount': max(t['amount'] for t in transactions),
            'min_amount': min(t['amount'] for t in transactions),
            'std_amount': np.std([t['amount'] for t in transactions]),
        }
        
        yield {
            'user_id': user_id,
            'features': features,
            'window_start': window.start.to_utc_datetime().isoformat(),
            'window_end': window.end.to_utc_datetime().isoformat()
        }

def run_streaming_pipeline():
    options = PipelineOptions()
    
    with Pipeline(options=options) as p:
        (
            p
            | "ReadTransactions" >> beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': 'kafka:9092'},
                topics=['transactions']
            )
            | "ParseJSON" >> beam.Map(lambda x: json.loads(x.value))
            | "KeyByUser" >> beam.Map(lambda x: (x['user_id'], x))
            | "Window" >> beam.WindowInto(
                FixedWindows(300),  # 5-minute windows
                trigger=AfterWatermark(
                    early=AfterProcessingTime(60)
                ),
                accumulation_mode=AccumulationMode.DISCARDING
            )
            | "GroupByUser" >> beam.GroupByKey()
            | "ComputeFeatures" >> beam.ParDo(ComputeTransactionFeatures())
            | "WriteToRedis" >> beam.ParDo(WriteToRedisFn())
        )

⚠️

Streaming features require careful handling of late data. Use watermarks and triggers to balance between completeness and latency. Consider using accumulation modes (DISCARDING vs ACCUMULATING) based on your use case.

Feature Registry and Discovery

Feature Metadata Schema

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel

class FeatureMetadata(BaseModel):
    name: str
    description: str
    data_type: str
    entity: str
    owner: str
    team: str
    tags: List[str]
    created_at: datetime
    updated_at: datetime
    version: str
    online_serving: bool
    offline_serving: bool
    freshness_sla_hours: float
    backward_compatible: bool
    
class FeatureGroupMetadata(BaseModel):
    name: str
    description: str
    entity: str
    features: List[FeatureMetadata]
    source: str
    schedule: Optional[str]
    dependencies: List[str]
    data_quality_checks: List[str]
    
class FeatureRegistry:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def register_feature(self, feature: FeatureMetadata):
        """Register a new feature in the metadata store."""
        query = """
        INSERT INTO features 
        (name, description, data_type, entity, owner, team, tags, 
         created_at, updated_at, version, online_serving, offline_serving,
         freshness_sla_hours, backward_compatible)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        
        self.db.execute(query, (
            feature.name,
            feature.description,
            feature.data_type,
            feature.entity,
            feature.owner,
            feature.team,
            json.dumps(feature.tags),
            feature.created_at,
            feature.updated_at,
            feature.version,
            feature.online_serving,
            feature.offline_serving,
            feature.freshness_sla_hours,
            feature.backward_compatible
        ))
    
    def search_features(self, query: str = None, 
                        tags: List[str] = None,
                        entity: str = None,
                        team: str = None) -> List[FeatureMetadata]:
        """Search for features by various criteria."""
        conditions = []
        params = []
        
        if query:
            conditions.append("(name LIKE ? OR description LIKE ?)")
            params.extend([f"%{query}%", f"%{query}%"])
        
        if tags:
            conditions.append("tags LIKE ?")
            params.extend([f"%{tag}%" for tag in tags])
        
        if entity:
            conditions.append("entity = ?")
            params.append(entity)
        
        if team:
            conditions.append("team = ?")
            params.append(team)
        
        where_clause = " AND ".join(conditions) if conditions else "1=1"
        
        sql = f"""
        SELECT * FROM features 
        WHERE {where_clause}
        ORDER BY updated_at DESC
        """
        
        results = self.db.execute(sql, params)
        return [FeatureMetadata(**row) for row in results]
    
    def get_feature_lineage(self, feature_name: str) -> dict:
        """Get upstream and downstream lineage for a feature."""
        query = """
        WITH RECURSIVE upstream AS (
            SELECT feature_name, upstream_feature, 1 as depth
            FROM feature_lineage
            WHERE feature_name = ?
            UNION ALL
            SELECT fl.feature_name, fl.upstream_feature, u.depth + 1
            FROM feature_lineage fl
            JOIN upstream u ON fl.feature_name = u.upstream_feature
        ),
        downstream AS (
            SELECT feature_name, downstream_feature, 1 as depth
            FROM feature_lineage
            WHERE upstream_feature = ?
            UNION ALL
            SELECT fl.feature_name, fl.downstream_feature, d.depth + 1
            FROM feature_lineage fl
            JOIN downstream d ON fl.feature_name = d.downstream_feature
        )
        SELECT * FROM upstream
        UNION ALL
        SELECT * FROM downstream
        """
        
        results = self.db.execute(query, (feature_name, feature_name))
        return {
            'upstream': [r for r in results if r['feature_name'] == feature_name],
            'downstream': [r for r in results if r['upstream_feature'] == feature_name]
        }

Feature Transform Patterns

Batch Transform with Spark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

class BatchFeatureTransform:
    def __init__(self, spark: SparkSession):
        self.spark = spark
    
    def compute_user_features(self, transactions_df, 
                               window_days: int = 30):
        """Compute user-level features from transaction data."""
        
        # Window aggregation
        window_spec = F.window(
            "transaction_time", 
            f"{window_days} days"
        )
        
        user_features = transactions_df \
            .groupBy("user_id", window_spec) \
            .agg(
                F.count("*").alias("transaction_count"),
                F.sum("amount").alias("total_amount"),
                F.avg("amount").alias("avg_amount"),
                F.stddev("amount").alias("std_amount"),
                F.max("amount").alias("max_amount"),
                F.min("amount").alias("min_amount"),
                F.countDistinct("merchant_id").alias("unique_merchants"),
                F.collect_set("category").alias("categories"),
                F.expr("percentile_approx(amount, 0.5)").alias("median_amount"),
            ) \
            .withColumn("amount_range", 
                       F.col("max_amount") - F.col("min_amount")) \
            .withColumn("avg_amount_per_merchant",
                       F.col("total_amount") / F.col("unique_merchants"))
        
        # Time since last transaction
        user_features = user_features \
            .withColumn("hours_since_last",
                       F.unix_timestamp("window.end") - 
                       F.unix_timestamp("last_transaction_time"))
        
        return user_features
    
    def compute_session_features(self, events_df, 
                                  session_timeout_minutes: int = 30):
        """Compute session-level features from event data."""
        
        # Sessionize events
        session_window = F.session_window(
            "event_time", 
            f"{session_timeout_minutes} minutes"
        )
        
        session_features = events_df \
            .groupBy("user_id", session_window) \
            .agg(
                F.count("*").alias("session_length"),
                F.countDistinct("page").alias("unique_pages"),
                F.sum(F.when(F.col("event_type") == "click", 1).otherwise(0))
                    .alias("click_count"),
                F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0))
                    .alias("purchase_count"),
                F.first("page").alias("landing_page"),
                F.last("page").alias("exit_page"),
                F.avg("time_on_page").alias("avg_time_on_page"),
                F.min("event_time").alias("session_start"),
                F.max("event_time").alias("session_end"),
            ) \
            .withColumn("session_duration_seconds",
                       F.unix_timestamp("session_end") - 
                       F.unix_timestamp("session_start")) \
            .withColumn("click_through_rate",
                       F.col("click_count") / F.col("session_length")) \
            .withColumn("conversion_rate",
                       F.col("purchase_count") / F.col("session_length"))
        
        return session_features

Real-time Feature Transform with Flink

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
import json

class RealTimeFeatureTransform:
    def __init__(self):
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.t_env = StreamTableEnvironment.create(self.env)
    
    def define_feature_streams(self):
        """Define streaming feature computation."""
        
        # Source: Kafka transactions
        self.t_env.execute_sql("""
            CREATE TABLE transactions (
                user_id BIGINT,
                amount DOUBLE,
                merchant_id STRING,
                category STRING,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
            ) WITH (
                'connector' = 'kafka',
                'topic' = 'transactions',
                'properties.bootstrap.servers' = 'kafka:9092',
                'format' = 'json'
            )
        """)
        
        # Sink: Redis for online serving
        self.t_env.execute_sql("""
            CREATE TABLE user_features_sink (
                user_id BIGINT,
                feature_window_start TIMESTAMP(3),
                feature_window_end TIMESTAMP(3),
                transaction_count BIGINT,
                total_amount DOUBLE,
                avg_amount DOUBLE,
                max_amount DOUBLE,
                min_amount DOUBLE,
                std_amount DOUBLE,
                PRIMARY KEY (user_id, feature_window_start) NOT ENFORCED
            ) WITH (
                'connector' = 'redis',
                'host' = 'redis',
                'port' = '6379',
                'command' = 'HSET'
            )
        """)
        
        # Feature computation query
        self.t_env.execute_sql("""
            INSERT INTO user_features_sink
            SELECT
                user_id,
                TUMBLE_START(event_time, INTERVAL '5' MINUTE) as feature_window_start,
                TUMBLE_END(event_time, INTERVAL '5' MINUTE) as feature_window_end,
                COUNT(*) as transaction_count,
                SUM(amount) as total_amount,
                AVG(amount) as avg_amount,
                MAX(amount) as max_amount,
                MIN(amount) as min_amount,
                STDDEV(amount) as std_amount
            FROM transactions
            GROUP BY 
                user_id,
                TUMBLE(event_time, INTERVAL '5' MINUTE)
        """)

ℹ️

For sub-10ms feature serving, pre-compute features in batch and store in Redis. For features requiring freshness < 1 minute, use streaming computation with Flink or Kafka Streams.

Feature Store Comparison

FeatureFeastTectonHopsworksAWS SageMaker
Offline StoreParquet/Delta/S3Databricks/S3HopsworksS3
Online StoreRedis/DynamoDBDynamoDB/RedisHopsworksDynamoDB
Point-in-timeβœ…βœ…βœ…βœ…
StreamingVia Spark/FlinkNativeVia KafkaNative
Open Sourceβœ…βŒPartial❌
ManagedβŒβœ…βœ…βœ…

Summary

Feature stores are critical infrastructure for production ML:

  1. Offline Store: Batch features for training with Delta Lake/Parquet
  2. Online Store: Low-latency serving with Redis/DynamoDB
  3. Point-in-time Correctness: Prevent data leakage in training
  4. Feature Registry: Centralized metadata and discovery
  5. Streaming Features: Real-time computation with Flink/Kafka Streams

Master these patterns to build reliable feature infrastructure at scale.

Advertisement