Interview Question (Hard) β Asked at: Google, Microsoft, Amazon, Apple, Meta
"Design an ML governance framework that ensures model fairness, explainability, and compliance with regulations like GDPR and AI Act. How do you implement model cards and audit trails?"
ML Governance Architecture
ML governance ensures responsible AI development and deployment through policies, procedures, and technical controls.
Governance Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML Governance Framework β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Policies βββββΆβControls βββββΆβAudit βββββΆβReporting β β
β β & Standardsβ β& Checks β β Trails β β& Dashboardsβ
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Model β βFairness β βExplain- β βRegulatoryβ β
β β Registry β β& Bias β βability β βComplianceβ β
β β β βDetection β βEngine β β β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Model Cards
Automated Model Card Generation
import json
from datetime import datetime
from typing import Dict, List, Optional
import pandas as pd
import numpy as np
class ModelCardGenerator:
"""Generate comprehensive model cards for governance."""
def __init__(self, model_name: str, model_version: str,
model_owner: str, team: str):
self.model_name = model_name
self.model_version = model_version
self.model_owner = model_owner
self.team = team
def generate_model_card(self,
model_description: str,
training_data: pd.DataFrame,
evaluation_results: Dict,
fairness_results: Dict = None,
limitations: List[str] = None,
ethical_considerations: List[str] = None,
intended_uses: List[str] = None,
out_of_scope_uses: List[str] = None) -> Dict:
"""Generate complete model card."""
model_card = {
'model_details': {
'name': self.model_name,
'version': self.model_version,
'type': self._get_model_type(),
'owner': self.model_owner,
'team': self.team,
'description': model_description,
'created_at': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat(),
'license': 'Proprietary',
'contact': f"{self.team}@company.com"
},
'intended_use': {
'primary_use_cases': intended_uses or [
'Production inference for real-time predictions',
'Batch processing for offline analysis'
],
'out_of_scope_uses': out_of_scope_uses or [
'Life-critical decisions without human oversight',
'Surveillance or profiling of individuals',
'Any use violating applicable laws or regulations'
],
'users': self._get_intended_users(),
'deployment_context': self._get_deployment_context()
},
'training_data': {
'description': self._get_data_description(training_data),
'size': len(training_data),
'features': self._get_feature_description(training_data),
'preprocessing_steps': self._get_preprocessing_steps(),
'data_collection_process': self._get_data_collection_process(),
'known_limitations': self._get_data_limitations(training_data)
},
'evaluation_data': {
'description': self._get_eval_data_description(),
'metrics': evaluation_results,
'evaluation_methodology': self._get_eval_methodology()
},
'performance_metrics': evaluation_results,
'fairness_analysis': fairness_results or {},
'limitations': limitations or self._get_default_limitations(),
'ethical_considerations': ethical_considerations or self._get_ethical_considerations(),
'recommendations': self._generate_recommendations(evaluation_results, fairness_results),
'governance': {
'review_status': 'pending',
'last_review_date': None,
'next_review_date': None,
'approver': None,
'audit_trail': []
}
}
return model_card
def _get_model_type(self) -> str:
"""Determine model type."""
return "Classification Model"
def _get_intended_users(self) -> List[str]:
"""Define intended users."""
return [
"ML Engineers for model deployment",
"Data Scientists for model analysis",
"Product Managers for feature integration",
"Compliance Officers for audit"
]
def _get_deployment_context(self) -> str:
"""Describe deployment context."""
return """
Model deployed in production environment for real-time inference.
Serves predictions via REST API with <100ms latency requirement.
Used for automated decision-making with human oversight.
"""
def _get_data_description(self, data: pd.DataFrame) -> str:
"""Generate data description."""
return f"""
Training dataset with {len(data)} samples and {len(data.columns)} features.
Data collected from production systems over the last 90 days.
Features include numerical and categorical variables.
All data anonymized and compliant with privacy regulations.
"""
def _get_feature_description(self, data: pd.DataFrame) -> List[Dict]:
"""Describe features."""
features = []
for col in data.columns:
feature_info = {
'name': col,
'type': str(data[col].dtype),
'description': f"Feature {col}",
'missing_rate': float(data[col].isna().mean()),
'unique_values': int(data[col].nunique())
}
if data[col].dtype in ['int64', 'float64']:
feature_info['statistics'] = {
'mean': float(data[col].mean()),
'std': float(data[col].std()),
'min': float(data[col].min()),
'max': float(data[col].max())
}
features.append(feature_info)
return features
def _get_preprocessing_steps(self) -> List[str]:
"""List preprocessing steps."""
return [
"Missing value imputation using median for numerical, mode for categorical",
"Categorical encoding using one-hot encoding",
"Feature scaling using standard scaler",
"Outlier handling using IQR method"
]
def _get_data_collection_process(self) -> str:
"""Describe data collection."""
return """
Data collected from production user interactions with explicit consent.
All data anonymized and de-identified before use.
No personally identifiable information (PII) included in training data.
Data retention policy: 90 days for raw data, indefinite for aggregated features.
"""
def _get_data_limitations(self, data: pd.DataFrame) -> List[str]:
"""Identify data limitations."""
limitations = []
# Check for class imbalance
if 'label' in data.columns:
class_counts = data['label'].value_counts()
imbalance_ratio = class_counts.min() / class_counts.max()
if imbalance_ratio < 0.5:
limitations.append(
f"Class imbalance detected: ratio {imbalance_ratio:.2f}"
)
# Check for missing values
missing_rates = data.isna().mean()
high_missing = missing_rates[missing_rates > 0.1]
if not high_missing.empty:
limitations.append(
f"High missing values in: {list(high_missing.index)}"
)
return limitations
def _get_eval_data_description(self) -> str:
"""Describe evaluation data."""
return """
Evaluation performed on held-out test set representing
recent production data distribution. Test set contains
diverse samples across all demographic groups.
"""
def _get_eval_methodology(self) -> str:
"""Describe evaluation methodology."""
return """
Evaluation metrics computed on stratified test set.
Cross-validation with 5 folds used for robust estimation.
Statistical significance testing performed for metric comparisons.
Fairness metrics computed across protected attributes.
"""
def _get_default_limitations(self) -> List[str]:
"""Get default limitations."""
return [
"Model trained on historical data, may not generalize to new patterns",
"Performance may degrade on underrepresented groups",
"Not suitable for extreme edge cases outside training distribution",
"Requires periodic retraining to maintain performance"
]
def _get_ethical_considerations(self) -> List[str]:
"""Get ethical considerations."""
return [
"Model reviewed for fairness across protected attributes",
"Regular bias audits scheduled quarterly",
"Human oversight required for high-stakes decisions",
"Transparency provided through explainability tools",
"Right to explanation available for affected individuals"
]
def _generate_recommendations(self, metrics: Dict,
fairness_results: Dict) -> List[str]:
"""Generate recommendations."""
recommendations = []
if metrics.get('auc_roc', 0) < 0.9:
recommendations.append(
"Consider feature engineering or model tuning to improve performance"
)
if fairness_results:
disparities = fairness_results.get('disparities', {})
for attr, disparity in disparities.items():
if disparity > 0.1:
recommendations.append(
f"Address fairness disparity in {attr}: {disparity:.4f}"
)
recommendations.extend([
"Monitor model performance in production",
"Schedule regular retraining (quarterly recommended)",
"Conduct periodic bias audits",
"Maintain audit trail for regulatory compliance"
])
return recommendations
def export_model_card(self, model_card: Dict, format: str = 'json') -> str:
"""Export model card."""
if format == 'json':
return json.dumps(model_card, indent=2, default=str)
elif format == 'markdown':
return self._to_markdown(model_card)
else:
raise ValueError(f"Unsupported format: {format}")
def _to_markdown(self, card: Dict) -> str:
"""Convert to markdown."""
md = f"""
# Model Card: {card['model_details']['name']}
**Version:** {card['model_details']['version']}
**Owner:** {card['model_details']['owner']}
**Team:** {card['model_details']['team']}
**Description:** {card['model_details']['description']}
## Intended Use
### Primary Use Cases
{chr(10).join(f"- {use}" for use in card['intended_use']['primary_use_cases'])}
### Out-of-Scope Uses
{chr(10).join(f"- {use}" for use in card['intended_use']['out_of_scope_uses'])}
## Training Data
{card['training_data']['description']}
**Size:** {card['training_data']['size']} samples
## Performance Metrics
{self._format_metrics(card['performance_metrics'])}
## Fairness Analysis
{self._format_fairness(card['fairness_analysis'])}
## Limitations
{chr(10).join(f"- {limitation}" for limitation in card['limitations'])}
## Ethical Considerations
{chr(10).join(f"- {consideration}" for consideration in card['ethical_considerations'])}
## Recommendations
{chr(10).join(f"- {rec}" for rec in card['recommendations'])}
"""
return md
def _format_metrics(self, metrics: Dict) -> str:
"""Format metrics."""
lines = []
for metric, value in metrics.items():
if isinstance(value, float):
lines.append(f"- **{metric}:** {value:.4f}")
else:
lines.append(f"- **{metric}:** {value}")
return chr(10).join(lines)
def _format_fairness(self, fairness: Dict) -> str:
"""Format fairness results."""
if not fairness:
return "No fairness analysis performed."
lines = []
for attr, result in fairness.items():
if isinstance(result, dict):
lines.append(f"- **{attr}:** disparity={result.get('disparity', 'N/A')}")
return chr(10).join(lines) if lines else "No disparities detected."
βΉοΈ
Model cards provide transparency about model capabilities, limitations, and intended use. Automate model card generation to ensure consistency and completeness across all models.
Explainability
SHAP-Based Explainability
import shap
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
import json
class ModelExplainer:
"""Provide model explanations using SHAP."""
def __init__(self, model, X_background: pd.DataFrame):
self.model = model
self.X_background = X_background
# Create SHAP explainer
if hasattr(model, 'predict_proba'):
self.explainer = shap.TreeExplainer(model)
else:
self.explainer = shap.KernelExplainer(
model.predict,
shap.sample(X_background, 100)
)
def explain_prediction(self, X_instance: pd.DataFrame) -> Dict:
"""Explain a single prediction."""
# Calculate SHAP values
shap_values = self.explainer.shap_values(X_instance)
if isinstance(shap_values, list):
shap_values = shap_values[1] # For binary classification
# Get feature importance
feature_importance = dict(zip(
X_instance.columns,
shap_values[0]
))
# Sort by absolute importance
sorted_importance = sorted(
feature_importance.items(),
key=lambda x: abs(x[1]),
reverse=True
)
# Calculate base value and prediction
base_value = self.explainer.expected_value
if isinstance(base_value, list):
base_value = base_value[1]
prediction = self.model.predict(X_instance)[0]
explanation = {
'prediction': float(prediction),
'base_value': float(base_value),
'feature_contributions': [
{
'feature': feat,
'contribution': float(contrib),
'value': float(X_instance[feat].values[0]),
'direction': 'positive' if contrib > 0 else 'negative'
}
for feat, contrib in sorted_importance[:10]
],
'total_contribution': float(sum(feature_importance.values()))
}
return explanation
def explain_batch(self, X_batch: pd.DataFrame) -> List[Dict]:
"""Explain multiple predictions."""
shap_values = self.explainer.shap_values(X_batch)
if isinstance(shap_values, list):
shap_values = shap_values[1]
explanations = []
for i in range(len(X_batch)):
feature_importance = dict(zip(
X_batch.columns,
shap_values[i]
))
sorted_importance = sorted(
feature_importance.items(),
key=lambda x: abs(x[1]),
reverse=True
)
explanation = {
'instance_index': i,
'feature_contributions': [
{'feature': feat, 'contribution': float(contrib)}
for feat, contrib in sorted_importance[:5]
]
}
explanations.append(explanation)
return explanations
def get_global_feature_importance(self) -> Dict:
"""Get global feature importance."""
shap_values = self.explainer.shap_values(self.X_background)
if isinstance(shap_values, list):
shap_values = shap_values[1]
# Calculate mean absolute SHAP values
mean_shap = np.abs(shap_values).mean(axis=0)
feature_importance = dict(zip(
self.X_background.columns,
mean_shap
))
# Sort by importance
sorted_importance = sorted(
feature_importance.items(),
key=lambda x: x[1],
reverse=True
)
return {
'global_importance': [
{'feature': feat, 'importance': float(imp)}
for feat, imp in sorted_importance
],
'total_importance': float(sum(mean_shap))
}
def detect_feature_interactions(self, X_instance: pd.DataFrame) -> Dict:
"""Detect feature interactions."""
# Calculate SHAP interaction values
shap_interaction = self.explainer.shap_interaction_values(X_instance)
if isinstance(shap_interaction, list):
shap_interaction = shap_interaction[1]
# Extract top interactions
n_features = len(X_instance.columns)
interactions = []
for i in range(n_features):
for j in range(i + 1, n_features):
interaction_value = shap_interaction[0, i, j]
if abs(interaction_value) > 0.01: # Threshold
interactions.append({
'feature_1': X_instance.columns[i],
'feature_2': X_instance.columns[j],
'interaction_strength': float(abs(interaction_value)),
'interaction_value': float(interaction_value)
})
# Sort by strength
interactions.sort(key=lambda x: x['interaction_strength'], reverse=True)
return {
'interactions': interactions[:10],
'total_interactions': len(interactions)
}
class LIMEExplainer:
"""Provide explanations using LIME."""
def __init__(self, model, X_train: pd.DataFrame):
from lime.lime_tabular import LimeTabularExplainer
self.model = model
self.explainer = LimeTabularExplainer(
X_train.values,
feature_names=list(X_train.columns),
class_names=['negative', 'positive'],
mode='classification'
)
def explain_prediction(self, X_instance: pd.DataFrame) -> Dict:
"""Explain a single prediction using LIME."""
explanation = self.explainer.explain_instance(
X_instance.values[0],
self.model.predict_proba,
num_features=10
)
return {
'prediction': int(self.model.predict(X_instance)[0]),
'probability': float(self.model.predict_proba(X_instance)[0][1]),
'feature_importance': [
{'feature': feat, 'weight': float(weight)}
for feat, weight in explanation.as_list()
],
'intercept': float(explanation.intercept[1]),
'local_pred': float(explanation.local_pred[1])
}
Fairness and Bias Detection
Fairness Metrics Implementation
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from scipy import stats
class FairnessAuditor:
"""Audit model fairness across protected attributes."""
def __init__(self, protected_attributes: List[str],
privileged_groups: Dict[str, any] = None):
"""
Args:
protected_attributes: List of protected attribute column names
privileged_groups: Mapping of attribute to privileged value
"""
self.protected_attributes = protected_attributes
self.privileged_groups = privileged_groups or {}
def compute_fairness_metrics(self, y_true: np.ndarray,
y_pred: np.ndarray,
sensitive_features: pd.DataFrame) -> Dict:
"""Compute comprehensive fairness metrics."""
results = {}
for attr in self.protected_attributes:
if attr not in sensitive_features.columns:
continue
groups = sensitive_features[attr].unique()
# Get privileged group
privileged_value = self.privileged_groups.get(attr, groups[0])
# Compute group-specific metrics
group_metrics = {}
for group in groups:
mask = sensitive_features[attr] == group
if mask.sum() < 50: # Skip small groups
continue
group_y_true = y_true[mask]
group_y_pred = y_pred[mask]
metrics = self._compute_group_metrics(
group_y_true, group_y_pred
)
group_metrics[group] = metrics
# Compute fairness metrics
fairness = self._compute_fairness_metrics(
group_metrics, privileged_value
)
results[attr] = {
'group_metrics': group_metrics,
'fairness_metrics': fairness,
'privileged_group': privileged_value
}
return results
def _compute_group_metrics(self, y_true: np.ndarray,
y_pred: np.ndarray) -> Dict:
"""Compute metrics for a single group."""
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score
)
metrics = {
'size': len(y_true),
'positive_rate': float(np.mean(y_true)),
'prediction_rate': float(np.mean(y_pred)),
'accuracy': float(accuracy_score(y_true, y_pred)),
}
if len(np.unique(y_true)) == 2:
metrics['precision'] = float(precision_score(y_true, y_pred, zero_division=0))
metrics['recall'] = float(recall_score(y_true, y_pred, zero_division=0))
metrics['f1_score'] = float(f1_score(y_true, y_pred, zero_division=0))
if len(np.unique(y_pred)) == 2:
metrics['auc_roc'] = float(roc_auc_score(y_true, y_pred))
# True positive rate, false positive rate
if len(np.unique(y_true)) == 2:
tp = np.sum((y_true == 1) & (y_pred == 1))
fp = np.sum((y_true == 0) & (y_pred == 1))
fn = np.sum((y_true == 1) & (y_pred == 0))
tn = np.sum((y_true == 0) & (y_pred == 0))
metrics['true_positive_rate'] = float(tp / (tp + fn)) if (tp + fn) > 0 else 0
metrics['false_positive_rate'] = float(fp / (fp + tn)) if (fp + tn) > 0 else 0
metrics['true_negative_rate'] = float(tn / (tn + fp)) if (tn + fp) > 0 else 0
metrics['false_negative_rate'] = float(fn / (fn + tp)) if (fn + tp) > 0 else 0
return metrics
def _compute_fairness_metrics(self, group_metrics: Dict,
privileged_group: str) -> Dict:
"""Compute fairness metrics across groups."""
fairness = {}
# Demographic Parity
prediction_rates = {
group: metrics['prediction_rate']
for group, metrics in group_metrics.items()
}
if len(prediction_rates) > 1:
max_rate = max(prediction_rates.values())
min_rate = min(prediction_rates.values())
fairness['demographic_parity_difference'] = max_rate - min_rate
fairness['demographic_parity_ratio'] = min_rate / max_rate if max_rate > 0 else 0
# Equal Opportunity (True Positive Rate)
tpr_values = {
group: metrics.get('true_positive_rate', 0)
for group, metrics in group_metrics.items()
}
if len(tpr_values) > 1:
max_tpr = max(tpr_values.values())
min_tpr = min(tpr_values.values())
fairness['equal_opportunity_difference'] = max_tpr - min_tpr
# Equalized Odds (average of TPR and FPR differences)
fpr_values = {
group: metrics.get('false_positive_rate', 0)
for group, metrics in group_metrics.items()
}
if len(fpr_values) > 1 and len(tpr_values) > 1:
tpr_diff = max(tpr_values.values()) - min(tpr_values.values())
fpr_diff = max(fpr_values.values()) - min(fpr_values.values())
fairness['equalized_odds_difference'] = (tpr_diff + fpr_diff) / 2
# Predictive Parity
precision_values = {
group: metrics.get('precision', 0)
for group, metrics in group_metrics.items()
}
if len(precision_values) > 1:
max_precision = max(precision_values.values())
min_precision = min(precision_values.values())
fairness['predictive_parity_difference'] = max_precision - min_precision
# Disparate Impact
privileged_rate = group_metrics.get(privileged_group, {}).get('prediction_rate', 0)
other_rates = [
metrics['prediction_rate']
for group, metrics in group_metrics.items()
if group != privileged_group
]
if other_rates and privileged_rate > 0:
min_other_rate = min(other_rates)
fairness['disparate_impact_ratio'] = min_other_rate / privileged_rate
return fairness
def statistical_parity_test(self, y_pred: np.ndarray,
sensitive_features: pd.DataFrame,
attr: str,
significance_level: float = 0.05) -> Dict:
"""Test for statistical parity using chi-squared test."""
from scipy.stats import chi2_contingency
groups = sensitive_features[attr].unique()
# Create contingency table
contingency = []
for group in groups:
mask = sensitive_features[attr] == group
group_preds = y_pred[mask]
positive_count = np.sum(group_preds == 1)
negative_count = np.sum(group_preds == 0)
contingency.append([positive_count, negative_count])
contingency = np.array(contingency)
# Perform chi-squared test
chi2, p_value, dof, expected = chi2_contingency(contingency)
return {
'test': 'chi_squared_parity',
'attribute': attr,
'chi2_statistic': float(chi2),
'p_value': float(p_value),
'degrees_of_freedom': dof,
'statistically_significant': p_value < significance_level,
'contingency_table': contingency.tolist()
}
class BiasMitigator:
"""Mitigate bias in ML models."""
def __init__(self, method: str = 'reweighing'):
self.method = method
self.weights = None
def fit(self, X: pd.DataFrame, y: np.ndarray,
sensitive_features: pd.DataFrame,
privileged_group: str = None):
"""Fit bias mitigation."""
if self.method == 'reweighing':
self.weights = self._compute_reweighing_weights(
X, y, sensitive_features, privileged_group
)
elif self.method == 'disparate_impact_remover':
self._fit_disparate_impact_remover(X, sensitive_features)
def _compute_reweighing_weights(self, X, y, sensitive_features,
privileged_group) -> np.ndarray:
"""Compute reweighing weights."""
weights = np.ones(len(y))
for attr in sensitive_features.columns:
groups = sensitive_features[attr].unique()
for group in groups:
mask = sensitive_features[attr] == group
for label in [0, 1]:
label_mask = y == label
combined_mask = mask & label_mask
# Expected vs observed
expected = np.mean(mask) * np.mean(label_mask)
observed = np.mean(combined_mask)
if expected > 0:
weights[combined_mask] = expected / observed
return weights
def _fit_disparate_impact_remover(self, X, sensitive_features):
"""Fit disparate impact remover."""
# Implementation for disparate impact remover
pass
def get_weights(self) -> np.ndarray:
"""Get sample weights for bias mitigation."""
return self.weights
β οΈ
Bias mitigation requires careful consideration of trade-offs. Mitigating one type of bias may increase another. Always validate fairness improvements against multiple metrics.
Regulatory Compliance
GDPR Compliance Implementation
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class GDPRComplianceManager:
"""Manage GDPR compliance for ML systems."""
def __init__(self, data_retention_days: int = 90):
self.data_retention_days = data_retention_days
def log_data_processing(self, processing_type: str,
data_subjects: List[str],
purpose: str,
legal_basis: str,
retention_period: int = None) -> Dict:
"""Log data processing activity."""
log_entry = {
'processing_id': f"proc_{datetime.now():%Y%m%d%H%M%S}",
'timestamp': datetime.now().isoformat(),
'processing_type': processing_type,
'data_subjects': data_subjects,
'purpose': purpose,
'legal_basis': legal_basis,
'retention_period': retention_period or self.data_retention_days,
'expiry_date': (
datetime.now() + timedelta(days=retention_period or self.data_retention_days)
).isoformat()
}
return log_entry
def handle_data_subject_request(self, request_type: str,
subject_id: str) -> Dict:
"""Handle data subject requests (access, erasure, etc.)."""
response = {
'request_type': request_type,
'subject_id': subject_id,
'timestamp': datetime.now().isoformat(),
'status': 'processing'
}
if request_type == 'access':
response['data'] = self._get_subject_data(subject_id)
response['status'] = 'completed'
elif request_type == 'erasure':
self._erase_subject_data(subject_id)
response['status'] = 'completed'
elif request_type == 'portability':
response['data'] = self._export_subject_data(subject_id)
response['status'] = 'completed'
return response
def _get_subject_data(self, subject_id: str) -> Dict:
"""Get all data for a data subject."""
# Implementation depends on data storage
return {'subject_id': subject_id, 'data': {}}
def _erase_subject_data(self, subject_id: str):
"""Erase all data for a data subject."""
pass
def _export_subject_data(self, subject_id: str) -> Dict:
"""Export data for portability."""
return self._get_subject_data(subject_id)
def check_data_retention(self) -> List[Dict]:
"""Check for data that needs to be deleted."""
# Implementation would check actual data stores
return []
def generate_compliance_report(self) -> Dict:
"""Generate GDPR compliance report."""
return {
'report_date': datetime.now().isoformat(),
'data_retention_policy': f"{self.data_retention_days} days",
'processing_activities': [],
'data_subject_requests': [],
'compliance_status': 'compliant'
}
class AIEactCompliance:
"""Manage EU AI Act compliance."""
def __init__(self):
self.risk_categories = {
'unacceptable': [],
'high': [],
'limited': [],
'minimal': []
}
def assess_risk_category(self, use_case: str,
model_type: str,
deployment_context: str) -> str:
"""Assess AI system risk category."""
# High-risk use cases
high_risk_use_cases = [
'credit_scoring',
'employment_decisions',
'law_enforcement',
'education_access',
'essential_services_access'
]
if any(use_case in high_risk for high_risk in high_risk_use_cases):
return 'high'
# Limited-risk use cases
limited_risk_use_cases = [
'chatbots',
'emotion_recognition',
'biometric_categorization'
]
if any(use_case in limited_risk for limited_risk in limited_risk_use_cases):
return 'limited'
return 'minimal'
def get_compliance_requirements(self, risk_category: str) -> Dict:
"""Get compliance requirements for risk category."""
requirements = {
'high': {
'requirements': [
'Risk management system',
'Data governance',
'Technical documentation',
'Record-keeping',
'Transparency',
'Human oversight',
'Accuracy, robustness, cybersecurity',
'Conformity assessment'
],
'timeline': '12 months from enforcement',
'penalties': 'Up to 35 million EUR or 7% of global turnover'
},
'limited': {
'requirements': [
'Transparency obligations',
'User notification'
],
'timeline': '6 months from enforcement',
'penalties': 'Up to 15 million EUR or 3% of global turnover'
},
'minimal': {
'requirements': [
'Voluntary codes of conduct'
],
'timeline': 'No specific requirements',
'penalties': 'N/A'
}
}
return requirements.get(risk_category, requirements['minimal'])
def generate_compliance_checklist(self, risk_category: str) -> List[Dict]:
"""Generate compliance checklist."""
requirements = self.get_compliance_requirements(risk_category)
checklist = [
{
'requirement': req,
'status': 'pending',
'priority': 'high' if risk_category == 'high' else 'medium',
'due_date': requirements.get('timeline', 'N/A')
}
for req in requirements['requirements']
]
return checklist
βΉοΈ
GDPR and EU AI Act require specific compliance measures. Implement data processing logs, data subject rights handling, and risk assessments from the start of ML system development.
Audit Trail
ML Audit Trail Implementation
from datetime import datetime
from typing import Dict, List, Optional
import json
import hashlib
class MLAuditTrail:
"""Maintain audit trail for ML systems."""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.audit_log = []
def log_event(self, event_type: str,
actor: str,
resource: str,
action: str,
details: Dict = None,
metadata: Dict = None) -> Dict:
"""Log an audit event."""
event = {
'event_id': self._generate_event_id(),
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'actor': actor,
'resource': resource,
'action': action,
'details': details or {},
'metadata': metadata or {},
'checksum': None
}
# Calculate checksum for integrity
event['checksum'] = self._calculate_checksum(event)
self.audit_log.append(event)
return event
def log_model_training(self, model_name: str,
model_version: str,
training_config: Dict,
metrics: Dict,
actor: str):
"""Log model training event."""
return self.log_event(
event_type='model_training',
actor=actor,
resource=f"model/{model_name}/{model_version}",
action='train',
details={
'training_config': training_config,
'metrics': metrics
}
)
def log_model_deployment(self, model_name: str,
model_version: str,
environment: str,
actor: str):
"""Log model deployment event."""
return self.log_event(
event_type='model_deployment',
actor=actor,
resource=f"model/{model_name}/{model_version}",
action='deploy',
details={
'environment': environment
}
)
def log_data_access(self, dataset_name: str,
access_type: str,
actor: str,
purpose: str):
"""Log data access event."""
return self.log_event(
event_type='data_access',
actor=actor,
resource=f"dataset/{dataset_name}",
action=access_type,
details={
'purpose': purpose
}
)
def log_fairness_audit(self, model_name: str,
model_version: str,
fairness_results: Dict,
actor: str):
"""Log fairness audit event."""
return self.log_event(
event_type='fairness_audit',
actor=actor,
resource=f"model/{model_name}/{model_version}",
action='audit_fairness',
details={
'fairness_results': fairness_results
}
)
def get_audit_history(self, resource: str = None,
event_type: str = None,
start_date: datetime = None,
end_date: datetime = None) -> List[Dict]:
"""Get audit history with filters."""
filtered_log = self.audit_log
if resource:
filtered_log = [e for e in filtered_log if resource in e['resource']]
if event_type:
filtered_log = [e for e in filtered_log if e['event_type'] == event_type]
if start_date:
filtered_log = [
e for e in filtered_log
if datetime.fromisoformat(e['timestamp']) >= start_date
]
if end_date:
filtered_log = [
e for e in filtered_log
if datetime.fromisoformat(e['timestamp']) <= end_date
]
return filtered_log
def verify_audit_integrity(self) -> Dict:
"""Verify integrity of audit log."""
verification_results = {
'total_events': len(self.audit_log),
'valid_events': 0,
'invalid_events': 0,
'verification_timestamp': datetime.now().isoformat()
}
for event in self.audit_log:
stored_checksum = event.get('checksum')
calculated_checksum = self._calculate_checksum(event)
if stored_checksum == calculated_checksum:
verification_results['valid_events'] += 1
else:
verification_results['invalid_events'] += 1
verification_results['integrity_check'] = (
'passed' if verification_results['invalid_events'] == 0 else 'failed'
)
return verification_results
def _generate_event_id(self) -> str:
"""Generate unique event ID."""
timestamp = datetime.now().isoformat()
random_part = hashlib.md5(timestamp.encode()).hexdigest()[:8]
return f"event_{timestamp}_{random_part}"
def _calculate_checksum(self, event: Dict) -> str:
"""Calculate checksum for event integrity."""
event_copy = event.copy()
event_copy.pop('checksum', None)
event_string = json.dumps(event_copy, sort_keys=True, default=str)
return hashlib.sha256(event_string.encode()).hexdigest()
Summary
ML governance is essential for responsible AI:
- Model Cards: Document model capabilities, limitations, and intended use
- Explainability: Use SHAP/LIME for transparent predictions
- Fairness: Detect and mitigate bias across protected attributes
- Compliance: Implement GDPR and EU AI Act requirements
- Audit Trails: Maintain complete records for accountability
Implement comprehensive governance to ensure responsible AI deployment.