πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

ML Data Pipelines: Validation, Transformation, Augmentation

MLOpsML Data Pipelines⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Netflix, Uber, Airbnb, Spotify

"Design an ML data pipeline that handles data validation, transformation, and augmentation at scale. How do you ensure data quality, handle schema evolution, and manage data versioning?"

ML Data Pipeline Architecture

ML data pipelines transform raw data into features suitable for model training and serving. They must handle validation, transformation, and quality monitoring at scale.

Data Pipeline Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                ML Data Pipeline Architecture                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data    │───▢│  Schema  │───▢│  Data    │───▢│Feature   β”‚ β”‚
β”‚  β”‚ Ingestionβ”‚    β”‚Validationβ”‚    β”‚Transform β”‚    β”‚Engineeringβ”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β”‚              β”‚              β”‚                β”‚         β”‚
β”‚       β–Ό              β–Ό              β–Ό                β–Ό         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Raw     β”‚    β”‚  Clean   β”‚    β”‚Processed β”‚    β”‚Feature   β”‚ β”‚
β”‚  β”‚  Data    β”‚    β”‚  Data    β”‚    β”‚  Data    β”‚    β”‚Store     β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Data Quality Monitoring                     β”‚   β”‚
β”‚  β”‚    Validation | Profiling | Drift Detection | Alerts    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Validation

Great Expectations Pipeline

import great_expectations as ge
import pandas as pd
from typing import Dict, List, Optional
from datetime import datetime
import json

class MLDataValidator:
    """Comprehensive data validation for ML pipelines."""
    
    def __init__(self, reference_data: pd.DataFrame = None):
        self.reference_data = reference_data
        self.validation_results = []
    
    def create_expectation_suite(self, df: pd.DataFrame,
                                 suite_name: str = "ml_data_suite") -> ge.core.ExpectationSuite:
        """Create expectation suite from data profile."""
        
        suite = ge.core.ExpectationSuite(expectation_suite_name=suite_name)
        
        # Add expectations for each column
        for column in df.columns:
            # Not null expectation
            suite.add_expectation(
                ge.core.ExpectationConfiguration(
                    expectation_type="expect_column_values_to_not_be_null",
                    kwargs={"column": column}
                )
            )
            
            # Data type expectations
            if df[column].dtype in ['int64', 'float64']:
                # Numeric range expectation
                min_val = df[column].min()
                max_val = df[column].max()
                
                suite.add_expectation(
                    ge.core.ExpectationConfiguration(
                        expectation_type="expect_column_values_to_be_between",
                        kwargs={
                            "column": column,
                            "min_value": min_val * 0.5,
                            "max_value": max_val * 1.5
                        }
                    )
                )
            
            elif df[column].dtype == 'object':
                # Unique values expectation
                unique_values = df[column].unique().tolist()
                if len(unique_values) <= 100:  # Only for low cardinality
                    suite.add_expectation(
                        ge.core.ExpectationConfiguration(
                            expectation_type="expect_column_values_to_be_in_set",
                            kwargs={
                                "column": column,
                                "value_set": unique_values
                            }
                        )
                    )
        
        # Table-level expectations
        suite.add_expectation(
            ge.core.ExpectationConfiguration(
                expectation_type="expect_table_row_count_to_be_between",
                kwargs={
                    "min_value": len(df) * 0.8,
                    "max_value": len(df) * 1.2
                }
            )
        )
        
        return suite
    
    def validate_dataset(self, df: pd.DataFrame,
                        suite: ge.core.ExpectationSuite) -> Dict:
        """Validate dataset against expectation suite."""
        
        ge_df = ge.from_pandas(df)
        
        results = ge_df.validate(suite)
        
        validation_result = {
            'timestamp': datetime.now().isoformat(),
            'success': results.success,
            'statistics': results.statistics,
            'results': []
        }
        
        for result in results.results:
            validation_result['results'].append({
                'expectation': result.expectation_config.expectation_type,
                'success': result.success,
                'result': result.result,
                'kwargs': result.expectation_config.kwargs
            })
        
        self.validation_results.append(validation_result)
        
        return validation_result
    
    def validate_ml_specific(self, df: pd.DataFrame,
                            target_column: str,
                            feature_columns: List[str]) -> Dict:
        """ML-specific validation checks."""
        
        checks = []
        
        # Check target variable
        if target_column in df.columns:
            # Class balance check
            class_counts = df[target_column].value_counts()
            class_ratio = class_counts.min() / class_counts.max()
            
            checks.append({
                'check': 'class_balance',
                'success': class_ratio > 0.1,
                'value': float(class_ratio),
                'threshold': 0.1,
                'message': f"Class ratio: {class_ratio:.4f}"
            })
            
            # Check for label leakage
            for col in feature_columns:
                if col in df.columns and df[col].dtype in ['int64', 'float64']:
                    correlation = abs(df[col].corr(df[target_column]))
                    
                    checks.append({
                        'check': 'label_leakage',
                        'feature': col,
                        'success': correlation < 0.95,
                        'value': float(correlation),
                        'threshold': 0.95,
                        'message': f"Correlation with target: {correlation:.4f}"
                    })
        
        # Check feature correlations
        if len(feature_columns) > 1:
            numeric_cols = [c for c in feature_columns 
                          if c in df.columns and df[c].dtype in ['int64', 'float64']]
            
            if len(numeric_cols) > 1:
                corr_matrix = df[numeric_cols].corr().abs()
                upper_tri = corr_matrix.where(
                    np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
                )
                
                high_corr_pairs = [
                    (col, row, corr_matrix.loc[row, col])
                    for col in upper_tri.columns
                    for row in upper_tri.index
                    if upper_tri.loc[row, col] > 0.95
                ]
                
                checks.append({
                    'check': 'feature_correlation',
                    'success': len(high_corr_pairs) == 0,
                    'value': len(high_corr_pairs),
                    'threshold': 0,
                    'message': f"Highly correlated pairs: {len(high_corr_pairs)}"
                })
        
        return {
            'timestamp': datetime.now().isoformat(),
            'checks': checks,
            'success': all(check['success'] for check in checks)
        }
    
    def generate_report(self) -> str:
        """Generate HTML validation report."""
        
        html = """
        <html>
        <head>
            <title>Data Validation Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .passed { color: green; }
                .failed { color: red; }
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; }
                th { background-color: #f2f2f2; }
            </style>
        </head>
        <body>
            <h1>Data Validation Report</h1>
            <p>Generated: """ + datetime.now().isoformat() + """</p>
        """
        
        for result in self.validation_results:
            status_class = "passed" if result['success'] else "failed"
            html += f"""
            <h2 class="{status_class}">
                Validation: {'PASSED' if result['success'] else 'FAILED'}
            </h2>
            <table>
                <tr><th>Expectation</th><th>Success</th><th>Details</th></tr>
            """
            
            for check in result.get('results', []):
                html += f"""
                <tr>
                    <td>{check['expectation']}</td>
                    <td class="{'passed' if check['success'] else 'failed'}">
                        {'βœ“' if check['success'] else 'βœ—'}
                    </td>
                    <td>{check.get('result', {})}</td>
                </tr>
                """
            
            html += "</table>"
        
        html += "</body></html>"
        
        return html

ℹ️

Data validation should be the first step in any ML pipeline. Implement schema validation, statistical tests, and ML-specific checks to catch data issues early.

Data Transformation

PySpark Transformation Pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from typing import Dict, List
import numpy as np

class MLDataTransformer:
    """Data transformation pipeline using PySpark."""
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.transformers = []
    
    def add_numeric_transformer(self, columns: List[str],
                               method: str = 'standard'):
        """Add numeric transformation."""
        
        self.transformers.append({
            'type': 'numeric',
            'columns': columns,
            'method': method
        })
        
        return self
    
    def add_categorical_transformer(self, columns: List[str],
                                   method: str = 'onehot'):
        """Add categorical transformation."""
        
        self.transformers.append({
            'type': 'categorical',
            'columns': columns,
            'method': method
        })
        
        return self
    
    def add_time_transformer(self, timestamp_column: str,
                            features: List[str] = None):
        """Add time-based feature extraction."""
        
        self.transformers.append({
            'type': 'time',
            'column': timestamp_column,
            'features': features or ['hour', 'dayofweek', 'month', 'year']
        })
        
        return self
    
    def add_aggregation_transformer(self, group_columns: List[str],
                                   agg_columns: List[str],
                                   agg_functions: List[str] = None):
        """Add aggregation transformer."""
        
        self.transformers.append({
            'type': 'aggregation',
            'group_columns': group_columns,
            'agg_columns': agg_columns,
            'agg_functions': agg_functions or ['mean', 'std', 'min', 'max']
        })
        
        return self
    
    def transform(self, df):
        """Apply all transformations."""
        
        for transformer in self.transformers:
            if transformer['type'] == 'numeric':
                df = self._transform_numeric(df, transformer)
            elif transformer['type'] == 'categorical':
                df = self._transform_categorical(df, transformer)
            elif transformer['type'] == 'time':
                df = self._transform_time(df, transformer)
            elif transformer['type'] == 'aggregation':
                df = self._transform_aggregation(df, transformer)
        
        return df
    
    def _transform_numeric(self, df, transformer: Dict):
        """Apply numeric transformations."""
        
        columns = transformer['columns']
        method = transformer['method']
        
        if method == 'standard':
            for col in columns:
                # Calculate mean and std
                stats = df.select(
                    F.mean(col).alias('mean'),
                    F.stddev(col).alias('std')
                ).collect()[0]
                
                mean_val = stats['mean']
                std_val = stats['std']
                
                # Apply transformation
                df = df.withColumn(
                    f"{col}_transformed",
                    (F.col(col) - mean_val) / (std_val + 1e-8)
                )
        
        elif method == 'minmax':
            for col in columns:
                stats = df.select(
                    F.min(col).alias('min'),
                    F.max(col).alias('max')
                ).collect()[0]
                
                min_val = stats['min']
                max_val = stats['max']
                
                df = df.withColumn(
                    f"{col}_transformed",
                    (F.col(col) - min_val) / (max_val - min_val + 1e-8)
                )
        
        elif method == 'log':
            for col in columns:
                df = df.withColumn(
                    f"{col}_transformed",
                    F.log1p(F.abs(F.col(col)))
                )
        
        return df
    
    def _transform_categorical(self, df, transformer: Dict):
        """Apply categorical transformations."""
        
        columns = transformer['columns']
        method = transformer['method']
        
        if method == 'onehot':
            for col in columns:
                # Get unique values
                unique_values = df.select(col).distinct().collect()
                unique_values = [row[col] for row in unique_values]
                
                # Create one-hot columns
                for value in unique_values:
                    df = df.withColumn(
                        f"{col}_{value}",
                        F.when(F.col(col) == value, 1).otherwise(0)
                    )
        
        elif method == 'label':
            for col in columns:
                # Create label encoding
                distinct_values = df.select(col).distinct().collect()
                value_to_idx = {
                    row[col]: idx 
                    for idx, row in enumerate(distinct_values)
                }
                
                # Apply encoding
                for value, idx in value_to_idx.items():
                    df = df.withColumn(
                        f"{col}_encoded",
                        F.when(F.col(col) == value, idx).otherwise(
                            F.col(f"{col}_encoded")
                        )
                    )
        
        return df
    
    def _transform_time(self, df, transformer: Dict):
        """Extract time-based features."""
        
        col = transformer['column']
        features = transformer['features']
        
        # Convert to timestamp
        df = df.withColumn(
            f"{col}_timestamp",
            F.to_timestamp(F.col(col))
        )
        
        if 'hour' in features:
            df = df.withColumn(
                f"{col}_hour",
                F.hour(F.col(f"{col}_timestamp"))
            )
        
        if 'dayofweek' in features:
            df = df.withColumn(
                f"{col}_dayofweek",
                F.dayofweek(F.col(f"{col}_timestamp"))
            )
        
        if 'month' in features:
            df = df.withColumn(
                f"{col}_month",
                F.month(F.col(f"{col}_timestamp"))
            )
        
        if 'year' in features:
            df = df.withColumn(
                f"{col}_year",
                F.year(F.col(f"{col}_timestamp"))
            )
        
        if 'is_weekend' in features:
            df = df.withColumn(
                f"{col}_is_weekend",
                F.when(
                    F.dayofweek(F.col(f"{col}_timestamp")).isin([1, 7]),
                    1
                ).otherwise(0)
            )
        
        return df
    
    def _transform_aggregation(self, df, transformer: Dict):
        """Apply aggregation transformations."""
        
        group_cols = transformer['group_columns']
        agg_cols = transformer['agg_columns']
        agg_funcs = transformer['agg_functions']
        
        # Define aggregation
        agg_exprs = []
        for col in agg_cols:
            for func in agg_funcs:
                if func == 'mean':
                    agg_exprs.append(F.mean(col).alias(f"{col}_mean"))
                elif func == 'std':
                    agg_exprs.append(F.stddev(col).alias(f"{col}_std"))
                elif func == 'min':
                    agg_exprs.append(F.min(col).alias(f"{col}_min"))
                elif func == 'max':
                    agg_exprs.append(F.max(col).alias(f"{col}_max"))
                elif func == 'count':
                    agg_exprs.append(F.count(col).alias(f"{col}_count"))
        
        # Perform aggregation
        agg_df = df.groupBy(group_cols).agg(*agg_exprs)
        
        # Join with original dataframe
        df = df.join(agg_df, on=group_cols, how='left')
        
        return df

Feature Engineering Pipeline

import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.decomposition import PCA

class FeatureEngineeringPipeline:
    """End-to-end feature engineering pipeline."""
    
    def __init__(self):
        self.steps = []
        self.fitted = False
        self.scalers = {}
        self.encoders = {}
        self.feature_names = []
    
    def add_step(self, step_type: str, **kwargs):
        """Add a transformation step."""
        
        self.steps.append({
            'type': step_type,
            'params': kwargs
        })
        
        return self
    
    def fit_transform(self, X: pd.DataFrame, 
                     y: pd.Series = None) -> pd.DataFrame:
        """Fit and transform the data."""
        
        result = X.copy()
        
        for step in self.steps:
            result = self._apply_step(result, step, fit=True, y=y)
        
        self.fitted = True
        self.feature_names = list(result.columns)
        
        return result
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Transform data using fitted pipeline."""
        
        if not self.fitted:
            raise ValueError("Pipeline not fitted. Call fit_transform first.")
        
        result = X.copy()
        
        for step in self.steps:
            result = self._apply_step(result, step, fit=False)
        
        return result
    
    def _apply_step(self, df: pd.DataFrame, step: Dict,
                   fit: bool = True, y: pd.Series = None) -> pd.DataFrame:
        """Apply a single transformation step."""
        
        step_type = step['type']
        params = step['params']
        
        if step_type == 'missing_value':
            return self._handle_missing(df, params, fit)
        
        elif step_type == 'numeric_scaling':
            return self._scale_numeric(df, params, fit)
        
        elif step_type == 'categorical_encoding':
            return self._encode_categorical(df, params, fit)
        
        elif step_type == 'feature_creation':
            return self._create_features(df, params)
        
        elif step_type == 'feature_selection':
            return self._select_features(df, params, fit, y)
        
        elif step_type == 'dimensionality_reduction':
            return self._reduce_dimensions(df, params, fit)
        
        elif step_type == 'outlier_handling':
            return self._handle_outliers(df, params, fit)
        
        else:
            raise ValueError(f"Unknown step type: {step_type}")
    
    def _handle_missing(self, df: pd.DataFrame, params: Dict,
                       fit: bool = True) -> pd.DataFrame:
        """Handle missing values."""
        
        method = params.get('method', 'median')
        columns = params.get('columns', df.columns.tolist())
        
        for col in columns:
            if col not in df.columns:
                continue
            
            if fit:
                if method == 'median':
                    fill_value = df[col].median()
                elif method == 'mean':
                    fill_value = df[col].mean()
                elif method == 'mode':
                    fill_value = df[col].mode()[0] if not df[col].mode().empty else 0
                else:
                    fill_value = 0
                
                self.scalers[f'missing_{col}'] = fill_value
            
            fill_value = self.scalers.get(f'missing_{col}', 0)
            df[col] = df[col].fillna(fill_value)
        
        return df
    
    def _scale_numeric(self, df: pd.DataFrame, params: Dict,
                      fit: bool = True) -> pd.DataFrame:
        """Scale numeric features."""
        
        method = params.get('method', 'standard')
        columns = params.get('columns', 
            df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        )
        
        for col in columns:
            if col not in df.columns:
                continue
            
            if fit:
                if method == 'standard':
                    mean_val = df[col].mean()
                    std_val = df[col].std()
                    self.scalers[f'scale_{col}'] = (mean_val, std_val)
                elif method == 'minmax':
                    min_val = df[col].min()
                    max_val = df[col].max()
                    self.scalers[f'scale_{col}'] = (min_val, max_val)
            
            if f'scale_{col}' in self.scalers:
                if method == 'standard':
                    mean_val, std_val = self.scalers[f'scale_{col}']
                    df[col] = (df[col] - mean_val) / (std_val + 1e-8)
                elif method == 'minmax':
                    min_val, max_val = self.scalers[f'scale_{col}']
                    df[col] = (df[col] - min_val) / (max_val - min_val + 1e-8)
        
        return df
    
    def _encode_categorical(self, df: pd.DataFrame, params: Dict,
                           fit: bool = True) -> pd.DataFrame:
        """Encode categorical features."""
        
        method = params.get('method', 'onehot')
        columns = params.get('columns', 
            df.select_dtypes(include=['object', 'category']).columns.tolist()
        )
        
        for col in columns:
            if col not in df.columns:
                continue
            
            if fit:
                if method == 'label':
                    le = LabelEncoder()
                    le.fit(df[col].astype(str))
                    self.encoders[f'encode_{col}'] = le
                elif method == 'onehot':
                    unique_values = df[col].unique().tolist()
                    self.encoders[f'encode_{col}'] = unique_values
            
            if method == 'label':
                le = self.encoders.get(f'encode_{col}')
                if le:
                    df[col] = le.transform(df[col].astype(str))
            
            elif method == 'onehot':
                unique_values = self.encoders.get(f'encode_{col}', [])
                for value in unique_values:
                    df[f'{col}_{value}'] = (df[col] == value).astype(int)
                df = df.drop(columns=[col])
        
        return df
    
    def _create_features(self, df: pd.DataFrame, params: Dict) -> pd.DataFrame:
        """Create new features."""
        
        feature_type = params.get('type', 'interaction')
        
        if feature_type == 'interaction':
            columns = params.get('columns', df.columns.tolist()[:5])
            
            for i, col1 in enumerate(columns):
                for col2 in columns[i+1:]:
                    if col1 in df.columns and col2 in df.columns:
                        if df[col1].dtype in ['int64', 'float64'] and \
                           df[col2].dtype in ['int64', 'float64']:
                            df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
                            df[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-8)
        
        elif feature_type == 'polynomial':
            columns = params.get('columns', df.columns.tolist()[:5])
            degree = params.get('degree', 2)
            
            for col in columns:
                if col in df.columns and df[col].dtype in ['int64', 'float64']:
                    for d in range(2, degree + 1):
                        df[f'{col}_pow{d}'] = df[col] ** d
        
        elif feature_type == 'log':
            columns = params.get('columns', df.columns.tolist()[:5])
            
            for col in columns:
                if col in df.columns and df[col].dtype in ['int64', 'float64']:
                    df[f'{col}_log'] = np.log1p(df[col].clip(lower=0))
        
        return df
    
    def _select_features(self, df: pd.DataFrame, params: Dict,
                        fit: bool = True, y: pd.Series = None) -> pd.DataFrame:
        """Select top features."""
        
        method = params.get('method', 'kbest')
        k = params.get('k', 10)
        
        if y is None:
            return df
        
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        
        if len(numeric_cols) <= k:
            return df
        
        if fit:
            selector = SelectKBest(f_classif, k=k)
            selector.fit(df[numeric_cols], y)
            selected_features = [
                col for col, selected 
                in zip(numeric_cols, selector.get_support())
                if selected
            ]
            self.scalers['selected_features'] = selected_features
        
        selected_features = self.scalers.get('selected_features', numeric_cols[:k])
        
        return df[selected_features + 
                  [c for c in df.columns if c not in numeric_cols]]
    
    def _reduce_dimensions(self, df: pd.DataFrame, params: Dict,
                          fit: bool = True) -> pd.DataFrame:
        """Reduce dimensionality using PCA."""
        
        n_components = params.get('n_components', 10)
        
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        
        if len(numeric_cols) <= n_components:
            return df
        
        if fit:
            pca = PCA(n_components=n_components)
            pca.fit(df[numeric_cols])
            self.scalers['pca'] = pca
        
        pca = self.scalers.get('pca')
        if pca:
            pca_result = pca.transform(df[numeric_cols])
            pca_df = pd.DataFrame(
                pca_result,
                columns=[f'pca_{i}' for i in range(n_components)],
                index=df.index
            )
            
            # Keep non-numeric columns
            non_numeric = [c for c in df.columns if c not in numeric_cols]
            
            df = pd.concat([df[non_numeric], pca_df], axis=1)
        
        return df
    
    def _handle_outliers(self, df: pd.DataFrame, params: Dict,
                        fit: bool = True) -> pd.DataFrame:
        """Handle outliers."""
        
        method = params.get('method', 'clip')
        columns = params.get('columns', 
            df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        )
        
        for col in columns:
            if col not in df.columns:
                continue
            
            if fit:
                q1 = df[col].quantile(0.25)
                q3 = df[col].quantile(0.75)
                iqr = q3 - q1
                lower_bound = q1 - 1.5 * iqr
                upper_bound = q3 + 1.5 * iqr
                self.scalers[f'outlier_{col}'] = (lower_bound, upper_bound)
            
            lower_bound, upper_bound = self.scalers.get(
                f'outlier_{col}', 
                (df[col].min(), df[col].max())
            )
            
            if method == 'clip':
                df[col] = df[col].clip(lower=lower_bound, upper=upper_bound)
            elif method == 'remove':
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
        
        return df

⚠️

Always fit transformers on training data only, then transform both training and test data. This prevents data leakage and ensures consistent transformations.

Data Augmentation

Image Data Augmentation

import numpy as np
from typing import List, Tuple
import random

class ImageAugmenter:
    """Data augmentation for image data."""
    
    def __init__(self, augmentations: List[str] = None):
        self.augmentations = augmentations or [
            'horizontal_flip', 'vertical_flip', 'rotation',
            'brightness', 'contrast', 'noise', 'crop'
        ]
    
    def augment(self, image: np.ndarray, 
                n_augmentations: int = 1) -> List[np.ndarray]:
        """Apply random augmentations to image."""
        
        augmented_images = [image.copy()]
        
        for _ in range(n_augmentations):
            aug_image = image.copy()
            
            # Apply random subset of augmentations
            n_transforms = random.randint(1, len(self.augmentations))
            selected_augs = random.sample(self.augmentations, n_transforms)
            
            for aug in selected_augs:
                if aug == 'horizontal_flip':
                    aug_image = self._horizontal_flip(aug_image)
                elif aug == 'vertical_flip':
                    aug_image = self._vertical_flip(aug_image)
                elif aug == 'rotation':
                    aug_image = self._rotate(aug_image)
                elif aug == 'brightness':
                    aug_image = self._adjust_brightness(aug_image)
                elif aug == 'contrast':
                    aug_image = self._adjust_contrast(aug_image)
                elif aug == 'noise':
                    aug_image = self._add_noise(aug_image)
                elif aug == 'crop':
                    aug_image = self._random_crop(aug_image)
            
            augmented_images.append(aug_image)
        
        return augmented_images
    
    def _horizontal_flip(self, image: np.ndarray) -> np.ndarray:
        """Random horizontal flip."""
        if random.random() > 0.5:
            return np.flipud(image).copy()
        return image
    
    def _vertical_flip(self, image: np.ndarray) -> np.ndarray:
        """Random vertical flip."""
        if random.random() > 0.5:
            return np.fliplr(image).copy()
        return image
    
    def _rotate(self, image: np.ndarray, 
                max_angle: int = 30) -> np.ndarray:
        """Random rotation."""
        from scipy.ndimage import rotate as rotate_image
        
        angle = random.uniform(-max_angle, max_angle)
        return rotate_image(image, angle, reshape=False)
    
    def _adjust_brightness(self, image: np.ndarray,
                          factor_range: Tuple[float, float] = (0.7, 1.3)) -> np.ndarray:
        """Adjust brightness."""
        factor = random.uniform(*factor_range)
        return np.clip(image * factor, 0, 255).astype(np.uint8)
    
    def _adjust_contrast(self, image: np.ndarray,
                        factor_range: Tuple[float, float] = (0.7, 1.3)) -> np.ndarray:
        """Adjust contrast."""
        factor = random.uniform(*factor_range)
        mean = np.mean(image)
        return np.clip((image - mean) * factor + mean, 0, 255).astype(np.uint8)
    
    def _add_noise(self, image: np.ndarray,
                  std_range: Tuple[float, float] = (0, 25)) -> np.ndarray:
        """Add random noise."""
        std = random.uniform(*std_range)
        noise = np.random.normal(0, std, image.shape)
        return np.clip(image + noise, 0, 255).astype(np.uint8)
    
    def _random_crop(self, image: np.ndarray,
                    crop_ratio: float = 0.8) -> np.ndarray:
        """Random crop and resize."""
        h, w = image.shape[:2]
        new_h, new_w = int(h * crop_ratio), int(w * crop_ratio)
        
        top = random.randint(0, h - new_h)
        left = random.randint(0, w - new_w)
        
        cropped = image[top:top+new_h, left:left+new_w]
        
        # Resize back to original size
        from PIL import Image
        pil_image = Image.fromarray(cropped)
        pil_image = pil_image.resize((w, h))
        
        return np.array(pil_image)

class TextAugmenter:
    """Data augmentation for text data."""
    
    def __init__(self, augmentations: List[str] = None):
        self.augmentations = augmentations or [
            'synonym_replacement', 'random_insertion',
            'random_swap', 'random_deletion'
        ]
    
    def augment(self, text: str, 
                n_augmentations: int = 1) -> List[str]:
        """Apply random augmentations to text."""
        
        augmented_texts = [text]
        
        for _ in range(n_augmentations):
            aug_text = text
            
            # Apply random augmentation
            aug_method = random.choice(self.augmentations)
            
            if aug_method == 'synonym_replacement':
                aug_text = self._synonym_replacement(aug_text)
            elif aug_method == 'random_insertion':
                aug_text = self._random_insertion(aug_text)
            elif aug_method == 'random_swap':
                aug_text = self._random_swap(aug_text)
            elif aug_method == 'random_deletion':
                aug_text = self._random_deletion(aug_text)
            
            augmented_texts.append(aug_text)
        
        return augmented_texts
    
    def _synonym_replacement(self, text: str, 
                            n_replacements: int = 1) -> str:
        """Replace words with synonyms."""
        # Simplified implementation
        words = text.split()
        
        for _ in range(n_replacements):
            if words:
                idx = random.randint(0, len(words) - 1)
                # In production, use WordNet or similar
                words[idx] = f"SYN_{words[idx]}"
        
        return ' '.join(words)
    
    def _random_insertion(self, text: str, 
                         n_insertions: int = 1) -> str:
        """Randomly insert words."""
        words = text.split()
        
        for _ in range(n_insertions):
            new_word = "INSERTED"
            idx = random.randint(0, len(words))
            words.insert(idx, new_word)
        
        return ' '.join(words)
    
    def _random_swap(self, text: str, 
                    n_swaps: int = 1) -> str:
        """Randomly swap words."""
        words = text.split()
        
        for _ in range(n_swaps):
            if len(words) >= 2:
                idx1, idx2 = random.sample(range(len(words)), 2)
                words[idx1], words[idx2] = words[idx2], words[idx1]
        
        return ' '.join(words)
    
    def _random_deletion(self, text: str,
                        p: float = 0.1) -> str:
        """Randomly delete words."""
        words = text.split()
        
        if len(words) == 1:
            return text
        
        new_words = [w for w in words if random.random() > p]
        
        if not new_words:
            return random.choice(words)
        
        return ' '.join(new_words)

class TabularAugmenter:
    """Data augmentation for tabular data."""
    
    def __init__(self):
        pass
    
    def augment(self, df: 'pd.DataFrame',
               n_augmentations: int = 1,
               method: str = 'smote') -> 'pd.DataFrame':
        """Augment tabular data."""
        
        augmented_dfs = [df]
        
        for _ in range(n_augmentations):
            if method == 'smote':
                aug_df = self._smote_augment(df)
            elif method == 'gaussian':
                aug_df = self._gaussian_augment(df)
            elif method == 'mixup':
                aug_df = self._mixup_augment(df)
            else:
                aug_df = df.sample(frac=0.1, replace=True)
            
            augmented_dfs.append(aug_df)
        
        import pandas as pd
        return pd.concat(augmented_dfs, ignore_index=True)
    
    def _smote_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
        """SMOTE-like augmentation."""
        # Simplified SMOTE implementation
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
        
        # Find minority class
        if 'label' in df.columns:
            class_counts = df['label'].value_counts()
            minority_class = class_counts.idxmin()
            minority_df = df[df['label'] == minority_class]
        else:
            minority_df = df.sample(n=max(1, len(df) // 10))
        
        # Generate synthetic samples
        synthetic_samples = []
        
        for _ in range(len(minority_df)):
            sample1 = minority_df.sample(1)
            sample2 = minority_df.sample(1)
            
            # Interpolate
            alpha = random.random()
            synthetic = sample1 * alpha + sample2 * (1 - alpha)
            
            synthetic_samples.append(synthetic)
        
        import pandas as pd
        return pd.concat(synthetic_samples, ignore_index=True)
    
    def _gaussian_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
        """Add Gaussian noise."""
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
        
        aug_df = df.copy()
        
        for col in numeric_cols:
            std = df[col].std() * 0.1
            aug_df[col] = df[col] + np.random.normal(0, std, len(df))
        
        return aug_df
    
    def _mixup_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
        """Mixup augmentation."""
        # Randomly pair samples and interpolate
        indices = np.random.permutation(len(df))
        half = len(df) // 2
        
        sample1 = df.iloc[indices[:half]]
        sample2 = df.iloc[indices[half:2*half]]
        
        alpha = random.random() * 0.5 + 0.25
        
        mixed = sample1 * alpha + sample2 * (1 - alpha)
        
        return mixed

ℹ️

Data augmentation increases training data diversity without collecting new data. Use task-appropriate augmentations and validate that augmentations don't introduce label noise.

Data Quality Monitoring

Production Data Quality System

import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import json

@dataclass
class QualityCheck:
    name: str
    passed: bool
    value: float
    threshold: float
    message: str

class DataQualityMonitor:
    """Monitor data quality in production."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.quality_history = []
    
    def check_quality(self, df: pd.DataFrame,
                     reference_df: pd.DataFrame = None) -> Dict:
        """Run comprehensive quality checks."""
        
        checks = []
        
        # Completeness checks
        checks.extend(self._check_completeness(df))
        
        # Consistency checks
        checks.extend(self._check_consistency(df))
        
        # Validity checks
        checks.extend(self._check_validity(df))
        
        # Uniqueness checks
        checks.extend(self._check_uniqueness(df))
        
        # Freshness checks
        checks.extend(self._check_freshness(df))
        
        # Distribution checks (if reference provided)
        if reference_df is not None:
            checks.extend(self._check_distribution(df, reference_df))
        
        # Calculate overall score
        passed_checks = sum(1 for c in checks if c.passed)
        total_checks = len(checks)
        
        quality_score = passed_checks / total_checks if total_checks > 0 else 0
        
        result = {
            'timestamp': datetime.now().isoformat(),
            'quality_score': quality_score,
            'passed_checks': passed_checks,
            'total_checks': total_checks,
            'checks': [
                {
                    'name': c.name,
                    'passed': c.passed,
                    'value': c.value,
                    'threshold': c.threshold,
                    'message': c.message
                }
                for c in checks
            ],
            'status': 'passed' if quality_score >= 0.9 else 'failed'
        }
        
        self.quality_history.append(result)
        
        return result
    
    def _check_completeness(self, df: pd.DataFrame) -> List[QualityCheck]:
        """Check data completeness."""
        
        checks = []
        
        # Check missing values
        missing_rates = df.isna().mean()
        
        for col, rate in missing_rates.items():
            threshold = self.config.get('missing_threshold', 0.1)
            
            checks.append(QualityCheck(
                name=f'completeness_{col}',
                passed=rate <= threshold,
                value=float(rate),
                threshold=threshold,
                message=f'Missing rate for {col}: {rate:.4f}'
            ))
        
        # Check overall completeness
        overall_missing = df.isna().mean().mean()
        threshold = self.config.get('overall_missing_threshold', 0.05)
        
        checks.append(QualityCheck(
            name='overall_completeness',
            passed=overall_missing <= threshold,
            value=float(overall_missing),
            threshold=threshold,
            message=f'Overall missing rate: {overall_missing:.4f}'
        ))
        
        return checks
    
    def _check_consistency(self, df: pd.DataFrame) -> List[QualityCheck]:
        """Check data consistency."""
        
        checks = []
        
        # Check data types
        for col in df.columns:
            if df[col].dtype == 'object':
                # Check for mixed types
                unique_types = df[col].apply(type).nunique()
                
                checks.append(QualityCheck(
                    name=f'consistency_{col}',
                    passed=unique_types <= 2,
                    value=float(unique_types),
                    threshold=2,
                    message=f'Type consistency for {col}: {unique_types} types'
                ))
        
        return checks
    
    def _check_validity(self, df: pd.DataFrame) -> List[QualityCheck]:
        """Check data validity."""
        
        checks = []
        
        # Check numeric ranges
        for col in df.select_dtypes(include=['int64', 'float64']).columns:
            # Check for infinities
            inf_count = np.isinf(df[col]).sum()
            
            checks.append(QualityCheck(
                name=f'validity_inf_{col}',
                passed=inf_count == 0,
                value=float(inf_count),
                threshold=0,
                message=f'Infinities in {col}: {inf_count}'
            ))
            
            # Check for extreme values
            q1 = df[col].quantile(0.01)
            q99 = df[col].quantile(0.99)
            extreme_count = ((df[col] < q1) | (df[col] > q99)).sum()
            extreme_rate = extreme_count / len(df)
            
            checks.append(QualityCheck(
                name=f'validity_extreme_{col}',
                passed=extreme_rate < 0.05,
                value=float(extreme_rate),
                threshold=0.05,
                message=f'Extreme values in {col}: {extreme_rate:.4f}'
            ))
        
        return checks
    
    def _check_uniqueness(self, df: pd.DataFrame) -> List[QualityCheck]:
        """Check data uniqueness."""
        
        checks = []
        
        # Check for duplicate rows
        duplicate_rate = df.duplicated().mean()
        
        checks.append(QualityCheck(
            name='uniqueness_rows',
            passed=duplicate_rate < 0.1,
            value=float(duplicate_rate),
            threshold=0.1,
            message=f'Duplicate row rate: {duplicate_rate:.4f}'
        ))
        
        return checks
    
    def _check_freshness(self, df: pd.DataFrame) -> List[QualityCheck]:
        """Check data freshness."""
        
        checks = []
        
        # Find timestamp columns
        timestamp_cols = [col for col in df.columns 
                         if 'time' in col.lower() or 'date' in col.lower()]
        
        for col in timestamp_cols:
            try:
                df[col] = pd.to_datetime(df[col])
                max_timestamp = df[col].max()
                
                hours_old = (datetime.now() - max_timestamp).total_seconds() / 3600
                threshold = self.config.get('freshness_threshold_hours', 24)
                
                checks.append(QualityCheck(
                    name=f'freshness_{col}',
                    passed=hours_old <= threshold,
                    value=float(hours_old),
                    threshold=threshold,
                    message=f'Data age for {col}: {hours_old:.2f} hours'
                ))
            except:
                pass
        
        return checks
    
    def _check_distribution(self, df: pd.DataFrame,
                           reference_df: pd.DataFrame) -> List[QualityCheck]:
        """Check data distribution against reference."""
        
        from scipy.stats import ks_2samp
        
        checks = []
        
        common_cols = set(df.columns) & set(reference_df.columns)
        
        for col in common_cols:
            if df[col].dtype in ['int64', 'float64'] and \
               reference_df[col].dtype in ['int64', 'float64']:
                
                # KS test
                stat, p_value = ks_2samp(
                    df[col].dropna(),
                    reference_df[col].dropna()
                )
                
                threshold = self.config.get('distribution_pvalue_threshold', 0.05)
                
                checks.append(QualityCheck(
                    name=f'distribution_{col}',
                    passed=p_value > threshold,
                    value=float(p_value),
                    threshold=threshold,
                    message=f'Distribution shift for {col}: p={p_value:.4f}'
                ))
        
        return checks

ℹ️

Data quality monitoring should be automated and integrated into your ML pipeline. Set up alerts for quality degradation and maintain quality dashboards for visibility.

Summary

ML data pipelines require comprehensive validation and transformation:

  1. Data Validation: Schema, statistical, and ML-specific checks
  2. Data Transformation: Scaling, encoding, feature engineering
  3. Data Augmentation: Task-appropriate data synthesis
  4. Quality Monitoring: Automated quality checks and alerting

Implement robust data pipelines to ensure reliable ML model performance.

Advertisement