πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

A/B Testing for ML Models

MLOpsExperimentation⭐ Premium

Advertisement

A/B Testing for ML Models

Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe

A/B Testing Framework

A/B testing ML models requires careful experimental design, statistical rigor, and automated decision-making.

ℹ️

Google runs thousands of ML A/B tests simultaneously, with automated systems handling 95% of test decisions.

Experiment Framework

# ab_testing.py
import numpy as np
from scipy import stats
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib

class ExperimentStatus(Enum):
    DRAFT = "draft"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    TERMINATED = "terminated"

@dataclass
class Variant:
    name: str
    model_version: str
    traffic_percentage: float
    is_control: bool = False

@dataclass
class ExperimentConfig:
    experiment_id: str
    name: str
    description: str
    variants: List[Variant]
    primary_metric: str
    secondary_metrics: List[str]
    min_sample_size: int
    significance_level: float = 0.05
    power: float = 0.8
    max_duration_days: int = 14
    auto_promote: bool = True
    promotion_threshold: float = 0.05

@dataclass
class ExperimentResult:
    experiment_id: str
    status: ExperimentStatus
    winner: Optional[str]
    p_value: float
    confidence_interval: tuple
    effect_size: float
    sample_size_per_variant: int
    duration_days: int
    metrics_summary: Dict[str, Dict]

class ABTestManager:
    def __init__(self):
        self.experiments: Dict[str, ExperimentConfig] = {}
        self.results: Dict[str, ExperimentResult] = {}
        self.experiment_data: Dict[str, List[Dict]] = {}

    def create_experiment(self, config: ExperimentConfig) -> str:
        self.experiments[config.experiment_id] = config
        self.experiment_data[config.experiment_id] = []
        return config.experiment_id

    def assign_variant(self, experiment_id: str, user_id: str) -> Optional[Variant]:
        config = self.experiments[experiment_id]
        hash_value = int(hashlib.md5(f"{experiment_id}:{user_id}".encode()).hexdigest(), 16)
        assignment = (hash_value % 100) / 100

        cumulative = 0
        for variant in config.variants:
            cumulative += variant.traffic_percentage
            if assignment < cumulative:
                return variant
        return None

    def log_metric(self, experiment_id: str, user_id: str, variant_name: str, metric_name: str, value: float):
        self.experiment_data[experiment_id].append({
            "user_id": user_id,
            "variant": variant_name,
            "metric": metric_name,
            "value": value,
            "timestamp": datetime.now().isoformat()
        })

    def analyze_experiment(self, experiment_id: str) -> ExperimentResult:
        config = self.experiments[experiment_id]
        data = self.experiment_data[experiment_id]

        variant_data = {}
        for variant in config.variants:
            variant_metrics = [d["value"] for d in data if d["variant"] == variant.name and d["metric"] == config.primary_metric]
            variant_data[variant.name] = variant_metrics

        control_name = next(v.name for v in config.variants if v.is_control)
        treatment_names = [v.name for v in config.variants if not v.is_control]

        control_values = np.array(variant_data[control_name])

        best_treatment = None
        best_p_value = 1.0

        for treatment_name in treatment_names:
            treatment_values = np.array(variant_data[treatment_name])

            t_stat, p_value = stats.ttest_ind(control_values, treatment_values)

            if p_value < best_p_value:
                best_p_value = p_value
                best_treatment = treatment_name

        if best_treatment:
            treatment_values = np.array(variant_data[best_treatment])
            effect_size = (treatment_values.mean() - control_values.mean()) / control_values.std()

            ci_lower = effect_size - 1.96 * np.sqrt(1/len(control_values) + 1/len(treatment_values))
            ci_upper = effect_size + 1.96 * np.sqrt(1/len(control_values) + 1/len(treatment_values))

            winner = best_treatment if best_p_value < config.significance_level else None
        else:
            effect_size = 0
            ci_lower, ci_upper = 0, 0
            winner = None

        result = ExperimentResult(
            experiment_id=experiment_id,
            status=ExperimentStatus.COMPLETED,
            winner=winner,
            p_value=best_p_value,
            confidence_interval=(ci_lower, ci_upper),
            effect_size=effect_size,
            sample_size_per_variant=len(control_values),
            duration_days=1,
            metrics_summary={v: {"mean": np.mean(variant_data[v]), "std": np.std(variant_data[v])} for v in variant_data}
        )

        self.results[experiment_id] = result
        return result

    def should_promote(self, experiment_id: str) -> bool:
        config = self.experiments[experiment_id]
        result = self.results.get(experiment_id)

        if result is None or not config.auto_promote:
            return False

        if result.p_value < config.significance_level and result.effect_size > config.promotion_threshold:
            return True
        return False

    def get_traffic_split(self, experiment_id: str) -> Dict[str, float]:
        config = self.experiments[experiment_id]
        return {v.name: v.traffic_percentage for v in config.variants}


# Usage
manager = ABTestManager()

config = ExperimentConfig(
    experiment_id="exp-001",
    name="New Recommendation Model",
    description="Testing improved recommendation algorithm",
    variants=[
        Variant(name="control", model_version="v1.0", traffic_percentage=0.5, is_control=True),
        Variant(name="treatment", model_version="v2.0", traffic_percentage=0.5)
    ],
    primary_metric="click_through_rate",
    secondary_metrics=["conversion_rate", "revenue_per_user"],
    min_sample_size=10000,
    significance_level=0.05
)

manager.create_experiment(config)

for i in range(5000):
    variant = manager.assign_variant("exp-001", f"user_{i}")
    if variant.name == "control":
        ctr = np.random.beta(2, 10)
    else:
        ctr = np.random.beta(2.2, 10)
    manager.log_metric("exp-001", f"user_{i}", variant.name, "click_through_rate", ctr)

result = manager.analyze_experiment("exp-001")
print(f"Winner: {result.winner}, p-value: {result.p_value:.4f}")

Statistical Testing

# statistical_tests.py
import numpy as np
from scipy import stats
from typing import Dict, Tuple
from dataclasses import dataclass

@dataclass
class TestResult:
    test_name: str
    statistic: float
    p_value: float
    significant: bool
    effect_size: float
    confidence_interval: Tuple[float, float]

class StatisticalTester:
    def __init__(self, significance_level: float = 0.05):
        self.significance_level = significance_level

    def t_test(self, control: np.ndarray, treatment: np.ndarray) -> TestResult:
        t_stat, p_value = stats.ttest_ind(control, treatment)
        effect_size = (treatment.mean() - control.mean()) / control.std()
        ci = stats.t.interval(
            1 - self.significance_level,
            len(control) + len(treatment) - 2,
            loc=(treatment.mean() - control.mean()),
            scale=stats.sem(np.concatenate([control, treatment]))
        )
        return TestResult(
            test_name="t_test",
            statistic=t_stat,
            p_value=p_value,
            significant=p_value < self.significance_level,
            effect_size=effect_size,
            confidence_interval=ci
        )

    def mann_whitney_u(self, control: np.ndarray, treatment: np.ndarray) -> TestResult:
        u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided')
        effect_size = 1 - (2 * u_stat) / (len(control) * len(treatment))
        return TestResult(
            test_name="mann_whitney_u",
            statistic=u_stat,
            p_value=p_value,
            significant=p_value < self.significance_level,
            effect_size=effect_size,
            confidence_interval=(0, 0)
        )

    def bootstrap_test(self, control: np.ndarray, treatment: np.ndarray, n_bootstrap: int = 10000) -> TestResult:
        observed_diff = treatment.mean() - control.mean()
        combined = np.concatenate([control, treatment])
        bootstrap_diffs = []

        for _ in range(n_bootstrap):
            np.random.shuffle(combined)
            boot_control = combined[:len(control)]
            boot_treatment = combined[len(control):]
            bootstrap_diffs.append(boot_treatment.mean() - boot_control.mean())

        bootstrap_diffs = np.array(bootstrap_diffs)
        p_value = np.mean(np.abs(bootstrap_diffs) >= np.abs(observed_diff))
        ci = np.percentile(bootstrap_diffs, [2.5, 97.5])

        return TestResult(
            test_name="bootstrap",
            statistic=observed_diff,
            p_value=p_value,
            significant=p_value < self.significance_level,
            effect_size=observed_diff / control.std(),
            confidence_interval=tuple(ci)
        )

    def calculate_sample_size(self, baseline_rate: float, minimum_detectable_effect: float, power: float = 0.8) -> int:
        alpha = self.significance_level
        p1 = baseline_rate
        p2 = baseline_rate + minimum_detectable_effect
        pooled_p = (p1 + p2) / 2

        z_alpha = stats.norm.ppf(1 - alpha / 2)
        z_beta = stats.norm.ppf(power)

        n = ((z_alpha * np.sqrt(2 * pooled_p * (1 - pooled_p)) +
              z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2) / (minimum_detectable_effect ** 2)
        return int(np.ceil(n))


tester = StatisticalTester(significance_level=0.05)
control = np.random.beta(2, 10, 1000)
treatment = np.random.beta(2.2, 10, 1000)

t_result = tester.t_test(control, treatment)
print(f"t-test: p={t_result.p_value:.4f}, significant={t_result.significant}")

sample_size = tester.calculate_sample_size(baseline_rate=0.2, minimum_detectable_effect=0.02)
print(f"Required sample size: {sample_size}")

Follow-Up Questions

  1. How do you handle network effects in A/B testing?
  2. What metrics should be tracked during ML A/B tests?
  3. How do you implement multi-armed bandit testing for model selection?
  4. What are the ethical considerations in ML A/B testing?

Advertisement