The Interview Question
"Write a Python function that takes a DataFrame of user transactions and identifies anomalous spending patterns using statistical methods."
Python coding interviews test whether you can write clean, efficient, and correct code for real data science tasks.
Why Companies Ask This
βΉοΈ
Google and Apple want data scientists who can implement solutions, not just call library functions. They need to verify you understand the algorithms, can write production-quality code, and can debug when things go wrong.
Interviewers evaluate:
- Python Proficiency β Do you write idiomatic, clean Python?
- Library Knowledge β Do you know pandas, numpy, sklearn well?
- Algorithm Understanding β Can you implement from scratch?
- Code Quality β Is it readable, documented, and tested?
- Debugging Skills β Can you fix issues quickly?
Core Python Skills for Data Scientists
1. Pandas Mastery
import pandas as pd
import numpy as np
# Essential pandas operations for interviews
def analyze_user_spending(transactions_df):
"""
Comprehensive spending analysis using pandas.
"""
# 1. Basic aggregation
user_summary = transactions_df.groupby('user_id').agg(
total_spent=('amount', 'sum'),
transaction_count=('amount', 'count'),
avg_transaction=('amount', 'mean'),
std_transaction=('amount', 'std'),
first_transaction=('timestamp', 'min'),
last_transaction=('timestamp', 'max'),
).reset_index()
# 2. Time-based features
transactions_df['hour'] = transactions_df['timestamp'].dt.hour
transactions_df['day_of_week'] = transactions_df['timestamp'].dt.dayofweek
transactions_df['is_weekend'] = transactions_df['day_of_week'].isin([5, 6])
# 3. Rolling statistics (user-level)
transactions_df = transactions_df.sort_values(['user_id', 'timestamp'])
transactions_df['rolling_avg_7d'] = (
transactions_df.groupby('user_id')['amount']
.rolling('7D', min_periods=1)
.mean()
.reset_index(0, drop=True)
)
# 4. Percentile-based analysis
user_summary['spending_percentile'] = user_summary['total_spent'].rank(pct=True)
# 5. Cohort analysis
transactions_df['cohort_month'] = (
transactions_df.groupby('user_id')['timestamp']
.transform('min')
.dt.to_period('M')
)
return user_summary
2. NumPy for Performance
import numpy as np
# Vectorized operations (10-100x faster than loops)
def fast_anomaly_detection(values, window=30, threshold=3):
"""
Fast anomaly detection using numpy vectorization.
"""
# Moving average and std (vectorized)
cumsum = np.cumsum(np.insert(values, 0, 0))
moving_avg = (cumsum[window:] - cumsum[:-window]) / window
# Rolling std
moving_sq_cumsum = np.cumsum(np.insert(values**2, 0, 0))
moving_var = (moving_sq_cumsum[window:] - moving_sq_cumsum[:-window]) / window - moving_avg**2
moving_std = np.sqrt(moving_var)
# Detect anomalies
anomalies = np.abs(values[window-1:] - moving_avg) > threshold * moving_std
return anomalies
# Example
np.random.seed(42)
data = np.random.normal(100, 15, 1000)
data[500] = 200 # Inject anomaly
data[750] = 10 # Inject anomaly
anomalies = fast_anomaly_detection(data)
print(f"Anomalies detected at indices: {np.where(anomalies)[0]}")
3. Scikit-learn Implementation
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import numpy as np
class SpendingAnomalyDetector:
"""
Production-ready anomaly detection for spending patterns.
"""
def __init__(self, contamination=0.05, random_state=42):
self.contamination = contamination
self.random_state = random_state
self.pipeline = None
self.threshold = None
def fit(self, X, y=None):
"""
Fit the anomaly detection model.
Args:
X: Feature matrix (user spending features)
y: Not used (unsupervised)
"""
self.pipeline = Pipeline([
('scaler', StandardScaler()),
('detector', IsolationForest(
contamination=self.contamination,
random_state=self.random_state,
n_estimators=100
))
])
self.pipeline.fit(X)
# Set threshold based on training data scores
scores = self.pipeline.decision_function(X)
self.threshold = np.percentile(scores, self.contamination * 100)
return self
def predict(self, X):
"""
Predict anomalies.
Returns:
-1 for anomalies, 1 for normal points
"""
return self.pipeline.predict(X)
def score(self, X):
"""
Get anomaly scores (lower = more anomalous).
"""
return self.pipeline.decision_function(X)
def get_feature_importance(self, feature_names):
"""
Get feature importance for interpretability.
"""
# Use permutation importance
from sklearn.inspection import permutation_importance
# Refit with a supervised wrapper for importance
pass # Implementation depends on specific use case
# Usage
detector = SpendingAnomalyDetector(contamination=0.05)
detector.fit(user_features)
predictions = detector.predict(new_user_features)
anomaly_scores = detector.score(new_user_features)
# Identify anomalies
anomalous_users = new_user_features[predictions == -1]
print(f"Detected {len(anomalous_users)} anomalous users")
Complete Interview Example: Anomaly Detection Function
import pandas as pd
import numpy as np
from typing import Tuple, List, Dict
from dataclasses import dataclass
from scipy import stats
@dataclass
class AnomalyResult:
"""Container for anomaly detection results."""
user_id: int
anomaly_score: float
is_anomaly: bool
contributing_factors: List[str]
confidence: float
def detect_spending_anomalies(
transactions: pd.DataFrame,
user_id_col: str = 'user_id',
amount_col: str = 'amount',
timestamp_col: str = 'timestamp',
z_threshold: float = 3.0,
iqr_multiplier: float = 1.5,
min_transactions: int = 5,
) -> pd.DataFrame:
"""
Detect anomalous spending patterns using multiple statistical methods.
Args:
transactions: DataFrame with user transactions
user_id_col: Column name for user IDs
amount_col: Column name for transaction amounts
timestamp_col: Column name for timestamps
z_threshold: Z-score threshold for anomaly detection
iqr_multiplier: IQR multiplier for outlier detection
min_transactions: Minimum transactions per user for analysis
Returns:
DataFrame with anomaly results per user
"""
# Validate input
required_cols = [user_id_col, amount_col, timestamp_col]
if not all(col in transactions.columns for col in required_cols):
raise ValueError(f"Missing required columns: {required_cols}")
# Sort by user and timestamp
df = transactions.sort_values([user_id_col, timestamp_col]).copy()
# Calculate user-level features
user_features = df.groupby(user_id_col).agg(
total_amount=(amount_col, 'sum'),
mean_amount=(amount_col, 'mean'),
std_amount=(amount_col, 'std'),
transaction_count=(amount_col, 'count'),
median_amount=(amount_col, 'median'),
max_amount=(amount_col, 'max'),
min_amount=(amount_col, 'min'),
).reset_index()
# Filter users with too few transactions
user_features = user_features[
user_features['transaction_count'] >= min_transactions
]
# Calculate time-based features
df['hour'] = pd.to_datetime(df[timestamp_col]).dt.hour
df['day_of_week'] = pd.to_datetime(df[timestamp_col]).dt.dayofweek
time_features = df.groupby(user_id_col).agg(
avg_hour=(hour, 'mean'),
std_hour=(hour, 'std'),
weekend_ratio=(day_of_week, lambda x: x.isin([5, 6]).mean()),
unique_days=(timestamp_col, lambda x: x.dt.date.nunique()),
).reset_index()
# Merge features
features = user_features.merge(time_features, on=user_id_col)
# Method 1: Z-Score based detection
features['amount_zscore'] = np.abs(
stats.zscore(features['total_amount'])
)
# Method 2: IQR based detection
Q1 = features['total_amount'].quantile(0.25)
Q3 = features['total_amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - iqr_multiplier * IQR
upper_bound = Q3 + iqr_multiplier * IQR
features['iqr_anomaly'] = (
(features['total_amount'] < lower_bound) |
(features['total_amount'] > upper_bound)
)
# Method 3: Statistical tests per user
def detect_user_anomaly(group):
"""Detect if user's spending pattern is anomalous."""
amounts = group[amount_col].values
if len(amounts) < min_transactions:
return pd.Series({'user_anomaly_score': 0, 'is_anomaly': False})
# Test for normality
if len(amounts) >= 8:
_, p_value = stats.normaltest(amounts)
else:
p_value = 1.0
# Z-score of mean against population
pop_mean = features['mean_amount'].mean()
pop_std = features['mean_amount'].std()
user_zscore = (group[amount_col].mean() - pop_mean) / pop_std
# Combine signals
anomaly_score = (
0.4 * min(features['amount_zscore'].iloc[0] / z_threshold, 2.0) +
0.3 * (1 - p_value) +
0.3 * min(abs(user_zscore) / z_threshold, 2.0)
)
return pd.Series({
'user_anomaly_score': anomaly_score,
'is_anomaly': anomaly_score > 1.0
})
user_anomalies = df.groupby(user_id_col).apply(detect_user_anomaly).reset_index()
features = features.merge(user_anomalies, on=user_id_col)
# Identify contributing factors
def identify_factors(row):
factors = []
if row['amount_zscore'] > z_threshold:
factors.append(f"Unusual total spending (z={row['amount_zscore']:.2f})")
if row['iqr_anomaly']:
factors.append("Outside IQR bounds")
if row.get('user_anomaly_score', 0) > 1.0:
factors.append("Statistical pattern anomaly")
if row.get('std_hour', 0) < 1.0:
factors.append("Unusually consistent transaction times")
return factors
features['contributing_factors'] = features.apply(identify_factors, axis=1)
return features
# Example usage
if __name__ == "__main__":
# Generate sample data
np.random.seed(42)
n_users = 1000
n_transactions = 50000
sample_data = pd.DataFrame({
'user_id': np.random.randint(1, n_users + 1, n_transactions),
'amount': np.random.lognormal(3, 1, n_transactions),
'timestamp': pd.date_range(
'2026-01-01',
periods=n_transactions,
freq='T'
),
})
# Inject anomalies
anomaly_users = [42, 123, 456]
mask = sample_data['user_id'].isin(anomaly_users)
sample_data.loc[mask, 'amount'] *= 10
# Detect anomalies
results = detect_spending_anomalies(sample_data)
# Show anomalies
anomalies = results[results['is_anomaly']]
print(f"Detected {len(anomalies)} anomalous users:")
print(anomalies[['user_id', 'total_amount', 'user_anomaly_score', 'contributing_factors']])
Debugging Strategies
Common Bugs and Fixes
# Bug 1: SettingWithCopyWarning
# BAD
df[df['col'] > 0]['new_col'] = 1
# GOOD
df.loc[df['col'] > 0, 'new_col'] = 1
# Bug 2: Ignoring NaN in aggregations
# BAD
df.groupby('user_id')['amount'].mean() # NaNs are excluded
# GOOD
df.groupby('user_id')['amount'].mean(skipna=False) # Explicit handling
# Bug 3: Merging on different dtypes
# BAD
df1['id'] = df1['id'].astype(int)
df2['id'] = df2['id'].astype(str)
merged = df1.merge(df2, on='id') # Empty result!
# GOOD
df1['id'] = df1['id'].astype(str)
df2['id'] = df2['id'].astype(str)
merged = df1.merge(df2, on='id')
# Bug 4: Not handling duplicate column names after merge
# BAD
merged = df1.merge(df2, on='user_id')
# Now there are 'value_x' and 'value_y' columns
# GOOD
merged = df1.merge(df2, on='user_id', suffixes=('_left', '_right'))
Debugging Checklist
def debug_dataframe(df, name="DataFrame"):
"""
Quick debugging checklist for DataFrames.
"""
print(f"\n{'='*50}")
print(f"Debugging: {name}")
print(f"{'='*50}")
# Shape
print(f"\nShape: {df.shape}")
# Data types
print(f"\nData types:\n{df.dtypes}")
# Missing values
missing = df.isnull().sum()
if missing.any():
print(f"\nMissing values:\n{missing[missing > 0]}")
# Duplicates
n_duplicates = df.duplicated().sum()
if n_duplicates > 0:
print(f"\nDuplicate rows: {n_duplicates}")
# Numeric summaries
print(f"\nNumeric summary:\n{df.describe()}")
# Categorical summaries
cat_cols = df.select_dtypes(include=['object', 'category']).columns
for col in cat_cols[:5]: # First 5 categorical columns
print(f"\n{col} value counts:\n{df[col].value_counts().head()}")
print(f"{'='*50}\n")
Company-Specific Tips
Google Coding Interviews
π‘
Google values algorithmic thinking and optimization. They may ask you to implement ML algorithms from scratch, not just use libraries.
- Be ready to implement gradient descent, logistic regression, or decision trees from scratch
- Know Big O notation and can analyze time/space complexity
- Practice on LeetCode medium/hard problems
- Focus on clean, readable code
Apple Coding Interviews
π‘
Apple values practical implementation and attention to detail. They often ask about data processing pipelines and real-world scenarios.
- Be ready to handle messy, real-world data
- Know how to optimize for memory and speed
- Understand iOS/macOS data patterns
- Focus on production-quality code with error handling