The Interview Question
βΉοΈ
Question: You're designing an experiment to test whether a new recommendation algorithm increases user engagement on a streaming platform:
- Current average watch time: 45 minutes/day
- Minimum detectable effect: 5% increase (2.25 minutes)
- Standard deviation: 12 minutes
- Significance level: 0.05
- Desired power: 0.80
Walk through your complete experiment design:
- Calculate the required sample size
- Design the randomization scheme
- Define metrics and guardrail metrics
- How do you handle network effects and interference?
Detailed Answer
1. Sample Size Calculation
Sample size determination is critical for detecting meaningful effects while avoiding wasted resources.
import numpy as np
from scipy import stats
from typing import Tuple, Dict
import matplotlib.pyplot as plt
class SampleSizeCalculator:
"""Calculate sample size for various experimental designs"""
def __init__(self, alpha: float = 0.05, power: float = 0.80):
self.alpha = alpha
self.power = power
def two_sample_t_test(self, effect_size: float,
std_dev: float,
ratio: float = 1.0) -> Dict:
"""
Calculate sample size for two-sample t-test
Parameters:
-----------
effect_size : float
Minimum detectable effect (absolute)
std_dev : float
Standard deviation of the metric
ratio : float
Ratio of treatment to control group sizes
"""
# Z-scores for alpha and power
z_alpha = stats.norm.ppf(1 - self.alpha / 2) # Two-tailed
z_beta = stats.norm.ppf(self.power)
# Sample size formula
n_control = ((z_alpha + z_beta) ** 2 * std_dev ** 2 * (1 + 1/ratio)) / (effect_size ** 2)
n_treatment = n_control * ratio
return {
'n_control': int(np.ceil(n_control)),
'n_treatment': int(np.ceil(n_treatment)),
'total': int(np.ceil(n_control + n_treatment)),
'effect_size': effect_size,
'relative_effect': effect_size / 45 * 100, # Assuming baseline of 45
'power': self.power,
'alpha': self.alpha
}
def two_sample_proportion(self, p_control: float,
mde: float,
ratio: float = 1.0) -> Dict:
"""
Calculate sample size for comparing two proportions
Parameters:
-----------
p_control : float
Expected proportion in control group
mde : float
Minimum detectable effect (relative change)
"""
p_treatment = p_control * (1 + mde)
# Pooled proportion
p_pooled = (p_control + p_treatment) / 2
# Z-scores
z_alpha = stats.norm.ppf(1 - self.alpha / 2)
z_beta = stats.norm.ppf(self.power)
# Sample size formula for proportions
n_control = ((z_alpha + z_beta) ** 2 *
(p_pooled * (1 - p_pooled) + p_treatment * (1 - p_treatment) / ratio)) / \
((p_treatment - p_control) ** 2)
n_treatment = n_control * ratio
return {
'n_control': int(np.ceil(n_control)),
'n_treatment': int(np.ceil(n_treatment)),
'total': int(np.ceil(n_control + n_treatment)),
'p_control': p_control,
'p_treatment': p_treatment,
'absolute_effect': p_treatment - p_control,
'relative_effect': mde * 100
}
def plot_power_curve(self, effect_sizes: np.ndarray,
std_dev: float,
sample_sizes: np.ndarray):
"""Visualize power as function of effect size and sample size"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Plot 1: Power vs Effect Size for different sample sizes
for n in [1000, 5000, 10000, 50000]:
powers = []
for effect in effect_sizes:
z_alpha = stats.norm.ppf(1 - self.alpha / 2)
z_beta_calc = effect * np.sqrt(n / 2) / std_dev - z_alpha
power = stats.norm.cdf(z_beta_calc)
powers.append(power)
axes[0].plot(effect_sizes, powers, label=f'n={n}', linewidth=2)
axes[0].axhline(y=self.power, color='gray', linestyle='--', label=f'{self.power*100}% power')
axes[0].set_xlabel('Effect Size (minutes)')
axes[0].set_ylabel('Power')
axes[0].set_title('Power vs Effect Size')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Plot 2: Required Sample Size vs Effect Size
required_n = []
for effect in effect_sizes:
result = self.two_sample_t_test(effect, std_dev)
required_n.append(result['total'])
axes[1].plot(effect_sizes, required_n, linewidth=2)
axes[1].set_xlabel('Effect Size (minutes)')
axes[1].set_ylabel('Required Total Sample Size')
axes[1].set_title('Sample Size Requirements')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('power_analysis.png', dpi=150, bbox_inches='tight')
plt.show()
# Calculate sample size for the given problem
calculator = SampleSizeCalculator(alpha=0.05, power=0.80)
# Effect size = 5% of 45 minutes = 2.25 minutes
effect_size = 2.25
std_dev = 12
result = calculator.two_sample_t_test(effect_size, std_dev)
print("Sample Size Calculation")
print("=" * 60)
print(f"Effect size: {effect_size} minutes ({result['relative_effect']:.1f}%)")
print(f"Standard deviation: {std_dev} minutes")
print(f"Significance level: {result['alpha']}")
print(f"Power: {result['power']}")
print(f"\nRequired sample sizes:")
print(f" Control group: {result['n_control']:,}")
print(f" Treatment group: {result['n_treatment']:,}")
print(f" Total: {result['total']:,}")
2. Randomization and Experimental Design
import pandas as pd
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
@dataclass
class ExperimentalUnit:
"""Represents a user or entity in the experiment"""
user_id: str
characteristics: Dict
assignment: Optional[str] = None
class ExperimentalDesign:
"""Comprehensive experimental design framework"""
def __init__(self, experiment_name: str, n_users: int):
self.experiment_name = experiment_name
self.n_users = n_users
self.assignments = {}
self.strata = {}
def simple_randomization(self, treatment_prob: float = 0.5) -> Dict[str, str]:
"""Simple random assignment"""
assignments = {}
for i in range(self.n_users):
user_id = f"user_{i:06d}"
assignments[user_id] = 'treatment' if np.random.random() < treatment_prob else 'control'
self.assignments = assignments
return assignments
def stratified_randomization(self, strata: Dict[str, List[str]],
treatment_prob: float = 0.5) -> Dict[str, str]:
"""Stratified randomization to ensure balance"""
assignments = {}
for stratum_name, user_ids in strata.items():
self.strata[stratum_name] = user_ids
# Randomize within each stratum
n_treatment = int(len(user_ids) * treatment_prob)
treatment_users = np.random.choice(user_ids, size=n_treatment, replace=False)
for user_id in user_ids:
assignments[user_id] = 'treatment' if user_id in treatment_users else 'control'
self.assignments = assignments
return assignments
def cluster_randomization(self, clusters: Dict[str, List[str]],
treatment_clusters: List[str]) -> Dict[str, str]:
"""Cluster randomization (e.g., by geographic region)"""
assignments = {}
for cluster_id, user_ids in clusters.items():
assignment = 'treatment' if cluster_id in treatment_clusters else 'control'
for user_id in user_ids:
assignments[user_id] = assignment
self.assignments = assignments
return assignments
def covariate_balanced_randomization(self, covariates: pd.DataFrame,
treatment_prob: float = 0.5) -> Dict[str, str]:
"""Randomization with covariate balancing"""
from sklearn.linear_model import LogisticRegression
# Fit propensity score model
X = covariates.values
y = np.random.binomial(1, treatment_prob, len(covariates))
model = LogisticRegression()
model.fit(X, y)
# Get propensity scores
propensity_scores = model.predict_proba(X)[:, 1]
# Rank and assign based on propensity scores
assignments = {}
sorted_indices = np.argsort(propensity_scores)
n_treatment = int(len(covariates) * treatment_prob)
treatment_indices = sorted_indices[:n_treatment]
for i, user_id in enumerate(covariates.index):
assignments[user_id] = 'treatment' if i in treatment_indices else 'control'
self.assignments = assignments
return assignments
def validate_balance(self, covariates: pd.DataFrame) -> Dict:
"""Validate randomization balance"""
assignments_df = pd.DataFrame({
'user_id': self.assignments.keys(),
'assignment': self.assignments.values()
})
# Merge with covariates
df = covariates.merge(assignments_df, left_index=True, right_on='user_id')
balance_results = {}
for covariate in covariates.columns:
treatment_values = df[df['assignment'] == 'treatment'][covariate]
control_values = df[df['assignment'] == 'control'][covariate]
# Calculate standardized mean difference
pooled_std = np.sqrt((treatment_values.std()**2 + control_values.std()**2) / 2)
smd = abs(treatment_values.mean() - control_values.mean()) / pooled_std
balance_results[covariate] = {
'treatment_mean': treatment_values.mean(),
'control_mean': control_values.mean(),
'smd': smd,
'balanced': smd < 0.1 # Common threshold
}
# Overall balance
all_smd = [r['smd'] for r in balance_results.values()]
balance_results['overall'] = {
'mean_smd': np.mean(all_smd),
'max_smd': np.max(all_smd),
'balanced': np.mean(all_smd) < 0.1
}
return balance_results
def calculate_required_duration(self, daily_users: int,
seasonality: bool = True) -> Dict:
"""Calculate required experiment duration"""
total_sample = len(self.assignments)
# Basic duration
basic_days = total_sample / daily_users
# Add buffer for seasonality (2 full weeks recommended)
if seasonality:
duration = max(basic_days, 14) # Minimum 2 weeks
else:
duration = basic_days
# Weekend effect
weeks = int(np.ceil(duration / 7))
total_days = weeks * 7
return {
'daily_users': daily_users,
'total_sample_needed': total_sample,
'basic_duration_days': basic_days,
'recommended_duration_days': total_days,
'weeks': weeks,
'start_date': datetime.now().strftime('%Y-%m-%d'),
'end_date': (datetime.now() + timedelta(days=total_days)).strftime('%Y-%m-%d')
}
# Example usage
design = ExperimentalDesign('recommendation_algorithm_v2', n_users=100000)
# Stratified randomization
strata = {
'new_users': [f'user_{i:06d}' for i in range(0, 20000)],
'power_users': [f'user_{i:06d}' for i in range(20000, 50000)],
'casual_users': [f'user_{i:06d}' for i in range(50000, 100000)]
}
assignments = design.stratified_randomization(strata, treatment_prob=0.5)
# Check balance
covariates = pd.DataFrame({
'age': np.random.normal(35, 10, 100000),
'tenure_days': np.random.exponential(365, 100000),
'previous_purchases': np.random.poisson(10, 100000)
}, index=[f'user_{i:06d}' for i in range(100000)])
balance = design.validate_balance(covariates)
print("Randomization Balance Check:")
print(f"Overall mean SMD: {balance['overall']['mean_smd']:.4f}")
print(f"Balanced: {balance['overall']['balanced']}")
# Calculate duration
duration = design.calculate_required_duration(daily_users=50000)
print(f"\nExperiment Duration:")
print(f"Required days: {duration['recommended_duration_days']}")
print(f"End date: {duration['end_date']}")
3. Metrics and Guardrail Metrics
@dataclass
class ExperimentMetrics:
"""Define and track experiment metrics"""
primary_metric: str
secondary_metrics: List[str]
guardrail_metrics: List[str]
class MetricsFramework:
"""Comprehensive metrics framework for experiments"""
def __init__(self):
self.metrics = {}
self.results = {}
def define_metrics(self, experiment_type: str) -> ExperimentMetrics:
"""Define appropriate metrics based on experiment type"""
metric_definitions = {
'recommendation': ExperimentMetrics(
primary_metric='watch_time_per_session',
secondary_metrics=[
'session_duration',
'content_completion_rate',
'user_satisfaction_score',
'return_rate_7d'
],
guardrail_metrics=[
'error_rate',
'latency_p99',
'crash_rate',
'negative_feedback_rate'
]
),
'pricing': ExperimentMetrics(
primary_metric='revenue_per_user',
secondary_metrics=[
'conversion_rate',
'average_order_value',
'cart_abandonment_rate',
'customer_lifetime_value'
],
guardrail_metrics=[
'support_ticket_rate',
'refund_rate',
'negative_reviews'
]
),
'ui_change': ExperimentMetrics(
primary_metric='task_completion_rate',
secondary_metrics=[
'time_on_task',
'click_through_rate',
'user_satisfaction_score'
],
guardrail_metrics=[
'error_rate',
'bounce_rate',
'help_page_visits'
]
)
}
return metric_definitions.get(experiment_type)
def calculate_metrics(self, data: pd.DataFrame,
metrics: ExperimentMetrics) -> Dict:
"""Calculate all experiment metrics"""
results = {}
# Primary metric
if metrics.primary_metric in data.columns:
results['primary'] = {
'metric': metrics.primary_metric,
'treatment_mean': data[data['assignment'] == 'treatment'][metrics.primary_metric].mean(),
'control_mean': data[data['assignment'] == 'control'][metrics.primary_metric].mean(),
}
results['primary']['lift'] = (
(results['primary']['treatment_mean'] - results['primary']['control_mean']) /
results['primary']['control_mean'] * 100
)
# Secondary metrics
results['secondary'] = {}
for metric in metrics.secondary_metrics:
if metric in data.columns:
treatment_val = data[data['assignment'] == 'treatment'][metric].mean()
control_val = data[data['assignment'] == 'control'][metric].mean()
results['secondary'][metric] = {
'treatment_mean': treatment_val,
'control_mean': control_val,
'lift': (treatment_val - control_val) / control_val * 100
}
# Guardrail metrics
results['guardrails'] = {}
for metric in metrics.guardrail_metrics:
if metric in data.columns:
treatment_val = data[data['assignment'] == 'treatment'][metric].mean()
control_val = data[data['assignment'] == 'control'][metric].mean()
results['guardrails'][metric] = {
'treatment_mean': treatment_val,
'control_mean': control_val,
'degraded': treatment_val > control_val * 1.1 # 10% degradation threshold
}
return results
def check_guardrails(self, results: Dict) -> Dict:
"""Check if any guardrails are breached"""
guardrail_violations = []
for metric, values in results.get('guardrails', {}).items():
if values.get('degraded', False):
guardrail_violations.append({
'metric': metric,
'treatment_mean': values['treatment_mean'],
'control_mean': values['control_mean'],
'degradation_pct': (values['treatment_mean'] - values['control_mean']) / values['control_mean'] * 100
})
return {
'violations': guardrail_violations,
'all_passed': len(guardrail_violations) == 0,
'recommendation': 'STOP' if guardrail_violations else 'CONTINUE'
}
4. Handling Network Effects and Interference
class NetworkEffectHandler:
"""Handle network effects and interference in experiments"""
def __init__(self, network_graph, assignments):
self.network = network_graph
self.assignments = assignments
def identify_exposed_pairs(self) -> List[Tuple]:
"""Identify pairs of users who might interfere"""
exposed_pairs = []
for user in self.network.nodes():
for neighbor in self.network.neighbors(user):
if self.assignments.get(user) != self.assignments.get(neighbor):
exposed_pairs.append((user, neighbor))
return exposed_pairs
def geographic_cluster_design(self, user_locations: pd.DataFrame,
n_clusters: int = 50) -> Dict:
"""Design geographic clusters to minimize interference"""
from sklearn.cluster import KMeans
# Cluster users by location
coords = user_locations[['latitude', 'longitude']].values
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(coords)
# Assign clusters to treatment/control
cluster_assignments = {}
unique_clusters = np.unique(clusters)
n_treatment_clusters = len(unique_clusters) // 2
treatment_clusters = np.random.choice(
unique_clusters,
size=n_treatment_clusters,
replace=False
)
for cluster_id in unique_clusters:
cluster_assignments[cluster_id] = (
'treatment' if cluster_id in treatment_clusters else 'control'
)
# Map users to assignments
user_assignments = {}
for i, user_id in enumerate(user_locations.index):
cluster = clusters[i]
user_assignments[user_id] = cluster_assignments[cluster]
return user_assignments
def interference_detection(self, metric_name: str,
data: pd.DataFrame) -> Dict:
"""Detect if interference is affecting results"""
from scipy.spatial.distance import pdist, squareform
# Calculate spatial autocorrelation
if 'latitude' in data.columns and 'longitude' in data.columns:
coords = data[['latitude', 'longitude']].values
distances = squareform(pdist(coords))
# Calculate Moran's I (simplified)
metric_values = data[metric_name].values
n = len(metric_values)
# Create weight matrix (inverse distance)
W = 1 / (distances + 1)
np.fill_diagonal(W, 0)
# Calculate Moran's I
z = metric_values - metric_values.mean()
numerator = n * np.sum(W * np.outer(z, z))
denominator = np.sum(W) * np.sum(z**2)
morans_i = numerator / denominator
return {
'morans_i': morans_i,
'interference_detected': abs(morans_i) > 0.2,
'interpretation': 'Significant spatial autocorrelation' if abs(morans_i) > 0.2 else 'No significant interference'
}
return {'interference_detected': False}
β οΈ
Critical Warning: Network effects can completely invalidate experiment results. If users in the treatment group influence users in the control group, you're not measuring the true effect.
5. Real-World Application: Streaming Platform Experiment
def design_streaming_experiment():
"""Complete experiment design for streaming platform"""
# 1. Define experiment parameters
experiment = {
'name': 'recommendation_algorithm_v2',
'objective': 'Increase user engagement through better recommendations',
'hypothesis': 'New algorithm will increase watch time by at least 5%',
'primary_metric': 'daily_watch_time',
'minimum_detectable_effect': 0.05,
'significance_level': 0.05,
'power': 0.80,
'baseline_value': 45, # minutes
'std_dev': 12 # minutes
}
# 2. Calculate sample size
calculator = SampleSizeCalculator(
alpha=experiment['significance_level'],
power=experiment['power']
)
effect_size = experiment['baseline_value'] * experiment['minimum_detectable_effect']
sample_result = calculator.two_sample_t_test(effect_size, experiment['std_dev'])
# 3. Design randomization
design = ExperimentalDesign(experiment['name'], sample_result['total'])
# 4. Define metrics
metrics_framework = MetricsFramework()
metrics = metrics_framework.define_metrics('recommendation')
# 5. Calculate duration
daily_active_users = 2000000 # 2M DAU
duration = design.calculate_required_duration(
daily_users=int(daily_active_users * 0.3), # 30% exposed to experiment
seasonality=True
)
# 6. Compile final design
final_design = {
**experiment,
'sample_size': sample_result,
'duration': duration,
'metrics': {
'primary': metrics.primary_metric,
'secondary': metrics.secondary_metrics,
'guardrails': metrics.guardrail_metrics
},
'randomization': 'Stratified by user segment',
'analysis_method': 'Two-sample t-test with Bonferroni correction'
}
return final_design
# Generate complete experiment design
design = design_streaming_experiment()
print("Complete Experiment Design")
print("=" * 60)
for key, value in design.items():
if isinstance(value, dict):
print(f"\n{key}:")
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f"{key}: {value}")
6. Common Follow-Up Questions
Follow-up 1: How do you handle multiple comparisons?
from statsmodels.stats.multitest import multipletests
def multiple_comparison_correction(p_values: List[float],
method: str = 'bonferroni',
alpha: float = 0.05) -> Dict:
"""Apply multiple comparison correction"""
# Bonferroni correction
rejected_bonf, pvals_corrected_bonf, _, _ = multipletests(
p_values, alpha=alpha, method='bonferroni'
)
# False Discovery Rate (Benjamini-Hochberg)
rejected_fdr, pvals_corrected_fdr, _, _ = multipletests(
p_values, alpha=alpha, method='fdr_bh'
)
return {
'original_p_values': p_values,
'bonferroni': {
'corrected_p_values': pvals_corrected_bonf.tolist(),
'significant': rejected_bonf.tolist()
},
'fdr': {
'corrected_p_values': pvals_corrected_fdr.tolist(),
'significant': rejected_fdr.tolist()
}
}
# Example
p_values = [0.02, 0.04, 0.08, 0.12, 0.03]
results = multiple_comparison_correction(p_values)
print("Multiple Comparison Correction")
print("-" * 60)
print(f"Original p-values: {p_values}")
print(f"Bonferroni significant: {results['bonferroni']['significant']}")
print(f"FDR significant: {results['fdr']['significant']}")
Follow-up 2: How do you analyze results with non-normal data?
from scipy.stats import mannwhitneyu, bootstrap
def analyze_non_normal_results(treatment_data, control_data):
"""Analyze experiment results with non-normal data"""
# Mann-Whitney U test (non-parametric)
stat_u, p_value_u = mannwhitneyu(
treatment_data, control_data, alternative='two-sided'
)
# Bootstrap confidence interval
def mean_diff(x, y):
return np.mean(x) - np.mean(y)
boot_result = bootstrap(
(treatment_data, control_data),
statistic=mean_diff,
n_resamples=10000,
confidence_level=0.95
)
# Hodges-Lehmann estimator (median difference)
all_diffs = np.subtract.outer(treatment_data, control_data).flatten()
hodges_lehmann = np.median(all_diffs)
return {
'mann_whitney_u': {
'statistic': stat_u,
'p_value': p_value_u
},
'bootstrap_ci': {
'lower': boot_result.confidence_interval.low,
'upper': boot_result.confidence_interval.high
},
'hodges_lehmann_estimate': hodges_lehmann,
'recommendation': 'Use non-parametric methods if p < 0.05'
}
Company-Specific Tips
βΉοΈ
Netflix Tips:
- Netflix heavily tests on streaming-specific metrics (completion rate, engagement)
- Understand how to handle time-series data in experiments
- Know how to test recommendation algorithms without user disruption
- Be familiar with Thompson sampling for bandits
Uber Tips:
- Uber tests on marketplace dynamics (supply/demand)
- Understand two-sided marketplace experiments
- Know how to handle geographic experiments
- Be comfortable with causal inference methods
Quiz Section
Related Topics
- Hypothesis Testing β Statistical foundations for experiments
- Bayesian A/B Testing β Alternative experimental approach
- Causal Inference β Methods beyond A/B testing
- Power Analysis β Detailed power calculations