πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Experiment Design: A/B Tests, Sample Size, Power Analysis

Data Science Interview PremiumExperiment Design⭐ Premium

Advertisement

NETFLIX & UBER INTERVIEW QUESTION

Experiment Design: A/B Tests, Sample Size, Power Analysis

Experimental Methodology & Causal Inference

The Interview Question

ℹ️

Question: You're designing an experiment to test whether a new recommendation algorithm increases user engagement on a streaming platform:

  • Current average watch time: 45 minutes/day
  • Minimum detectable effect: 5% increase (2.25 minutes)
  • Standard deviation: 12 minutes
  • Significance level: 0.05
  • Desired power: 0.80

Walk through your complete experiment design:

  1. Calculate the required sample size
  2. Design the randomization scheme
  3. Define metrics and guardrail metrics
  4. How do you handle network effects and interference?

Detailed Answer

1. Sample Size Calculation

Sample size determination is critical for detecting meaningful effects while avoiding wasted resources.

import numpy as np
from scipy import stats
from typing import Tuple, Dict
import matplotlib.pyplot as plt

class SampleSizeCalculator:
    """Calculate sample size for various experimental designs"""
    
    def __init__(self, alpha: float = 0.05, power: float = 0.80):
        self.alpha = alpha
        self.power = power
    
    def two_sample_t_test(self, effect_size: float, 
                         std_dev: float,
                         ratio: float = 1.0) -> Dict:
        """
        Calculate sample size for two-sample t-test
        
        Parameters:
        -----------
        effect_size : float
            Minimum detectable effect (absolute)
        std_dev : float
            Standard deviation of the metric
        ratio : float
            Ratio of treatment to control group sizes
        """
        # Z-scores for alpha and power
        z_alpha = stats.norm.ppf(1 - self.alpha / 2)  # Two-tailed
        z_beta = stats.norm.ppf(self.power)
        
        # Sample size formula
        n_control = ((z_alpha + z_beta) ** 2 * std_dev ** 2 * (1 + 1/ratio)) / (effect_size ** 2)
        n_treatment = n_control * ratio
        
        return {
            'n_control': int(np.ceil(n_control)),
            'n_treatment': int(np.ceil(n_treatment)),
            'total': int(np.ceil(n_control + n_treatment)),
            'effect_size': effect_size,
            'relative_effect': effect_size / 45 * 100,  # Assuming baseline of 45
            'power': self.power,
            'alpha': self.alpha
        }
    
    def two_sample_proportion(self, p_control: float, 
                             mde: float,
                             ratio: float = 1.0) -> Dict:
        """
        Calculate sample size for comparing two proportions
        
        Parameters:
        -----------
        p_control : float
            Expected proportion in control group
        mde : float
            Minimum detectable effect (relative change)
        """
        p_treatment = p_control * (1 + mde)
        
        # Pooled proportion
        p_pooled = (p_control + p_treatment) / 2
        
        # Z-scores
        z_alpha = stats.norm.ppf(1 - self.alpha / 2)
        z_beta = stats.norm.ppf(self.power)
        
        # Sample size formula for proportions
        n_control = ((z_alpha + z_beta) ** 2 * 
                    (p_pooled * (1 - p_pooled) + p_treatment * (1 - p_treatment) / ratio)) / \
                   ((p_treatment - p_control) ** 2)
        
        n_treatment = n_control * ratio
        
        return {
            'n_control': int(np.ceil(n_control)),
            'n_treatment': int(np.ceil(n_treatment)),
            'total': int(np.ceil(n_control + n_treatment)),
            'p_control': p_control,
            'p_treatment': p_treatment,
            'absolute_effect': p_treatment - p_control,
            'relative_effect': mde * 100
        }
    
    def plot_power_curve(self, effect_sizes: np.ndarray, 
                        std_dev: float,
                        sample_sizes: np.ndarray):
        """Visualize power as function of effect size and sample size"""
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # Plot 1: Power vs Effect Size for different sample sizes
        for n in [1000, 5000, 10000, 50000]:
            powers = []
            for effect in effect_sizes:
                z_alpha = stats.norm.ppf(1 - self.alpha / 2)
                z_beta_calc = effect * np.sqrt(n / 2) / std_dev - z_alpha
                power = stats.norm.cdf(z_beta_calc)
                powers.append(power)
            
            axes[0].plot(effect_sizes, powers, label=f'n={n}', linewidth=2)
        
        axes[0].axhline(y=self.power, color='gray', linestyle='--', label=f'{self.power*100}% power')
        axes[0].set_xlabel('Effect Size (minutes)')
        axes[0].set_ylabel('Power')
        axes[0].set_title('Power vs Effect Size')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # Plot 2: Required Sample Size vs Effect Size
        required_n = []
        for effect in effect_sizes:
            result = self.two_sample_t_test(effect, std_dev)
            required_n.append(result['total'])
        
        axes[1].plot(effect_sizes, required_n, linewidth=2)
        axes[1].set_xlabel('Effect Size (minutes)')
        axes[1].set_ylabel('Required Total Sample Size')
        axes[1].set_title('Sample Size Requirements')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('power_analysis.png', dpi=150, bbox_inches='tight')
        plt.show()

# Calculate sample size for the given problem
calculator = SampleSizeCalculator(alpha=0.05, power=0.80)

# Effect size = 5% of 45 minutes = 2.25 minutes
effect_size = 2.25
std_dev = 12

result = calculator.two_sample_t_test(effect_size, std_dev)

print("Sample Size Calculation")
print("=" * 60)
print(f"Effect size: {effect_size} minutes ({result['relative_effect']:.1f}%)")
print(f"Standard deviation: {std_dev} minutes")
print(f"Significance level: {result['alpha']}")
print(f"Power: {result['power']}")
print(f"\nRequired sample sizes:")
print(f"  Control group: {result['n_control']:,}")
print(f"  Treatment group: {result['n_treatment']:,}")
print(f"  Total: {result['total']:,}")

2. Randomization and Experimental Design

import pandas as pd
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib

@dataclass
class ExperimentalUnit:
    """Represents a user or entity in the experiment"""
    user_id: str
    characteristics: Dict
    assignment: Optional[str] = None
    
class ExperimentalDesign:
    """Comprehensive experimental design framework"""
    
    def __init__(self, experiment_name: str, n_users: int):
        self.experiment_name = experiment_name
        self.n_users = n_users
        self.assignments = {}
        self.strata = {}
    
    def simple_randomization(self, treatment_prob: float = 0.5) -> Dict[str, str]:
        """Simple random assignment"""
        assignments = {}
        for i in range(self.n_users):
            user_id = f"user_{i:06d}"
            assignments[user_id] = 'treatment' if np.random.random() < treatment_prob else 'control'
        
        self.assignments = assignments
        return assignments
    
    def stratified_randomization(self, strata: Dict[str, List[str]], 
                                treatment_prob: float = 0.5) -> Dict[str, str]:
        """Stratified randomization to ensure balance"""
        assignments = {}
        
        for stratum_name, user_ids in strata.items():
            self.strata[stratum_name] = user_ids
            
            # Randomize within each stratum
            n_treatment = int(len(user_ids) * treatment_prob)
            treatment_users = np.random.choice(user_ids, size=n_treatment, replace=False)
            
            for user_id in user_ids:
                assignments[user_id] = 'treatment' if user_id in treatment_users else 'control'
        
        self.assignments = assignments
        return assignments
    
    def cluster_randomization(self, clusters: Dict[str, List[str]], 
                             treatment_clusters: List[str]) -> Dict[str, str]:
        """Cluster randomization (e.g., by geographic region)"""
        assignments = {}
        
        for cluster_id, user_ids in clusters.items():
            assignment = 'treatment' if cluster_id in treatment_clusters else 'control'
            for user_id in user_ids:
                assignments[user_id] = assignment
        
        self.assignments = assignments
        return assignments
    
    def covariate_balanced_randomization(self, covariates: pd.DataFrame, 
                                        treatment_prob: float = 0.5) -> Dict[str, str]:
        """Randomization with covariate balancing"""
        from sklearn.linear_model import LogisticRegression
        
        # Fit propensity score model
        X = covariates.values
        y = np.random.binomial(1, treatment_prob, len(covariates))
        
        model = LogisticRegression()
        model.fit(X, y)
        
        # Get propensity scores
        propensity_scores = model.predict_proba(X)[:, 1]
        
        # Rank and assign based on propensity scores
        assignments = {}
        sorted_indices = np.argsort(propensity_scores)
        
        n_treatment = int(len(covariates) * treatment_prob)
        treatment_indices = sorted_indices[:n_treatment]
        
        for i, user_id in enumerate(covariates.index):
            assignments[user_id] = 'treatment' if i in treatment_indices else 'control'
        
        self.assignments = assignments
        return assignments
    
    def validate_balance(self, covariates: pd.DataFrame) -> Dict:
        """Validate randomization balance"""
        assignments_df = pd.DataFrame({
            'user_id': self.assignments.keys(),
            'assignment': self.assignments.values()
        })
        
        # Merge with covariates
        df = covariates.merge(assignments_df, left_index=True, right_on='user_id')
        
        balance_results = {}
        
        for covariate in covariates.columns:
            treatment_values = df[df['assignment'] == 'treatment'][covariate]
            control_values = df[df['assignment'] == 'control'][covariate]
            
            # Calculate standardized mean difference
            pooled_std = np.sqrt((treatment_values.std()**2 + control_values.std()**2) / 2)
            smd = abs(treatment_values.mean() - control_values.mean()) / pooled_std
            
            balance_results[covariate] = {
                'treatment_mean': treatment_values.mean(),
                'control_mean': control_values.mean(),
                'smd': smd,
                'balanced': smd < 0.1  # Common threshold
            }
        
        # Overall balance
        all_smd = [r['smd'] for r in balance_results.values()]
        balance_results['overall'] = {
            'mean_smd': np.mean(all_smd),
            'max_smd': np.max(all_smd),
            'balanced': np.mean(all_smd) < 0.1
        }
        
        return balance_results
    
    def calculate_required_duration(self, daily_users: int, 
                                   seasonality: bool = True) -> Dict:
        """Calculate required experiment duration"""
        total_sample = len(self.assignments)
        
        # Basic duration
        basic_days = total_sample / daily_users
        
        # Add buffer for seasonality (2 full weeks recommended)
        if seasonality:
            duration = max(basic_days, 14)  # Minimum 2 weeks
        else:
            duration = basic_days
        
        # Weekend effect
        weeks = int(np.ceil(duration / 7))
        total_days = weeks * 7
        
        return {
            'daily_users': daily_users,
            'total_sample_needed': total_sample,
            'basic_duration_days': basic_days,
            'recommended_duration_days': total_days,
            'weeks': weeks,
            'start_date': datetime.now().strftime('%Y-%m-%d'),
            'end_date': (datetime.now() + timedelta(days=total_days)).strftime('%Y-%m-%d')
        }

# Example usage
design = ExperimentalDesign('recommendation_algorithm_v2', n_users=100000)

# Stratified randomization
strata = {
    'new_users': [f'user_{i:06d}' for i in range(0, 20000)],
    'power_users': [f'user_{i:06d}' for i in range(20000, 50000)],
    'casual_users': [f'user_{i:06d}' for i in range(50000, 100000)]
}

assignments = design.stratified_randomization(strata, treatment_prob=0.5)

# Check balance
covariates = pd.DataFrame({
    'age': np.random.normal(35, 10, 100000),
    'tenure_days': np.random.exponential(365, 100000),
    'previous_purchases': np.random.poisson(10, 100000)
}, index=[f'user_{i:06d}' for i in range(100000)])

balance = design.validate_balance(covariates)
print("Randomization Balance Check:")
print(f"Overall mean SMD: {balance['overall']['mean_smd']:.4f}")
print(f"Balanced: {balance['overall']['balanced']}")

# Calculate duration
duration = design.calculate_required_duration(daily_users=50000)
print(f"\nExperiment Duration:")
print(f"Required days: {duration['recommended_duration_days']}")
print(f"End date: {duration['end_date']}")

3. Metrics and Guardrail Metrics

@dataclass
class ExperimentMetrics:
    """Define and track experiment metrics"""
    
    primary_metric: str
    secondary_metrics: List[str]
    guardrail_metrics: List[str]
    
class MetricsFramework:
    """Comprehensive metrics framework for experiments"""
    
    def __init__(self):
        self.metrics = {}
        self.results = {}
    
    def define_metrics(self, experiment_type: str) -> ExperimentMetrics:
        """Define appropriate metrics based on experiment type"""
        
        metric_definitions = {
            'recommendation': ExperimentMetrics(
                primary_metric='watch_time_per_session',
                secondary_metrics=[
                    'session_duration',
                    'content_completion_rate',
                    'user_satisfaction_score',
                    'return_rate_7d'
                ],
                guardrail_metrics=[
                    'error_rate',
                    'latency_p99',
                    'crash_rate',
                    'negative_feedback_rate'
                ]
            ),
            'pricing': ExperimentMetrics(
                primary_metric='revenue_per_user',
                secondary_metrics=[
                    'conversion_rate',
                    'average_order_value',
                    'cart_abandonment_rate',
                    'customer_lifetime_value'
                ],
                guardrail_metrics=[
                    'support_ticket_rate',
                    'refund_rate',
                    'negative_reviews'
                ]
            ),
            'ui_change': ExperimentMetrics(
                primary_metric='task_completion_rate',
                secondary_metrics=[
                    'time_on_task',
                    'click_through_rate',
                    'user_satisfaction_score'
                ],
                guardrail_metrics=[
                    'error_rate',
                    'bounce_rate',
                    'help_page_visits'
                ]
            )
        }
        
        return metric_definitions.get(experiment_type)
    
    def calculate_metrics(self, data: pd.DataFrame, 
                         metrics: ExperimentMetrics) -> Dict:
        """Calculate all experiment metrics"""
        results = {}
        
        # Primary metric
        if metrics.primary_metric in data.columns:
            results['primary'] = {
                'metric': metrics.primary_metric,
                'treatment_mean': data[data['assignment'] == 'treatment'][metrics.primary_metric].mean(),
                'control_mean': data[data['assignment'] == 'control'][metrics.primary_metric].mean(),
            }
            results['primary']['lift'] = (
                (results['primary']['treatment_mean'] - results['primary']['control_mean']) / 
                results['primary']['control_mean'] * 100
            )
        
        # Secondary metrics
        results['secondary'] = {}
        for metric in metrics.secondary_metrics:
            if metric in data.columns:
                treatment_val = data[data['assignment'] == 'treatment'][metric].mean()
                control_val = data[data['assignment'] == 'control'][metric].mean()
                results['secondary'][metric] = {
                    'treatment_mean': treatment_val,
                    'control_mean': control_val,
                    'lift': (treatment_val - control_val) / control_val * 100
                }
        
        # Guardrail metrics
        results['guardrails'] = {}
        for metric in metrics.guardrail_metrics:
            if metric in data.columns:
                treatment_val = data[data['assignment'] == 'treatment'][metric].mean()
                control_val = data[data['assignment'] == 'control'][metric].mean()
                results['guardrails'][metric] = {
                    'treatment_mean': treatment_val,
                    'control_mean': control_val,
                    'degraded': treatment_val > control_val * 1.1  # 10% degradation threshold
                }
        
        return results
    
    def check_guardrails(self, results: Dict) -> Dict:
        """Check if any guardrails are breached"""
        guardrail_violations = []
        
        for metric, values in results.get('guardrails', {}).items():
            if values.get('degraded', False):
                guardrail_violations.append({
                    'metric': metric,
                    'treatment_mean': values['treatment_mean'],
                    'control_mean': values['control_mean'],
                    'degradation_pct': (values['treatment_mean'] - values['control_mean']) / values['control_mean'] * 100
                })
        
        return {
            'violations': guardrail_violations,
            'all_passed': len(guardrail_violations) == 0,
            'recommendation': 'STOP' if guardrail_violations else 'CONTINUE'
        }

4. Handling Network Effects and Interference

class NetworkEffectHandler:
    """Handle network effects and interference in experiments"""
    
    def __init__(self, network_graph, assignments):
        self.network = network_graph
        self.assignments = assignments
    
    def identify_exposed_pairs(self) -> List[Tuple]:
        """Identify pairs of users who might interfere"""
        exposed_pairs = []
        
        for user in self.network.nodes():
            for neighbor in self.network.neighbors(user):
                if self.assignments.get(user) != self.assignments.get(neighbor):
                    exposed_pairs.append((user, neighbor))
        
        return exposed_pairs
    
    def geographic_cluster_design(self, user_locations: pd.DataFrame, 
                                 n_clusters: int = 50) -> Dict:
        """Design geographic clusters to minimize interference"""
        from sklearn.cluster import KMeans
        
        # Cluster users by location
        coords = user_locations[['latitude', 'longitude']].values
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        clusters = kmeans.fit_predict(coords)
        
        # Assign clusters to treatment/control
        cluster_assignments = {}
        unique_clusters = np.unique(clusters)
        n_treatment_clusters = len(unique_clusters) // 2
        
        treatment_clusters = np.random.choice(
            unique_clusters, 
            size=n_treatment_clusters, 
            replace=False
        )
        
        for cluster_id in unique_clusters:
            cluster_assignments[cluster_id] = (
                'treatment' if cluster_id in treatment_clusters else 'control'
            )
        
        # Map users to assignments
        user_assignments = {}
        for i, user_id in enumerate(user_locations.index):
            cluster = clusters[i]
            user_assignments[user_id] = cluster_assignments[cluster]
        
        return user_assignments
    
    def interference_detection(self, metric_name: str, 
                              data: pd.DataFrame) -> Dict:
        """Detect if interference is affecting results"""
        from scipy.spatial.distance import pdist, squareform
        
        # Calculate spatial autocorrelation
        if 'latitude' in data.columns and 'longitude' in data.columns:
            coords = data[['latitude', 'longitude']].values
            distances = squareform(pdist(coords))
            
            # Calculate Moran's I (simplified)
            metric_values = data[metric_name].values
            n = len(metric_values)
            
            # Create weight matrix (inverse distance)
            W = 1 / (distances + 1)
            np.fill_diagonal(W, 0)
            
            # Calculate Moran's I
            z = metric_values - metric_values.mean()
            numerator = n * np.sum(W * np.outer(z, z))
            denominator = np.sum(W) * np.sum(z**2)
            morans_i = numerator / denominator
            
            return {
                'morans_i': morans_i,
                'interference_detected': abs(morans_i) > 0.2,
                'interpretation': 'Significant spatial autocorrelation' if abs(morans_i) > 0.2 else 'No significant interference'
            }
        
        return {'interference_detected': False}

⚠️

Critical Warning: Network effects can completely invalidate experiment results. If users in the treatment group influence users in the control group, you're not measuring the true effect.

5. Real-World Application: Streaming Platform Experiment

def design_streaming_experiment():
    """Complete experiment design for streaming platform"""
    
    # 1. Define experiment parameters
    experiment = {
        'name': 'recommendation_algorithm_v2',
        'objective': 'Increase user engagement through better recommendations',
        'hypothesis': 'New algorithm will increase watch time by at least 5%',
        'primary_metric': 'daily_watch_time',
        'minimum_detectable_effect': 0.05,
        'significance_level': 0.05,
        'power': 0.80,
        'baseline_value': 45,  # minutes
        'std_dev': 12  # minutes
    }
    
    # 2. Calculate sample size
    calculator = SampleSizeCalculator(
        alpha=experiment['significance_level'],
        power=experiment['power']
    )
    
    effect_size = experiment['baseline_value'] * experiment['minimum_detectable_effect']
    sample_result = calculator.two_sample_t_test(effect_size, experiment['std_dev'])
    
    # 3. Design randomization
    design = ExperimentalDesign(experiment['name'], sample_result['total'])
    
    # 4. Define metrics
    metrics_framework = MetricsFramework()
    metrics = metrics_framework.define_metrics('recommendation')
    
    # 5. Calculate duration
    daily_active_users = 2000000  # 2M DAU
    duration = design.calculate_required_duration(
        daily_users=int(daily_active_users * 0.3),  # 30% exposed to experiment
        seasonality=True
    )
    
    # 6. Compile final design
    final_design = {
        **experiment,
        'sample_size': sample_result,
        'duration': duration,
        'metrics': {
            'primary': metrics.primary_metric,
            'secondary': metrics.secondary_metrics,
            'guardrails': metrics.guardrail_metrics
        },
        'randomization': 'Stratified by user segment',
        'analysis_method': 'Two-sample t-test with Bonferroni correction'
    }
    
    return final_design

# Generate complete experiment design
design = design_streaming_experiment()

print("Complete Experiment Design")
print("=" * 60)
for key, value in design.items():
    if isinstance(value, dict):
        print(f"\n{key}:")
        for k, v in value.items():
            print(f"  {k}: {v}")
    else:
        print(f"{key}: {value}")

6. Common Follow-Up Questions

Follow-up 1: How do you handle multiple comparisons?

from statsmodels.stats.multitest import multipletests

def multiple_comparison_correction(p_values: List[float], 
                                  method: str = 'bonferroni',
                                  alpha: float = 0.05) -> Dict:
    """Apply multiple comparison correction"""
    
    # Bonferroni correction
    rejected_bonf, pvals_corrected_bonf, _, _ = multipletests(
        p_values, alpha=alpha, method='bonferroni'
    )
    
    # False Discovery Rate (Benjamini-Hochberg)
    rejected_fdr, pvals_corrected_fdr, _, _ = multipletests(
        p_values, alpha=alpha, method='fdr_bh'
    )
    
    return {
        'original_p_values': p_values,
        'bonferroni': {
            'corrected_p_values': pvals_corrected_bonf.tolist(),
            'significant': rejected_bonf.tolist()
        },
        'fdr': {
            'corrected_p_values': pvals_corrected_fdr.tolist(),
            'significant': rejected_fdr.tolist()
        }
    }

# Example
p_values = [0.02, 0.04, 0.08, 0.12, 0.03]
results = multiple_comparison_correction(p_values)

print("Multiple Comparison Correction")
print("-" * 60)
print(f"Original p-values: {p_values}")
print(f"Bonferroni significant: {results['bonferroni']['significant']}")
print(f"FDR significant: {results['fdr']['significant']}")

Follow-up 2: How do you analyze results with non-normal data?

from scipy.stats import mannwhitneyu, bootstrap

def analyze_non_normal_results(treatment_data, control_data):
    """Analyze experiment results with non-normal data"""
    
    # Mann-Whitney U test (non-parametric)
    stat_u, p_value_u = mannwhitneyu(
        treatment_data, control_data, alternative='two-sided'
    )
    
    # Bootstrap confidence interval
    def mean_diff(x, y):
        return np.mean(x) - np.mean(y)
    
    boot_result = bootstrap(
        (treatment_data, control_data),
        statistic=mean_diff,
        n_resamples=10000,
        confidence_level=0.95
    )
    
    # Hodges-Lehmann estimator (median difference)
    all_diffs = np.subtract.outer(treatment_data, control_data).flatten()
    hodges_lehmann = np.median(all_diffs)
    
    return {
        'mann_whitney_u': {
            'statistic': stat_u,
            'p_value': p_value_u
        },
        'bootstrap_ci': {
            'lower': boot_result.confidence_interval.low,
            'upper': boot_result.confidence_interval.high
        },
        'hodges_lehmann_estimate': hodges_lehmann,
        'recommendation': 'Use non-parametric methods if p < 0.05'
    }

Company-Specific Tips

ℹ️

Netflix Tips:

  • Netflix heavily tests on streaming-specific metrics (completion rate, engagement)
  • Understand how to handle time-series data in experiments
  • Know how to test recommendation algorithms without user disruption
  • Be familiar with Thompson sampling for bandits

Uber Tips:

  • Uber tests on marketplace dynamics (supply/demand)
  • Understand two-sided marketplace experiments
  • Know how to handle geographic experiments
  • Be comfortable with causal inference methods

Quiz Section


Related Topics

Advertisement