Design Real-Time Fraud Detection System
Building real-time fraud detection for millions of transactions with sub-100ms latency
Interview Question
"Design a real-time fraud detection system that can analyze millions of transactions per day, detect fraudulent activities with high precision, and provide explainable decisions for compliance requirements, all within 100ms latency."
Difficulty: Hard | Frequently asked at Stripe, PayPal, Square, JPMorgan, Goldman Sachs
1. Requirements Gathering
Functional Requirements
- Real-time Transaction Scoring: Evaluate each transaction for fraud risk in real-time
- Rule Engine: Configurable business rules for fraud prevention
- Alert Generation: Generate alerts for suspicious activities
- Case Management: Workflow for fraud analysts to investigate and resolve cases
- Explainable Decisions: Provide reasons for fraud decisions for compliance
- Feedback Loop: Incorporate analyst decisions to improve models
- Batch Analysis: Historical analysis for model training and pattern discovery
Non-Functional Requirements
- Latency: < 100ms for real-time scoring (critical path)
- Throughput: 10,000+ transactions per second, 500M+ transactions/day
- Availability: 99.999% uptime (financial system requirement)
- Accuracy: < 0.1% false positive rate, > 95% fraud recall
- Explainability: All decisions must be auditable with reasons
- Scalability: Handle 10x growth in transaction volume
- Compliance: PCI DSS, GDPR, SOC2 compliance
βΉοΈ
Scale Perspective: Stripe processes over $1 trillion in payments annually. PayPal handles 25+ billion transactions per year. Real-time fraud detection must evaluate each transaction in under 100ms while maintaining extremely low false positive rates to avoid blocking legitimate transactions.
2. High-Level Architecture Overview
The fraud detection system follows a layered architecture: Data Ingestion β Feature Computation β Model Scoring β Decision Engine β Case Management.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA SOURCES β
β Payment Gateway β Bank APIs β Merchant Systems β User Apps β Device Telemetryβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA INGESTION LAYER β
β Apache Kafka β Schema Registry β Event Validation β Deduplication β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
β REAL-TIME PROCESSING β β FEATURE STORE β β BATCH PROCESSING β
β (Flink/Spark Streaming)β β (Redis/DynamoDB)β β (Spark/Airflow) β
β (< 20ms) β β (< 5ms) β β (Daily/Weekly) β
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FRAUD DETECTION ENGINE β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Rule Engine β β ML Models β β Graph β β Ensemble β β
β β (Drools) β β (GBDT+NN) β β Analysis β β Scoring β β
β β (< 5ms) β β (< 30ms) β β (< 20ms) β β (< 10ms) β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DECISION ENGINE β
β Risk Scoring β Action Decision β Explanation Generation β Logging β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
β TRANSACTION β β FRAUD β β ANALYST β
β PROCESSING β β ANALYTICS β β DASHBOARD β
β (Approve/Decline) β β (Real-time) β β (Case Management) β
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
π‘
Key Insight: The fraud detection system must balance three competing objectives: catching fraud (high recall), avoiding false positives (high precision), and making fast decisions (low latency). Different components optimize for different aspects of this trade-off.
3. Data Pipeline Design
3.1 Transaction Data Model
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, List
from decimal import Decimal
@dataclass
class Transaction:
# Core transaction fields
transaction_id: str
user_id: str
merchant_id: str
amount: Decimal
currency: str
timestamp: datetime
# Transaction details
payment_method: str # card, bank_transfer, wallet
card_last_four: Optional[str]
card_brand: Optional[str]
card_country: Optional[str]
# Merchant details
merchant_category: str
merchant_country: str
merchant_name: str
# Device and location
device_fingerprint: Optional[str]
ip_address: str
ip_country: str
user_agent: str
# Session context
session_id: Optional[str]
referral_source: Optional[str]
# Risk signals (pre-computed)
is_3d_secure: bool
avs_result: Optional[str]
cvv_result: Optional[str]
@dataclass
class FraudLabel:
transaction_id: str
is_fraud: bool
fraud_type: Optional[str] # account_takeover, card_testing, friendly_fraud
confidence: float
analyst_id: Optional[str]
investigation_notes: Optional[str]
resolved_at: Optional[datetime]
3.2 Real-time Feature Computation
class RealTimeFeatureComputer:
"""Compute features in real-time for fraud scoring"""
def __init__(self, redis_client, feature_store):
self.redis = redis_client
self.feature_store = feature_store
async def compute_features(self, transaction: Transaction) -> Dict:
"""Compute all features for a transaction"""
features = {}
# User velocity features (real-time from Redis)
user_key = f"user:{transaction.user_id}"
user_features = await self.compute_user_velocity(transaction, user_key)
features.update(user_features)
# Card velocity features
card_key = f"card:{transaction.card_last_four}:{transaction.card_brand}"
card_features = await self.compute_card_velocity(transaction, card_key)
features.update(card_features)
# Merchant features
merchant_key = f"merchant:{transaction.merchant_id}"
merchant_features = await self.compute_merchant_features(transaction, merchant_key)
features.update(merchant_features)
# Geographic features
geo_features = await self.compute_geo_features(transaction)
features.update(geo_features)
# Device features
device_features = await self.compute_device_features(transaction)
features.update(device_features)
# Historical features (from feature store)
historical_features = await self.feature_store.get_features(
user_id=transaction.user_id,
feature_names=['avg_transaction_amount', 'avg_transactions_per_day',
'preferred_merchants', 'usual_countries']
)
features.update(historical_features)
return features
async def compute_user_velocity(self, transaction, user_key):
"""Compute user transaction velocity features"""
pipe = self.redis.pipeline()
# Time windows for velocity
windows = [
('1h', 3600),
('24h', 86400),
('7d', 604800),
('30d', 2592000)
]
for window_name, window_seconds in windows:
key = f"{user_key}:transactions:{window_name}"
pipe.zcount(key, transaction.timestamp.timestamp() - window_seconds,
transaction.timestamp.timestamp())
counts = await pipe.execute()
return {
'user_txn_count_1h': counts[0],
'user_txn_count_24h': counts[1],
'user_txn_count_7d': counts[2],
'user_txn_count_30d': counts[3],
'user_txn_count_avg_daily': counts[3] / 30,
'user_txn_frequency_ratio': counts[0] / max(counts[3] / 30, 0.001)
}
async def compute_geo_features(self, transaction):
"""Compute geographic risk features"""
# Check if transaction country matches user's usual countries
usual_countries = await self.redis.smembers(
f"user:{transaction.user_id}:usual_countries"
)
is_unusual_country = transaction.card_country not in usual_countries
# Check distance from last transaction
last_location = await self.redis.get(
f"user:{transaction.user_id}:last_location"
)
if last_location:
distance = self.calculate_distance(
last_location,
transaction.ip_country
)
time_diff = (transaction.timestamp - last_location['timestamp']).total_seconds()
# Impossible travel check
max_speed_kmh = 1000 # Commercial flight speed
impossible_travel = distance / (time_diff / 3600) > max_speed_kmh if time_diff > 0 else False
else:
distance = None
impossible_travel = False
return {
'is_unusual_country': is_unusual_country,
'distance_from_last_txn_km': distance,
'impossible_travel_detected': impossible_travel,
'country_mismatch': transaction.card_country != transaction.ip_country
}
3.3 Feature Store Design
class FraudFeatureStore:
"""Feature store for fraud detection features"""
def __init__(self):
# Online store for real-time serving
self.online_store = RedisCluster(
host='redis-cluster',
port=6379,
decode_responses=True
)
# Offline store for training
self.offline_store = SparkSession.builder \
.appName("FraudFeatureStore") \
.getOrCreate()
async def get_features(self, user_id: str, feature_names: List[str]) -> Dict:
"""Get features for online serving"""
pipeline = self.online_store.pipeline()
for feature_name in feature_names:
key = f"features:{user_id}:{feature_name}"
pipeline.hgetall(key)
results = await pipeline.execute()
return {
feature_name: self.deserialize(result)
for feature_name, result in zip(feature_names, results)
}
def compute_daily_features(self, date: str):
"""Batch compute daily features"""
# Read transaction data
transactions = self.offline_store.read.parquet(
f"s3://fraud-data/transactions/date={date}"
)
# Compute user-level features
user_features = transactions.groupBy("user_id").agg(
F.avg("amount").alias("avg_amount"),
F.stddev("amount").alias("std_amount"),
F.count("*").alias("txn_count"),
F.countDistinct("merchant_id").alias("unique_merchants"),
F.countDistinct("country").alias("unique_countries"),
F.sum(F.when(F.col("is_fraud") == True, 1).otherwise(0)).alias("fraud_count_30d")
)
# Compute merchant-level features
merchant_features = transactions.groupBy("merchant_id").agg(
F.avg("amount").alias("avg_amount"),
F.count("*").alias("txn_count"),
F.sum(F.when(F.col("is_fraud") == True, 1).otherwise(0)).alias("fraud_count_30d"),
F.countDistinct("user_id").alias("unique_users")
)
# Store features
user_features.write.mode("overwrite").parquet(
f"s3://fraud-features/user-features/date={date}"
)
merchant_features.write.mode("overwrite").parquet(
f"s3://fraud-features/merchant-features/date={date}"
)
β οΈ
Critical Feature Engineering Considerations:
- Time-based features: Be careful about data leakage - never use future information
- Velocity features: Must be computed in real-time with sliding windows
- Historical features: Balance freshness with computational cost
- Cross-entity features: User-merchant, user-card, merchant-device interactions
4. Model Selection and Training Approach
4.1 Multi-Model Architecture
class FraudDetectionEnsemble:
"""Ensemble of models for different fraud types"""
def __init__(self):
# Model 1: Card fraud detection
self.card_fraud_model = self.load_model('card_fraud_v2')
# Model 2: Account takeover detection
self.account_takeover_model = self.load_model('account_takeover_v1')
# Model 3: Friendly fraud detection
self.friendly_fraud_model = self.load_model('friendly_fraud_v1')
# Model 4: Card testing detection
self.card_testing_model = self.load_model('card_testing_v1')
# Meta-model for final scoring
self.meta_model = self.load_model('meta_scorer_v1')
def predict(self, features: Dict) -> Dict:
"""Get predictions from all models"""
predictions = {}
# Get individual model predictions
predictions['card_fraud'] = self.card_fraud_model.predict_proba(
features['card_features']
)[1]
predictions['account_takeover'] = self.account_takeover_model.predict_proba(
features['account_features']
)[1]
predictions['friendly_fraud'] = self.friendly_fraud_model.predict_proba(
features['transaction_features']
)[1]
predictions['card_testing'] = self.card_testing_model.predict_proba(
features['velocity_features']
)[1]
# Meta-model combines predictions
meta_features = np.array([
predictions['card_fraud'],
predictions['account_takeover'],
predictions['friendly_fraud'],
predictions['card_testing'],
features['amount_normalized'],
features['risk_score']
]).reshape(1, -1)
final_score = self.meta_model.predict_proba(meta_features)[0][1]
return {
'fraud_score': final_score,
'component_scores': predictions,
'risk_level': self.get_risk_level(final_score)
}
def get_risk_level(self, score):
if score > 0.8:
return 'HIGH'
elif score > 0.5:
return 'MEDIUM'
elif score > 0.2:
return 'LOW'
else:
return 'MINIMAL'
4.2 Gradient Boosted Decision Trees (Primary Model)
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import precision_recall_curve, auc
class CardFraudModel:
"""Gradient boosted model for card fraud detection"""
def __init__(self):
self.model = lgb.LGBMClassifier(
objective='binary',
metric='auc',
num_leaves=63,
learning_rate=0.05,
n_estimators=1000,
min_child_samples=100,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=1.0,
scale_pos_weight=100, # Handle class imbalance
random_state=42
)
def train(self, X_train, y_train, X_val, y_val):
"""Train model with early stopping"""
self.model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[
lgb.early_stopping(50),
lgb.log_evaluation(100)
]
)
def cross_validate(self, X, y, n_folds=5):
"""Perform stratified k-fold cross-validation"""
skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
scores = []
for train_idx, val_idx in skf.split(X, y):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
self.model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(50)]
)
y_pred = self.model.predict_proba(X_val)[:, 1]
precision, recall, _ = precision_recall_curve(y_val, y_pred)
pr_auc = auc(recall, precision)
scores.append(pr_auc)
return np.mean(scores), np.std(scores)
def get_feature_importance(self):
"""Get feature importance for explainability"""
importance = self.model.feature_importances_
feature_names = self.model.feature_name_
return sorted(
zip(feature_names, importance),
key=lambda x: x[1],
reverse=True
)
4.3 Neural Network for Complex Patterns
class FraudDetectionNeuralNet(tf.keras.Model):
"""Deep neural network for complex fraud patterns"""
def __init__(self):
super().__init__()
# Feature embedding layers
self.user_embedding = tf.keras.layers.Embedding(1000000, 32)
self.merchant_embedding = tf.keras.layers.Embedding(500000, 16)
self.device_embedding = tf.keras.layers.Embedding(1000000, 8)
# Feature interaction layers
self.cross_network = CrossNetwork(num_layers=3)
# Deep network
self.deep_network = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu')
])
# Attention mechanism for sequence features
self.attention = tf.keras.layers.MultiHeadAttention(
num_heads=4, key_dim=32
)
# Output layer
self.output_layer = tf.keras.layers.Dense(1, activation='sigmoid')
def call(self, inputs, training=False):
# Embed categorical features
user_emb = self.user_embedding(inputs['user_id'])
merchant_emb = self.merchant_embedding(inputs['merchant_id'])
device_emb = self.device_embedding(inputs['device_id'])
# Concatenate all features
numerical_features = inputs['numerical_features']
categorical_embeddings = tf.concat([
user_emb, merchant_emb, device_emb
], axis=1)
all_features = tf.concat([
numerical_features, categorical_embeddings
], axis=1)
# Apply cross network for feature interactions
cross_output = self.cross_network(all_features)
# Apply deep network
deep_output = self.deep_network(all_features)
# Combine and predict
combined = tf.concat([cross_output, deep_output], axis=1)
return self.output_layer(combined)
4.4 Handling Class Imbalance
class ImbalancedDataHandler:
"""Techniques for handling extreme class imbalance in fraud detection"""
def __init__(self):
pass
def oversampling_smote(self, X, y, sampling_ratio=0.1):
"""SMOTE oversampling for minority class"""
from imblearn.over_sampling import SMOTE
smote = SMOTE(sampling_strategy=sampling_ratio, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)
return X_resampled, y_resampled
def focal_loss(self, y_true, y_pred, alpha=0.25, gamma=2.0):
"""Focal loss for handling class imbalance"""
y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)
# Binary cross entropy
bce = -y_true * tf.math.log(y_pred) - (1 - y_true) * tf.math.log(1 - y_pred)
# Focal modulating factor
p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
focal_weight = alpha_t * tf.pow(1 - p_t, gamma)
return focal_weight * bce
def compute_class_weights(self, y):
"""Compute class weights for imbalanced data"""
n_samples = len(y)
n_fraud = sum(y)
n_legitimate = n_samples - n_fraud
weight_fraud = n_samples / (2 * n_fraud)
weight_legitimate = n_samples / (2 * n_legitimate)
return {0: weight_legitimate, 1: weight_fraud}
βΉοΈ
Class Imbalance Strategy: For fraud detection with 0.1% fraud rate:
- Use focal loss or class weights instead of oversampling
- Combine multiple techniques: ensemble of models trained with different sampling strategies
- Focus on precision-recall AUC, not accuracy
- Use threshold tuning based on business requirements (cost of false positives vs false negatives)
5. Serving Architecture
5.1 Real-time Scoring Pipeline
Transaction β Kafka β Feature Computation β Model Scoring β Decision Engine β Response
(< 20ms) (< 30ms) (< 10ms)
β β β
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β Feature β β Model β β Rule β
β Store β β Serving β β Engine β
β (Redis) β β (TF/ONNX)β β (Drools)β
βββββββββββ βββββββββββ βββββββββββ
5.2 Model Serving Options
class ModelServing:
"""Model serving for fraud detection"""
def __init__(self):
# Option 1: TensorFlow Serving
self.tf_serving_client = tf_serving_client.Client(
host='tensorflow-serving:8501'
)
# Option 2: ONNX Runtime
self.onnx_session = ort.InferenceSession(
"fraud_model.onnx",
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
# Option 3: Triton Inference Server
self.triton_client = tritonclient.http.InferenceServerClient(
url='triton-server:8000'
)
async def predict(self, features: Dict) -> Dict:
"""Get prediction from model"""
# Prepare input tensor
input_tensor = self.prepare_input(features)
# Get prediction
prediction = self.onnx_session.run(
['fraud_probability'],
{'input': input_tensor}
)[0]
return {
'fraud_probability': float(prediction[0]),
'model_version': 'v2.1.3',
'inference_time_ms': self.measure_latency()
}
5.3 Decision Engine
class FraudDecisionEngine:
"""Make final fraud decision based on model scores and rules"""
def __init__(self):
self.rule_engine = RuleEngine()
self.decision_history = DecisionHistory()
async def make_decision(self, transaction, model_scores, features):
"""Make final fraud decision"""
# Step 1: Check hard rules (immediate block/allow)
rule_decision = await self.rule_engine.evaluate(transaction, features)
if rule_decision == 'BLOCK':
return {
'decision': 'DECLINE',
'reason': 'Hard rule violation',
'rule_id': rule_decision.rule_id,
'confidence': 1.0
}
# Step 2: Get model score
ml_score = model_scores['fraud_probability']
# Step 3: Combine with business rules
final_score = self.combine_scores(ml_score, rule_decision, features)
# Step 4: Make decision based on threshold
if final_score > 0.9:
decision = 'DECLINE'
elif final_score > 0.7:
decision = 'CHALLENGE' # 3DS or additional verification
elif final_score > 0.5:
decision = 'REVIEW' # Manual review queue
else:
decision = 'APPROVE'
# Step 5: Generate explanation
explanation = self.generate_explanation(
transaction, model_scores, features, decision
)
return {
'decision': decision,
'fraud_score': final_score,
'explanation': explanation,
'risk_level': self.get_risk_level(final_score),
'recommended_actions': self.get_recommended_actions(decision, final_score)
}
def generate_explanation(self, transaction, model_scores, features, decision):
"""Generate human-readable explanation for decision"""
reasons = []
# High-value transaction
if features.get('amount_normalized', 0) > 3:
reasons.append("Transaction amount significantly higher than user's average")
# Unusual location
if features.get('is_unusual_country', False):
reasons.append("Transaction from unusual country for this user")
# Velocity anomaly
if features.get('user_txn_count_1h', 0) > 10:
reasons.append("Unusually high transaction frequency in last hour")
# Device anomaly
if features.get('new_device', False):
reasons.append("Transaction from new device")
# Card mismatch
if features.get('card_country_mismatch', False):
reasons.append("Card country differs from IP country")
# Model contribution
if model_scores['card_fraud'] > 0.5:
reasons.append("Card fraud model indicates high risk")
return {
'decision_reasons': reasons,
'primary_factor': reasons[0] if reasons else "Normal transaction pattern",
'confidence_explanation': f"Model confidence: {model_scores['fraud_probability']:.2%}"
}
βΉοΈ
Decision Engine Discussion: When discussing the decision engine, emphasize:
- Hard rules vs ML-based decisions
- Threshold tuning based on business requirements
- Explainability requirements for compliance
- Feedback loop from analyst decisions
6. Monitoring and Observability
6.1 Key Metrics
class FraudMonitoringMetrics:
"""Comprehensive monitoring for fraud detection system"""
# Model performance metrics
MODEL_METRICS = [
'precision_at_threshold',
'recall_at_threshold',
'f1_score',
'pr_auc',
'roc_auc',
'false_positive_rate',
'false_negative_rate'
]
# Business metrics
BUSINESS_METRICS = [
'fraud_rate',
'fraud_loss_amount',
'chargeback_rate',
'false_positive_rate',
'customer_friction_score'
]
# Operational metrics
OPERATIONAL_METRICS = [
'scoring_latency_p50',
'scoring_latency_p95',
'scoring_latency_p99',
'throughput_tps',
'error_rate',
'feature_freshness'
]
# Drift metrics
DRIFT_METRICS = [
'feature_drift_psi',
'prediction_drift_ks',
'model_calibration_error',
'data_quality_score'
]
6.2 Real-time Monitoring Dashboard
class FraudMonitoringDashboard:
"""Real-time monitoring dashboard for fraud detection"""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.alert_manager = AlertManager()
async def update_dashboard(self):
"""Update monitoring dashboard"""
metrics = await self.collect_metrics()
# Check for anomalies
anomalies = self.detect_anomalies(metrics)
# Send alerts if needed
for anomaly in anomalies:
await self.alert_manager.send_alert(anomaly)
# Update dashboard
await self.update_visualizations(metrics, anomalies)
async def collect_metrics(self):
"""Collect all monitoring metrics"""
return {
'real_time': {
'transactions_per_second': await self.get_tps(),
'avg_latency_ms': await self.get_avg_latency(),
'fraud_rate': await self.get_fraud_rate(),
'false_positive_rate': await self.get_fpr()
},
'hourly': {
'total_transactions': await self.get_hourly_transactions(),
'total_fraud': await self.get_hourly_fraud(),
'total_fraud_amount': await self.get_hourly_fraud_amount(),
'model_drift_score': await self.get_model_drift()
},
'daily': {
'chargeback_rate': await self.get_daily_chargebacks(),
'customer_complaints': await self.get_complaints(),
'model_performance': await self.get_model_performance()
}
}
def detect_anomalies(self, metrics):
"""Detect anomalies in metrics"""
anomalies = []
# Check for sudden spike in fraud rate
current_fraud_rate = metrics['real_time']['fraud_rate']
if current_fraud_rate > self.baseline_fraud_rate * 2:
anomalies.append({
'type': 'FRAUD_SPIKE',
'severity': 'HIGH',
'message': f'Fraud rate spike detected: {current_fraud_rate:.2%}',
'current': current_fraud_rate,
'baseline': self.baseline_fraud_rate
})
# Check for latency degradation
current_latency = metrics['real_time']['avg_latency_ms']
if current_latency > 100: # SLA threshold
anomalies.append({
'type': 'LATENCY_BREACH',
'severity': 'MEDIUM',
'message': f'Latency SLA breach: {current_latency:.2f}ms',
'current': current_latency,
'threshold': 100
})
return anomalies
β οΈ
Critical Monitoring Points:
- Model drift: Monitor for changes in feature distributions and prediction patterns
- Feedback loop: Track how analyst decisions affect future model performance
- Concept drift: Fraud patterns evolve - detect when model performance degrades
- Alert fatigue: Balance sensitivity with false alarm rate
7. Scale Considerations and Trade-offs
7.1 Horizontal Scaling
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCALING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Transaction Volume β
β βββ Partition by user_id or merchant_id β
β βββ Shard 1: Users A-H β
β βββ Shard 2: Users I-P β
β βββ Shard 3: Users Q-Z β
β β
β Model Serving β
β βββ Horizontal scaling with load balancing β
β βββ GPU instances for neural network models β
β βββ CPU instances for GBDT models β
β β
β Feature Store β
β βββ Redis cluster with replication β
β βββ Master for writes β
β βββ Replicas for reads β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
7.2 Cost vs Performance Trade-offs
| Dimension | Option A (Cost Optimized) | Option B (Performance Optimized) |
|---|---|---|
| Model Complexity | Simple GBDT (fast inference) | Deep neural network (better accuracy) |
| Feature Freshness | Batch features (hourly) | Real-time features (streaming) |
| Model Retraining | Weekly retraining | Daily retraining |
| Explainability | Post-hoc explanations | Inherently interpretable models |
| Storage | Sampled historical data | Full historical data |
7.3 Latency Optimization
class LatencyOptimizer:
"""Optimize latency for fraud detection"""
def __init__(self):
self.feature_cache = LRUCache(maxsize=100000)
self.model_cache = ModelCache()
async def optimize_scoring(self, transaction):
"""Optimize fraud scoring latency"""
# Step 1: Check feature cache
cache_key = self.generate_cache_key(transaction)
cached_features = self.feature_cache.get(cache_key)
if cached_features:
features = cached_features
else:
# Step 2: Compute features (parallel)
features = await self.compute_features_parallel(transaction)
self.feature_cache.set(cache_key, features, ttl=60)
# Step 3: Check model cache
model_version = self.model_cache.get_latest_version()
if model_version != self.current_model_version:
await self.update_model(model_version)
# Step 4: Batch scoring for similar transactions
if self.can_batch(transaction):
return await self.batch_score(transaction, features)
else:
return await self.individual_score(features)
async def compute_features_parallel(self, transaction):
"""Compute features in parallel for lower latency"""
tasks = [
self.compute_user_features(transaction.user_id),
self.compute_card_features(transaction.card_last_four),
self.compute_merchant_features(transaction.merchant_id),
self.compute_geo_features(transaction.ip_country),
self.compute_device_features(transaction.device_fingerprint)
]
results = await asyncio.gather(*tasks)
# Merge results
features = {}
for result in results:
features.update(result)
return features
π‘
Latency Optimization Tips:
- Cache frequently accessed features
- Compute features in parallel
- Use efficient data structures (numpy arrays vs pandas)
- Consider model distillation for faster inference
- Use async I/O for database calls
8. Advanced Topics
8.1 Graph-based Fraud Detection
import networkx as nx
from node2vec import Node2Vec
class GraphFraudDetector:
"""Graph-based fraud detection using transaction networks"""
def __init__(self):
self.graph = nx.DiGraph()
def build_transaction_graph(self, transactions):
"""Build graph from transaction data"""
for txn in transactions:
# Add nodes
self.graph.add_node(txn.user_id, type='user')
self.graph.add_node(txn.merchant_id, type='merchant')
self.graph.add_node(txn.device_fingerprint, type='device')
# Add edges with features
self.graph.add_edge(
txn.user_id, txn.merchant_id,
amount=txn.amount,
timestamp=txn.timestamp,
is_fraud=txn.is_fraud
)
def detect_fraud_communities(self):
"""Detect fraud rings using community detection"""
# Find strongly connected components
communities = list(nx.strongly_connected_components(self.graph))
fraud_communities = []
for community in communities:
# Check if community has high fraud concentration
fraud_rate = self.compute_community_fraud_rate(community)
if fraud_rate > 0.3: # Threshold
fraud_communities.append({
'nodes': community,
'fraud_rate': fraud_rate,
'size': len(community)
})
return fraud_communities
def compute_node_features(self, node_id):
"""Compute graph-based features for a node"""
node = self.graph.nodes[node_id]
# Degree centrality
in_degree = self.graph.in_degree(node_id)
out_degree = self.graph.out_degree(node_id)
# PageRank
pagerank = nx.pagerank(self.graph, personalization={node_id: 1})
# Triangle count (for clustering)
triangles = nx.triangles(self.graph.to_undirected(), node_id)
return {
'in_degree': in_degree,
'out_degree': out_degree,
'pagerank': pagerank[node_id],
'triangle_count': triangles,
'node_type': node['type']
}
8.2 Real-time Anomaly Detection
class RealTimeAnomalyDetector:
"""Detect anomalies in real-time using streaming algorithms"""
def __init__(self):
# Count-Min Sketch for frequency estimation
self.count_min = CountMinSketch(width=1000000, depth=10)
# HyperLogLog for cardinality estimation
self.hll = HyperLogLog(error_rate=0.01)
# Sliding window for recent patterns
self.window_size = 3600 # 1 hour
self.window = SlidingWindow(self.window_size)
def update(self, transaction):
"""Update anomaly detection with new transaction"""
# Update frequency sketch
self.count_min.update(transaction.user_id)
self.count_min.update(transaction.merchant_id)
self.count_min.update(transaction.card_last_four)
# Update cardinality estimator
self.hll.add(transaction.user_id)
# Add to sliding window
self.window.add(transaction)
# Check for anomalies
anomalies = self.detect_anomalies(transaction)
return anomalies
def detect_anomalies(self, transaction):
"""Detect anomalies based on current state"""
anomalies = []
# Check frequency anomaly
user_freq = self.count_min.estimate(transaction.user_id)
if user_freq > self.get_frequency_threshold():
anomalies.append({
'type': 'HIGH_FREQUENCY',
'entity': transaction.user_id,
'frequency': user_freq,
'threshold': self.get_frequency_threshold()
})
# Check velocity anomaly
recent_count = self.window.count_entity(
transaction.user_id,
window_seconds=300 # 5 minutes
)
if recent_count > self.get_velocity_threshold():
anomalies.append({
'type': 'VELOCITY_ANOMALY',
'entity': transaction.user_id,
'count': recent_count,
'window': '5 minutes'
})
return anomalies
8.3 Explainable AI for Compliance
class FraudExplainer:
"""Generate explanations for fraud decisions"""
def __init__(self):
self.shap_explainer = shap.TreeExplainer(self.model)
self.lime_explainer = lime.lime_tabular.LimeTabularExplainer(
training_data=self.training_data,
feature_names=self.feature_names,
class_names=['Legitimate', 'Fraud']
)
def explain_prediction(self, transaction_features):
"""Generate explanation for a prediction"""
# SHAP explanation
shap_values = self.shap_explainer.shap_values(transaction_features)
# Get top contributing features
feature_importance = list(zip(
self.feature_names,
shap_values[0]
))
feature_importance.sort(key=lambda x: abs(x[1]), reverse=True)
# Generate human-readable explanation
explanation = self.generate_narrative(
transaction_features,
feature_importance[:5] # Top 5 features
)
return {
'shap_values': shap_values,
'top_features': feature_importance[:5],
'narrative': explanation,
'confidence': self.compute_confidence(shap_values)
}
def generate_narrative(self, features, top_features):
"""Generate human-readable explanation"""
narrative_parts = []
for feature_name, importance in top_features:
if feature_name == 'is_unusual_country' and features[feature_name]:
narrative_parts.append(
"Transaction from unusual country for this user"
)
elif feature_name == 'amount_normalized' and features[feature_name] > 3:
narrative_parts.append(
f"Transaction amount ({features['amount']:.2f}) is "
f"{features['amount_normalized']:.1f}x higher than average"
)
elif feature_name == 'user_txn_count_1h' and features[feature_name] > 10:
narrative_parts.append(
f"User has made {features['user_txn_count_1h']} transactions "
f"in the last hour (unusual frequency)"
)
return "Primary risk factors: " + "; ".join(narrative_parts)
βΉοΈ
Explainability Requirements: For financial fraud detection:
- Provide both global and local explanations
- Use multiple explanation methods (SHAP, LIME, rule-based)
- Ensure explanations are auditable and logged
- Allow analysts to provide feedback on explanation quality
9. Implementation Roadmap
Phase 1: Rule-based System (Weeks 1-2)
- Implement basic rule engine
- Set up transaction ingestion pipeline
- Create basic monitoring dashboard
- Establish baseline metrics
Phase 2: ML Models (Weeks 3-6)
- Feature engineering pipeline
- Train initial GBDT model
- Implement model serving
- Set up A/B testing framework
Phase 3: Advanced Detection (Weeks 7-10)
- Graph-based fraud detection
- Real-time anomaly detection
- Multi-model ensemble
- Advanced explainability
Phase 4: Optimization (Weeks 11-14)
- Latency optimization
- Cost optimization
- Advanced monitoring
- Feedback loop implementation
10. Summary and Key Takeaways
Architecture Recap
- Multi-layered detection: Rules + ML + Graph analysis
- Real-time feature computation: Streaming features with low latency
- Ensemble approach: Multiple models for different fraud types
- Explainable decisions: All decisions must be auditable
Key Metrics
- Model Performance: Precision, Recall, F1, PR-AUC
- Business Impact: Fraud rate, chargeback rate, false positive rate
- Operational: Latency, throughput, error rate
Common Interview Mistakes
- Ignoring class imbalance (99.9% legitimate transactions)
- Not discussing explainability requirements
- Forgetting about feedback loops from analyst decisions
- Not considering latency requirements for real-time decisions
- Ignoring concept drift and model degradation
βΉοΈ
Final Interview Tip: Emphasize the business impact of fraud detection decisions. Discuss the trade-off between catching fraud and customer friction. Show understanding of both ML techniques and production requirements for financial systems.
Further Reading
- "Fraud Detection in Financial Services" (IEEE Conference)
- "Graph-Based Fraud Detection" (KDD Conference)
- "Explainable AI for Financial Services" (NIST)
- "Real-Time Anomaly Detection" (ACM Computing Surveys)
- "Production Machine Learning Systems" (O'Reilly)