The Interview Question
βΉοΈ
Question: You're building a recommendation engine for a streaming platform:
- Dataset: 100M ratings from 1M users on 10K movies
- Requirements: Real-time recommendations, handle cold start, explainable recommendations
Walk through your recommendation system design:
- How do you implement collaborative filtering at scale?
- How do you handle the cold start problem?
- How do you evaluate recommendation quality?
- How do you ensure diversity and serendipity in recommendations?
Detailed Answer
1. Collaborative Filtering Implementation
import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import svals
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
import implicit
from collections import defaultdict
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
class CollaborativeFiltering:
"""Collaborative filtering recommendation system"""
def __init__(self, ratings_df):
self.ratings = ratings_df
self.user_item_matrix = None
self.similarity_matrix = None
def create_user_item_matrix(self):
"""Create sparse user-item matrix"""
# Create mappings
self.user_ids = self.ratings['user_id'].unique()
self.item_ids = self.ratings['movie_id'].unique()
self.user_to_idx = {uid: idx for idx, uid in enumerate(self.user_ids)}
self.item_to_idx = {iid: idx for idx, iid in enumerate(self.item_ids)}
self.idx_to_user = {idx: uid for uid, idx in self.user_to_idx.items()}
self.idx_to_item = {idx: iid for iid, idx in self.item_to_idx.items()}
# Create sparse matrix
rows = self.ratings['user_id'].map(self.user_to_idx)
cols = self.ratings['movie_id'].map(self.item_to_idx)
values = self.ratings['rating']
self.user_item_matrix = csr_matrix(
(values, (rows, cols)),
shape=(len(self.user_ids), len(self.item_ids))
)
print(f"User-Item Matrix: {self.user_item_matrix.shape}")
print(f"Sparsity: {1 - self.user_item_matrix.nnz / (self.user_item_matrix.shape[0] * self.user_item_matrix.shape[1]):.4f}")
return self.user_item_matrix
def user_based_cf(self, user_id, n_recommendations=10):
"""User-based collaborative filtering"""
if self.similarity_matrix is None:
# Calculate user similarity using cosine similarity
self.similarity_matrix = cosine_similarity(self.user_item_matrix)
user_idx = self.user_to_idx[user_id]
# Get similarity scores for target user
user_similarities = self.similarity_matrix[user_idx]
# Get items rated by target user
user_ratings = self.user_item_matrix[user_idx].toarray().flatten()
rated_items = np.where(user_ratings > 0)[0]
# Find similar users who rated unrated items
recommendations = {}
for item_idx in range(self.user_item_matrix.shape[1]):
if item_idx not in rated_items:
# Calculate predicted rating
numerator = 0
denominator = 0
for other_idx in range(self.user_item_matrix.shape[0]):
if other_idx != user_idx:
other_rating = self.user_item_matrix[other_idx, item_idx]
if other_rating > 0:
numerator += user_similarities[other_idx] * other_rating
denominator += abs(user_similarities[other_idx])
if denominator > 0:
predicted_rating = numerator / denominator
recommendations[self.idx_to_item[item_idx]] = predicted_rating
# Sort and return top N
sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
return sorted_recs[:n_recommendations]
def item_based_cf(self, user_id, n_recommendations=10):
"""Item-based collaborative filtering"""
# Calculate item similarity
item_similarity = cosine_similarity(self.user_item_matrix.T)
user_idx = self.user_to_idx[user_id]
user_ratings = self.user_item_matrix[user_idx].toarray().flatten()
# Get items rated by user
rated_items = np.where(user_ratings > 0)[0]
# Calculate predicted ratings for unrated items
recommendations = {}
for item_idx in range(self.user_item_matrix.shape[1]):
if item_idx not in rated_items:
# Weighted average of similar items
numerator = 0
denominator = 0
for rated_idx in rated_items:
similarity = item_similarity[item_idx, rated_idx]
rating = user_ratings[rated_idx]
numerator += similarity * rating
denominator += abs(similarity)
if denominator > 0:
predicted_rating = numerator / denominator
recommendations[self.idx_to_item[item_idx]] = predicted_rating
sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
return sorted_recs[:n_recommendations]
def matrix_factorization_svd(self, n_factors=100, n_recommendations=10):
"""Matrix factorization using SVD"""
# Perform SVD
svd = TruncatedSVD(n_components=n_factors, random_state=42)
user_factors = svd.fit_transform(self.user_item_matrix)
item_factors = svd.components_
# Reconstruct the rating matrix
reconstructed = np.dot(user_factors, item_factors)
return user_factors, item_factors, reconstructed
def als_implicit(self, user_id, n_recommendations=10, factors=100, regularization=0.01, iterations=20):
"""Alternating Least Squares for implicit feedback"""
# Convert to implicit format
model = implicit.als.AlternatingLeastSquares(
factors=factors,
regularization=regularization,
iterations=iterations,
random_state=42
)
# Train model (items_users format for implicit library)
model.fit(self.user_item_matrix.T)
# Get recommendations
user_idx = self.user_to_idx[user_id]
recommendations = model.recommend(
user_idx,
self.user_item_matrix[user_idx],
N=n_recommendations
)
# Convert back to item IDs
rec_items = [(self.idx_to_item[idx], score) for idx, score in zip(recommendations[0], recommendations[1])]
return rec_items
# Example usage
# cf = CollaborativeFiltering(ratings_df)
# cf.create_user_item_matrix()
# recommendations = cf.user_based_cf(user_id=123, n_recommendations=10)
2. Content-Based Filtering
class ContentBasedRecommender:
"""Content-based recommendation system"""
def __init__(self, items_df):
self.items = items_df
self.item_similarity = None
self.feature_matrix = None
def create_feature_matrix(self, text_columns, categorical_columns):
"""Create item feature matrix"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
# Text features using TF-IDF
text_transformer = Pipeline([
('tfidf', TfidfVectorizer(max_features=1000, stop_words='english'))
])
# Categorical features
categorical_transformer = Pipeline([
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Combine features
preprocessor = ColumnTransformer([
('text', text_transformer, text_columns[0] if text_columns else []),
('categorical', categorical_transformer, categorical_columns)
])
# Fit and transform
self.feature_matrix = preprocessor.fit_transform(self.items)
# Calculate item similarity
self.item_similarity = cosine_similarity(self.feature_matrix)
print(f"Feature matrix shape: {self.feature_matrix.shape}")
return self.feature_matrix
def recommend_from_item(self, item_id, n_recommendations=10):
"""Recommend items similar to a given item"""
item_idx = self.items.index.get_loc(item_id)
# Get similarity scores
similar_scores = list(enumerate(self.item_similarity[item_idx]))
similar_scores = sorted(similar_scores, key=lambda x: x[1], reverse=True)
# Get top N similar items (excluding itself)
top_similar = similar_scores[1:n_recommendations + 1]
recommendations = [
(self.items.index[idx], score) for idx, score in top_similar
]
return recommendations
def recommend_for_user(self, user_history, n_recommendations=10):
"""Recommend based on user's viewing history"""
# Get items user has interacted with
user_items = self.items[self.items.index.isin(user_history)]
if len(user_items) == 0:
return []
# Calculate user profile as weighted average of item features
user_indices = [self.items.index.get_loc(idx) for idx in user_history]
user_profile = self.feature_matrix[user_indices].mean(axis=0)
# Calculate similarity between user profile and all items
user_profile_dense = user_profile.toarray() if hasattr(user_profile, 'toarray') else user_profile
similarities = cosine_similarity(user_profile_dense, self.feature_matrix)[0]
# Get top N recommendations (excluding already seen items)
similar_scores = list(enumerate(similarities))
similar_scores = sorted(similar_scores, key=lambda x: x[1], reverse=True)
recommendations = []
for idx, score in similar_scores:
if self.items.index[idx] not in user_history:
recommendations.append((self.items.index[idx], score))
if len(recommendations) >= n_recommendations:
break
return recommendations
def get_feature_importance(self, item_id, top_n=5):
"""Get most important features for an item"""
item_idx = self.items.index.get_loc(item_id)
# Get item features
item_features = self.feature_matrix[item_idx].toarray().flatten()
# Get feature names
feature_names = self.get_feature_names()
# Sort by importance
feature_importance = list(zip(feature_names, item_features))
feature_importance.sort(key=lambda x: x[1], reverse=True)
return feature_importance[:top_n]
def get_feature_names(self):
"""Get feature names from the preprocessor"""
# This depends on how the preprocessor was configured
# Simplified version
return [f'feature_{i}' for i in range(self.feature_matrix.shape[1])]
# Example usage
# content_rec = ContentBasedRecommender(movies_df)
# content_rec.create_feature_matrix(['description'], ['genre', 'director'])
# recommendations = content_rec.recommend_from_item(movie_id=123)
3. Hybrid Recommendation System
class HybridRecommender:
"""Hybrid recommendation combining multiple approaches"""
def __init__(self, cf_recommender, content_recommender):
self.cf = cf_recommender
self.content = content_recommender
self.weights = {'cf': 0.6, 'content': 0.4}
def recommend_hybrid(self, user_id, user_history, n_recommendations=10):
"""Generate hybrid recommendations"""
# Get collaborative filtering recommendations
cf_recs = self.cf.user_based_cf(user_id, n_recommendations * 2)
cf_dict = {item: score for item, score in cf_recs}
# Get content-based recommendations
content_recs = self.content.recommend_for_user(user_history, n_recommendations * 2)
content_dict = {item: score for item, score in content_recs}
# Combine scores
all_items = set(list(cf_dict.keys()) + list(content_dict.keys()))
hybrid_scores = {}
for item in all_items:
cf_score = cf_dict.get(item, 0)
content_score = content_dict.get(item, 0)
# Weighted combination
hybrid_score = (self.weights['cf'] * cf_score +
self.weights['content'] * content_score)
hybrid_scores[item] = hybrid_score
# Sort and get top N
sorted_recs = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_recs[:n_recommendations]
def update_weights(self, validation_data):
"""Optimize weights using validation data"""
from scipy.optimize import minimize
def objective(weights):
self.weights['cf'] = weights[0]
self.weights['content'] = 1 - weights[0]
# Calculate NDCG on validation data
ndcg = self.evaluate_on_validation(validation_data)
return -ndcg # Minimize negative NDCG
# Optimize
result = minimize(objective, [0.5], bounds=[(0, 1)])
self.weights['cf'] = result.x[0]
self.weights['content'] = 1 - result.x[0]
print(f"Optimized weights: CF={self.weights['cf']:.3f}, Content={self.weights['content']:.3f}")
def evaluate_on_validation(self, validation_data):
"""Evaluate recommendations on validation data"""
ndcg_scores = []
for user_id, true_items in validation_data.items():
# Get recommendations
user_history = self.get_user_history(user_id)
recommendations = self.recommend_hybrid(user_id, user_history, n_recommendations=10)
rec_items = [item for item, score in recommendations]
# Calculate NDCG
ndcg = self.calculate_ndcg(true_items, rec_items)
ndcg_scores.append(ndcg)
return np.mean(ndcg_scores)
def calculate_ndcg(self, true_items, predicted_items, k=10):
"""Calculate NDCG@k"""
# DCG
dcg = 0
for i, item in enumerate(predicted_items[:k]):
if item in true_items:
dcg += 1 / np.log2(i + 2)
# Ideal DCG
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(true_items), k)))
if ideal_dcg == 0:
return 0
return dcg / ideal_dcg
def get_user_history(self, user_id):
"""Get user's interaction history"""
# This would typically come from a database
# Simplified version
return []
# Example usage
# hybrid = HybridRecommender(cf, content_rec)
# recommendations = hybrid.recommend_hybrid(user_id=123, user_history=[1, 2, 3])
4. Cold Start Solutions
class ColdStartHandler:
"""Handle cold start problems in recommendations"""
def __init__(self, items_df, popular_items):
self.items = items_df
self.popular_items = popular_items
def new_user_recommendations(self, n_recommendations=10):
"""Recommendations for new users (no history)"""
# Strategy 1: Popular items
recommendations = self.popular_items[:n_recommendations]
return recommendations
def new_user_with_demographics(self, demographics, n_recommendations=10):
"""Recommendations using demographic information"""
# Find similar users based on demographics
# Recommend what similar users liked
# Simplified: use demographic-based rules
if demographics.get('age', 0) < 25:
# Young users tend to like certain genres
recommendations = self.items[
self.items['genre'].isin(['Action', 'Comedy', 'Sci-Fi'])
].nlargest(n_recommendations, 'popularity')
elif demographics.get('age', 0) > 40:
# Older users tend to like different genres
recommendations = self.items[
self.items['genre'].isin(['Drama', 'Documentary', 'Classic'])
].nlargest(n_recommendations, 'popularity')
else:
recommendations = self.popular_items[:n_recommendations]
return recommendations
def new_item_recommendations(self, item_features, n_recommendations=10):
"""Recommendations for new items (no interactions)"""
# Strategy 1: Content-based similarity
# Find similar existing items
# Strategy 2: Use item features to find target audience
# Recommend to users who liked similar items
# Simplified: find similar items by content
similarities = []
for existing_idx, existing_item in self.items.iterrows():
similarity = self.calculate_item_similarity(item_features, existing_item)
similarities.append((existing_idx, similarity))
# Get most similar items
similarities.sort(key=lambda x: x[1], reverse=True)
similar_items = similarities[:n_recommendations]
return similar_items
def calculate_item_similarity(self, item1_features, item2_features):
"""Calculate similarity between two items"""
# This would use actual feature comparison
# Simplified version
return 0.5
def handle_new_item_with_metadata(self, item_metadata):
"""Use metadata to make new item recommendations"""
# Extract features from metadata
genre = item_metadata.get('genre', 'Unknown')
director = item_metadata.get('director', 'Unknown')
year = item_metadata.get('year', 2020)
# Find similar items
similar_items = self.items[
(self.items['genre'] == genre) |
(self.items['director'] == director)
].head(10)
return similar_items
# Example usage
# cold_start = ColdStartHandler(movies_df, popular_movies)
# new_user_recs = cold_start.new_user_recommendations()
# new_item_recs = cold_start.new_item_recommendations(new_movie_features)
5. Evaluation Metrics
class RecommenderEvaluator:
"""Evaluate recommendation system performance"""
def __init__(self, test_data, recommendations):
self.test_data = test_data
self.recommendations = recommendations
def precision_at_k(self, k=10):
"""Precision@K"""
precisions = []
for user_id, true_items in self.test_data.items():
if user_id in self.recommendations:
rec_items = [item for item, score in self.recommendations[user_id][:k]]
relevant_and_recommended = len(set(true_items) & set(rec_items))
precision = relevant_and_recommended / k
precisions.append(precision)
return np.mean(precisions) if precisions else 0
def recall_at_k(self, k=10):
"""Recall@K"""
recalls = []
for user_id, true_items in self.test_data.items():
if user_id in self.recommendations and true_items:
rec_items = [item for item, score in self.recommendations[user_id][:k]]
relevant_and_recommended = len(set(true_items) & set(rec_items))
recall = relevant_and_recommended / len(true_items)
recalls.append(recall)
return np.mean(recalls) if recalls else 0
def ndcg_at_k(self, k=10):
"""Normalized Discounted Cumulative Gain@K"""
ndcg_scores = []
for user_id, true_items in self.test_data.items():
if user_id in self.recommendations:
rec_items = [item for item, score in self.recommendations[user_id][:k]]
# DCG
dcg = 0
for i, item in enumerate(rec_items):
if item in true_items:
dcg += 1 / np.log2(i + 2)
# Ideal DCG
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(true_items), k)))
if ideal_dcg > 0:
ndcg_scores.append(dcg / ideal_dcg)
return np.mean(ndcg_scores) if ndcg_scores else 0
def map_at_k(self, k=10):
"""Mean Average Precision@K"""
ap_scores = []
for user_id, true_items in self.test_data.items():
if user_id in self.recommendations and true_items:
rec_items = [item for item, score in self.recommendations[user_id][:k]]
# Calculate AP
precision_sum = 0
relevant_count = 0
for i, item in enumerate(rec_items):
if item in true_items:
relevant_count += 1
precision_sum += relevant_count / (i + 1)
ap = precision_sum / min(len(true_items), k) if true_items else 0
ap_scores.append(ap)
return np.mean(ap_scores) if ap_scores else 0
def mrr(self):
"""Mean Reciprocal Rank"""
rr_scores = []
for user_id, true_items in self.test_data.items():
if user_id in self.recommendations:
rec_items = [item for item, score in self.recommendations[user_id]]
# Find first relevant item
for i, item in enumerate(rec_items):
if item in true_items:
rr_scores.append(1 / (i + 1))
break
return np.mean(rr_scores) if rr_scores else 0
def coverage(self, all_items):
"""Catalog coverage"""
recommended_items = set()
for user_id, recs in self.recommendations.items():
for item, score in recs:
recommended_items.add(item)
return len(recommended_items) / len(all_items)
def diversity(self, recommendations):
"""Recommendation diversity"""
if len(recommendations) < 2:
return 0
# Calculate pairwise dissimilarity
items = [item for item, score in recommendations]
# This would use actual item similarity
# Simplified version
return 0.5
def evaluate_all(self, all_items):
"""Run all evaluation metrics"""
results = {
'precision_at_10': self.precision_at_k(10),
'recall_at_10': self.recall_at_k(10),
'ndcg_at_10': self.ndcg_at_k(10),
'map_at_10': self.map_at_k(10),
'mrr': self.mrr(),
'coverage': self.coverage(all_items)
}
return results
# Example usage
# evaluator = RecommenderEvaluator(test_data, recommendations)
# results = evaluator.evaluate_all(all_items)
π‘
Pro Tip: Balance between accuracy and diversity. Highly accurate recommendations can create filter bubbles. Consider adding diversity metrics and exploration strategies.
6. Common Follow-Up Questions
Follow-up 1: How do you handle scalability for millions of users?
class ScalableRecommender:
"""Scalable recommendation system"""
def __init__(self):
self.index = None
def build_ann_index(self, item_factors):
"""Build Approximate Nearest Neighbor index"""
import faiss
# Normalize vectors
faiss.normalize_L2(item_factors)
# Build index
dimension = item_factors.shape[1]
n_items = item_factors.shape[0]
# Use IVF index for large datasets
nlist = int(np.sqrt(n_items)) # Number of clusters
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
# Train and add vectors
self.index.train(item_factors)
self.index.add(item_factors)
# Set search parameters
self.index.nprobe = 10 # Number of clusters to search
return self.index
def recommend_ann(self, user_factor, k=10):
"""Fast recommendations using ANN"""
import faiss
# Normalize query
query = user_factor.reshape(1, -1).astype('float32')
faiss.normalize_L2(query)
# Search
distances, indices = self.index.search(query, k)
return indices[0], distances[0]
def distributed_recommendations(self, user_ids, n_workers=4):
"""Distribute recommendation computation"""
from multiprocessing import Pool
# Split users across workers
user_chunks = np.array_split(user_ids, n_workers)
with Pool(n_workers) as pool:
results = pool.map(self.recommend_batch, user_chunks)
# Combine results
all_recommendations = {}
for result in results:
all_recommendations.update(result)
return all_recommendations
def recommend_batch(self, user_ids):
"""Recommend for a batch of users"""
recommendations = {}
for user_id in user_ids:
# Get user recommendations
recs = self.recommend_user(user_id)
recommendations[user_id] = recs
return recommendations
# Example usage
# scalable = ScalableRecommender()
# scalable.build_ann_index(item_factors)
# fast_recs = scalable.recommend_ann(user_factor, k=10)
Company-Specific Tips
βΉοΈ
Netflix Tips:
- Netflix uses a combination of collaborative filtering, content-based, and deep learning
- Focus on metrics like NDCG and watch time
- Understand how to handle the cold start for new content
- Be familiar with A/B testing for recommendation algorithms
Amazon Tips:
- Amazon values real-time recommendations
- Know how to implement "Customers who bought this also bought"
- Understand item-to-item collaborative filtering
- Be comfortable with large-scale matrix factorization
Quiz Section
Related Topics
- Collaborative Filtering β User similarity for clustering
- Matrix Factorization β Dimensionality reduction techniques
- Deep Learning for RecSys β Neural network approaches
- A/B Testing RecSys β Evaluating recommendation algorithms