Slowly Changing Dimensions (SCD) in PySpark
SCD Types Comparison
Architecture Diagram: SCD Type Comparison
Architecture Diagram: SCD Type 2 MERGE Pipeline
Architecture Diagram: SCD Implementation Decision Tree
Detailed Explanation
Slowly Changing Dimensions (SCD) represent one of the most fundamental patterns in dimensional modeling and data warehousing. In any real-world data warehouse, dimension attributes (customer address, product category, employee department) change over time, and the warehouse must decide how to handle these changes to preserve historical accuracy while supporting current-state analysis.
What are the Primary SCD Strategies?
The three primary SCD strategies β Type 1 (overwrite), Type 2 (history), and Type 3 (limited history) β each represent different trade-offs between storage efficiency, query complexity, and historical fidelity.
| SCD Type | Description | Use Case | Trade-offs |
|---|---|---|---|
| Type 1 (Overwrite) | When an attribute changes, the old value is overwritten | Correcting a data entry error | No historical tracking, minimal storage and query complexity |
| Type 2 (History) | Every change generates a new row with a unique surrogate key, temporal bounds (valid_from, valid_to), and an is_current flag | Regulatory compliance, audit trails, point-in-time analysis ("What was Alice's city on March 15?") | Increased storage (each change creates a new row), more complex queries (requiring WHERE is_current = true for current-state analysis, or temporal predicates for historical queries) |
| Type 3 (Limited History) | Adds a "previous value" column. When a change occurs, the current value moves to the previous column and the new value overwrites the current column | Change-of-address tracking, comparing current vs. previous address | Simpler than Type 2 but only tracks one prior state β earlier values are lost if an attribute changes multiple times |
What is SCD Implementation in PySpark?
In PySpark with Delta Lake, SCD implementations leverage the MERGE command for atomic upserts.
Key Points:
- MERGE operation: Enables conditional updates, inserts, and deletes in a single atomic operation
- Essential for SCD Type 2: Must simultaneously expire old records and insert new versions
- ACID guarantees: Ensure that SCD processing is idempotent β running the same MERGE twice produces the same result
What is Surrogate Key Design?
The surrogate key design is critical for SCD Type 2.
Natural Keys vs. Surrogate Keys:
- Natural keys (e.g., customer_id): Remain in the dimension but are not unique across versions
- Surrogate key (typically an auto-incrementing integer or UUID): Uniquely identifies each version, what fact tables reference, enabling joins that are independent of natural key changes
What are Versioning Strategies?
| Strategy | Description | Pros | Cons |
|---|---|---|---|
| Simple incrementing integers (version = 1, 2, 3...) | Human-readable | Easy to understand | Requires lookup for version count |
| Timestamp-based versioning (valid_from = transaction time) | Enables temporal joins | Precise time-based queries | Requires careful handling of time zones and clock skew |
| Hash-based versioning (hash of all tracked attributes) | Enables change detection without explicit comparison | Efficient change detection | Hash collisions possible |
Key Takeaway: SCD implementations in PySpark with Delta Lake leverage the MERGE command for atomic upserts, ensuring idempotent processing. The surrogate key design is critical for SCD Type 2, enabling joins independent of natural key changes.
Mathematical Foundations
Definition: Slowly Changing Dimension
A dimension attribute is slowly changing if its value changes at rate where is the observation period. For SCD Type , the storage model is:
SCD Type 2 Row Explosion
For dimension with rows and change rate per period:
After periods, row count grows exponentially. Storage cost: .
Merge Correctness Theorem
SCD Type 2 merge is correct if and only if:
- No overlapping valid periods:
- Current record marked:
Lookup Join Cost
Fact table joining with SCD Type 2 dimension :
With temporal index on valid period: .
Change Detection
Detecting changed records by comparing checksums:
This avoids full row comparison with hash comparison.
Key Insight
Delta Lake's MERGE INTO simplifies SCD implementations by combining insert, update, and delete in a single atomic operation. The WHEN MATCHED/NOT MATCHED clauses map directly to SCD logic.
Summary
SCD types trade off history retention against storage cost. Type 2 provides full history but causes row explosion. Delta Lake's MERGE INTO enables efficient SCD implementation with ACID guarantees. Temporal indexing on valid periods reduces lookup join cost from linear to logarithmic.
Key Concepts Table
| Concept | SCD Type 1 | SCD Type 2 | SCD Type 3 | SCD Type 4 |
|---|---|---|---|---|
| Storage Overhead | Minimal (1 row/entity) | High (N rows/versions) | Low (1 row + prev cols) | Medium (history + current) |
| Historical Accuracy | None (overwritten) | Full (all versions) | Limited (1 prior) | Partial (key attrs full) |
| Query Complexity | Simple | Complex (temporal) | Moderate | Complex (2 tables) |
| Surrogate Key Required | No | Yes | Optional | Yes |
| Fact Table Join | Natural key | Surrogate key | Natural key | Surrogate key |
| Point-in-Time Query | No | Yes | No | Partial |
| Audit Trail | No | Complete | Partial | Partial |
| Implementation | UPDATE | INSERT + UPDATE | UPDATE with shift | 2 tables + triggers |
| Best For | Error corrections | Compliance, audit | Change detection | Hybrid workloads |
| Storage (relative) | 1x | 5-20x | 1.2x | 2-5x |
Code Examples
Example 1: SCD Type 1 (Overwrite) with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SCD-Type1") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Current Dimension Table (SCD Type 1) βββ
dim_customer_data = [
(101, "Alice Johnson", "New York", "alice@email.com", "2026-01-01"),
(102, "Bob Smith", "Chicago", "bob@email.com", "2026-01-01"),
(103, "Carol White", "Houston", "carol@email.com", "2026-01-01"),
]
dim_customer = spark.createDataFrame(
dim_customer_data,
["customer_id", "name", "city", "email", "effective_date"]
)
# Write initial dimension
(
dim_customer.write
.format("delta")
.mode("overwrite")
.save("/mnt/warehouse/dim_customer_scd1")
)
# βββ Staging: New data with changes βββ
staging_data = [
(101, "Alice Johnson", "Los Angeles", "alice.new@email.com"), # Changed city + email
(102, "Bob Smith", "Chicago", "bob@email.com"), # No change
(104, "David Lee", "Phoenix", "david@email.com"), # New record
]
staging_df = spark.createDataFrame(
staging_data,
["customer_id", "name", "city", "email"]
).withColumn("effective_date", current_date())
# βββ SCD Type 1 MERGE βββ
dim_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd1")
(
dim_table.alias("target")
.merge(
staging_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdate(
condition="""
target.name != source.name OR
target.city != source.city OR
target.email != source.email
""",
set={
"name": col("source.name"),
"city": col("source.city"),
"email": col("source.email"),
"effective_date": col("source.effective_date")
}
)
.whenNotMatchedInsertAll()
.execute()
)
# Result: Alice's city overwritten to LA, David added, Bob unchanged
result = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd1")
result.orderBy("customer_id").show()
# +-----------+--------------+-------+------------------+---------------+
# |customer_id| name| city| email|effective_date|
# +-----------+--------------+-------+------------------+---------------+
# | 101| Alice Johnson| LA |alice.new@email.com| 2026-05-31 |
# | 102| Bob Smith|Chicago| bob@email.com| 2026-01-01 |
# | 103| Carol White|Houston| carol@email.com| 2026-01-01 |
# | 104| David Lee| Phoenix| david@email.com| 2026-05-31 |
# +-----------+--------------+-------+------------------+---------------+
Example 2: SCD Type 2 (Full History) with Delta Lake
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("SCD-Type2") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Current Dimension Table (SCD Type 2) βββ
scd2_schema = StructType([
StructField("sk_customer_id", LongType(), False),
StructField("customer_id", StringType(), False),
StructField("name", StringType()),
StructField("city", StringType()),
StructField("email", StringType()),
StructField("valid_from", DateType()),
StructField("valid_to", DateType()),
StructField("is_current", BooleanType()),
StructField("version", IntegerType()),
])
# Initial load
initial_data = [
(1, "C101", "Alice Johnson", "New York", "alice@email.com",
"2026-01-01", None, True, 1),
(2, "C102", "Bob Smith", "Chicago", "bob@email.com",
"2026-01-01", None, True, 1),
]
dim_customer = spark.createDataFrame(initial_data, schema=scd2_schema)
(
dim_customer.write
.format("delta")
.mode("overwrite")
.save("/mnt/warehouse/dim_customer_scd2")
)
# βββ Staging: New and changed records βββ
staging_data = [
("C101", "Alice Johnson", "Los Angeles", "alice.updated@email.com"), # Changed
("C103", "Carol White", "Houston", "carol@email.com"), # New
]
staging_df = spark.createDataFrame(
staging_data,
["customer_id", "name", "city", "email"]
).withColumn("load_date", current_date())
# βββ SCD Type 2 MERGE with Surrogate Key Generation βββ
dim_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd2")
# Get next surrogate key
max_sk = dim_table.toDF().select(
coalesce(max("sk_customer_id"), lit(0))
).first()[0]
# Add surrogate key to staging data
staging_with_sk = staging_df.withColumn(
"new_sk",
monotonically_increasing_id() + lit(max_sk + 1)
)
# Phase 1: Expire changed records
(
dim_table.alias("target")
.merge(
staging_with_sk.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
)
.whenMatchedUpdate(
condition="""
target.name != source.name OR
target.city != source.city OR
target.email != source.email
""",
set={
"valid_to": date_sub(col("source.load_date"), 1),
"is_current": lit(False)
}
)
.execute()
)
# Phase 2: Insert new versions and new records
# Read current state after Phase 1
current_dim = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd2")
# Get max version per customer
max_versions = (
current_dim
.groupBy("customer_id")
.agg(max("version").alias("max_version"))
)
# Prepare new records with correct version numbers
new_records = (
staging_with_sk
.join(max_versions, "customer_id", "left")
.withColumn(
"version",
coalesce(col("max_version"), lit(0)) + 1
)
.select(
col("new_sk").alias("sk_customer_id"),
col("customer_id"),
col("name"),
col("city"),
col("email"),
col("load_date").alias("valid_from"),
lit(None).cast(DateType()).alias("valid_to"),
lit(True).alias("is_current"),
col("version")
)
)
# Filter to only new versions (where source changed or is new)
changed_customer_ids = (
staging_with_sk
.join(
current_dim.filter(col("is_current") == True),
"customer_id",
"inner"
)
.filter(
(current_dim["name"] != staging_with_sk["name"]) |
(current_dim["city"] != staging_with_sk["city"]) |
(current_dim["email"] != staging_with_sk["email"])
)
.select("customer_id")
.distinct()
)
# Insert new versions for changed and new records
records_to_insert = new_records.filter(
col("customer_id").isin(
[row.customer_id for row in changed_customer_ids.collect()]
) |
~col("customer_id").isin(
[row.customer_id for row in
current_dim.select("customer_id").distinct().collect()]
)
)
(
records_to_insert.write
.format("delta")
.mode("append")
.save("/mnt/warehouse/dim_customer_scd2")
)
# βββ Query: Current State vs. Historical βββ
full_dim = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd2")
print("=== CURRENT STATE ===")
full_dim.filter(col("is_current") == True).show()
print("=== FULL HISTORY ===")
full_dim.orderBy("customer_id", "version").show()
print("=== POINT-IN-TIME QUERY (As of 2026-03-01) ===")
full_dim.filter(
(col("valid_from") <= "2026-03-01") &
((col("valid_to").isNull()) | (col("valid_to") >= "2026-03-01"))
).show()
Example 3: SCD Type 3 (Limited History) and Type 4 (Hybrid)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SCD-Type3-Type4") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# ---------------------------------------------------------------
# SCD TYPE 3: Limited History (Previous Value)
# ---------------------------------------------------------------
scd3_data = [
(101, "Alice Johnson", "New York", None, "2026-01-01"),
(102, "Bob Smith", "Chicago", None, "2026-01-01"),
]
scd3_df = spark.createDataFrame(
scd3_data,
["customer_id", "name", "city", "prev_city", "change_date"]
)
(
scd3_df.write
.format("delta")
.mode("overwrite")
.save("/mnt/warehouse/dim_customer_scd3")
)
# Staging with changes
staging_scd3 = [
(101, "Alice Johnson", "Los Angeles"), # NYC β LA
]
staging_scd3_df = spark.createDataFrame(
staging_scd3,
["customer_id", "name", "city"]
)
# SCD Type 3 MERGE: shift current to previous
dim_scd3 = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd3")
(
dim_scd3.alias("target")
.merge(
staging_scd3_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdate(
condition="target.city != source.city",
set={
"prev_city": col("target.city"), # Shift current to previous
"city": col("source.city"), # Update current
"change_date": current_date()
}
)
.whenNotMatchedInsertAll()
.execute()
)
# Result: Alice now has prev_city = NYC, city = LA
spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd3").show()
# +-----------+--------------+------+---------+-----------+
# |customer_id| name| city|prev_city|change_date|
# +-----------+--------------+------+---------+-----------+
# | 101| Alice Johnson| LA| New York| 2026-05-31|
# | 102| Bob Smith|Chicago| null| 2026-01-01|
# +-----------+--------------+------+---------+-----------+
# ---------------------------------------------------------------
# SCD TYPE 4: Hybrid (History + Current Tables)
# ---------------------------------------------------------------
# Current table (SCD Type 1 style - always current)
current_dim = [
(101, "Alice Johnson", "Los Angeles", "alice@email.com"),
(102, "Bob Smith", "Chicago", "bob@email.com"),
]
current_df = spark.createDataFrame(
current_dim,
["customer_id", "name", "city", "email"]
)
(
current_df.write
.format("delta")
.mode("overwrite")
.save("/mnt/warehouse/dim_customer_current")
)
# History table (SCD Type 2 style - full audit trail)
history_dim = [
(1, 101, "Alice Johnson", "New York", "alice@email.com",
"2026-01-01", "2026-05-30", False, 1),
(2, 101, "Alice Johnson", "Los Angeles", "alice.updated@email.com",
"2026-05-31", None, True, 2),
(3, 102, "Bob Smith", "Chicago", "bob@email.com",
"2026-01-01", None, True, 1),
]
history_schema = StructType([
StructField("sk_id", LongType()),
StructField("customer_id", IntegerType()),
StructField("name", StringType()),
StructField("city", StringType()),
StructField("email", StringType()),
StructField("valid_from", StringType()),
StructField("valid_to", StringType()),
StructField("is_current", BooleanType()),
StructField("version", IntegerType()),
])
history_df = spark.createDataFrame(history_dim, schema=history_schema)
(
history_df.write
.format("delta")
.mode("overwrite")
.save("/mnt/warehouse/dim_customer_history")
)
# βββ Update both tables atomically βββ
# For Type 4: update current table (overwrite) and append to history
new_change = [
(103, "Carol White", "Houston", "carol@email.com"),
]
new_df = spark.createDataFrame(
new_change,
["customer_id", "name", "city", "email"]
)
# Update current table (simple overwrite)
current_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_current")
(
current_table.alias("target")
.merge(
new_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Append to history table
new_history = new_df.withColumn("sk_id", monotonically_increasing_id() + 100) \
.withColumn("valid_from", current_date()) \
.withColumn("valid_to", lit(None).cast(StringType())) \
.withColumn("is_current", lit(True)) \
.withColumn("version", lit(1))
new_history.write.format("delta").mode("append") \
.save("/mnt/warehouse/dim_customer_history")
Performance Metrics
| Metric | SCD Type 1 | SCD Type 2 | SCD Type 3 | SCD Type 4 |
|---|---|---|---|---|
| MERGE Duration (1M rows) | 12 seconds | 45 seconds | 15 seconds | 18 sec (current) + 12 sec (history) |
| Storage per Entity | 1 row (~200 bytes) | 5-20 rows (~1-4 KB) | 1 row (~300 bytes) | 2 rows (~500 bytes) |
| Query Latency (Current State) | 1.2 seconds | 3.8 seconds | 1.4 seconds | 1.1 seconds |
| Query Latency (Point-in-Time) | N/A | 8.5 seconds | N/A | 9.2 seconds |
| Join Complexity (with Facts) | Simple (= join) | Complex (temporal) | Simple (= join) | Medium (current join) |
| Concurrent Writer Support | High | Medium (row-level) | High | Medium |
| Backup/Restore Size | 200 MB/1M entities | 2 GB/1M entities | 250 MB/1M entities | 500 MB/1M entities |
| Idempotency | Yes | Yes (with care) | Yes | Yes |
| Compliance Audit Support | Poor | Excellent | Fair | Good |
| Implementation Complexity | Low | High | Medium | Medium-High |
Best Practices
-
Choose the right SCD type for each dimension β Not all attributes require the same treatment. Use Type 1 for correcting errors, Type 2 for regulatory compliance and audit trails, Type 3 for change-of-address tracking, and Type 4 for mixed workloads.
-
Always use surrogate keys for SCD Type 2 β Natural keys change across versions; surrogate keys ensure stable references from fact tables. Use monotonically_increasing_id() or a separate key generation step for distributed environments.
-
Implement temporal joins correctly β For SCD Type 2, join fact tables to dimensions using
fact.valid_date BETWEEN dim.valid_from AND COALESCE(dim.valid_to, '9999-12-31')to capture the correct version at the time of each fact record. -
Use Delta Lake MERGE for idempotent SCD processing β MERGE provides atomic upsert semantics that prevent duplicate rows on job retries. Always design SCD pipelines to be idempotent.
-
Partition dimension tables by is_current β This accelerates current-state queries (the most common pattern) by allowing partition pruning. Historical queries scan both partitions but are typically less frequent.
-
Implement slowly changing dimension quality checks β Validate that only one record per natural key has
is_current = true, thatvalid_from < valid_tofor expired records, and that surrogate keys are monotonically increasing. -
Monitor dimension growth β SCD Type 2 tables grow with every change; set up alerts when row count exceeds expected thresholds (e.g., >10x the natural key cardinality indicates potential data quality issues).
-
Use Z-ORDER on frequently filtered columns β Apply
OPTIMIZE ZORDER BY valid_from, customer_idto accelerate temporal queries and point-in-time lookups. -
Separate current and historical queries β Use views or materialized tables for current-state access (
WHERE is_current = true) to avoid scanning full history for operational queries. -
Document SCD policies per dimension β Maintain a data dictionary specifying which attributes use which SCD type, retention policies for historical versions, and business rules for change detection.
Mathematical Foundation Summary
SCD Type 2 maintains temporal history with valid_from β€ current_time β€ valid_to for current records. Row growth follows rows = base_rows Γ (1 + change_rate)^time_periods. Point-in-time query cost is O(log n) with B-tree indexing on valid_from/valid_to. The MERGE operation efficiency depends on join cardinality: matched_ratio = |source β© target| / |source|. SCD Type 1 overwrites with O(1) history but loses audit trail. SCD Type 2 storage overhead: overhead = history_versions / current_versions typically ranging 1.5x-5x.
See also: Data Lakehouse Architecture (34), Change Data Capture (36), Production Hardening (40)
See Also
- Merge Upsert β Merge and upsert operations
- CDC Patterns β Change data capture patterns
- Delta Lake β Delta Lake time travel for SCD
- Data Warehouse Concepts β Dimensional modeling fundamentals