15. Data Quality in PySpark
DfData Quality
Data quality refers to the fitness of data for its intended uses in operations, decision-making, and planning. It encompasses dimensions like accuracy, completeness, consistency, timeliness, validity, and uniqueness.
DfData Validation
Data validation is the process of ensuring that data meets defined quality constraints before it is used in downstream processing. Validation can be performed at ingestion, during transformation, or before loading.
Here,
- =Overall Data Quality Score (0-100%)
- =Weight of quality dimension i
- =Score for quality dimension i (0-100%)
- =Number of quality dimensions evaluated
Anomaly Detection Z-Score Threshold
Here,
- =Z-score of data point (standard deviations from mean)
- =Data point value
- =Mean of the distribution
- =Standard deviation of the distribution
- =Detection threshold (typically 2.5-3.0)
Data quality is not a one-time activity but a continuous process. Automated validation, monitoring, and alerting should be embedded in every stage of the data pipeline to catch issues early.
Use Delta Lake's ACID transactions and time travel features to implement data quality guarantees. Combine with Great Expectations or Deequ for declarative validation rules.
ThData Quality Dimensions
Theorem: Complete data quality assessment requires evaluating six dimensions: (1) Accuracy β data correctly represents real-world entities; (2) Completeness β all required data is present; (3) Consistency β data is uniform across systems; (4) Timeliness β data is available when needed; (5) Validity β data conforms to defined formats/rules; (6) Uniqueness β no unwanted duplicates exist.
- Data quality encompasses six dimensions: accuracy, completeness, consistency, timeliness, validity, uniqueness
- Quality scores combine weighted dimension scores for overall assessment
- Z-score anomaly detection identifies statistical outliers in data distributions
- Automated validation should be embedded at every pipeline stage
- Delta Lake + Great Expectations/Deequ provides declarative quality guarantees
Six Quality Dimensions Wheel
ποΈ Data Quality Architecture
π Detailed Explanation
Why Data Quality Matters
- Poor data quality β incorrect business decisions, failed processes, loss of trust
- Requires validation rules, anomaly detection, and continuous monitoring
- Must be embedded at every stage of the data pipeline
Six Quality Dimensions
| Dimension | What It Checks | Example |
|---|---|---|
| Accuracy | Data correctly represents real-world entities | Correct prices, addresses |
| Completeness | All required data is present | No missing values |
| Consistency | Data is uniform across systems | Same format everywhere |
| Timeliness | Data is available when needed | Fresh, up-to-date data |
| Validity | Data conforms to defined formats/rules | Valid email formats |
| Uniqueness | No unwanted duplicates exist | One record per entity |
Key Takeaway: A comprehensive quality framework evaluates all six dimensions and provides a holistic quality score.
Validation Rules
| Rule Type | Complexity | Example |
|---|---|---|
| Null checks | Simple | Column cannot be null |
| Range validation | Simple | Value between 0 and 1M |
| Format validation | Moderate | Regex for email/phone |
| Foreign key relationships | Complex | Referential integrity |
Implementation options:
- DataFrame operations
- SQL constraints
- External libraries (Great Expectations, Deequ)
Anomaly Detection
- Identifies data points deviating significantly from expected patterns
- Statistical methods:
- Z-score analysis
- Distribution profiling
- Detects:
- Sudden spikes in null values
- Dramatic shifts in value distributions
- Upstream data problems
Delta Lake Quality Features
| Feature | Quality Benefit |
|---|---|
| ACID transactions | Atomic writes prevent partial data |
| Schema enforcement | Prevents invalid data from being written |
| Time travel | Enables data rollback on quality issues |
Continuous Monitoring
- Track quality metrics over time
- Set up alerts for quality degradation
- Establish audit trails for compliance
- Automate checks at every pipeline stage (ingestion β serving)
Data Quality Tools
| Tool | Approach | Best For |
|---|---|---|
| Great Expectations | Declarative, built-in expectations | Standard validation |
| Deequ | Built on Spark, scalable | Large-scale analysis |
| DataFrame ops | Maximum flexibility | Custom validation logic |
Testing Pyramid
- Unit tests β individual validation rules
- Integration tests β pipeline validation
- End-to-end tests β complete workflows
- Production monitoring β continuous quality assurance
Key Takeaway: Each testing level provides different insights and catches different types of issues.
Advanced Techniques
- Data profiling β automatically discovers patterns and constraints
- Data observability β monitors health across entire data ecosystem
- Move from reactive fixes to proactive quality management
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Data Validation | Checking data against defined rules | Ingestion, transformation, loading |
| Anomaly Detection | Identifying statistical outliers | Monitoring, alerting |
| Data Profiling | Analyzing data characteristics | Discovery, documentation |
| Quality Scoring | Quantifying overall data fitness | Reporting, SLAs |
| Schema Enforcement | Ensuring data structure compliance | Write operations |
| Time Travel | Accessing historical data versions | Rollback, auditing |
| Data Lineage | Tracking data transformations | Compliance, debugging |
| Alerting | Notifying stakeholders of quality issues | Operations, monitoring |
π» Code Examples
Basic Data Validation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("DataValidation") \
.getOrCreate()
# Read data
df = spark.read \
.format("json") \
.load("/path/to/data")
# Define validation rules
# Rule 1: No null values in critical columns
# Parameter: isNotNull() β checks that column value is not null
df_not_null = df.filter(
col("user_id").isNotNull() &
col("email").isNotNull() &
col("event_date").isNotNull()
)
# Rule 2: Data type validation
# Parameter: cast() β attempts to cast to target type, null if fails
df_type_valid = df.filter(
col("amount").cast(DoubleType()).isNotNull() &
col("event_date").cast(DateType()).isNotNull()
)
# Rule 3: Value range validation
# Parameter: between() β checks if value is within range
df_range_valid = df.filter(
(col("amount") >= 0) &
(col("amount") <= 1000000) &
col("amount").between(0, 1000000) # Alternative syntax
)
# Rule 4: Format validation using regex
# Parameter: rlike() β checks if string matches regex pattern
df_format_valid = df.filter(
col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") &
col("phone").rlike(r"^\+?[1-9]\d{1,14}$") # E.164 format
)
# Combine all validations
df_valid = df.join(df_not_null.select("id"), "id", "inner") \
.join(df_type_valid.select("id"), "id", "inner") \
.join(df_range_valid.select("id"), "id", "inner") \
.join(df_format_valid.select("id"), "id", "inner")
print(f"Original records: {df.count()}")
print(f"Valid records: {df_valid.count()}")
print(f"Validation rate: {df_valid.count() / df.count() * 100:.2f}%")
Statistical Anomaly Detection
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("AnomalyDetection") \
.getOrCreate()
# Read data
df = spark.read \
.format("delta") \
.load("/path/to/data")
# Calculate statistics for anomaly detection
# Parameter: mean(), stddev() β aggregate functions for statistics
stats = df.select(
mean("amount").alias("mean_amount"),
stddev("amount").alias("stddev_amount"),
count("amount").alias("count_amount"),
min("amount").alias("min_amount"),
max("amount").alias("max_amount")
).collect()[0]
# Apply Z-score anomaly detection
# Parameter: when().otherwise() β conditional column creation
df_with_anomalies = df.withColumn(
"z_score",
(col("amount") - lit(stats["mean_amount"])) / lit(stats["stddev_amount"])
).withColumn(
"is_anomaly",
when(abs(col("z_score")) > 3.0, True).otherwise(False) # 3-sigma rule
).withColumn(
"anomaly_type",
when(col("z_score") > 3.0, "HIGH_VALUE")
.when(col("z_score") < -3.0, "LOW_VALUE")
.otherwise("NORMAL")
)
# Calculate anomaly statistics
anomaly_stats = df_with_anomalies.groupBy("is_anomaly").agg(
count("*").alias("record_count"),
avg("amount").alias("avg_amount"),
stddev("amount").alias("stddev_amount")
)
print("Anomaly Statistics:")
anomaly_stats.show()
# Detect distribution anomalies using quantiles
# Parameter: approx_quantile() β approximate quantile calculation
quantiles = df.select(
expr("percentile_approx(amount, 0.25)").alias("q1"),
expr("percentile_approx(amount, 0.50)").alias("median"),
expr("percentile_approx(amount, 0.75)").alias("q3")
).collect()[0]
iqr = quantiles["q3"] - quantiles["q1"]
lower_bound = quantiles["q1"] - 1.5 * iqr
upper_bound = quantiles["q3"] + 1.5 * iqr
# Mark IQR-based anomalies
df_iqr_anomalies = df.withColumn(
"iqr_anomaly",
when(
(col("amount") < lower_bound) | (col("amount") > upper_bound),
True
).otherwise(False)
)
print(f"IQR bounds: [{lower_bound:.2f}, {upper_bound:.2f}]")
print(f"IQR anomalies: {df_iqr_anomalies.filter(col('iqr_anomaly')).count()}")
Great Expectations Integration
from pyspark.sql import SparkSession
from great_expectations.dataset import SparkDFDataset
spark = SparkSession.builder \
.appName("GreatExpectations") \
.getOrCreate()
# Read data into Great Expectations
df = spark.read \
.format("delta") \
.load("/path/to/data")
# Wrap Spark DataFrame with Great Expectations
ge_df = SparkDFDataset(df)
# Define expectations
# Parameter: expect_column_values_to_not_be_null β null check
ge_df.expect_column_values_to_not_be_null("user_id")
# Parameter: expect_column_values_to_be_unique β uniqueness check
ge_df.expect_column_values_to_be_unique("email")
# Parameter: expect_column_values_to_be_in_set β value domain check
ge_df.expect_column_values_to_be_in_set(
"status",
["active", "inactive", "pending", "suspended"]
)
# Parameter: expect_column_values_to_match_regex β format validation
ge_df.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# Parameter: expect_column_values_to_be_between β range validation
ge_df.expect_column_values_to_be_between(
"amount",
min_value=0,
max_value=1000000
)
# Parameter: expect_table_row_count_to_be_between β volume check
ge_df.expect_table_row_count_to_be_between(
min_value=1000,
max_value=1000000
)
# Run validation
results = ge_df.validate()
print(f"Validation success: {results.success}")
print(f"Successful expectations: {results.statistics['successful_expectations']}")
print(f"Unsuccessful expectations: {results.statistics['unsuccessful_expectations']}")
Deequ Validation Framework
from pyspark.sql import SparkSession
from com.amazon.deequ import VerificationSuite, VerificationResult
from com.amazon.deequ.checks import Check, CheckLevel
from com.amazon.deequ.constraints import Constraint
spark = SparkSession.builder \
.appName("DeequValidation") \
.getOrCreate()
# Read data
df = spark.read \
.format("delta") \
.load("/path/to/data")
# Define verification suite with checks
# Parameter: CheckLevel.Error β severity level for check failures
verification_suite = VerificationSuite() \
.onData(df) \
.addCheck(
Check(spark, CheckLevel.Error, "Data Quality Check")
# Null check
.isComplete("user_id")
.isComplete("email")
.isComplete("event_date")
# Uniqueness check
.isUnique("email")
# Range check
.isContainedIn("amount", 0, 1000000)
# Value set check
.isContainedIn("status", ["active", "inactive", "pending"])
# Compliance check (custom SQL)
.satisfies(
"event_date <= current_date()",
"event_date_not_future",
CheckLevel.Error,
"Event date should not be in the future"
)
# Pattern check
.matchesPattern(
"email",
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
"email_format"
)
)
# Run verification
result = verification_suite.run()
# Get results
if result.status == VerificationResult.SUCCESS:
print("All checks passed!")
else:
print("Some checks failed:")
for check in result.checkResults:
if check.status != "SUCCESS":
print(f" - {check.check}: {check.status}")
# Get metrics
metrics = result.metrics
print(f"\nMetrics:")
for metric_name, metric_value in metrics.items():
print(f" {metric_name}: {metric_value}")
π Performance Metrics
| Metric | Optimal | Typical | Poor | Optimization |
|---|---|---|---|---|
| Validation Speed | > 1M rows/sec | 100K-1M rows/sec | < 100K rows/sec | Partitioning, caching |
| Anomaly Detection | < 1% false positives | 1-5% false positives | > 5% false positives | Threshold tuning |
| Quality Score | > 95% | 80-95% | < 80% | Rule refinement |
| Coverage | 100% critical fields | 80-100% | < 80% | Comprehensive rules |
| Alert Accuracy | > 90% actionable | 70-90% | < 70% | Alert tuning |
π Best Practices
- Define clear quality dimensions β Establish what quality means for your data
- Implement validation at every stage β Ingestion, processing, and serving
- Use statistical methods β Leverage Z-scores, distributions, and profiling
- Automate quality checks β Embed validation in CI/CD pipelines
- Monitor continuously β Track quality metrics over time
- Set up alerting β Notify stakeholders of quality degradation
- Maintain audit trails β Keep records of quality checks and results
- Use appropriate tools β Great Expectations, Deequ, or custom solutions
- Test with realistic data β Validate with production-like volumes
- Iterate and improve β Refine rules based on feedback and issues
π Related Topics
- 06-joins-optimization.mdx: Join quality and data consistency
- 07-partitioning-strategies.mdx: Partitioning for quality validation
- 14-merge-upsert.mdx: Quality during merge operations
- 16-schema-evolution.mdx: Schema quality and evolution
See Also
- 10-serialization-kryo.mdx: Serialization quality and consistency
- 11-structured-streaming.mdx: Streaming data quality validation
- Kafka Streams (kafka/03): Stream processing quality guarantees
- Data Engineering Streaming (data-engineering/022): Real-time data quality patterns