πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Quality at Scale: Frameworks & Automation

Data EngineeringData Quality⭐ Premium

Advertisement

Data Quality at Scale: Frameworks & Automation

Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Stripe, Spotify

1. Data Quality Dimensions

DimensionDefinitionExample
CompletenessNo missing valuesnull_ratio < 0.01
AccuracyValues match realityamount >= 0
ConsistencyNo contradictionsorder_date <= ship_date
TimelinessData is freshlast_updated < 1hr ago
UniquenessNo duplicatescount(*) = count(DISTINCT id)
ValidityFollows format rulesemail LIKE '%@%.%'

2. Great Expectations Framework

import great_expectations as gx

context = gx.get_context()

# Define expectations
validator = context.sources.pandas_default.read_csv("data/orders.csv")

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)

# Run validation
results = validator.validate()

# Build checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
checkpoint.run()

3. Automated Quality Pipeline

class DataQualityPipeline:
    def __init__(self, spark):
        self.spark = spark
        self.results = []
    
    def check_completeness(self, df, columns, threshold=0.99):
        for col in columns:
            ratio = df.filter(F.col(col).isNotNull()).count() / df.count()
            self.results.append({
                "check": f"completeness_{col}",
                "passed": ratio >= threshold,
                "value": ratio,
                "threshold": threshold,
            })
    
    def check_uniqueness(self, df, key_columns):
        total = df.count()
        distinct = df.select(key_columns).distinct().count()
        ratio = distinct / total
        self.results.append({
            "check": f"uniqueness_{'_'.join(key_columns)}",
            "passed": ratio >= 0.999,
            "value": ratio,
        })
    
    def check_freshness(self, table_name, timestamp_col, max_age_minutes=60):
        max_ts = self.spark.sql(f"SELECT MAX({timestamp_col}) FROM {table_name}").collect()[0][0]
        age_minutes = (datetime.now() - max_ts).total_seconds() / 60
        self.results.append({
            "check": f"freshness_{table_name}",
            "passed": age_minutes <= max_age_minutes,
            "age_minutes": age_minutes,
        })
    
    def check_volume_anomaly(self, table_name, current_count, historical_avg, historical_std):
        z_score = (current_count - historical_avg) / historical_std if historical_std > 0 else 0
        self.results.append({
            "check": f"volume_{table_name}",
            "passed": abs(z_score) <= 3,
            "z_score": z_score,
        })
    
    def get_summary(self):
        passed = sum(1 for r in self.results if r["passed"])
        return {"total": len(self.results), "passed": passed, "failed": len(self.results) - passed}

ℹ️

Best Practice: Fail fast β€” run quality checks at ingestion, not after transformation. A bad record should never enter your pipeline.

Follow-Up Questions

  1. How would you implement data quality monitoring for 10,000+ tables?
  2. Design a data quality scoring system for executive dashboards.
  3. How do you handle quality checks for streaming data?
  4. Design an automated root cause analysis system for quality failures.
  5. How would you measure the business impact of data quality issues?

Advertisement