🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Bayesian Statistics

⭐ Premium

Advertisement

Bayesian Statistics

Bayesian methods update beliefs with evidence. Learn to build probabilistic models that quantify uncertainty and make principled decisions.

Bayesian Inference Fundamentals

Bayesian Posterior UpdatePriorP(θ)What we believedbefore dataLikelihoodP(D|θ)How probable isthe data given θPosteriorP(θ|D)Updated beliefafter seeing dataEvidenceP(D)Normalizingconstantà—=P(θ|D) = P(D|θ) à— P(θ) / P(D)
P(θD)=P(Dθ)P(θ)P(D)P(\theta|D) = \frac{P(D|\theta)P(\theta)}{P(D)}

Bayesian Inference with PyMC

import pymc as pm
import numpy as np
import arviz as az

# Simple Bayesian linear regression
np.random.seed(42)
n = 100
X = np.random.uniform(0, 10, n)
true_slope = 2.5
true_intercept = 1.0
y = true_slope * X + true_intercept + np.random.normal(0, 1, n)

with pm.Model() as linear_model:
    # Priors
    slope = pm.Normal("slope", mu=0, sigma=10)
    intercept = pm.Normal("intercept", mu=0, sigma=10)
    sigma = pm.HalfNormal("sigma", sigma=5)
    
    # Likelihood
    mu = slope * X + intercept
    y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=y)
    
    # Inference
    trace = pm.sample(2000, tune=1000, cores=4, return_inferencedata=True)

# Analyze results
print(az.summary(trace, var_names=["slope", "intercept", "sigma"]))

# Posterior predictive checks
with linear_model:
    ppc = pm.sample_posterior_predictive(trace)

az.plot_ppc(az.from_pymc(posterior_predictive=ppc, observed={"y_obs": y}))

Bayesian A/B Testing

import pymc as pm
import numpy as np
import arviz as az

def bayesian_ab_test(control_conversions, control_total,
                     treatment_conversions, treatment_total):
    with pm.Model() as ab_model:
        # Priors
        p_control = pm.Beta("p_control", alpha=1, beta=1)
        p_treatment = pm.Beta("p_treatment", alpha=1, beta=1)
        
        # Likelihood
        obs_control = pm.Binomial("obs_control", n=control_total, p=p_control,
                                   observed=control_conversions)
        obs_treatment = pm.Binomial("obs_treatment", n=treatment_total, p=p_treatment,
                                     observed=treatment_conversions)
        
        # Derived quantities
        lift = pm.Deterministic("lift", (p_treatment - p_control) / p_control)
        is_better = pm.Deterministic("is_better", pm.math.gt(p_treatment, p_control))
        
        # Sample
        trace = pm.sample(5000, tune=2000, cores=4, return_inferencedata=True)
    
    # Results
    summary = az.summary(trace, var_names=["p_control", "p_treatment", "lift", "is_better"])
    
    prob_better = trace.posterior["is_better"].mean().item()
    
    return {
        "control_rate": control_conversions / control_total,
        "treatment_rate": treatment_conversions / treatment_total,
        "lift_mean": trace.posterior["lift"].mean().item(),
        "lift_hdi": az.hdi(trace, var_names=["lift"])["lift"].values,
        "prob_treatment_better": prob_better,
        "summary": summary
    }

# Example
results = bayesian_ab_test(
    control_conversions=120, control_total=1000,
    treatment_conversions=150, treatment_total=1000
)
print(f"Control rate: {results['control_rate']:.3f}")
print(f"Treatment rate: {results['treatment_rate']:.3f}")
print(f"Prob treatment better: {results['prob_treatment_better']:.3f}")

Hierarchical Models

import pymc as pm
import numpy as np

def hierarchical_model():
    # Simulate grouped data (e.g., sales across stores)
    n_groups = 20
    n_per_group = 50
    
    group_effects = np.random.normal(0, 2, n_groups)
    group_ids = np.repeat(range(n_groups), n_per_group)
    X = np.random.normal(0, 1, n_groups * n_per_group)
    y = 3 * X + group_effects[group_ids] + np.random.normal(0, 1, n_groups * n_per_group)
    
    with pm.Model() as hierarchical:
        # Hyperpriors (global parameters)
        mu_global = pm.Normal("mu_global", mu=0, sigma=5)
        sigma_global = pm.HalfNormal("sigma_global", sigma=5)
        
        # Group-level parameters (partially pooled)
        group_offset = pm.Normal("group_offset", mu=0, sigma=1, shape=n_groups)
        group_intercept = pm.Deterministic("group_intercept", 
                                           mu_global + sigma_global * group_offset)
        
        # Shared slope
        slope = pm.Normal("slope", mu=0, sigma=10)
        
        # Observation noise
        sigma = pm.HalfNormal("sigma", sigma=5)
        
        # Expected value
        mu = slope * X + group_intercept[group_ids]
        
        # Likelihood
        y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=y)
        
        trace = pm.sample(2000, tune=1000, cores=4)
    
    return trace

trace = hierarchical_model()

# Compare group-level intercepts
group_means = trace.posterior["group_intercept"].mean(dim=["chain", "draw"]).values
print("Group intercepts:", group_means)

MCMC Diagnostics

import arviz as az
import matplotlib.pyplot as plt

def diagnose_mcmc(trace):
    """Comprehensive MCMC diagnostics"""
    
    # 1. Trace plots
    az.plot_trace(trace, var_names=["slope", "intercept"])
    plt.tight_layout()
    
    # 2. Autocorrelation
    az.plot_autocorr(trace, var_names=["slope", "intercept"])
    plt.tight_layout()
    
    # 3. Rank plots (better than trace plots)
    az.plot_rank(trace, var_names=["slope"])
    
    # 4. R-hat (should be close to 1.0)
    rhat = az.rhat(trace)
    print("R-hat values:")
    print(rhat)
    
    # 5. Effective sample size
    ess = az.ess(trace)
    print("\nEffective Sample Size:")
    print(ess)
    
    # 6. Divergences
    if hasattr(trace, 'sample_stats'):
        divergences = trace.sample_stats.diverging.sum().item()
        print(f"\nDivergences: {divergences}")
    
    # 7. WAIC/LOO for model comparison
    # waic = az.waic(trace)
    # loo = az.loo(trace)
    
    return {
        'rhat': rhat,
        'ess': ess,
        'converged': all(r.values < 1.05 for r in rhat.values())
    }

diagnostics = diagnose_mcmc(trace)
print(f"\nConverged: {diagnostics['converged']}")

Bayesian Time Series

import pymc as pm
import numpy as np

def bayesian_structural_time_series(y):
    """Bayesian structural time series model"""
    n = len(y)
    t = np.arange(n)
    
    with pm.Model() as sts_model:
        # Local linear trend
        level_sigma = pm.HalfNormal("level_sigma", sigma=0.1)
        slope_sigma = pm.HalfNormal("slope_sigma", sigma=0.01)
        
        # Random walk priors
        level_innovations = pm.Normal("level_innovations", mu=0, sigma=1, shape=n)
        slope_innovations = pm.Normal("slope_innovations", mu=0, sigma=1, shape=n)
        
        level = pm.math.cumsum(level_sigma * level_innovations)
        slope = pm.math.cumsum(slope_sigma * slope_innovations)
        
        # Observation noise
        obs_sigma = pm.HalfNormal("obs_sigma", sigma=1)
        
        # Expected value
        mu = level + slope * t
        
        # Likelihood
        y_obs = pm.Normal("y_obs", mu=mu, sigma=obs_sigma, observed=y)
        
        trace = pm.sample(2000, tune=1000, cores=4)
    
    return trace

# Example
t = np.linspace(0, 10, 200)
y = 2 * t + 5 * np.sin(t) + np.random.normal(0, 0.5, 200)

trace = bayesian_structural_time_series(y)

Model Comparison

import pymc as pm
import arviz as az

def compare_models(data):
    """Compare multiple models using WAIC"""
    
    with pm.Model() as linear:
        slope = pm.Normal("slope", mu=0, sigma=10)
        intercept = pm.Normal("intercept", mu=0, sigma=10)
        sigma = pm.HalfNormal("sigma", sigma=5)
        mu = slope * data['x'] + intercept
        y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=data['y'])
        trace_linear = pm.sample(1000, return_inferencedata=True)
    
    with pm.Model() as quadratic:
        b1 = pm.Normal("b1", mu=0, sigma=10)
        b2 = pm.Normal("b2", mu=0, sigma=10)
        intercept = pm.Normal("intercept", mu=0, sigma=10)
        sigma = pm.HalfNormal("sigma", sigma=5)
        mu = b1 * data['x'] + b2 * data['x']**2 + intercept
        y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=data['y'])
        trace_quad = pm.sample(1000, return_inferencedata=True)
    
    # Compare
    comp = az.compare({
        "linear": trace_linear,
        "quadratic": trace_quad
    }, ic="waic")
    
    print(comp)
    az.plot_compare(comp)
    
    return comp

Best Practices

  1. Start with informative priors when domain knowledge exists
  2. Check MCMC diagnostics – R-hat, ESS, divergences
  3. Use hierarchical models for grouped data (partial pooling)
  4. Compare models with WAIC/LOO, not just posterior means
  5. Report full posterior distributions, not just point estimates

Advertisement