Interview Question (Hard) β Asked at: Uber, Airbnb, Spotify, LinkedIn, Stripe
"Design a feature store that supports both batch training and real-time serving. How do you handle point-in-time correctness, feature sharing across teams, and data freshness guarantees?"
Feature Store Architecture Overview
A feature store is a centralized platform for storing, accessing, and serving machine learning features. It bridges the gap between data engineering and ML engineering, providing a unified interface for feature creation, storage, and retrieval.
Core Components
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Feature Store Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Feature Definition β β
β β (Feast, Tecton, Hopsworks Feature Server) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββΌβββββββββββββββββββββ β
β βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β Offline β β Feature β β Online β β
β β Store β β Registry β β Store β β
β β(S3/GCS/ β β(Metadata DB) β β(Redis/ β β
β β Delta) β β β β DynamoDB)β β
β ββββββββββββ ββββββββββββββββ ββββββββββββ β
β β β β β
β β ββββββββββββββββ΄ββββββββββββββββ β β
β β β Feature Transform β β β
β β β (Spark/Flink/Beam Streaming) β β β
β β ββββββββββββββββββββββββββββββββ β β
β β β β
β ββββββββββββββββ¬ββββββββββββββββββββββββ β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Training Data Pipeline β β
β β (Point-in-time joins, Feature snapshots) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Online vs Offline Store Design
Offline Store (Batch Features)
The offline store is optimized for large-scale batch reads, training data generation, and historical feature computation.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
class OfflineFeatureStore:
def __init__(self, spark: SparkSession, storage_path: str):
self.spark = spark
self.storage_path = storage_path
def create_feature_table(self, source_df, entity_columns,
timestamp_column, feature_columns):
"""Create an offline feature table with proper partitioning."""
# Add event timestamp for point-in-time queries
df = source_df.select(
*entity_columns,
timestamp_column,
*feature_columns
)
# Partition by date for efficient queries
df = df.withColumn(
"event_date",
F.to_date(F.col(timestamp_column))
)
# Write to Delta Lake with partitioning
df.write \
.format("delta") \
.partitionBy("event_date") \
.mode("overwrite") \
.save(f"{self.storage_path}/features/{feature_table_name}")
return df
def point_in_time_join(self, entities_df, feature_table_name,
feature_columns, timestamp_column):
"""Perform point-in-time correct join for training data."""
feature_df = self.spark.read.format("delta").load(
f"{self.storage_path}/features/{feature_table_name}"
)
# Window for point-in-time correctness
window = Window.partitionBy("entity_id").orderBy("event_timestamp")
# For each entity at each timestamp, get the latest feature value
# that was computed BEFORE the entity timestamp
joined_df = entities_df.alias("e").join(
feature_df.alias("f"),
(F.col("e.entity_id") == F.col("f.entity_id")) &
(F.col("f.event_timestamp") <= F.col(f"e.{timestamp_column}")),
"left"
).withColumn(
"rn",
F.row_number().over(
Window.partitionBy("e.entity_id", f"e.{timestamp_column}")
.orderBy(F.col("f.event_timestamp").desc())
)
).filter(F.col("rn") == 1).drop("rn")
return joined_df.select(
"e.entity_id",
f"e.{timestamp_column}",
*[f"f.{col}" for col in feature_columns]
)
def compute_training_dataset(self, label_df, feature_tables: dict):
"""Compute complete training dataset with point-in-time joins."""
result = label_df
for table_name, config in feature_tables.items():
result = self.point_in_time_join(
result,
table_name,
config['features'],
config['timestamp']
)
return result
Online Store (Real-time Features)
The online store is optimized for low-latency, high-throughput feature serving.
import redis
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np
class OnlineFeatureStore:
def __init__(self, redis_host: str, redis_port: int,
redis_db: int = 0):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.ttl_seconds = 86400 * 7 # 7 days default TTL
def _build_key(self, entity_id: str, feature_group: str) -> str:
"""Build Redis key for feature lookup."""
return f"feature:{feature_group}:{entity_id}"
def set_features(self, entity_id: str, feature_group: str,
features: Dict[str, float],
event_timestamp: datetime = None):
"""Store features for an entity."""
key = self._build_key(entity_id, feature_group)
if event_timestamp is None:
event_timestamp = datetime.utcnow()
data = {
'features': features,
'event_timestamp': event_timestamp.isoformat(),
'computed_at': datetime.utcnow().isoformat()
}
# Use pipeline for atomic operations
pipe = self.redis.pipeline()
pipe.setex(key, self.ttl_seconds, json.dumps(data))
pipe.setex(
f"meta:{key}",
self.ttl_seconds,
json.dumps({
'feature_group': feature_group,
'entity_id': entity_id,
'feature_names': list(features.keys())
})
)
pipe.execute()
def get_features(self, entity_id: str, feature_group: str,
feature_names: List[str] = None) -> Optional[Dict]:
"""Retrieve features for an entity."""
key = self._build_key(entity_id, feature_group)
data = self.redis.get(key)
if data is None:
return None
data = json.loads(data)
features = data['features']
if feature_names:
features = {k: v for k, v in features.items()
if k in feature_names}
return {
'features': features,
'event_timestamp': data['event_timestamp'],
'computed_at': data['computed_at']
}
def get_online_features(self, entities: List[Dict]) -> List[Dict]:
"""Batch feature retrieval for multiple entities."""
results = []
pipe = self.redis.pipeline()
keys = []
for entity in entities:
key = self._build_key(
entity['entity_id'],
entity['feature_group']
)
keys.append(key)
pipe.get(key)
raw_results = pipe.execute()
for entity, raw_data in zip(entities, raw_results):
if raw_data:
data = json.loads(raw_data)
features = data['features']
# Apply default values for missing features
defaults = entity.get('defaults', {})
for feat_name, default_val in defaults.items():
if feat_name not in features:
features[feat_name] = default_val
results.append({
'entity_id': entity['entity_id'],
'features': features,
'event_timestamp': data['event_timestamp']
})
else:
results.append({
'entity_id': entity['entity_id'],
'features': entity.get('defaults', {}),
'event_timestamp': None
})
return results
def set_features_with_versioning(self, entity_id: str,
feature_group: str,
features: Dict[str, float],
feature_version: str):
"""Store features with version tracking."""
key = f"feature:{feature_group}:{entity_id}:v{feature_version}"
data = {
'features': features,
'feature_version': feature_version,
'event_timestamp': datetime.utcnow().isoformat()
}
self.redis.setex(key, self.ttl_seconds, json.dumps(data))
# Update version pointer
pointer_key = f"feature:{feature_group}:{entity_id}:current"
self.redis.set(pointer_key, feature_version)
βΉοΈ
Online stores typically target p99 latency < 10ms. Use Redis Cluster for horizontal scaling, or DynamoDB for serverless auto-scaling. Consider feature caching at the application layer for extremely high QPS.
Point-in-Time Correctness
Point-in-time correctness ensures that during training, features are joined with labels using only data that was available at the time of prediction.
The Problem: Data Leakage
# WRONG - Data Leakage Example
# Using future information to predict past events
def data_leakage_example():
"""
BAD: User's transaction history includes transactions AFTER the label
"""
training_data = """
| user_id | label_date | label | avg_transaction_amount |
|---------|------------|-------|------------------------|
| 123 | 2024-01-15 | 1 | 150.00 | # WRONG!
"""
# The avg_transaction_amount on Jan 15 includes transactions from Jan 16+
# This creates data leakage and artificially high accuracy
# CORRECT - Point-in-time Join
def point_in_time_correct():
"""
GOOD: Features are computed using only data available BEFORE the label date
"""
training_data = """
| user_id | label_date | label | avg_transaction_amount |
|---------|------------|-------|------------------------|
| 123 | 2024-01-15 | 1 | 125.00 | # CORRECT
"""
# avg_transaction_amount only includes transactions BEFORE Jan 15
Feast Point-in-Time Join Implementation
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, FeatureView
from feast.data_source import RequestSource
from feast.protos.feast.types.Value_pb2 import Float
import pandas as pd
# Define entity
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User identifier"
)
# Define feature view with offline source
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=365),
features=[
Feature(name="avg_transaction_amount_30d", dtype=Float),
Feature(name="transaction_count_30d", dtype=Float),
Feature(name="days_since_last_transaction", dtype=Float),
],
online=True,
source=FileSource(
path="s3://feature-store/user_features/",
event_timestamp_column="event_timestamp"
)
)
# Point-in-time correct retrieval
store = FeatureStore(repo_path="feature_repo/")
# For training - get historical features
entity_df = pd.DataFrame({
"user_id": [123, 456, 789],
"event_timestamp": pd.to_datetime([
"2024-01-15", "2024-01-16", "2024-01-17"
])
})
# This performs point-in-time correct join automatically
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:avg_transaction_amount_30d",
"user_features:transaction_count_30d",
"user_features:days_since_last_transaction"
]
).to_df()
print(training_df)
Streaming Feature Computation
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import apache_beam as beam
class ComputeTransactionFeatures(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
user_id, transactions = element
# Window-based aggregation
features = {
'transaction_count': len(transactions),
'total_amount': sum(t['amount'] for t in transactions),
'avg_amount': np.mean([t['amount'] for t in transactions]),
'max_amount': max(t['amount'] for t in transactions),
'min_amount': min(t['amount'] for t in transactions),
'std_amount': np.std([t['amount'] for t in transactions]),
}
yield {
'user_id': user_id,
'features': features,
'window_start': window.start.to_utc_datetime().isoformat(),
'window_end': window.end.to_utc_datetime().isoformat()
}
def run_streaming_pipeline():
options = PipelineOptions()
with Pipeline(options=options) as p:
(
p
| "ReadTransactions" >> beam.io.ReadFromKafka(
consumer_config={'bootstrap.servers': 'kafka:9092'},
topics=['transactions']
)
| "ParseJSON" >> beam.Map(lambda x: json.loads(x.value))
| "KeyByUser" >> beam.Map(lambda x: (x['user_id'], x))
| "Window" >> beam.WindowInto(
FixedWindows(300), # 5-minute windows
trigger=AfterWatermark(
early=AfterProcessingTime(60)
),
accumulation_mode=AccumulationMode.DISCARDING
)
| "GroupByUser" >> beam.GroupByKey()
| "ComputeFeatures" >> beam.ParDo(ComputeTransactionFeatures())
| "WriteToRedis" >> beam.ParDo(WriteToRedisFn())
)
β οΈ
Streaming features require careful handling of late data. Use watermarks and triggers to balance between completeness and latency. Consider using accumulation modes (DISCARDING vs ACCUMULATING) based on your use case.
Feature Registry and Discovery
Feature Metadata Schema
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
class FeatureMetadata(BaseModel):
name: str
description: str
data_type: str
entity: str
owner: str
team: str
tags: List[str]
created_at: datetime
updated_at: datetime
version: str
online_serving: bool
offline_serving: bool
freshness_sla_hours: float
backward_compatible: bool
class FeatureGroupMetadata(BaseModel):
name: str
description: str
entity: str
features: List[FeatureMetadata]
source: str
schedule: Optional[str]
dependencies: List[str]
data_quality_checks: List[str]
class FeatureRegistry:
def __init__(self, db_connection):
self.db = db_connection
def register_feature(self, feature: FeatureMetadata):
"""Register a new feature in the metadata store."""
query = """
INSERT INTO features
(name, description, data_type, entity, owner, team, tags,
created_at, updated_at, version, online_serving, offline_serving,
freshness_sla_hours, backward_compatible)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
self.db.execute(query, (
feature.name,
feature.description,
feature.data_type,
feature.entity,
feature.owner,
feature.team,
json.dumps(feature.tags),
feature.created_at,
feature.updated_at,
feature.version,
feature.online_serving,
feature.offline_serving,
feature.freshness_sla_hours,
feature.backward_compatible
))
def search_features(self, query: str = None,
tags: List[str] = None,
entity: str = None,
team: str = None) -> List[FeatureMetadata]:
"""Search for features by various criteria."""
conditions = []
params = []
if query:
conditions.append("(name LIKE ? OR description LIKE ?)")
params.extend([f"%{query}%", f"%{query}%"])
if tags:
conditions.append("tags LIKE ?")
params.extend([f"%{tag}%" for tag in tags])
if entity:
conditions.append("entity = ?")
params.append(entity)
if team:
conditions.append("team = ?")
params.append(team)
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT * FROM features
WHERE {where_clause}
ORDER BY updated_at DESC
"""
results = self.db.execute(sql, params)
return [FeatureMetadata(**row) for row in results]
def get_feature_lineage(self, feature_name: str) -> dict:
"""Get upstream and downstream lineage for a feature."""
query = """
WITH RECURSIVE upstream AS (
SELECT feature_name, upstream_feature, 1 as depth
FROM feature_lineage
WHERE feature_name = ?
UNION ALL
SELECT fl.feature_name, fl.upstream_feature, u.depth + 1
FROM feature_lineage fl
JOIN upstream u ON fl.feature_name = u.upstream_feature
),
downstream AS (
SELECT feature_name, downstream_feature, 1 as depth
FROM feature_lineage
WHERE upstream_feature = ?
UNION ALL
SELECT fl.feature_name, fl.downstream_feature, d.depth + 1
FROM feature_lineage fl
JOIN downstream d ON fl.feature_name = d.downstream_feature
)
SELECT * FROM upstream
UNION ALL
SELECT * FROM downstream
"""
results = self.db.execute(query, (feature_name, feature_name))
return {
'upstream': [r for r in results if r['feature_name'] == feature_name],
'downstream': [r for r in results if r['upstream_feature'] == feature_name]
}
Feature Transform Patterns
Batch Transform with Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
class BatchFeatureTransform:
def __init__(self, spark: SparkSession):
self.spark = spark
def compute_user_features(self, transactions_df,
window_days: int = 30):
"""Compute user-level features from transaction data."""
# Window aggregation
window_spec = F.window(
"transaction_time",
f"{window_days} days"
)
user_features = transactions_df \
.groupBy("user_id", window_spec) \
.agg(
F.count("*").alias("transaction_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
F.stddev("amount").alias("std_amount"),
F.max("amount").alias("max_amount"),
F.min("amount").alias("min_amount"),
F.countDistinct("merchant_id").alias("unique_merchants"),
F.collect_set("category").alias("categories"),
F.expr("percentile_approx(amount, 0.5)").alias("median_amount"),
) \
.withColumn("amount_range",
F.col("max_amount") - F.col("min_amount")) \
.withColumn("avg_amount_per_merchant",
F.col("total_amount") / F.col("unique_merchants"))
# Time since last transaction
user_features = user_features \
.withColumn("hours_since_last",
F.unix_timestamp("window.end") -
F.unix_timestamp("last_transaction_time"))
return user_features
def compute_session_features(self, events_df,
session_timeout_minutes: int = 30):
"""Compute session-level features from event data."""
# Sessionize events
session_window = F.session_window(
"event_time",
f"{session_timeout_minutes} minutes"
)
session_features = events_df \
.groupBy("user_id", session_window) \
.agg(
F.count("*").alias("session_length"),
F.countDistinct("page").alias("unique_pages"),
F.sum(F.when(F.col("event_type") == "click", 1).otherwise(0))
.alias("click_count"),
F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0))
.alias("purchase_count"),
F.first("page").alias("landing_page"),
F.last("page").alias("exit_page"),
F.avg("time_on_page").alias("avg_time_on_page"),
F.min("event_time").alias("session_start"),
F.max("event_time").alias("session_end"),
) \
.withColumn("session_duration_seconds",
F.unix_timestamp("session_end") -
F.unix_timestamp("session_start")) \
.withColumn("click_through_rate",
F.col("click_count") / F.col("session_length")) \
.withColumn("conversion_rate",
F.col("purchase_count") / F.col("session_length"))
return session_features
Real-time Feature Transform with Flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
import json
class RealTimeFeatureTransform:
def __init__(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.t_env = StreamTableEnvironment.create(self.env)
def define_feature_streams(self):
"""Define streaming feature computation."""
# Source: Kafka transactions
self.t_env.execute_sql("""
CREATE TABLE transactions (
user_id BIGINT,
amount DOUBLE,
merchant_id STRING,
category STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Sink: Redis for online serving
self.t_env.execute_sql("""
CREATE TABLE user_features_sink (
user_id BIGINT,
feature_window_start TIMESTAMP(3),
feature_window_end TIMESTAMP(3),
transaction_count BIGINT,
total_amount DOUBLE,
avg_amount DOUBLE,
max_amount DOUBLE,
min_amount DOUBLE,
std_amount DOUBLE,
PRIMARY KEY (user_id, feature_window_start) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = 'redis',
'port' = '6379',
'command' = 'HSET'
)
""")
# Feature computation query
self.t_env.execute_sql("""
INSERT INTO user_features_sink
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as feature_window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as feature_window_end,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MAX(amount) as max_amount,
MIN(amount) as min_amount,
STDDEV(amount) as std_amount
FROM transactions
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE)
""")
βΉοΈ
For sub-10ms feature serving, pre-compute features in batch and store in Redis. For features requiring freshness < 1 minute, use streaming computation with Flink or Kafka Streams.
Feature Store Comparison
| Feature | Feast | Tecton | Hopsworks | AWS SageMaker |
|---|---|---|---|---|
| Offline Store | Parquet/Delta/S3 | Databricks/S3 | Hopsworks | S3 |
| Online Store | Redis/DynamoDB | DynamoDB/Redis | Hopsworks | DynamoDB |
| Point-in-time | β | β | β | β |
| Streaming | Via Spark/Flink | Native | Via Kafka | Native |
| Open Source | β | β | Partial | β |
| Managed | β | β | β | β |
Summary
Feature stores are critical infrastructure for production ML:
- Offline Store: Batch features for training with Delta Lake/Parquet
- Online Store: Low-latency serving with Redis/DynamoDB
- Point-in-time Correctness: Prevent data leakage in training
- Feature Registry: Centralized metadata and discovery
- Streaming Features: Real-time computation with Flink/Kafka Streams
Master these patterns to build reliable feature infrastructure at scale.