Building a Data Quality Framework from Scratch
Ensuring data reliability and trustworthiness at scale
Interview Question
"Design a data quality framework that: (1) validates data at ingestion, (2) monitors data freshness, (3) detects anomalies, (4) quarantines bad data, (5) alerts stakeholders. How do you measure data quality, and what metrics do you track?"
Difficulty: Hard | Frequently asked at Google, Airbnb, Uber, Netflix
Theoretical Foundation
Data Quality Dimensions
Data quality is measured across multiple dimensions:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Quality Dimensions β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Completeness: Are all expected records present? β
β Metric: % of non-null values β
β Formula: (non-null count / total count) Γ 100 β
β β
β 2. Accuracy: Does data reflect reality? β
β Metric: % of values matching source of truth β
β Formula: (correct count / total count) Γ 100 β
β β
β 3. Consistency: Is data consistent across systems? β
β Metric: % of values matching across sources β
β Formula: (matching values / total values) Γ 100 β
β β
β 4. Timeliness: Is data available when needed? β
β Metric: Time from event to availability β
β Formula: availability_time - event_time β
β β
β 5. Uniqueness: Are records unique? β
β Metric: % of unique records β
β Formula: (unique count / total count) Γ 100 β
β β
β 6. Validity: Does data conform to rules? β
β Metric: % of values passing validation rules β
β Formula: (valid count / total count) Γ 100 β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Quality Score
Where:
- = Completeness score
- = Accuracy score
- = Consistency score
- = Timeliness score
- = Uniqueness score
- = Validity score
- = Weights (sum to 1)
Example weights for different use cases:
| Use Case | Completeness | Accuracy | Consistency | Timeliness | Uniqueness | Validity |
|---|---|---|---|---|---|---|
| Financial reporting | 0.3 | 0.3 | 0.2 | 0.1 | 0.05 | 0.05 |
| Real-time analytics | 0.1 | 0.1 | 0.1 | 0.4 | 0.1 | 0.2 |
| ML training | 0.2 | 0.2 | 0.1 | 0.1 | 0.2 | 0.2 |
Data Quality Pipeline
Validation Rules
Schema Validation
schema_rules = {
"columns": {
"user_id": {"type": "string", "nullable": False},
"email": {"type": "string", "nullable": False, "pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"},
"age": {"type": "integer", "nullable": True, "min": 0, "max": 150},
"signup_date": {"type": "date", "nullable": False},
}
}
Statistical Validation
statistical_rules = {
"row_count": {
"min": 1000,
"max": 10000000,
"expected": 50000,
"tolerance": 0.2 # Β±20%
},
"null_rate": {
"user_id": {"max": 0.0},
"email": {"max": 0.01},
"age": {"max": 0.05},
}
}
Business Rules
business_rules = [
# Foreign key relationships
{"type": "referential", "table": "orders", "column": "user_id",
"references": {"table": "users", "column": "id"}},
# Cross-column validation
{"type": "expression", "condition": "start_date <= end_date"},
# Business logic
{"type": "custom", "function": "validate_email_domain"},
]
Anomaly Detection Methods
1. Statistical Process Control (SPC)
Points outside control limits are anomalies.
2. Z-Score
If , the value is anomalous.
3. IQR Method
4. Time Series Decomposition
Where:
- = Trend component
- = Seasonal component
- = Residual (anomaly if large)
Data Freshness
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Freshness Metrics β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Latency: Time from event to availability β
β Formula: available_time - event_time β
β Target: < 1 hour (batch), < 1 minute (streaming) β
β β
β 2. Freshness: Time since last update β
β Formula: now - last_update_time β
β Target: < schedule_interval + buffer β
β β
β 3. Staleness: How outdated the data is β
β Formula: now - max(timestamp_column) β
β Target: < business_requirement β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Code Implementation
Data Quality Framework Core
from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from enum import Enum
class QualityCheckResult(Enum):
PASS = "pass"
FAIL = "fail"
WARN = "warn"
@dataclass
class QualityCheck:
name: str
description: str
check_func: Callable
severity: str # critical, warning, info
dimensions: List[str] # completeness, accuracy, etc.
@dataclass
class QualityReport:
timestamp: datetime
table_name: str
checks: List[Dict]
overall_score: float
passed: bool
details: Dict
class DataQualityFramework:
def __init__(self, spark_session=None):
self.spark = spark_session
self.checks = {}
self.reports = []
def register_check(self, check: QualityCheck):
"""Register a quality check"""
self.checks[check.name] = check
def run_checks(self, df, table_name: str) -> QualityReport:
"""Run all registered checks on a DataFrame"""
results = []
scores = {}
for name, check in self.checks.items():
try:
result = check.check_func(df)
results.append({
'name': name,
'description': check.description,
'result': result,
'severity': check.severity,
'dimensions': check.dimensions,
})
# Calculate dimension scores
for dim in check.dimensions:
if dim not in scores:
scores[dim] = []
scores[dim].append(1.0 if result == QualityCheckResult.PASS else 0.0)
except Exception as e:
results.append({
'name': name,
'description': check.description,
'result': QualityCheckResult.FAIL,
'severity': check.severity,
'error': str(e),
})
# Calculate overall score
all_scores = []
for dim, dim_scores in scores.items():
all_scores.append(np.mean(dim_scores))
overall_score = np.mean(all_scores) if all_scores else 0.0
# Determine if passed (no critical failures)
critical_failures = [r for r in results
if r['result'] == QualityCheckResult.FAIL
and r['severity'] == 'critical']
report = QualityReport(
timestamp=datetime.now(),
table_name=table_name,
checks=results,
overall_score=overall_score,
passed=len(critical_failures) == 0,
details=scores,
)
self.reports.append(report)
return report
Built-in Quality Checks
class BuiltinChecks:
"""Built-in quality checks"""
@staticmethod
def completeness_check(columns: List[str], max_null_rate: float = 0.0):
"""Check completeness of specified columns"""
def check(df):
for col in columns:
null_rate = df.filter(df[col].isNull()).count() / df.count()
if null_rate > max_null_rate:
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name=f"completeness_{'_'.join(columns)}",
description=f"Check completeness of {columns}",
check_func=check,
severity='critical',
dimensions=['completeness'],
)
@staticmethod
def uniqueness_check(columns: List[str]):
"""Check uniqueness of specified columns"""
def check(df):
total_count = df.count()
distinct_count = df.select(columns).distinct().count()
if total_count != distinct_count:
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name=f"uniqueness_{'_'.join(columns)}",
description=f"Check uniqueness of {columns}",
check_func=check,
severity='critical',
dimensions=['uniqueness'],
)
@staticmethod
def range_check(column: str, min_val=None, max_val=None):
"""Check if values are within range"""
def check(df):
if min_val is not None:
below_min = df.filter(df[col] < min_val).count()
if below_min > 0:
return QualityCheckResult.FAIL
if max_val is not None:
above_max = df.filter(df[col] > max_val).count()
if above_max > 0:
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name=f"range_{column}",
description=f"Check range of {column}",
check_func=check,
severity='warning',
dimensions=['validity'],
)
@staticmethod
def pattern_check(column: str, pattern: str):
"""Check if values match pattern"""
def check(df):
import re
non_matching = df.filter(~df[col].rlike(pattern)).count()
if non_matching > 0:
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name=f"pattern_{column}",
description=f"Check pattern of {column}",
check_func=check,
severity='warning',
dimensions=['validity'],
)
@staticmethod
def freshness_check(timestamp_column: str, max_age_hours: int = 24):
"""Check data freshness"""
def check(df):
from pyspark.sql import functions as F
max_timestamp = df.agg(F.max(timestamp_column)).collect()[0][0]
if max_timestamp is None:
return QualityCheckResult.FAIL
age = datetime.now() - max_timestamp
if age > timedelta(hours=max_age_hours):
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name=f"freshness_{timestamp_column}",
description=f"Check freshness of {timestamp_column}",
check_func=check,
severity='critical',
dimensions=['timeliness'],
)
@staticmethod
def row_count_check(min_rows: int, max_rows: int = None):
"""Check row count is within bounds"""
def check(df):
count = df.count()
if count < min_rows:
return QualityCheckResult.FAIL
if max_rows and count > max_rows:
return QualityCheckResult.FAIL
return QualityCheckResult.PASS
return QualityCheck(
name="row_count",
description=f"Check row count between {min_rows} and {max_rows}",
check_func=check,
severity='critical',
dimensions=['completeness'],
)
Anomaly Detection
class AnomalyDetector:
"""Detect anomalies in data"""
def __init__(self, spark_session):
self.spark = spark_session
def detect_statistical_anomalies(self, df, column, threshold=3.0):
"""Detect anomalies using Z-score"""
from pyspark.sql import functions as F
# Calculate mean and std
stats = df.agg(
F.mean(column).alias('mean'),
F.stddev(column).alias('stddev')
).collect()[0]
mean = stats['mean']
stddev = stats['stddev']
# Calculate Z-score
anomalies = df.withColumn(
'z_score',
(F.col(column) - mean) / stddev
).filter(F.abs(F.col('z_score')) > threshold)
return anomalies
def detect_iqr_anomalies(self, df, column):
"""Detect anomalies using IQR method"""
from pyspark.sql import functions as F
# Calculate quartiles
quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
q1, q3 = quantiles
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Find anomalies
anomalies = df.filter(
(F.col(column) < lower_bound) |
(F.col(column) > upper_bound)
)
return anomalies
def detect_time_series_anomalies(self, df, timestamp_col, value_col,
window_size=7, threshold=2.0):
"""Detect anomalies in time series data"""
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Calculate rolling average and std
window = Window.orderBy(timestamp_col).rowsBetween(-window_size, -1)
df_with_stats = df.withColumn(
'rolling_avg', F.avg(value_col).over(window)
).withColumn(
'rolling_std', F.stddev(value_col).over(window)
)
# Detect anomalies
anomalies = df_with_stats.withColumn(
'z_score',
(F.col(value_col) - F.col('rolling_avg')) / F.col('rolling_std')
).filter(F.abs(F.col('z_score')) > threshold)
return anomalies
Data Quarantine
class DataQuarantine:
"""Quarantine bad data for investigation"""
def __init__(self, spark_session, quarantine_path):
self.spark = spark_session
self.quarantine_path = quarantine_path
def quarantine_records(self, bad_records, reason, table_name):
"""Quarantine bad records"""
from pyspark.sql import functions as F
# Add metadata
quarantined = bad_records \
.withColumn('_quarantine_reason', F.lit(reason)) \
.withColumn('_quarantine_timestamp', F.current_timestamp()) \
.withColumn('_source_table', F.lit(table_name))
# Write to quarantine storage
quarantined.write \
.format("delta") \
.mode("append") \
.partitionBy("_quarantine_timestamp") \
.save(f"{self.quarantine_path}/{table_name}")
return quarantined.count()
def get_quarantine_stats(self, table_name):
"""Get quarantine statistics"""
from pyspark.sql import functions as F
quarantine_df = self.spark.read \
.format("delta") \
.load(f"{self.quarantine_path}/{table_name}")
stats = quarantine_df.groupBy('_quarantine_reason') \
.agg(
F.count("*").alias("count"),
F.min("_quarantine_timestamp").alias("first_seen"),
F.max("_quarantine_timestamp").alias("last_seen")
)
return stats
Monitoring and Alerting
class DataQualityMonitor:
"""Monitor data quality and send alerts"""
def __init__(self, alert_config):
self.alert_config = alert_config
def send_alert(self, report: QualityReport):
"""Send alert based on quality report"""
if not report.passed:
self._send_critical_alert(report)
elif report.overall_score < 0.9:
self._send_warning_alert(report)
def _send_critical_alert(self, report):
"""Send critical alert"""
import requests
critical_failures = [c for c in report.checks
if c['result'] == QualityCheckResult.FAIL
and c['severity'] == 'critical']
message = f"""
π¨ CRITICAL DATA QUALITY FAILURE
Table: {report.table_name}
Time: {report.timestamp}
Score: {report.overall_score:.2%}
Failed Checks:
{chr(10).join(f"- {c['name']}: {c.get('error', 'Failed')}" for c in critical_failures)}
Please investigate immediately.
"""
# Send to Slack/PagerDuty
requests.post(
self.alert_config['slack_webhook'],
json={'text': message}
)
def _send_warning_alert(self, report):
"""Send warning alert"""
import requests
message = f"""
β οΈ Data Quality Warning
Table: {report.table_name}
Time: {report.timestamp}
Score: {report.overall_score:.2%}
Some checks are below threshold.
"""
requests.post(
self.alert_config['slack_webhook'],
json={'text': message}
)
Complete Example
# ============================================================
# COMPLETE DATA QUALITY FRAMEWORK EXAMPLE
# ============================================================
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DataQualityFramework") \
.getOrCreate()
# Initialize framework
dq_framework = DataQualityFramework(spark)
# Register checks
dq_framework.register_check(
BuiltinChecks.completeness_check(['user_id', 'email'], max_null_rate=0.01)
)
dq_framework.register_check(
BuiltinChecks.uniqueness_check(['user_id'])
)
dq_framework.register_check(
BuiltinChecks.range_check('age', min_val=0, max_val=150)
)
dq_framework.register_check(
BuiltinChecks.freshness_check('updated_at', max_age_hours=24)
)
dq_framework.register_check(
BuiltinChecks.row_count_check(min_rows=1000)
)
# Read data
df = spark.read.parquet("s3://data-lake/users/")
# Run quality checks
report = dq_framework.run_checks(df, "users")
# Process report
if not report.passed:
# Quarantine bad data
quarantine = DataQuarantine(spark, "s3://quarantine/")
# Find failing records (example)
bad_records = df.filter(df.user_id.isNull())
quarantine.quarantine_records(bad_records, "null_user_id", "users")
# Send alerts
monitor = DataQualityMonitor({
'slack_webhook': Variable.get('slack_webhook_url')
})
monitor.send_alert(report)
# Generate quality report
print(f"Overall Score: {report.overall_score:.2%}")
print(f"Passed: {report.passed}")
for check in report.checks:
print(f" {check['name']}: {check['result'].value}")
Great Expectations Integration
import great_expectations as ge
# ============================================================
# GREAT EXPECTATIONS INTEGRATION
# ============================================================
# Convert to Great Expectations DataFrame
ge_df = ge.from_pandas(df.toPandas())
# Define expectations
ge_df.expect_column_values_to_not_be_null('user_id')
ge_df.expect_column_values_to_be_unique('user_id')
ge_df.expect_column_values_to_be_between('age', min_value=0, max_value=150)
ge_df.expect_column_values_to_match_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$')
# Validate
results = ge_df.validate()
# Check results
if not results.success:
print("Validation failed!")
for result in results.results:
if not result.success:
print(f" {result.expectation_config.expectation_type}: {result.result}")
π‘
Production Tip: Start with critical checks (completeness, freshness) and gradually add more sophisticated checks. Don't try to implement everything at onceβiterate based on actual data issues you encounter.
Common Follow-Up Questions
Q1: How do you measure data quality ROI?
Track metrics:
- Incidents prevented: Number of bad data incidents caught before impact
- Time saved: Hours spent on data debugging reduced
- Revenue protected: Business impact of data quality improvements
- Customer satisfaction: Reduction in data-related complaints
Q2: How do you handle data quality in real-time?
# Real-time quality checks with Spark Streaming
streaming_df = spark.readStream \
.format("kafka") \
.load()
# Apply quality checks
quality_checked = streaming_df \
.withColumn("is_valid",
(F.col("user_id").isNotNull()) &
(F.col("amount") > 0)
)
# Split into good and bad streams
good_stream = quality_checked.filter(F.col("is_valid"))
bad_stream = quality_checked.filter(~F.col("is_valid"))
# Write to separate sinks
good_stream.writeStream.format("delta").start("s3://good-data/")
bad_stream.writeStream.format("delta").start("s3://quarantine/")
Q3: How do you handle data quality across teams?
- Data contracts: Define quality expectations between teams
- SLAs: Agree on freshness and quality targets
- Shared metrics: Common quality dashboards
- Escalation process: Clear ownership for quality issues
Q4: How do you prioritize quality checks?
Prioritize based on:
- Business impact: What's the cost of bad data?
- Frequency: How often does this issue occur?
- Detection difficulty: How easy is it to detect?
- Remediation cost: How hard is it to fix?
β οΈ
Critical Consideration: Data quality is not just a technical problemβit's a business problem. Work with stakeholders to define quality requirements, and focus on the checks that provide the most business value.
Company-Specific Tips
Google Interview Tips
- Discuss data contracts and schema evolution
- Explain anomaly detection at scale
- Mention data freshness monitoring
- Talk about automated remediation
Airbnb Interview Tips
- Focus on data trust and user-facing metrics
- Discuss A/B testing data quality
- Mention ML data quality for recommendations
- Talk about cost optimization for quality checks
Uber Interview Tips
- Discuss real-time data quality for ride matching
- Explain geospatial data validation
- Mention payment data accuracy
- Talk about multi-region quality monitoring
βΉοΈ
Final Takeaway: Data quality is a continuous process, not a one-time project. Start with critical checks, monitor constantly, and iterate based on actual issues. The goal is not perfect data, but reliable data that stakeholders can trust.