πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

15. Data Quality in PySpark

🟒 Free Lesson

Advertisement

15. Data Quality in PySpark

DfData Quality

Data quality refers to the fitness of data for its intended uses in operations, decision-making, and planning. It encompasses dimensions like accuracy, completeness, consistency, timeliness, validity, and uniqueness.

DfData Validation

Data validation is the process of ensuring that data meets defined quality constraints before it is used in downstream processing. Validation can be performed at ingestion, during transformation, or before loading.

Data Quality Score Formula
DQS=βˆ‘i=1nwiβ‹…scoreiβˆ‘i=1nwiDQS = \frac{\sum_{i=1}^{n} w_i \cdot score_i}{\sum_{i=1}^{n} w_i}

Here,

  • DQSDQS=Overall Data Quality Score (0-100%)
  • wiw_i=Weight of quality dimension i
  • scoreiscore_i=Score for quality dimension i (0-100%)
  • nn=Number of quality dimensions evaluated

Anomaly Detection Z-Score Threshold

z=xβˆ’ΞΌΟƒ>thresholdz = \frac{x - \mu}{\sigma} > threshold

Here,

  • zz=Z-score of data point (standard deviations from mean)
  • xx=Data point value
  • ΞΌ\mu=Mean of the distribution
  • Οƒ\sigma=Standard deviation of the distribution
  • thresholdthreshold=Detection threshold (typically 2.5-3.0)

Data quality is not a one-time activity but a continuous process. Automated validation, monitoring, and alerting should be embedded in every stage of the data pipeline to catch issues early.

Use Delta Lake's ACID transactions and time travel features to implement data quality guarantees. Combine with Great Expectations or Deequ for declarative validation rules.

ThData Quality Dimensions

Theorem: Complete data quality assessment requires evaluating six dimensions: (1) Accuracy β€” data correctly represents real-world entities; (2) Completeness β€” all required data is present; (3) Consistency β€” data is uniform across systems; (4) Timeliness β€” data is available when needed; (5) Validity β€” data conforms to defined formats/rules; (6) Uniqueness β€” no unwanted duplicates exist.

  • Data quality encompasses six dimensions: accuracy, completeness, consistency, timeliness, validity, uniqueness
  • Quality scores combine weighted dimension scores for overall assessment
  • Z-score anomaly detection identifies statistical outliers in data distributions
  • Automated validation should be embedded at every pipeline stage
  • Delta Lake + Great Expectations/Deequ provides declarative quality guarantees

Six Quality Dimensions Wheel

DataQualityAccuracyCompletenessConsistencyTimelinessValidityUniquenessValidation Pipeline FlowIngest β†’ Schema Check β†’ Constraint Validate β†’ Anomaly Detect β†’ Quality Score β†’ Alert/BlockDQS = Ξ£(wi Γ— scorei) / Ξ£(wi)

πŸ—οΈ Data Quality Architecture

πŸ“š Detailed Explanation

Why Data Quality Matters

  • Poor data quality β†’ incorrect business decisions, failed processes, loss of trust
  • Requires validation rules, anomaly detection, and continuous monitoring
  • Must be embedded at every stage of the data pipeline

Six Quality Dimensions

DimensionWhat It ChecksExample
AccuracyData correctly represents real-world entitiesCorrect prices, addresses
CompletenessAll required data is presentNo missing values
ConsistencyData is uniform across systemsSame format everywhere
TimelinessData is available when neededFresh, up-to-date data
ValidityData conforms to defined formats/rulesValid email formats
UniquenessNo unwanted duplicates existOne record per entity

Key Takeaway: A comprehensive quality framework evaluates all six dimensions and provides a holistic quality score.


Validation Rules

Rule TypeComplexityExample
Null checksSimpleColumn cannot be null
Range validationSimpleValue between 0 and 1M
Format validationModerateRegex for email/phone
Foreign key relationshipsComplexReferential integrity

Implementation options:

  • DataFrame operations
  • SQL constraints
  • External libraries (Great Expectations, Deequ)

Anomaly Detection

  • Identifies data points deviating significantly from expected patterns
  • Statistical methods:
    • Z-score analysis
    • Distribution profiling
  • Detects:
    • Sudden spikes in null values
    • Dramatic shifts in value distributions
    • Upstream data problems

Delta Lake Quality Features

FeatureQuality Benefit
ACID transactionsAtomic writes prevent partial data
Schema enforcementPrevents invalid data from being written
Time travelEnables data rollback on quality issues

Continuous Monitoring

  1. Track quality metrics over time
  2. Set up alerts for quality degradation
  3. Establish audit trails for compliance
  4. Automate checks at every pipeline stage (ingestion β†’ serving)

Data Quality Tools

ToolApproachBest For
Great ExpectationsDeclarative, built-in expectationsStandard validation
DeequBuilt on Spark, scalableLarge-scale analysis
DataFrame opsMaximum flexibilityCustom validation logic

Testing Pyramid

  1. Unit tests β€” individual validation rules
  2. Integration tests β€” pipeline validation
  3. End-to-end tests β€” complete workflows
  4. Production monitoring β€” continuous quality assurance

Key Takeaway: Each testing level provides different insights and catches different types of issues.


Advanced Techniques

  • Data profiling β€” automatically discovers patterns and constraints
  • Data observability β€” monitors health across entire data ecosystem
  • Move from reactive fixes to proactive quality management

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Data ValidationChecking data against defined rulesIngestion, transformation, loading
Anomaly DetectionIdentifying statistical outliersMonitoring, alerting
Data ProfilingAnalyzing data characteristicsDiscovery, documentation
Quality ScoringQuantifying overall data fitnessReporting, SLAs
Schema EnforcementEnsuring data structure complianceWrite operations
Time TravelAccessing historical data versionsRollback, auditing
Data LineageTracking data transformationsCompliance, debugging
AlertingNotifying stakeholders of quality issuesOperations, monitoring

πŸ’» Code Examples

Basic Data Validation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DataValidation") \
    .getOrCreate()

# Read data
df = spark.read \
    .format("json") \
    .load("/path/to/data")

# Define validation rules
# Rule 1: No null values in critical columns
# Parameter: isNotNull() β€” checks that column value is not null
df_not_null = df.filter(
    col("user_id").isNotNull() &
    col("email").isNotNull() &
    col("event_date").isNotNull()
)

# Rule 2: Data type validation
# Parameter: cast() β€” attempts to cast to target type, null if fails
df_type_valid = df.filter(
    col("amount").cast(DoubleType()).isNotNull() &
    col("event_date").cast(DateType()).isNotNull()
)

# Rule 3: Value range validation
# Parameter: between() β€” checks if value is within range
df_range_valid = df.filter(
    (col("amount") >= 0) &
    (col("amount") <= 1000000) &
    col("amount").between(0, 1000000)  # Alternative syntax
)

# Rule 4: Format validation using regex
# Parameter: rlike() β€” checks if string matches regex pattern
df_format_valid = df.filter(
    col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") &
    col("phone").rlike(r"^\+?[1-9]\d{1,14}$")  # E.164 format
)

# Combine all validations
df_valid = df.join(df_not_null.select("id"), "id", "inner") \
    .join(df_type_valid.select("id"), "id", "inner") \
    .join(df_range_valid.select("id"), "id", "inner") \
    .join(df_format_valid.select("id"), "id", "inner")

print(f"Original records: {df.count()}")
print(f"Valid records: {df_valid.count()}")
print(f"Validation rate: {df_valid.count() / df.count() * 100:.2f}%")

Statistical Anomaly Detection

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("AnomalyDetection") \
    .getOrCreate()

# Read data
df = spark.read \
    .format("delta") \
    .load("/path/to/data")

# Calculate statistics for anomaly detection
# Parameter: mean(), stddev() β€” aggregate functions for statistics
stats = df.select(
    mean("amount").alias("mean_amount"),
    stddev("amount").alias("stddev_amount"),
    count("amount").alias("count_amount"),
    min("amount").alias("min_amount"),
    max("amount").alias("max_amount")
).collect()[0]

# Apply Z-score anomaly detection
# Parameter: when().otherwise() β€” conditional column creation
df_with_anomalies = df.withColumn(
    "z_score",
    (col("amount") - lit(stats["mean_amount"])) / lit(stats["stddev_amount"])
).withColumn(
    "is_anomaly",
    when(abs(col("z_score")) > 3.0, True).otherwise(False)  # 3-sigma rule
).withColumn(
    "anomaly_type",
    when(col("z_score") > 3.0, "HIGH_VALUE")
    .when(col("z_score") < -3.0, "LOW_VALUE")
    .otherwise("NORMAL")
)

# Calculate anomaly statistics
anomaly_stats = df_with_anomalies.groupBy("is_anomaly").agg(
    count("*").alias("record_count"),
    avg("amount").alias("avg_amount"),
    stddev("amount").alias("stddev_amount")
)

print("Anomaly Statistics:")
anomaly_stats.show()

# Detect distribution anomalies using quantiles
# Parameter: approx_quantile() β€” approximate quantile calculation
quantiles = df.select(
    expr("percentile_approx(amount, 0.25)").alias("q1"),
    expr("percentile_approx(amount, 0.50)").alias("median"),
    expr("percentile_approx(amount, 0.75)").alias("q3")
).collect()[0]

iqr = quantiles["q3"] - quantiles["q1"]
lower_bound = quantiles["q1"] - 1.5 * iqr
upper_bound = quantiles["q3"] + 1.5 * iqr

# Mark IQR-based anomalies
df_iqr_anomalies = df.withColumn(
    "iqr_anomaly",
    when(
        (col("amount") < lower_bound) | (col("amount") > upper_bound),
        True
    ).otherwise(False)
)

print(f"IQR bounds: [{lower_bound:.2f}, {upper_bound:.2f}]")
print(f"IQR anomalies: {df_iqr_anomalies.filter(col('iqr_anomaly')).count()}")

Great Expectations Integration

from pyspark.sql import SparkSession
from great_expectations.dataset import SparkDFDataset

spark = SparkSession.builder \
    .appName("GreatExpectations") \
    .getOrCreate()

# Read data into Great Expectations
df = spark.read \
    .format("delta") \
    .load("/path/to/data")

# Wrap Spark DataFrame with Great Expectations
ge_df = SparkDFDataset(df)

# Define expectations
# Parameter: expect_column_values_to_not_be_null β€” null check
ge_df.expect_column_values_to_not_be_null("user_id")

# Parameter: expect_column_values_to_be_unique β€” uniqueness check
ge_df.expect_column_values_to_be_unique("email")

# Parameter: expect_column_values_to_be_in_set β€” value domain check
ge_df.expect_column_values_to_be_in_set(
    "status",
    ["active", "inactive", "pending", "suspended"]
)

# Parameter: expect_column_values_to_match_regex β€” format validation
ge_df.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)

# Parameter: expect_column_values_to_be_between β€” range validation
ge_df.expect_column_values_to_be_between(
    "amount",
    min_value=0,
    max_value=1000000
)

# Parameter: expect_table_row_count_to_be_between β€” volume check
ge_df.expect_table_row_count_to_be_between(
    min_value=1000,
    max_value=1000000
)

# Run validation
results = ge_df.validate()

print(f"Validation success: {results.success}")
print(f"Successful expectations: {results.statistics['successful_expectations']}")
print(f"Unsuccessful expectations: {results.statistics['unsuccessful_expectations']}")

Deequ Validation Framework

from pyspark.sql import SparkSession
from com.amazon.deequ import VerificationSuite, VerificationResult
from com.amazon.deequ.checks import Check, CheckLevel
from com.amazon.deequ.constraints import Constraint

spark = SparkSession.builder \
    .appName("DeequValidation") \
    .getOrCreate()

# Read data
df = spark.read \
    .format("delta") \
    .load("/path/to/data")

# Define verification suite with checks
# Parameter: CheckLevel.Error β€” severity level for check failures
verification_suite = VerificationSuite() \
    .onData(df) \
    .addCheck(
        Check(spark, CheckLevel.Error, "Data Quality Check")
        # Null check
        .isComplete("user_id")
        .isComplete("email")
        .isComplete("event_date")
        
        # Uniqueness check
        .isUnique("email")
        
        # Range check
        .isContainedIn("amount", 0, 1000000)
        
        # Value set check
        .isContainedIn("status", ["active", "inactive", "pending"])
        
        # Compliance check (custom SQL)
        .satisfies(
            "event_date <= current_date()",
            "event_date_not_future",
            CheckLevel.Error,
            "Event date should not be in the future"
        )
        
        # Pattern check
        .matchesPattern(
            "email",
            r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
            "email_format"
        )
    )

# Run verification
result = verification_suite.run()

# Get results
if result.status == VerificationResult.SUCCESS:
    print("All checks passed!")
else:
    print("Some checks failed:")
    for check in result.checkResults:
        if check.status != "SUCCESS":
            print(f"  - {check.check}: {check.status}")

# Get metrics
metrics = result.metrics
print(f"\nMetrics:")
for metric_name, metric_value in metrics.items():
    print(f"  {metric_name}: {metric_value}")

πŸ“ˆ Performance Metrics

MetricOptimalTypicalPoorOptimization
Validation Speed> 1M rows/sec100K-1M rows/sec< 100K rows/secPartitioning, caching
Anomaly Detection< 1% false positives1-5% false positives> 5% false positivesThreshold tuning
Quality Score> 95%80-95%< 80%Rule refinement
Coverage100% critical fields80-100%< 80%Comprehensive rules
Alert Accuracy> 90% actionable70-90%< 70%Alert tuning

πŸ† Best Practices

  1. Define clear quality dimensions β€” Establish what quality means for your data
  2. Implement validation at every stage β€” Ingestion, processing, and serving
  3. Use statistical methods β€” Leverage Z-scores, distributions, and profiling
  4. Automate quality checks β€” Embed validation in CI/CD pipelines
  5. Monitor continuously β€” Track quality metrics over time
  6. Set up alerting β€” Notify stakeholders of quality degradation
  7. Maintain audit trails β€” Keep records of quality checks and results
  8. Use appropriate tools β€” Great Expectations, Deequ, or custom solutions
  9. Test with realistic data β€” Validate with production-like volumes
  10. Iterate and improve β€” Refine rules based on feedback and issues

πŸ”— Related Topics

  • 06-joins-optimization.mdx: Join quality and data consistency
  • 07-partitioning-strategies.mdx: Partitioning for quality validation
  • 14-merge-upsert.mdx: Quality during merge operations
  • 16-schema-evolution.mdx: Schema quality and evolution

See Also

  • 10-serialization-kryo.mdx: Serialization quality and consistency
  • 11-structured-streaming.mdx: Streaming data quality validation
  • Kafka Streams (kafka/03): Stream processing quality guarantees
  • Data Engineering Streaming (data-engineering/022): Real-time data quality patterns
⭐

Premium Content

15. Data Quality in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement