The Interview Question
βΉοΈ
Question: You're building a demand forecasting model for a ride-sharing platform:
- Dataset: 3 years of hourly ride requests across 50 cities
- Requirements: Forecast demand 7 days ahead, handle holidays, detect anomalies
- Challenges: Seasonality, trends, external factors (weather, events)
Walk through your time series analysis approach:
- How do you decompose the time series to understand components?
- Which forecasting models would you consider?
- How do you handle multiple seasonalities?
- How do you evaluate forecast accuracy?
Detailed Answer
1. Time Series Decomposition
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose, STL
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class TimeSeriesDecomposer:
"""Decompose time series into components"""
def __init__(self, series, freq=None):
self.series = series
self.freq = freq
self.decomposition = None
def classical_decomposition(self, model='additive', period=None):
"""Classical decomposition (additive or multiplicative)"""
if period is None:
period = self.freq or self.infer_period()
self.decomposition = seasonal_decompose(
self.series,
model=model,
period=period
)
return self.decomposition
def stl_decomposition(self, period=None, seasonal=7):
"""STL (Seasonal and Trend decomposition using Loess)"""
if period is None:
period = self.freq or self.infer_period()
stl = STL(self.series, period=period, seasonal=seasonal)
self.decomposition = stl.fit()
return self.decomposition
def infer_period(self):
"""Infer the seasonal period from the data"""
from scipy.signal import periodogram
# Remove trend
detrended = self.series - self.series.rolling(window=24).mean()
detrended = detrended.dropna()
# Calculate periodogram
freqs, power = periodogram(detrended)
# Find dominant frequency
dominant_freq = freqs[np.argmax(power[1:]) + 1]
if dominant_freq > 0:
period = int(1 / dominant_freq)
else:
period = 24 # Default to hourly data
print(f"Inferred period: {period}")
return period
def test_stationarity(self):
"""Test for stationarity using ADF and KPSS tests"""
results = {}
# ADF test (H0: unit root exists, non-stationary)
adf_result = adfuller(self.series.dropna())
results['adf'] = {
'statistic': adf_result[0],
'p_value': adf_result[1],
'critical_values': adf_result[4],
'stationary': adf_result[1] < 0.05
}
# KPSS test (H0: series is stationary)
kpss_result = kpss(self.series.dropna(), regression='c', nlags='auto')
results['kpss'] = {
'statistic': kpss_result[0],
'p_value': kpss_result[1],
'critical_values': kpss_result[3],
'stationary': kpss_result[1] > 0.05
}
# Overall assessment
results['is_stationary'] = results['adf']['stationary'] and results['kpss']['stationary']
print("Stationarity Tests:")
print(f" ADF p-value: {results['adf']['p_value']:.4f} ({'Stationary' if results['adf']['stationary'] else 'Non-stationary'})")
print(f" KPSS p-value: {results['kpss']['p_value']:.4f} ({'Stationary' if results['kpss']['stationary'] else 'Non-stationary'})")
print(f" Overall: {'Stationary' if results['is_stationary'] else 'Non-stationary'}")
return results
def visualize_decomposition(self):
"""Visualize time series decomposition"""
if self.decomposition is None:
print("Run decomposition first")
return
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
# Original
axes[0].plot(self.series.index, self.series.values, color='blue')
axes[0].set_title('Original Time Series')
axes[0].set_ylabel('Value')
# Trend
axes[1].plot(self.decomposition.trend.index, self.decomposition.trend.values, color='red')
axes[1].set_title('Trend Component')
axes[1].set_ylabel('Trend')
# Seasonal
axes[2].plot(self.decomposition.seasonal.index, self.decomposition.seasonal.values, color='green')
axes[2].set_title('Seasonal Component')
axes[2].set_ylabel('Seasonal')
# Residual
axes[3].plot(self.decomposition.resid.index, self.decomposition.resid.values, color='purple')
axes[3].set_title('Residual Component')
axes[3].set_ylabel('Residual')
plt.tight_layout()
plt.savefig('decomposition.png', dpi=150, bbox_inches='tight')
plt.show()
# Example usage
# decomposer = TimeSeriesDecomposer(ts_series)
# decomposition = decomposer.stl_decomposition(period=24)
# stationarity = decomposer.test_stationarity()
# decomposer.visualize_decomposition()
2. Forecasting Models
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from prophet import Prophet
import xgboost as xgb
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
class TimeSeriesForecaster:
"""Multiple time series forecasting models"""
def __init__(self, train_data, test_data=None):
self.train = train_data
self.test = test_data
self.models = {}
self.predictions = {}
self.metrics = {}
def arima_forecast(self, order=(1, 1, 1), steps=None):
"""ARIMA forecasting"""
if steps is None and self.test is not None:
steps = len(self.test)
model = ARIMA(self.train, order=order)
fitted = model.fit()
# Forecast
forecast = fitted.forecast(steps=steps)
self.predictions['ARIMA'] = forecast
# Calculate metrics if test data available
if self.test is not None:
self.metrics['ARIMA'] = self.calculate_metrics(forecast[:len(self.test)])
self.models['ARIMA'] = fitted
return forecast
def sarima_forecast(self, order=(1, 1, 1), seasonal_order=(1, 1, 1, 24), steps=None):
"""SARIMA forecasting for seasonal data"""
if steps is None and self.test is not None:
steps = len(self.test)
model = SARIMAX(
self.train,
order=order,
seasonal_order=seasonal_order,
enforce_stationarity=False,
enforce_invertibility=False
)
fitted = model.fit(disp=False)
# Forecast
forecast = fitted.forecast(steps=steps)
self.predictions['SARIMA'] = forecast
if self.test is not None:
self.metrics['SARIMA'] = self.calculate_metrics(forecast[:len(self.test)])
self.models['SARIMA'] = fitted
return forecast
def holt_winters_forecast(self, seasonal_periods=24, trend='add', seasonal='add'):
"""Holt-Winters Exponential Smoothing"""
model = ExponentialSmoothing(
self.train,
trend=trend,
seasonal=seasonal,
seasonal_periods=seasonal_periods
)
fitted = model.fit()
# Forecast
steps = len(self.test) if self.test is not None else 24
forecast = fitted.forecast(steps=steps)
self.predictions['Holt-Winters'] = forecast
if self.test is not None:
self.metrics['Holt-Winters'] = self.calculate_metrics(forecast[:len(self.test)])
self.models['Holt-Winters'] = fitted
return forecast
def prophet_forecast(self, steps=None):
"""Facebook Prophet forecasting"""
# Prepare data for Prophet
train_df = pd.DataFrame({
'ds': self.train.index,
'y': self.train.values
})
# Fit model
model = Prophet(
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True
)
model.fit(train_df)
# Make future dataframe
if steps is None:
steps = len(self.test) if self.test is not None else 30
future = model.make_future_dataframe(periods=steps, freq='H')
forecast = model.predict(future)
self.predictions['Prophet'] = forecast.set_index('ds')['yhat']
self.models['Prophet'] = model
if self.test is not None:
self.metrics['Prophet'] = self.calculate_metrics(
self.predictions['Prophet'][:len(self.test)]
)
return forecast
def xgboost_forecast(self, features_df=None, steps=None):
"""XGBoost with time series features"""
# Create time series features
def create_features(df):
df = df.copy()
df['hour'] = df.index.hour
df['dayofweek'] = df.index.dayofweek
df['quarter'] = df.index.quarter
df['month'] = df.index.month
df['year'] = df.index.year
df['dayofyear'] = df.index.dayofyear
# Lag features
for lag in [1, 2, 3, 24, 168]: # 1h, 2h, 3h, 1 day, 1 week
df[f'lag_{lag}'] = df['y'].shift(lag)
# Rolling features
for window in [24, 168]: # 1 day, 1 week
df[f'rolling_mean_{window}'] = df['y'].rolling(window=window).mean()
df[f'rolling_std_{window}'] = df['y'].rolling(window=window).std()
return df
# Prepare training data
train_df = pd.DataFrame({'y': self.train})
train_df = create_features(train_df)
train_df = train_df.dropna()
X_train = train_df.drop('y', axis=1)
y_train = train_df['y']
# Train model
model = xgb.XGBRegressor(
n_estimators=1000,
max_depth=5,
learning_rate=0.01,
early_stopping_rounds=50,
random_state=42
)
if self.test is not None:
# Create test features
test_df = pd.DataFrame({'y': self.test})
test_df = create_features(test_df)
test_df = test_df.dropna()
X_test = test_df.drop('y', axis=1)
y_test = test_df['y']
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
# Predict
forecast = model.predict(X_test)
self.predictions['XGBoost'] = pd.Series(forecast, index=self.test.index[:len(forecast)])
self.metrics['XGBoost'] = self.calculate_metrics(forecast, y_test.values)
self.models['XGBoost'] = model
return model
def calculate_metrics(self, predictions, actuals=None):
"""Calculate forecasting metrics"""
if actuals is None:
actuals = self.test[:len(predictions)]
# Ensure same length
min_len = min(len(predictions), len(actuals))
predictions = predictions[:min_len]
actuals = actuals[:min_len]
metrics = {
'RMSE': np.sqrt(mean_squared_error(actuals, predictions)),
'MAE': mean_absolute_error(actuals, predictions),
'MAPE': mean_absolute_percentage_error(actuals, predictions) * 100,
'R2': 1 - np.sum((actuals - predictions)**2) / np.sum((actuals - np.mean(actuals))**2)
}
return metrics
def compare_models(self):
"""Compare all fitted models"""
if not self.metrics:
print("No metrics available. Run forecasts first.")
return
comparison = pd.DataFrame(self.metrics).T
print("\nModel Comparison:")
print("=" * 60)
print(comparison)
return comparison
# Example usage
# forecaster = TimeSeriesForecaster(train_series, test_series)
# forecaster.arima_forecast(order=(1, 1, 1))
# forecaster.sarima_forecast(order=(1, 1, 1), seasonal_order=(1, 1, 1, 24))
# forecaster.holt_winters_forecast(seasonal_periods=24)
# comparison = forecaster.compare_models()
3. Handling Multiple Seasonalities
class MultiSeasonalForecaster:
"""Handle multiple seasonal patterns"""
def __init__(self, series):
self.series = series
self.seasonalities = {}
def detect_seasonalities(self):
"""Detect multiple seasonal patterns"""
from scipy.signal import periodogram
# Calculate periodogram
freqs, power = periodogram(self.series.values)
# Find peaks
peaks = []
for i in range(1, len(power) - 1):
if power[i] > power[i-1] and power[i] > power[i+1]:
peaks.append((freqs[i], power[i]))
# Convert to periods
self.seasonalities = {}
for freq, power in peaks:
if freq > 0:
period = int(1 / freq)
if period < len(self.series) // 2:
self.seasonalities[period] = power
# Sort by power
self.seasonalities = dict(sorted(
self.seasonalities.items(),
key=lambda x: x[1],
reverse=True
)[:5]) # Top 5 seasonalities
print("Detected seasonalities:")
for period, power in self.seasonalities.items():
print(f" Period: {period} samples, Power: {power:.2f}")
return self.seasonalities
def tbats_forecast(self, steps=None):
"""TBATS model for multiple seasonalities"""
from tbats import TBATS
model = TBATS(
seasonal_periods=list(self.seasonalities.keys()),
use_arma_errors=True,
use_box_cox=True
)
fitted = model.fit(self.series)
if steps is None:
steps = 24 * 7 # Default 1 week
forecast = fitted.forecast(steps=steps)
return forecast, fitted
def multiple_seasonal_decomposition(self):
"""Decompose with multiple seasonalities"""
from statsmodels.tsa.seasonal import STL
decompositions = {}
for period in self.seasonalities.keys():
if period < len(self.series) // 2:
stl = STL(self.series, period=period, robust=True)
decompositions[period] = stl.fit()
return decompositions
# Example usage
# multi_forecaster = MultiSeasonalForecaster(ts_series)
# seasonalities = multi_forecaster.detect_seasonalities()
# forecast, model = multi_forecaster.tbats_forecast(steps=168)
4. Anomaly Detection in Time Series
class TimeSeriesAnomalyDetector:
"""Detect anomalies in time series data"""
def __init__(self, series):
self.series = series
self.anomalies = None
def detect_statistical_anomalies(self, window=24, threshold=3):
"""Detect anomalies using statistical methods"""
# Rolling statistics
rolling_mean = self.series.rolling(window=window, center=True).mean()
rolling_std = self.series.rolling(window=window, center=True).std()
# Z-scores
z_scores = (self.series - rolling_mean) / rolling_std
# Detect anomalies
self.anomalies = np.abs(z_scores) > threshold
print(f"Detected {self.anomalies.sum()} anomalies using statistical method")
return self.anomalies
def detect_iqr_anomalies(self, window=24, multiplier=1.5):
"""Detect anomalies using IQR method"""
# Rolling IQR
rolling_q1 = self.series.rolling(window=window, center=True).quantile(0.25)
rolling_q3 = self.series.rolling(window=window, center=True).quantile(0.75)
rolling_iqr = rolling_q3 - rolling_q1
# Bounds
lower_bound = rolling_q1 - multiplier * rolling_iqr
upper_bound = rolling_q3 + multiplier * rolling_iqr
# Detect anomalies
self.anomalies = (self.series < lower_bound) | (self.series > upper_bound)
print(f"Detected {self.anomalies.sum()} anomalies using IQR method")
return self.anomalies
def detect_prophet_anomalies(self):
"""Detect anomalies using Prophet"""
# Prepare data
df = pd.DataFrame({
'ds': self.series.index,
'y': self.series.values
})
# Fit Prophet
model = Prophet(interval_width=0.99)
model.fit(df)
# Predict
forecast = model.predict(df)
# Detect anomalies (points outside uncertainty interval)
self.anomalies = (
(self.series.values < forecast['yhat_lower'].values) |
(self.series.values > forecast['yhat_upper'].values)
)
print(f"Detected {self.anomalies.sum()} anomalies using Prophet")
return self.anomalies, forecast
def detect_isolation_forest_anomalies(self, contamination=0.01):
"""Detect anomalies using Isolation Forest"""
from sklearn.ensemble import IsolationForest
# Reshape for sklearn
X = self.series.values.reshape(-1, 1)
# Fit model
model = IsolationForest(contamination=contamination, random_state=42)
predictions = model.fit_predict(X)
# -1 indicates anomaly
self.anomalies = predictions == -1
print(f"Detected {self.anomalies.sum()} anomalies using Isolation Forest")
return self.anomalies
def visualize_anomalies(self, anomalies=None, title="Time Series with Anomalies"):
"""Visualize anomalies"""
if anomalies is None:
anomalies = self.anomalies
if anomalies is None:
print("Run anomaly detection first")
return
plt.figure(figsize=(12, 6))
# Plot time series
plt.plot(self.series.index, self.series.values, color='blue', alpha=0.7, label='Time Series')
# Highlight anomalies
anomaly_points = self.series[anomalies]
plt.scatter(anomaly_points.index, anomaly_points.values, color='red', s=50, label='Anomalies', zorder=5)
plt.title(title)
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('anomalies.png', dpi=150, bbox_inches='tight')
plt.show()
# Example usage
# detector = TimeSeriesAnomalyDetector(ts_series)
# anomalies_stat = detector.detect_statistical_anomalies()
# anomalies_iqr = detector.detect_iqr_anomalies()
# detector.visualize_anomalies()
5. Real-World Application: Ride Demand Forecasting
class RideDemandForecaster:
"""Complete ride demand forecasting system"""
def __init__(self):
self.models = {}
self.preprocessor = None
def preprocess_data(self, df):
"""Preprocess ride demand data"""
# Aggregate to hourly
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
# Resample to hourly
hourly_demand = df.resample('H').size()
# Handle missing values
hourly_demand = hourly_demand.fillna(0)
# Add external features
features = pd.DataFrame(index=hourly_demand.index)
features['hour'] = features.index.hour
features['dayofweek'] = features.index.dayofweek
features['is_weekend'] = features['dayofweek'].isin([5, 6]).astype(int)
features['is_holiday'] = self._add_holidays(features.index)
return hourly_demand, features
def _add_holidays(self, dates):
"""Add holiday indicators"""
from pandas.tseries.holiday import USFederalHolidayCalendar
cal = USFederalHolidayCalendar()
holidays = cal.holidays(start=dates.min(), end=dates.max())
return dates.isin(holidays).astype(int)
def train_city_models(self, city_data, cities=None):
"""Train separate models for each city"""
if cities is None:
cities = list(city_data.keys())
city_forecasters = {}
for city in cities:
print(f"\nTraining model for {city}...")
series = city_data[city]
# Split data
train_size = int(len(series) * 0.8)
train = series[:train_size]
test = series[train_size:]
# Train forecaster
forecaster = TimeSeriesForecaster(train, test)
# Try multiple models
forecaster.arima_forecast(order=(1, 1, 1))
forecaster.sarima_forecast(order=(1, 1, 1), seasonal_order=(1, 1, 1, 24))
forecaster.holt_winters_forecast(seasonal_periods=24)
# Get best model
comparison = forecaster.compare_models()
best_model = comparison['RMSE'].idxmin()
city_forecasters[city] = {
'forecaster': forecaster,
'best_model': best_model,
'metrics': comparison.loc[best_model]
}
print(f"Best model for {city}: {best_model}")
self.models = city_forecasters
return city_forecasters
def forecast_demand(self, city, steps=168):
"""Forecast demand for a specific city"""
if city not in self.models:
print(f"No model for {city}")
return None
model_info = self.models[city]
forecaster = model_info['forecaster']
# Get forecast based on best model
if model_info['best_model'] == 'ARIMA':
forecast = forecaster.arima_forecast(steps=steps)
elif model_info['best_model'] == 'SARIMA':
forecast = forecaster.sarima_forecast(steps=steps)
elif model_info['best_model'] == 'Holt-Winters':
forecast = forecaster.holt_winters_forecast(steps=steps)
return forecast
def detect_demand_anomalies(self, city):
"""Detect anomalies in demand"""
if city not in self.models:
return None
series = self.models[city]['forecaster'].train
detector = TimeSeriesAnomalyDetector(series)
anomalies = detector.detect_statistical_anomalies()
return anomalies
def generate_insights(self, city):
"""Generate business insights from forecast"""
if city not in self.models:
return None
forecast = self.forecast_demand(city, steps=168)
anomalies = self.detect_demand_anomalies(city)
insights = {
'city': city,
'forecast_summary': {
'mean_demand': forecast.mean(),
'peak_demand': forecast.max(),
'low_demand': forecast.min(),
'forecast_period': f"{forecast.index[0]} to {forecast.index[-1]}"
},
'anomalies': {
'count': anomalies.sum() if anomalies is not None else 0,
'percentage': anomalies.mean() * 100 if anomalies is not None else 0
},
'recommendations': self._generate_recommendations(forecast, anomalies)
}
return insights
def _generate_recommendations(self, forecast, anomalies):
"""Generate recommendations based on forecast"""
recommendations = []
# Peak demand recommendations
peak_hours = forecast.groupby(forecast.index.hour).mean()
peak_hour = peak_hours.idxmax()
recommendations.append({
'type': 'capacity',
'message': f"Increase driver supply during hour {peak_hour}:00"
})
# Anomaly recommendations
if anomalies is not None and anomalies.sum() > 0:
recommendations.append({
'type': 'monitoring',
'message': f"Investigate {anomalies.sum()} demand anomalies"
})
return recommendations
# Example usage
# demand_forecaster = RideDemandForecaster()
# hourly_demand, features = demand_forecaster.preprocess_data(ride_data)
# city_models = demand_forecaster.train_city_models(city_data)
# forecast = demand_forecaster.forecast_demand('New York', steps=168)
# insights = demand_forecaster.generate_insights('New York')
π‘
Pro Tip: Always visualize your forecasts and compare with actual values. Time series plots can reveal issues that metrics alone might miss, like systematic bias or missed seasonal patterns.
6. Common Follow-Up Questions
Follow-up 1: How do you handle missing values in time series?
def handle_missing_values(series, method='interpolate'):
"""Handle missing values in time series"""
if method == 'interpolate':
# Linear interpolation
filled = series.interpolate(method='linear')
elif method == 'time':
# Time-based interpolation
filled = series.interpolate(method='time')
elif method == 'seasonal':
# Seasonal interpolation
filled = series.interpolate(method='spline', order=2)
elif method == 'forward_fill':
filled = series.ffill()
elif method == 'backward_fill':
filled = series.bfill()
elif method == 'seasonal_decompose':
# Fill using seasonal pattern
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(series.fillna(method='ffill'), model='additive', period=24)
seasonal = decomposition.seasonal
filled = series.fillna(seasonal)
return filled
Follow-up 2: How do you combine multiple forecasts?
def combine_forecasts(forecasts, weights=None, method='average'):
"""Combine multiple forecasts"""
if weights is None:
weights = [1/len(forecasts)] * len(forecasts)
if method == 'average':
combined = np.average(forecasts, weights=weights, axis=0)
elif method == 'median':
combined = np.median(forecasts, axis=0)
elif method == 'trimmed_mean':
from scipy.stats import trim_mean
combined = trim_mean(forecasts, 0.1, axis=0)
return combined
# Example
# forecasts = [arima_forecast, sarima_forecast, prophet_forecast]
# weights = [0.4, 0.3, 0.3] # Based on validation performance
# combined_forecast = combine_forecasts(forecasts, weights)
Company-Specific Tips
βΉοΈ
Uber Tips:
- Uber heavily tests on time series for demand forecasting
- Know how to handle multiple seasonalities (hourly, daily, weekly)
- Understand how to incorporate external factors (weather, events)
- Be comfortable with Prophet and ARIMA models
Amazon Tips:
- Amazon values scalable time series solutions
- Know how to forecast for inventory management
- Understand how to handle cold start for new products
- Be familiar with time series anomaly detection
Quiz Section
Related Topics
- Seasonal Analysis β Statistical tests for seasonality
- Anomaly Detection β Identifying outliers in data
- Feature Engineering β Creating time-based features
- Forecasting Evaluation β Comparing forecast models