The Interview Question
ℹ️
Question: You receive a dataset with 30% missing values across multiple columns, inconsistent date formats, duplicate records, and values outside expected ranges. Walk through your complete data cleaning pipeline:
- How do you identify and categorize different types of data quality issues?
- What imputation strategies would you use for different types of missing data?
- How do you handle duplicate records?
- How do you validate your cleaning process?
Detailed Answer
1. Data Quality Assessment Framework
Before cleaning, you must understand the nature and extent of data quality issues.
import pandas as pd
import numpy as np
from datetime import datetime
import re
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
class DataQualityAssessor:
"""Comprehensive data quality assessment"""
def __init__(self, df):
self.df = df
self.issues = []
self.quality_score = 0
def assess_completeness(self) -> Dict:
"""Assess missing value patterns"""
missing = self.df.isnull().sum()
missing_pct = (missing / len(self.df) * 100).round(2)
completeness_issues = {
'total_cells': self.df.size,
'missing_cells': missing.sum(),
'completeness_rate': 1 - missing.sum() / self.df.size,
'columns_with_missing': (missing > 0).sum(),
'critical_columns': missing_pct[missing_pct > 50].to_dict(),
'high_missing': missing_pct[(missing_pct > 20) & (missing_pct <= 50)].to_dict(),
'medium_missing': missing_pct[(missing_pct > 5) & (missing_pct <= 20)].to_dict(),
'low_missing': missing_pct[(missing_pct > 0) & (missing_pct <= 5)].to_dict()
}
# MCAR test (simplified)
missing_matrix = self.df.isnull().astype(int)
mcar_correlations = missing_matrix.corrwith(missing_matrix.sum(axis=1))
completeness_issues['mcar_likelihood'] = 'High' if mcar_correlations.abs().mean() < 0.1 else 'Low'
return completeness_issues
def assess_consistency(self) -> Dict:
"""Check for consistency issues"""
consistency_issues = {}
# Check for inconsistent formats in string columns
string_cols = self.df.select_dtypes(include=['object']).columns
for col in string_cols:
# Check for mixed case
unique_vals = self.df[col].dropna().unique()
if len(unique_vals) > 0:
has_upper = any(str(v)[0].isupper() for v in unique_vals[:100])
has_lower = any(str(v)[0].islower() for v in unique_vals[:100])
if has_upper and has_lower:
consistency_issues[f'{col}_case'] = 'Mixed case detected'
# Check for date format inconsistencies
date_cols = self.df.select_dtypes(include=['datetime64']).columns
for col in date_cols:
# Check for reasonable date ranges
min_date = self.df[col].min()
max_date = self.df[col].max()
if min_date.year < 1900 or max_date.year > 2100:
consistency_issues[f'{col}_range'] = f'Unusual date range: {min_date} to {max_date}'
# Check for inconsistent categories
for col in string_cols:
unique_count = self.df[col].nunique()
if unique_count < 20: # Likely categorical
# Check for similar values (fuzzy matching)
unique_vals = self.df[col].dropna().unique()
for i, v1 in enumerate(unique_vals[:50]):
for v2 in unique_vals[i+1:50]:
if self._fuzzy_match(str(v1), str(v2)) > 0.8:
consistency_issues[f'{col}_similar'] = f'Similar values: {v1} vs {v2}'
return consistency_issues
def _fuzzy_match(self, s1: str, s2: str) -> float:
"""Simple fuzzy string matching"""
from difflib import SequenceMatcher
return SequenceMatcher(None, s1.lower(), s2.lower()).ratio()
def assess_accuracy(self) -> Dict:
"""Check for accuracy issues (values outside expected ranges)"""
accuracy_issues = {}
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
data = self.df[col].dropna()
if len(data) == 0:
continue
# Check for outliers using IQR
Q1, Q3 = data.quantile(0.25), data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 3 * IQR
upper_bound = Q3 + 3 * IQR
outliers = data[(data < lower_bound) | (data > upper_bound)]
if len(outliers) > 0:
accuracy_issues[col] = {
'n_outliers': len(outliers),
'pct_outliers': len(outliers) / len(data) * 100,
'range': f'{data.min():.2f} to {data.max():.2f}',
'expected_range': f'{lower_bound:.2f} to {upper_bound:.2f}'
}
# Check for negative values where not expected
for col in ['age', 'quantity', 'price', 'amount']:
if col in self.df.columns:
negatives = (self.df[col] < 0).sum()
if negatives > 0:
accuracy_issues[f'{col}_negative'] = f'{negatives} negative values found'
return accuracy_issues
def assess_uniqueness(self) -> Dict:
"""Check for duplicate records"""
uniqueness_issues = {}
# Exact duplicates
exact_dupes = self.df.duplicated().sum()
uniqueness_issues['exact_duplicates'] = exact_dupes
uniqueness_issues['exact_duplicate_pct'] = exact_dupes / len(self.df) * 100
# Potential key columns (high uniqueness)
potential_keys = []
for col in self.df.columns:
if self.df[col].nunique() / len(self.df) > 0.95:
potential_keys.append(col)
uniqueness_issues['potential_key_columns'] = potential_keys
# Near duplicates (similar but not exact)
if len(self.df) <= 10000: # Only for smaller datasets
numeric_cols = self.df.select_dtypes(include=[np.number]).columns[:5]
if len(numeric_cols) > 1:
from sklearn.metrics.pairwise import cosine_similarity
sample = self.df[numeric_cols].dropna().head(1000)
if len(sample) > 0:
sim_matrix = cosine_similarity(sample)
np.fill_diagonal(sim_matrix, 0)
high_sim_pairs = np.where(sim_matrix > 0.95)
uniqueness_issues['near_duplicate_pairs'] = len(high_sim_pairs[0])
return uniqueness_issues
def generate_quality_report(self) -> Dict:
"""Generate comprehensive quality report"""
report = {
'completeness': self.assess_completeness(),
'consistency': self.assess_consistency(),
'accuracy': self.assess_accuracy(),
'uniqueness': self.assess_uniqueness()
}
# Calculate overall quality score
completeness_score = report['completeness']['completeness_rate'] * 100
consistency_score = max(0, 100 - len(report['consistency']) * 10)
accuracy_score = max(0, 100 - sum(v['pct_outliers'] for v in report['accuracy'].values() if isinstance(v, dict)))
uniqueness_score = max(0, 100 - report['uniqueness']['exact_duplicate_pct'])
report['overall_score'] = {
'completeness': completeness_score,
'consistency': consistency_score,
'accuracy': accuracy_score,
'uniqueness': uniqueness_score,
'overall': (completeness_score + consistency_score + accuracy_score + uniqueness_score) / 4
}
return report
# Example usage
# quality_assessor = DataQualityAssessor(df)
# quality_report = quality_assessor.generate_quality_report()
2. Missing Value Imputation Strategies
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings('ignore')
class MissingValueHandler:
"""Comprehensive missing value handling"""
def __init__(self, df):
self.df = df
self.imputation_log = {}
def analyze_missing_patterns(self) -> Dict:
"""Analyze patterns in missing data"""
missing = self.df.isnull()
patterns = {
'total_missing': missing.sum().sum(),
'complete_rows': (missing.sum(axis=1) == 0).sum(),
'rows_with_missing': (missing.sum(axis=1) > 0).sum(),
'columns_with_missing': missing.any().sum(),
'missing_by_column': missing.sum().to_dict(),
'missing_percentage': (missing.sum() / len(self.df) * 100).to_dict()
}
# Check for patterns (MCAR, MAR, MNAR)
if patterns['total_missing'] > 0:
# Little's test approximation
missing_corr = missing.astype(int).corr()
patterns['missing_correlation'] = missing_corr.mean().mean()
if patterns['missing_correlation'] < 0.1:
patterns['likely_type'] = 'MCAR (Missing Completely at Random)'
elif patterns['missing_correlation'] < 0.3:
patterns['likely_type'] = 'MAR (Missing at Random)'
else:
patterns['likely_type'] = 'MNAR (Missing Not at Random)'
return patterns
def impute_numeric(self, columns: List[str], strategy: str = 'auto') -> pd.DataFrame:
"""Impute numeric columns"""
df = self.df.copy()
for col in columns:
if col not in df.columns:
continue
missing_count = df[col].isnull().sum()
if missing_count == 0:
continue
if strategy == 'auto':
# Choose strategy based on data distribution
skewness = df[col].dropna().skew()
if abs(skewness) < 0.5:
# Normal distribution - use mean
imputer = SimpleImputer(strategy='mean')
df[col] = imputer.fit_transform(df[[col]])
self.imputation_log[col] = f'Mean imputation (skewness: {skewness:.2f})'
else:
# Skewed distribution - use median
imputer = SimpleImputer(strategy='median')
df[col] = imputer.fit_transform(df[[col]])
self.imputation_log[col] = f'Median imputation (skewness: {skewness:.2f})'
elif strategy == 'mean':
imputer = SimpleImputer(strategy='mean')
df[col] = imputer.fit_transform(df[[col]])
self.imputation_log[col] = 'Mean imputation'
elif strategy == 'median':
imputer = SimpleImputer(strategy='median')
df[col] = imputer.fit_transform(df[[col]])
self.imputation_log[col] = 'Median imputation'
elif strategy == 'knn':
# KNN imputation
numeric_cols = df.select_dtypes(include=[np.number]).columns
imputer = KNNImputer(n_neighbors=5)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
self.imputation_log[col] = 'KNN imputation'
elif strategy == 'iterative':
# MICE/Iterative imputation
numeric_cols = df.select_dtypes(include=[np.number]).columns
imputer = IterativeImputer(
estimator=RandomForestRegressor(n_estimators=100, random_state=42),
max_iter=10,
random_state=42
)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
self.imputation_log[col] = 'Iterative (MICE) imputation'
self.df = df
return self
def impute_categorical(self, columns: List[str], strategy: str = 'mode') -> pd.DataFrame:
"""Impute categorical columns"""
df = self.df.copy()
for col in columns:
if col not in df.columns:
continue
missing_count = df[col].isnull().sum()
if missing_count == 0:
continue
if strategy == 'mode':
mode_value = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
df[col] = df[col].fillna(mode_value)
self.imputation_log[col] = f'Mode imputation ({mode_value})'
elif strategy == 'constant':
df[col] = df[col].fillna('Missing')
self.imputation_log[col] = 'Constant imputation (Missing)'
elif strategy == 'category':
df[col] = df[col].cat.add_categories('Missing').fillna('Missing')
self.imputation_log[col] = 'New category imputation (Missing)'
self.df = df
return self
def create_missing_indicators(self, columns: List[str]) -> pd.DataFrame:
"""Create indicator variables for missing values"""
df = self.df.copy()
for col in columns:
if col in df.columns:
missing_indicator = f'{col}_is_missing'
df[missing_indicator] = df[col].isnull().astype(int)
self.imputation_log[f'{missing_indicator}'] = 'Missing indicator created'
self.df = df
return self
def impute_with_model(self, target_col: str, feature_cols: List[str]) -> pd.DataFrame:
"""Use model-based imputation"""
df = self.df.copy()
# Split into known and unknown
known_mask = df[target_col].notnull()
unknown_mask = df[target_col].isnull()
if unknown_mask.sum() == 0:
return self
# Train model on known values
X_known = df.loc[known_mask, feature_cols]
y_known = df.loc[known_mask, target_col]
# Use Random Forest
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_known, y_known)
# Predict missing values
X_unknown = df.loc[unknown_mask, feature_cols]
predictions = model.predict(X_unknown)
# Fill in missing values
df.loc[unknown_mask, target_col] = predictions
self.imputation_log[target_col] = f'Model-based imputation (R²: {model.score(X_known, y_known):.3f})'
self.df = df
return self
3. Duplicate Detection and Removal
class DuplicateHandler:
"""Handle duplicate records"""
def __init__(self, df):
self.df = df
self.duplicate_log = {}
def detect_exact_duplicates(self) -> pd.DataFrame:
"""Find exact duplicate rows"""
duplicate_mask = self.df.duplicated(keep='first')
self.duplicate_log['exact_duplicates'] = {
'count': duplicate_mask.sum(),
'percentage': duplicate_mask.sum() / len(self.df) * 100,
'indices': self.df[duplicate_mask].index.tolist()
}
return self.df[duplicate_mask]
def detect_partial_duplicates(self, key_columns: List[str]) -> pd.DataFrame:
"""Find duplicates based on key columns"""
duplicate_mask = self.df.duplicated(subset=key_columns, keep='first')
self.duplicate_log['partial_duplicates'] = {
'key_columns': key_columns,
'count': duplicate_mask.sum(),
'percentage': duplicate_mask.sum() / len(self.df) * 100
}
return self.df[duplicate_mask]
def detect_near_duplicates(self, columns: List[str], threshold: float = 0.95) -> List[Tuple]:
"""Find near-duplicate records using similarity"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Combine columns into single string
combined = self.df[columns].apply(lambda x: ' '.join(x.astype(str)), axis=1)
# Vectorize
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(combined)
# Calculate similarity (sample for large datasets)
sample_size = min(1000, len(self.df))
sample_idx = np.random.choice(len(self.df), sample_size, replace=False)
sample_matrix = tfidf_matrix[sample_idx]
similarity = cosine_similarity(sample_matrix)
# Find high similarity pairs
near_duplicates = []
for i in range(sample_size):
for j in range(i + 1, sample_size):
if similarity[i, j] > threshold:
near_duplicates.append((
self.df.index[sample_idx[i]],
self.df.index[sample_idx[j]],
similarity[i, j]
))
self.duplicate_log['near_duplicates'] = {
'count': len(near_duplicates),
'threshold': threshold,
'pairs': near_duplicates[:10] # Sample
}
return near_duplicates
def remove_duplicates(self, strategy: str = 'keep_first',
key_columns: List[str] = None) -> pd.DataFrame:
"""Remove duplicates based on strategy"""
df = self.df.copy()
if strategy == 'keep_first':
df = df.drop_duplicates(keep='first')
elif strategy == 'keep_last':
df = df.drop_duplicates(keep='last')
elif strategy == 'keep_most_complete':
# Keep row with fewest missing values
df['missing_count'] = df.isnull().sum(axis=1)
df = df.sort_values('missing_count').drop_duplicates(
subset=key_columns if key_columns else df.columns.tolist(),
keep='first'
)
df = df.drop('missing_count', axis=1)
elif strategy == 'aggregate':
# Aggregate duplicates
if key_columns:
agg_dict = {}
for col in df.columns:
if col not in key_columns:
if df[col].dtype in ['int64', 'float64']:
agg_dict[col] = 'mean'
else:
agg_dict[col] = 'first'
df = df.groupby(key_columns).agg(agg_dict).reset_index()
self.duplicate_log['removal'] = {
'strategy': strategy,
'rows_before': len(self.df),
'rows_after': len(df),
'rows_removed': len(self.df) - len(df)
}
self.df = df
return self
def merge_duplicates(self, key_columns: List[str],
value_columns: Dict[str, str]) -> pd.DataFrame:
"""Merge duplicate rows with custom aggregation"""
df = self.df.copy()
# Define aggregation
agg_dict = {}
for col, method in value_columns.items():
if method == 'first':
agg_dict[col] = 'first'
elif method == 'last':
agg_dict[col] = 'last'
elif method == 'mean':
agg_dict[col] = 'mean'
elif method == 'sum':
agg_dict[col] = 'sum'
elif method == 'max':
agg_dict[col] = 'max'
elif method == 'min':
agg_dict[col] = 'min'
# Group and aggregate
df_merged = df.groupby(key_columns).agg(agg_dict).reset_index()
self.duplicate_log['merge'] = {
'key_columns': key_columns,
'value_columns': value_columns,
'rows_before': len(df),
'rows_after': len(df_merged)
}
self.df = df_merged
return self
4. Data Type Conversion and Validation
class DataTypeHandler:
"""Handle data type conversions and validation"""
def __init__(self, df):
self.df = df
self.conversion_log = {}
def convert_dtypes(self, type_mapping: Dict[str, str]) -> pd.DataFrame:
"""Convert columns to specified types"""
df = self.df.copy()
for col, dtype in type_mapping.items():
if col not in df.columns:
continue
try:
if dtype == 'datetime':
# Try multiple date formats
date_formats = [
'%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y',
'%Y-%m-%d %H:%M:%S', '%m/%d/%Y %H:%M:%S'
]
for fmt in date_formats:
try:
df[col] = pd.to_datetime(df[col], format=fmt)
self.conversion_log[col] = f'Datetime (format: {fmt})'
break
except ValueError:
continue
elif dtype == 'numeric':
df[col] = pd.to_numeric(df[col], errors='coerce')
self.conversion_log[col] = 'Numeric'
elif dtype == 'category':
df[col] = df[col].astype('category')
self.conversion_log[col] = 'Category'
elif dtype == 'string':
df[col] = df[col].astype(str)
self.conversion_log[col] = 'String'
elif dtype == 'bool':
# Handle various boolean representations
bool_map = {
'true': True, 'false': False,
'yes': True, 'no': False,
'1': True, '0': False,
't': True, 'f': False
}
df[col] = df[col].str.lower().map(bool_map)
self.conversion_log[col] = 'Boolean'
except Exception as e:
self.conversion_log[col] = f'Conversion failed: {str(e)}'
self.df = df
return self
def validate_ranges(self, validation_rules: Dict[str, Dict]) -> pd.DataFrame:
"""Validate values are within expected ranges"""
df = self.df.copy()
for col, rules in validation_rules.items():
if col not in df.columns:
continue
if 'min' in rules:
mask = df[col] < rules['min']
if mask.any():
df.loc[mask, col] = np.nan
self.conversion_log[f'{col}_min_violation'] = f'{mask.sum()} values below minimum'
if 'max' in rules:
mask = df[col] > rules['max']
if mask.any():
df.loc[mask, col] = np.nan
self.conversion_log[f'{col}_max_violation'] = f'{mask.sum()} values above maximum'
if 'allowed_values' in rules:
mask = ~df[col].isin(rules['allowed_values'])
if mask.any():
df.loc[mask, col] = np.nan
self.conversion_log[f'{col}_invalid'] = f'{mask.sum()} invalid values'
self.df = df
return self
def standardize_text(self, columns: List[str],
operations: List[str] = None) -> pd.DataFrame:
"""Standardize text columns"""
df = self.df.copy()
if operations is None:
operations = ['lowercase', 'strip', 'remove_special']
for col in columns:
if col not in df.columns:
continue
for op in operations:
if op == 'lowercase':
df[col] = df[col].str.lower()
elif op == 'uppercase':
df[col] = df[col].str.upper()
elif op == 'title':
df[col] = df[col].str.title()
elif op == 'strip':
df[col] = df[col].str.strip()
elif op == 'remove_special':
df[col] = df[col].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
elif op == 'normalize_whitespace':
df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
elif op == 'fix_encoding':
# Fix common encoding issues
encoding_fixes = {
'’': "'", '“': '"', 'â€': '"',
'é': 'é', 'è': 'è', 'à ': 'à'
}
for wrong, right in encoding_fixes.items():
df[col] = df[col].str.replace(wrong, right)
self.conversion_log[col] = f'Text standardized ({", ".join(operations)})'
self.df = df
return self
5. Complete Data Cleaning Pipeline
class DataCleaningPipeline:
"""Complete data cleaning pipeline"""
def __init__(self, df):
self.df = df
self.cleaning_log = []
self.quality_before = None
self.quality_after = None
def run_pipeline(self) -> pd.DataFrame:
"""Execute complete cleaning pipeline"""
print("Starting Data Cleaning Pipeline")
print("=" * 60)
# Step 1: Initial assessment
print("\nStep 1: Initial Quality Assessment")
assessor = DataQualityAssessor(self.df)
self.quality_before = assessor.generate_quality_report()
print(f"Quality Score: {self.quality_before['overall_score']['overall']:.1f}/100")
# Step 2: Remove exact duplicates
print("\nStep 2: Removing Duplicates")
dup_handler = DuplicateHandler(self.df)
exact_dupes = dup_handler.detect_exact_duplicates()
print(f"Found {len(exact_dupes)} exact duplicates")
self.df = dup_handler.remove_duplicates(strategy='keep_most_complete')
self.cleaning_log.append(f"Removed {len(exact_dupes)} exact duplicates")
# Step 3: Standardize text
print("\nStep 3: Standardizing Text")
dtype_handler = DataTypeHandler(self.df)
string_cols = self.df.select_dtypes(include=['object']).columns
self.df = dtype_handler.standardize_text(string_cols)
self.cleaning_log.append(f"Standardized {len(string_cols)} text columns")
# Step 4: Convert data types
print("\nStep 4: Converting Data Types")
type_mapping = {
'age': 'numeric',
'signup_date': 'datetime',
'is_active': 'bool'
}
self.df = dtype_handler.convert_dtypes(type_mapping)
self.cleaning_log.append(f"Converted {len(type_mapping)} columns")
# Step 5: Handle missing values
print("\nStep 5: Handling Missing Values")
missing_handler = MissingValueHandler(self.df)
# Create missing indicators
missing_cols = self.df.columns[self.df.isnull().any()].tolist()
self.df = missing_handler.create_missing_indicators(missing_cols)
# Impute numeric columns
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
self.df = missing_handler.impute_numeric(numeric_cols, strategy='auto')
# Impute categorical columns
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
self.df = missing_handler.impute_categorical(categorical_cols, strategy='mode')
self.cleaning_log.append(f"Imputed {len(missing_cols)} columns with missing values")
# Step 6: Validate ranges
print("\nStep 6: Validating Ranges")
validation_rules = {
'age': {'min': 0, 'max': 120},
'total_purchases': {'min': 0},
'total_spend': {'min': 0}
}
self.df = dtype_handler.validate_ranges(validation_rules)
self.cleaning_log.append(f"Validated {len(validation_rules)} columns")
# Step 7: Final assessment
print("\nStep 7: Final Quality Assessment")
assessor_final = DataQualityAssessor(self.df)
self.quality_after = assessor_final.generate_quality_report()
print(f"Quality Score: {self.quality_after['overall_score']['overall']:.1f}/100")
# Summary
print("\n" + "=" * 60)
print("CLEANING SUMMARY")
print("=" * 60)
print(f"Rows: {len(self.df)}")
print(f"Columns: {len(self.df.columns)}")
print(f"Quality improvement: {self.quality_before['overall_score']['overall']:.1f} → {self.quality_after['overall_score']['overall']:.1f}")
return self.df
def generate_report(self) -> Dict:
"""Generate cleaning report"""
return {
'cleaning_steps': self.cleaning_log,
'quality_before': self.quality_before,
'quality_after': self.quality_after,
'rows_processed': len(self.df),
'columns_processed': len(self.df.columns)
}
💡
Pro Tip: Always document your cleaning decisions. Future you (and your team) will thank you when you need to understand why certain transformations were applied.
6. Common Follow-Up Questions
Follow-up 1: How do you handle missing data in time series?
def clean_time_series_missing(df, date_column, value_column):
"""Handle missing values in time series data"""
df = df.sort_values(date_column)
# Method 1: Forward fill (for sequential data)
df[f'{value_column}_ffill'] = df[value_column].ffill()
# Method 2: Backward fill
df[f'{value_column}_bfill'] = df[value_column].bfill()
# Method 3: Interpolation
df[f'{value_column}_interpolate'] = df[value_column].interpolate(method='time')
# Method 4: Seasonal decomposition and fill
from statsmodels.tsa.seasonal import seasonal_decompose
# Fill with seasonal pattern
if len(df) > 24: # Need enough data for decomposition
decomposition = seasonal_decompose(df[value_column].fillna(method='ffill'), model='additive', period=24)
seasonal = decomposition.seasonal
df[f'{value_column}_seasonal_fill'] = df[value_column].fillna(seasonal)
# Method 5: Rolling statistics
df[f'{value_column}_rolling_mean'] = df[value_column].rolling(window=24, min_periods=1).mean()
return df
Follow-up 2: How do you handle data cleaning at scale?
# Distributed data cleaning with Spark
def clean_with_spark(spark_df):
"""Example of data cleaning with PySpark"""
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Remove duplicates
spark_df = spark_df.dropDuplicates()
# Handle missing values
numeric_cols = [c for c, t in spark_df.dtypes if t in ['int', 'double']]
for col in numeric_cols:
spark_df = spark_df.withColumn(
col,
F.when(F.col(col).isNull(), F.mean(col).over(Window.partitionBy())).otherwise(F.col(col))
)
# Standardize text
string_cols = [c for c, t in spark_df.dtypes if t == 'string']
for col in string_cols:
spark_df = spark_df.withColumn(col, F.lower(F.trim(F.col(col))))
return spark_df
Company-Specific Tips
ℹ️
Google Tips:
- Google values reproducible cleaning pipelines
- Be prepared to discuss distributed data cleaning
- Know how to handle data cleaning in BigQuery
- Understand privacy-preserving cleaning techniques
Microsoft Tips:
- Microsoft focuses on enterprise data quality
- Know how to use Azure Data Factory for cleaning
- Be comfortable with SQL-based cleaning
- Understand data governance and lineage
Quiz Section
Related Topics
- Data Profiling — Assessing data quality before cleaning
- Feature Engineering — Building features from clean data
- Statistical Analysis — Formal analysis after cleaning
- Data Pipelines — Automating cleaning processes