πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

AutoML & HPO: Bayesian, TPE, Hyperband, NAS, Population

MLOpsAutoML & Hyperparameter Optimization⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Microsoft, Amazon, Netflix, Meta

"Design an AutoML system that automatically selects models, tunes hyperparameters, and performs feature engineering. How do you balance exploration vs exploitation and manage computational budget?"

AutoML Architecture Overview

AutoML automates the machine learning pipeline, from feature engineering to model selection and hyperparameter optimization.

AutoML Pipeline Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    AutoML Pipeline                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data    │───▢│ Feature  │───▢│ Model    │───▢│Hyper-    β”‚ β”‚
β”‚  β”‚Analysis  β”‚    β”‚Engineeringβ”‚   β”‚ Selectionβ”‚    β”‚parameter β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚ Optimizationβ”‚
β”‚       β”‚              β”‚              β”‚              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β–Ό              β–Ό              β–Ό                β”‚         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Dataset β”‚    β”‚Feature   β”‚    β”‚Model     β”‚    β”‚Best      β”‚ β”‚
β”‚  β”‚Profiling β”‚    β”‚Store     β”‚    β”‚Registry  β”‚    β”‚Config    β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Hyperparameter Optimization

Bayesian Optimization

import numpy as np
from typing import Dict, List, Tuple, Callable
from dataclasses import dataclass
from scipy.stats import norm
from scipy.optimize import minimize
import warnings

@dataclass
class Hyperparameter:
    name: str
    type: str  # 'int', 'float', 'categorical'
    bounds: Tuple = None
    choices: List = None
    log_scale: bool = False

class GaussianProcess:
    """Gaussian Process for Bayesian optimization."""
    
    def __init__(self, length_scale: float = 1.0,
                 noise_level: float = 0.1):
        self.length_scale = length_scale
        self.noise_level = noise_level
        self.X_train = None
        self.y_train = None
        self.K_inv = None
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        """Fit GP to observed data."""
        self.X_train = X
        self.y_train = y
        
        # Compute kernel matrix
        K = self._compute_kernel(X, X)
        K += self.noise_level * np.eye(len(X))
        
        # Compute inverse
        self.K_inv = np.linalg.inv(K)
    
    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Predict mean and variance."""
        
        if self.X_train is None:
            return np.zeros(len(X)), np.ones(len(X))
        
        K_star = self._compute_kernel(X, self.X_train)
        K_ss = self._compute_kernel(X, X)
        
        mu = K_star @ self.K_inv @ self.y_train
        sigma = np.sqrt(np.diag(K_ss - K_star @ self.K_inv @ K_star.T))
        
        return mu, sigma
    
    def _compute_kernel(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
        """Compute RBF kernel."""
        
        sqdist = np.sum(X1**2, axis=1).reshape(-1, 1) + \
                np.sum(X2**2, axis=1).reshape(1, -2) - \
                2 * X1 @ X2.T
        
        return np.exp(-0.5 * sqdist / self.length_scale**2)

class BayesianOptimizer:
    """Bayesian Optimization for hyperparameter tuning."""
    
    def __init__(self, objective: Callable,
                 hyperparameters: List[Hyperparameter],
                 n_initial_points: int = 10,
                 acquisition_function: str = 'ei'):
        """
        Args:
            objective: Function to optimize (takes dict, returns float)
            hyperparameters: List of hyperparameters to optimize
            n_initial_points: Number of random initial points
            acquisition_function: 'ei' (Expected Improvement) or 'ucb'
        """
        self.objective = objective
        self.hyperparameters = hyperparameters
        self.n_initial_points = n_initial_points
        self.acquisition_function = acquisition_function
        
        self.gp = GaussianProcess()
        self.observations = []
        self.results = []
    
    def _random_sample(self) -> Dict:
        """Generate random hyperparameter sample."""
        
        sample = {}
        
        for hp in self.hyperparameters:
            if hp.type == 'float':
                if hp.log_scale:
                    sample[hp.name] = np.exp(
                        np.random.uniform(
                            np.log(hp.bounds[0]),
                            np.log(hp.bounds[1])
                        )
                    )
                else:
                    sample[hp.name] = np.random.uniform(
                        hp.bounds[0], hp.bounds[1]
                    )
            
            elif hp.type == 'int':
                sample[hp.name] = np.random.randint(
                    hp.bounds[0], hp.bounds[1] + 1
                )
            
            elif hp.type == 'categorical':
                sample[hp.name] = np.random.choice(hp.choices)
        
        return sample
    
    def _expected_improvement(self, X: np.ndarray, 
                             best_y: float,
                             xi: float = 0.01) -> np.ndarray:
        """Calculate Expected Improvement."""
        
        mu, sigma = self.gp.predict(X)
        
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            
            z = (mu - best_y - xi) / (sigma + 1e-9)
            ei = (mu - best_y - xi) * norm.cdf(z) + sigma * norm.pdf(z)
            ei[sigma == 0.0] = 0.0
        
        return ei
    
    def _upper_confidence_bound(self, X: np.ndarray,
                               beta: float = 2.0) -> np.ndarray:
        """Calculate Upper Confidence Bound."""
        
        mu, sigma = self.gp.predict(X)
        
        return mu + beta * sigma
    
    def _encode_config(self, config: Dict) -> np.ndarray:
        """Encode hyperparameter config to vector."""
        
        encoded = []
        
        for hp in self.hyperparameters:
            value = config[hp.name]
            
            if hp.type == 'float':
                if hp.log_scale:
                    encoded.append(np.log(value))
                else:
                    encoded.append(value)
            
            elif hp.type == 'int':
                encoded.append(float(value))
            
            elif hp.type == 'categorical':
                # One-hot encoding
                one_hot = [0.0] * len(hp.choices)
                one_hot[hp.choices.index(value)] = 1.0
                encoded.extend(one_hot)
        
        return np.array(encoded)
    
    def optimize(self, n_iterations: int = 50,
                 verbose: bool = True) -> Dict:
        """Run Bayesian optimization."""
        
        # Initial random exploration
        for i in range(self.n_initial_points):
            config = self._random_sample()
            value = self.objective(config)
            
            self.observations.append(config)
            self.results.append(value)
            
            if verbose:
                print(f"Iteration {i+1}: {value:.4f}")
        
        # Bayesian optimization loop
        for i in range(self.n_iterations - self.n_initial_points):
            # Encode observations
            X = np.array([
                self._encode_config(obs) for obs in self.observations
            ])
            y = np.array(self.results)
            
            # Fit GP
            self.gp.fit(X, y)
            
            # Optimize acquisition function
            best_config = self._optimize_acquisition(y)
            
            # Evaluate
            value = self.objective(best_config)
            
            self.observations.append(best_config)
            self.results.append(value)
            
            if verbose:
                print(f"Iteration {self.n_initial_points + i + 1}: "
                     f"{value:.4f} (best: {max(self.results):.4f})")
        
        # Return best configuration
        best_idx = np.argmax(self.results)
        
        return {
            'best_config': self.observations[best_idx],
            'best_value': self.results[best_idx],
            'all_configs': self.observations,
            'all_values': self.results
        }
    
    def _optimize_acquisition(self, best_y: float) -> Dict:
        """Optimize acquisition function."""
        
        best_acquisition = -np.inf
        best_config = None
        
        # Random search over acquisition function
        for _ in range(1000):
            config = self._random_sample()
            X = self._encode_config(config).reshape(1, -1)
            
            if self.acquisition_function == 'ei':
                acquisition_value = self._expected_improvement(
                    X, best_y
                )[0]
            else:
                acquisition_value = self._upper_confidence_bound(X)[0]
            
            if acquisition_value > best_acquisition:
                best_acquisition = acquisition_value
                best_config = config
        
        return best_config

ℹ️

Bayesian optimization balances exploration and exploitation using Gaussian Processes. It's sample-efficient but scales poorly to high-dimensional spaces. Use it for expensive-to-evaluate models.

Tree-Structured Parzen Estimator (TPE)

import numpy as np
from typing import Dict, List, Tuple, Callable
from collections import defaultdict

class TreeStructuredParzenEstimator:
    """TPE for hyperparameter optimization."""
    
    def __init__(self, objective: Callable,
                 hyperparameters: List[Dict],
                 gamma: float = 0.25,
                 n_startup_trials: int = 10):
        """
        Args:
            objective: Function to minimize
            hyperparameters: List of hyperparameter definitions
            gamma: Split ratio for good/bad trials
            n_startup_trials: Number of random trials before TPE
        """
        self.objective = objective
        self.hyperparameters = hyperparameters
        self.gamma = gamma
        self.n_startup_trials = n_startup_trials
        
        self.trials = []
        self.results = []
    
    def _sample_from_prior(self, hp: Dict) -> float:
        """Sample from prior distribution."""
        
        if hp['type'] == 'float':
            if hp.get('log_scale', False):
                return np.exp(np.random.uniform(
                    np.log(hp['low']),
                    np.log(hp['high'])
                ))
            else:
                return np.random.uniform(hp['low'], hp['high'])
        
        elif hp['type'] == 'int':
            return np.random.randint(hp['low'], hp['high'] + 1)
        
        elif hp['type'] == 'categorical':
            return np.random.choice(hp['choices'])
    
    def _fit_kde(self, values: List[float], hp: Dict):
        """Fit Kernel Density Estimation."""
        
        if hp['type'] == 'float':
            if hp.get('log_scale', False):
                values = np.log(values)
            
            # Simple KDE
            bandwidth = np.std(values) * 0.5
            
            def kde(x):
                return np.mean(
                    np.exp(-0.5 * ((x - values) / bandwidth) ** 2) /
                    (bandwidth * np.sqrt(2 * np.pi))
                )
            
            return kde
        
        elif hp['type'] == 'categorical':
            counts = defaultdict(int)
            for v in values:
                counts[v] += 1
            
            total = len(values)
            
            def kde(x):
                return counts.get(x, 0) / total
            
            return kde
    
    def _suggest_next(self) -> Dict:
        """Suggest next hyperparameter configuration."""
        
        if len(self.trials) < self.n_startup_trials:
            # Random exploration
            return {
                hp['name']: self._sample_from_prior(hp)
                for hp in self.hyperparameters
            }
        
        # Split trials into good/bad
        sorted_indices = np.argsort(self.results)
        n_good = int(len(sorted_indices) * self.gamma)
        
        good_indices = sorted_indices[:n_good]
        bad_indices = sorted_indices[n_good:]
        
        # Fit KDEs
        configs = []
        
        for hp in self.hyperparameters:
            hp_name = hp['name']
            
            good_values = [self.trials[i][hp_name] for i in good_indices]
            bad_values = [self.trials[i][hp_name] for i in bad_indices]
            
            good_kde = self._fit_kde(good_values, hp)
            bad_kde = self._fit_kde(bad_values, hp)
            
            configs.append((hp, good_kde, bad_kde))
        
        # Sample and score
        best_config = None
        best_score = -np.inf
        
        for _ in range(1000):
            config = {}
            log_ratio = 0
            
            for hp, good_kde, bad_kde in configs:
                hp_name = hp['name']
                
                # Sample from good distribution
                value = self._sample_from_prior(hp)
                config[hp_name] = value
                
                # Calculate log ratio
                good_prob = good_kde(value)
                bad_prob = bad_kde(value)
                
                if bad_prob > 0:
                    log_ratio += np.log(good_prob + 1e-10) - \
                               np.log(bad_prob + 1e-10)
            
            if log_ratio > best_score:
                best_score = log_ratio
                best_config = config
        
        return best_config
    
    def optimize(self, n_trials: int = 100,
                 verbose: bool = True) -> Dict:
        """Run TPE optimization."""
        
        for i in range(n_trials):
            # Suggest next configuration
            config = self._suggest_next()
            
            # Evaluate
            value = self.objective(config)
            
            self.trials.append(config)
            self.results.append(value)
            
            if verbose:
                print(f"Trial {i+1}: {value:.4f} "
                     f"(best: {min(self.results):.4f})")
        
        best_idx = np.argmin(self.results)
        
        return {
            'best_config': self.trials[best_idx],
            'best_value': self.results[best_idx],
            'all_configs': self.trials,
            'all_values': self.results
        }

Hyperband

import numpy as np
from typing import Dict, List, Callable
from math import log, ceil

class Hyperband:
    """Hyperband for early-stopping based HPO."""
    
    def __init__(self, objective: Callable,
                 max_resource: int = 81,
                 eta: int = 3):
        """
        Args:
            objective: Function to minimize (takes config and resource)
            max_resource: Maximum resource (e.g., epochs)
            eta: Elimination rate
        """
        self.objective = objective
        self.max_resource = max_resource
        self.eta = eta
    
    def _generate_random_config(self) -> Dict:
        """Generate random hyperparameter configuration."""
        
        return {
            'learning_rate': 10 ** np.random.uniform(-5, -1),
            'batch_size': np.random.choice([16, 32, 64, 128]),
            'dropout': np.random.uniform(0.1, 0.5),
            'hidden_size': np.random.choice([64, 128, 256, 512])
        }
    
    def optimize(self, n_iterations: int = 1,
                 verbose: bool = True) -> Dict:
        """Run Hyperband optimization."""
        
        s = int(log(self.max_resource) / log(self.eta))
        
        all_configs = []
        all_values = []
        
        for i in range(s, -1, -1):
            n = int(ceil(self.max_resource / self.eta**i * self.eta / (s + 1)))
            r = self.max_resource * self.eta**(-i)
            
            if verbose:
                print(f"\n--- Bracket {i}: n={n}, r={r} ---")
            
            # Generate random configs
            configs = [self._generate_random_config() for _ in range(n)]
            
            for j in range(s - i + 1):
                # Evaluate all configs with current resource
                values = []
                for config in configs:
                    value = self.objective(config, int(r))
                    values.append(value)
                
                all_configs.extend(configs)
                all_values.extend(values)
                
                if verbose:
                    print(f"  Round {j}: evaluated {len(configs)} configs, "
                         f"best: {min(values):.4f}")
                
                # Keep top 1/eta configs
                n_keep = max(1, int(len(configs) / self.eta))
                sorted_indices = np.argsort(values)[:n_keep]
                configs = [configs[i] for i in sorted_indices]
        
        best_idx = np.argmin(all_values)
        
        return {
            'best_config': all_configs[best_idx],
            'best_value': all_values[best_idx],
            'all_configs': all_configs,
            'all_values': all_values
        }

class SuccessiveHalving:
    """Successive Halving for resource-efficient optimization."""
    
    def __init__(self, objective: Callable,
                 min_resource: int = 1,
                 max_resource: int = 64,
                 reduction_factor: int = 3):
        self.objective = objective
        self.min_resource = min_resource
        self.max_resource = max_resource
        self.reduction_factor = reduction_factor
    
    def optimize(self, configs: List[Dict] = None,
                 n_configs: int = 27,
                 verbose: bool = True) -> Dict:
        """Run Successive Halving."""
        
        if configs is None:
            configs = [self._generate_random_config() for _ in range(n_configs)]
        
        n_configs = len(configs)
        n_iterations = int(log(n_configs) / log(self.reduction_factor))
        
        all_configs = []
        all_values = []
        
        for i in range(n_iterations):
            n_resources = int(self.min_resource * self.reduction_factor ** i)
            n_configs = len(configs)
            
            if verbose:
                print(f"\nIteration {i+1}: {n_configs} configs, "
                     f"{n_resources} resources")
            
            # Evaluate all configs
            values = []
            for config in configs:
                value = self.objective(config, n_resources)
                values.append(value)
                all_configs.append(config)
                all_values.append(value)
            
            # Keep top 1/reduction_factor configs
            n_keep = max(1, int(n_configs / self.reduction_factor))
            sorted_indices = np.argsort(values)[:n_keep]
            configs = [configs[i] for i in sorted_indices]
        
        best_idx = np.argmin(all_values)
        
        return {
            'best_config': all_configs[best_idx],
            'best_value': all_values[best_idx],
            'all_configs': all_configs,
            'all_values': all_values
        }
    
    def _generate_random_config(self) -> Dict:
        return {
            'learning_rate': 10 ** np.random.uniform(-5, -1),
            'batch_size': np.random.choice([16, 32, 64]),
            'n_layers': np.random.randint(1, 5)
        }

⚠️

Hyperband is excellent for models where training can be early-stopped (e.g., neural networks). For models without this capability, use Bayesian optimization or TPE.

Neural Architecture Search (NAS)

DARTS-Based NAS

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict, Tuple

class MixedOperation(nn.Module):
    """Mixed operation for differentiable NAS."""
    
    def __init__(self, C: int, stride: int):
        super().__init__()
        
        self.ops = nn.ModuleList([
            nn.Identity() if stride == 1 else nn.AvgPool2d(3, stride=stride, padding=1),
            nn.ZeroPad2d(0),
            nn.Conv2d(C, C, 3, stride=stride, padding=1, bias=False),
            nn.Conv2d(C, C, 5, stride=stride, padding=2, bias=False),
            nn.Conv2d(C, C, 7, stride=stride, padding=3, bias=False),
            nn.MaxPool2d(3, stride=stride, padding=1),
            nn.AvgPool2d(3, stride=stride, padding=1),
        ])
        
        self.betas = nn.Parameter(1e-3 * torch.randn(len(self.ops)))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        weights = F.softmax(self.betas, dim=0)
        
        out = sum(w * op(x) for w, op in zip(weights, self.ops))
        
        return out

class DARTSArchitecture:
    """Differentiable Architecture Search."""
    
    def __init__(self, C: int = 16, n_layers: int = 8,
                 n_nodes: int = 4):
        self.C = C
        self.n_layers = n_layers
        self.n_nodes = n_nodes
        
        # Architecture parameters
        self.arch_params = nn.ParameterDict({
            'alphas': nn.Parameter(1e-3 * torch.randn(n_nodes, 7))
        })
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass with architecture parameters."""
        
        s0 = s1 = x
        
        for layer in range(self.n_layers):
            s0, s1 = s1, self._mixed_forward(s0, s1, layer)
        
        return s1
    
    def _mixed_forward(self, s0: torch.Tensor,
                      s1: torch.Tensor,
                      layer: int) -> torch.Tensor:
        """Forward through mixed operations."""
        
        weights = F.softmax(self.arch_params['alphas'], dim=-1)
        
        # Simplified: just use weights to combine operations
        s2 = sum(w * s0 for w in weights[layer])
        
        return s2
    
    def get_architecture(self) -> Dict:
        """Extract best architecture from learned parameters."""
        
        weights = F.softmax(self.arch_params['alphas'], dim=-1)
        
        best_ops = weights.argmax(dim=-1)
        
        op_names = [
            'identity', 'zero', 'conv3x3', 'conv5x5', 
            'conv7x7', 'maxpool', 'avgpool'
        ]
        
        architecture = {
            'operations': [op_names[op] for op in best_ops],
            'weights': weights.detach().numpy().tolist()
        }
        
        return architecture

class NASBench201:
    """NAS-Bench-201 style search space."""
    
    def __init__(self):
        self.ops = {
            'zero': lambda C: Zero(),
            'identity': lambda C: nn.Identity(),
            'conv1x1': lambda C: nn.Conv2d(C, C, 1, padding=0),
            'conv3x3': lambda C: nn.Conv2d(C, C, 3, padding=1),
            'maxpool3x3': lambda C: nn.MaxPool2d(3, stride=1, padding=1),
            'avgpool3x3': lambda C: nn.AvgPool2d(3, stride=1, padding=1),
        }
    
    def sample_architecture(self) -> Dict:
        """Sample random architecture."""
        
        ops = list(self.ops.keys())
        
        return {
            'edge_0_1': np.random.choice(ops),
            'edge_0_2': np.random.choice(ops),
            'edge_1_2': np.random.choice(ops),
            'edge_0_3': np.random.choice(ops),
            'edge_1_3': np.random.choice(ops),
            'edge_2_3': np.random.choice(ops),
        }

class Zero(nn.Module):
    def forward(self, x):
        return x * 0

Population-Based Training

import numpy as np
from typing import Dict, List, Callable
from dataclasses import dataclass
import copy

@dataclass
class Individual:
    config: Dict
    fitness: float = None
    generation: int = 0

class PopulationBasedTraining:
    """Population-Based Training for hyperparameter optimization."""
    
    def __init__(self, objective: Callable,
                 hyperparameters: List[Dict],
                 population_size: int = 20,
                 generations: int = 10,
                 mutation_rate: float = 0.3,
                 crossover_rate: float = 0.7):
        self.objective = objective
        self.hyperparameters = hyperparameters
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        
        self.population = []
        self.hall_of_fame = []
    
    def _initialize_population(self) -> List[Individual]:
        """Initialize random population."""
        
        population = []
        
        for _ in range(self.population_size):
            config = {}
            
            for hp in self.hyperparameters:
                if hp['type'] == 'float':
                    config[hp['name']] = np.random.uniform(
                        hp['low'], hp['high']
                    )
                elif hp['type'] == 'int':
                    config[hp['name']] = np.random.randint(
                        hp['low'], hp['high'] + 1
                    )
                elif hp['type'] == 'categorical':
                    config[hp['name']] = np.random.choice(hp['choices'])
            
            population.append(Individual(config=config))
        
        return population
    
    def _evaluate_population(self, population: List[Individual]):
        """Evaluate fitness of all individuals."""
        
        for individual in population:
            if individual.fitness is None:
                individual.fitness = self.objective(individual.config)
    
    def _select_parents(self, population: List[Individual],
                       n_parents: int = 2) -> List[Individual]:
        """Tournament selection."""
        
        parents = []
        
        for _ in range(n_parents):
            tournament_size = 3
            tournament = np.random.choice(
                population, size=tournament_size, replace=False
            )
            
            best = min(tournament, key=lambda x: x.fitness)
            parents.append(best)
        
        return parents
    
    def _crossover(self, parent1: Individual,
                  parent2: Individual) -> Individual:
        """Uniform crossover."""
        
        child_config = {}
        
        for hp in self.hyperparameters:
            hp_name = hp['name']
            
            if np.random.random() < 0.5:
                child_config[hp_name] = parent1.config[hp_name]
            else:
                child_config[hp_name] = parent2.config[hp_name]
        
        return Individual(
            config=child_config,
            generation=max(parent1.generation, parent2.generation) + 1
        )
    
    def _mutate(self, individual: Individual) -> Individual:
        """Mutate hyperparameters."""
        
        mutated_config = copy.deepcopy(individual.config)
        
        for hp in self.hyperparameters:
            if np.random.random() < self.mutation_rate:
                hp_name = hp['name']
                
                if hp['type'] == 'float':
                    # Gaussian perturbation
                    current = mutated_config[hp_name]
                    std = (hp['high'] - hp['low']) * 0.1
                    new_value = current + np.random.normal(0, std)
                    new_value = np.clip(new_value, hp['low'], hp['high'])
                    mutated_config[hp_name] = new_value
                
                elif hp['type'] == 'int':
                    current = mutated_config[hp_name]
                    delta = np.random.choice([-1, 1])
                    new_value = current + delta
                    new_value = np.clip(new_value, hp['low'], hp['high'])
                    mutated_config[hp_name] = new_value
                
                elif hp['type'] == 'categorical':
                    mutated_config[hp_name] = np.random.choice(hp['choices'])
        
        return Individual(
            config=mutated_config,
            fitness=None,
            generation=individual.generation + 1
        )
    
    def optimize(self, verbose: bool = True) -> Dict:
        """Run Population-Based Training."""
        
        # Initialize
        self.population = self._initialize_population()
        self._evaluate_population(self.population)
        
        for gen in range(self.generations):
            # Sort by fitness
            self.population.sort(key=lambda x: x.fitness)
            
            # Update hall of fame
            self.hall_of_fame.extend(self.population[:5])
            self.hall_of_fame.sort(key=lambda x: x.fitness)
            self.hall_of_fame = self.hall_of_fame[:10]
            
            if verbose:
                best = self.population[0]
                print(f"Generation {gen+1}: best fitness = {best.fitness:.4f}")
            
            # Create new population
            new_population = []
            
            # Elitism: keep top 10%
            elite_count = max(1, int(self.population_size * 0.1))
            new_population.extend(self.population[:elite_count])
            
            # Fill rest with crossover and mutation
            while len(new_population) < self.population_size:
                # Selection
                parents = self._select_parents(self.population)
                
                # Crossover
                if np.random.random() < self.crossover_rate:
                    child = self._crossover(parents[0], parents[1])
                else:
                    child = Individual(
                        config=copy.deepcopy(parents[0].config),
                        generation=parents[0].generation + 1
                    )
                
                # Mutation
                child = self._mutate(child)
                
                new_population.append(child)
            
            self.population = new_population
            self._evaluate_population(self.population)
        
        # Return best
        self.population.sort(key=lambda x: x.fitness)
        
        return {
            'best_config': self.population[0].config,
            'best_fitness': self.population[0].fitness,
            'hall_of_fame': [
                {'config': ind.config, 'fitness': ind.fitness}
                for ind in self.hall_of_fame
            ]
        }

ℹ️

Population-Based Training combines evolutionary algorithms with learning rate schedules. It's particularly effective for deep learning where the optimal hyperparameters change during training.

AutoML Framework Integration

Optuna Integration

import optuna
from typing import Dict, List, Callable
import numpy as np

class OptunaAutoML:
    """AutoML using Optuna framework."""
    
    def __init__(self, objective: Callable,
                 n_trials: int = 100,
                 timeout: int = 3600):
        self.objective = objective
        self.n_trials = n_trials
        self.timeout = timeout
        
        self.study = None
    
    def define_search_space(self, trial: optuna.Trial) -> Dict:
        """Define hyperparameter search space."""
        
        return {
            'learning_rate': trial.suggest_float(
                'learning_rate', 1e-5, 1e-1, log=True
            ),
            'n_estimators': trial.suggest_int(
                'n_estimators', 50, 1000
            ),
            'max_depth': trial.suggest_int(
                'max_depth', 3, 15
            ),
            'min_child_weight': trial.suggest_int(
                'min_child_weight', 1, 10
            ),
            'subsample': trial.suggest_float(
                'subsample', 0.5, 1.0
            ),
            'colsample_bytree': trial.suggest_float(
                'colsample_bytree', 0.5, 1.0
            ),
            'reg_alpha': trial.suggest_float(
                'reg_alpha', 1e-8, 10.0, log=True
            ),
            'reg_lambda': trial.suggest_float(
                'reg_lambda', 1e-8, 10.0, log=True
            ),
            'gamma': trial.suggest_float(
                'gamma', 1e-8, 5.0, log=True
            )
        }
    
    def optimize(self, verbose: bool = True) -> Dict:
        """Run Optuna optimization."""
        
        def optuna_objective(trial):
            config = self.define_search_space(trial)
            return self.objective(config)
        
        # Create study
        self.study = optuna.create_study(
            direction='minimize',
            sampler=optuna.samplers.TPESampler(seed=42),
            pruner=optuna.pruners.MedianPruner()
        )
        
        # Optimize
        self.study.optimize(
            optuna_objective,
            n_trials=self.n_trials,
            timeout=self.timeout,
            show_progress_bar=verbose
        )
        
        return {
            'best_config': self.study.best_params,
            'best_value': self.study.best_value,
            'n_trials': len(self.study.trials),
            'study': self.study
        }
    
    def get_feature_importance(self) -> Dict:
        """Get hyperparameter importance."""
        
        if self.study is None:
            return {}
        
        importances = optuna.importance.get_param_importances(self.study)
        
        return importances
    
    def get_optimization_history(self) -> Dict:
        """Get optimization history."""
        
        if self.study is None:
            return {}
        
        trials = self.study.trials
        
        return {
            'trials': [
                {
                    'number': trial.number,
                    'value': trial.value,
                    'params': trial.params,
                    'state': trial.state.name
                }
                for trial in trials
            ],
            'best_values': [
                min(t.value for t in trials[:i+1] if t.value is not None)
                for i in range(len(trials))
            ]
        }

Ray Tune Integration

import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
from typing import Dict, List, Callable

class RayTuneAutoML:
    """AutoML using Ray Tune for distributed optimization."""
    
    def __init__(self, objective: Callable,
                 n_trials: int = 100,
                 n_cpus: int = 8,
                 n_gpus: float = 1.0):
        self.objective = objective
        self.n_trials = n_trials
        self.n_cpus = n_cpus
        self.n_gpus = n_gpus
        
        # Initialize Ray
        if not ray.is_initialized():
            ray.init(num_cpus=n_cpus, num_gpus=n_gpus)
    
    def train_func(self, config: Dict):
        """Training function for Ray Tune."""
        
        # Update config with Ray-specific settings
        config['device'] = 'cuda' if self.n_gpus > 0 else 'cpu'
        
        # Run objective
        result = self.objective(config)
        
        # Report results to Ray Tune
        tune.report(
            loss=result['loss'],
            accuracy=result.get('accuracy', 0),
            **result.get('metrics', {})
        )
    
    def optimize(self, verbose: bool = True) -> Dict:
        """Run distributed optimization with Ray Tune."""
        
        # Define search space
        search_space = {
            'learning_rate': tune.loguniform(1e-5, 1e-1),
            'batch_size': tune.choice([16, 32, 64, 128]),
            'hidden_size': tune.choice([64, 128, 256, 512]),
            'n_layers': tune.randint(1, 6),
            'dropout': tune.uniform(0.1, 0.5),
        }
        
        # Define scheduler (early stopping)
        scheduler = ASHAScheduler(
            max_t=100,
            grace_period=10,
            reduction_factor=2
        )
        
        # Define search algorithm
        search = OptunaSearch()
        
        # Run optimization
        analysis = tune.run(
            self.train_func,
            config=search_space,
            num_samples=self.n_trials,
            scheduler=scheduler,
            search_alg=search,
            metric='loss',
            mode='min',
            resources_per_trial={
                'cpu': self.n_cpus // self.n_trials,
                'gpu': self.n_gpus / self.n_trials
            },
            verbose=verbose
        )
        
        # Get best result
        best_trial = analysis.get_best_trial('loss', 'min', 'last')
        
        return {
            'best_config': best_trial.config,
            'best_value': best_trial.last_result['loss'],
            'analysis': analysis
        }
    
    def cleanup(self):
        """Cleanup Ray resources."""
        if ray.is_initialized():
            ray.shutdown()

ℹ️

Ray Tune provides distributed hyperparameter optimization with early stopping and resource allocation. Use it for large-scale AutoML experiments requiring multiple GPUs.

Summary

AutoML and HPO automate ML pipeline optimization:

  1. Bayesian Optimization: Sample-efficient, uses GP surrogate
  2. TPE: Tree-structured Parzen Estimator for non-GP methods
  3. Hyperband: Early-stopping based resource allocation
  4. NAS: Neural Architecture Search for model design
  5. Population-Based: Evolutionary algorithms with learning

Choose the method based on your computational budget and model type.

Advertisement