πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Modeling for ML: Feature Engineering & Training Data

Data EngineeringML Data Pipelines⭐ Premium

Advertisement

Data Modeling for ML: Feature Engineering & Training Data

Difficulty: Senior Level | Companies: Netflix, Uber, Stripe, Airbnb, Spotify

1. ML Data Pipeline Architecture

ML Data PipelineRaw DataData Lake→Feature EngineeringFeature Store→Training DataData Warehouse→Model ServingModel Registry

2. Training Data Preparation

Point-in-Time Correct Dataset

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("MLDataPipeline").getOrCreate()

def create_training_dataset(events_df, labels_df, feature_store):
    """
    Create a training dataset with point-in-time correctness.
    
    For each event (label time), join features computed BEFORE that time.
    """
    
    # Get all label events
    labels = labels_df.select(
        "user_id",
        "event_timestamp",  # When the label was generated
        "label"            # The target variable
    )
    
    # For each label, get the features computed before the label time
    training_data = labels.alias("l").join(
        feature_store.alias("f"),
        on=(labels["user_id"] == feature_store["user_id"]) &
           (feature_store["computed_at"] <= labels["event_timestamp"]),
        how="left"
    )
    
    # Window to get the most recent features before each label
    window_spec = Window \
        .partitionBy("l.user_id", "l.event_timestamp") \
        .orderBy(F.col("f.computed_at").desc())
    
    training_data = training_data \
        .withColumn("rn", F.row_number().over(window_spec)) \
        .filter(F.col("rn") == 1) \
        .drop("rn", "f.user_id", "f.computed_at")
    
    return training_data

⚠️

Critical: Without point-in-time correctness, your model sees future information during training. This creates overly optimistic metrics that don't translate to production.

3. Feature Engineering Patterns

User-Level Features

def compute_user_features(interactions_df):
    """Compute user-level features from interaction data"""
    
    user_features = interactions_df \
        .groupBy("user_id") \
        .agg(
            # Engagement features
            F.count("*").alias("total_interactions"),
            F.countDistinct("session_id").alias("total_sessions"),
            F.avg("session_duration_seconds").alias("avg_session_duration"),
            
            # Recency features
            F.max("event_timestamp").alias("last_active_timestamp"),
            F.min("event_timestamp").alias("first_active_timestamp"),
            
            # Behavioral features
            F.avg("clicks_per_session").alias("avg_clicks_per_session"),
            F.sum("purchases").alias("total_purchases"),
            F.sum("revenue").alias("lifetime_revenue"),
            
            # Diversity features
            F.countDistinct("category").alias("unique_categories"),
            F.countDistinct("device").alias("unique_devices"),
        ) \
        .withColumn(
            "days_since_first_active",
            F.datediff(F.current_date(), F.col("first_active_timestamp"))
        ) \
        .withColumn(
            "days_since_last_active",
            F.datediff(F.current_date(), F.col("last_active_timestamp"))
        ) \
        .withColumn(
            "engagement_rate",
            F.col("total_sessions") / F.col("days_since_first_active")
        )
    
    return user_features

Windowed Features

def compute_windowed_features(events_df):
    """Compute features over different time windows"""
    
    windows = {
        "1d": 1,
        "7d": 7,
        "30d": 30,
        "90d": 90,
    }
    
    features = events_df
    
    for window_name, days in windows.items():
        window_spec = Window \
            .partitionBy("user_id") \
            .orderBy(F.col("event_timestamp").cast("long")) \
            .rangeBetween(-days * 86400, 0)
        
        features = features \
            .withColumn(
                f"spend_{window_name}",
                F.sum("amount").over(window_spec)
            ) \
            .withColumn(
                f"purchase_count_{window_name}",
                F.sum(F.when(F.col("event_type") == "PURCHASE", 1).otherwise(0))
                .over(window_spec)
            ) \
            .withColumn(
                f"avg_order_value_{window_name}",
                F.avg(F.when(F.col("event_type") == "PURCHASE", F.col("amount")))
                .over(window_spec)
            )
    
    return features

4. Data Leakage Detection

Common Leakage Patterns

class LeakageDetector:
    """Detect data leakage in training datasets"""
    
    def __init__(self):
        self.leakage_patterns = []
    
    def check_future_features(self, df, label_timestamp_col, feature_timestamp_cols):
        """Check if any features are computed after the label timestamp"""
        for feat_col in feature_timestamp_cols:
            if feat_col in df.columns:
                leakage = df.filter(F.col(feat_col) > F.col(label_timestamp_col)).count()
                if leakage > 0:
                    self.leakage_patterns.append({
                        "type": "future_feature",
                        "column": feat_col,
                        "affected_rows": leakage
                    })
    
    def check_target_leakage(self, df, target_col, feature_cols):
        """Check for features that are too correlated with the target"""
        from pyspark.ml.stat import Correlation
        
        for feat_col in feature_cols:
            corr = df.stat.corr(target_col, feat_col)
            if abs(corr) > 0.9:
                self.leakage_patterns.append({
                    "type": "target_leakage",
                    "column": feat_col,
                    "correlation": corr
                })
    
    def check_temporal_leakage(self, df, date_col, target_col):
        """Check if data from the future is used to predict the past"""
        # Group by date and check if predictions improve over time
        # (they shouldn't if there's no leakage)
        pass

⚠️

Common Leakage Patterns:

  1. Using future data to compute features (e.g., "next month's revenue" to predict churn)
  2. Including the target variable as a feature (e.g., using purchase_amount to predict purchase)
  3. Including identifiers that encode the target (e.g., user_id when users are sampled by label)

5. Label Engineering

class LabelEngineer:
    """Create labels for different ML tasks"""
    
    @staticmethod
    def create_churn_label(events_df, churn_window_days=30, observation_date=None):
        """Create binary churn label"""
        if observation_date is None:
            observation_date = events_df.agg(F.max("event_timestamp")).collect()[0][0]
        
        # Users who were active before observation date
        active_users = events_df.filter(
            F.col("event_timestamp") <= observation_date
        ).select("user_id").distinct()
        
        # Users who were active in the churn window
        retained_users = events_df.filter(
            (F.col("event_timestamp") <= observation_date) &
            (F.col("event_timestamp") > F.date_sub(observation_date, churn_window_days))
        ).select("user_id").distinct()
        
        # Churn = active before but not in churn window
        churn_labels = active_users.join(
            retained_users, "user_id", "left_anti"
        ).withColumn("churned", F.lit(1))
        
        return churn_labels
    
    @staticmethod
    def create_regression_label(events_df, target_col, window_days=30):
        """Create regression label (e.g., predicted spend in next N days)"""
        # This requires careful handling to avoid leakage
        pass

6. ML Data Pipeline Orchestration

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

with DAG(
    'ml_data_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
) as dag:
    
    # Step 1: Compute features
    compute_features = SparkSubmitOperator(
        task_id='compute_features',
        application='s3://apps/compute_features.py',
        conf={'spark.sql.shuffle.partitions': '200'},
    )
    
    # Step 2: Validate features
    validate_features = SparkSubmitOperator(
        task_id='validate_features',
        application='s3://apps/validate_features.py',
    )
    
    # Step 3: Create training dataset
    create_training = SparkSubmitOperator(
        task_id='create_training_dataset',
        application='s3://apps/create_training.py',
    )
    
    # Step 4: Train model
    train_model = SparkSubmitOperator(
        task_id='train_model',
        application='s3://apps/train_model.py',
    )
    
    # Step 5: Evaluate and register
    evaluate = SparkSubmitOperator(
        task_id='evaluate_model',
        application='s3://apps/evaluate.py',
    )
    
    compute_features >> validate_features >> create_training >> train_model >> evaluate

ℹ️

Best Practice: Always separate feature computation from training data creation. Features should be reusable across models, while training data is specific to each model.

Follow-Up Questions

  1. How would you handle class imbalance in training data creation?
  2. Design a feature store that supports both batch and streaming features.
  3. How do you version training datasets for reproducibility?
  4. Design a data pipeline for a real-time recommendation system.
  5. How would you detect and prevent data leakage in a complex ML pipeline?

Advertisement