πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Slowly Changing Dimensions (SCD) in PySpark

🟒 Free Lesson

Advertisement

Slowly Changing Dimensions (SCD) in PySpark

SCD Types Comparison

SCD Type ComparisonType 1: OverwriteRow updated in placeNo history preservedSimplest, lowest storageType 2: Add New RowOld row marked inactiveNew row with valid_from/toFull history preservedType 3: Add ColumnPrevious value in new columnLimited history (1 change)Moderate storage overheadSCD Type 2 TimelineJan: ActiveFeb: ActiveMar: InactiveMar: New ActiveApr: Activevalid_from | valid_to | is_current flags enable point-in-time queries

Architecture Diagram: SCD Type Comparison

Architecture Diagram: SCD Type 2 MERGE Pipeline

Architecture Diagram: SCD Implementation Decision Tree

Detailed Explanation

Slowly Changing Dimensions (SCD) represent one of the most fundamental patterns in dimensional modeling and data warehousing. In any real-world data warehouse, dimension attributes (customer address, product category, employee department) change over time, and the warehouse must decide how to handle these changes to preserve historical accuracy while supporting current-state analysis.


What are the Primary SCD Strategies?

The three primary SCD strategies β€” Type 1 (overwrite), Type 2 (history), and Type 3 (limited history) β€” each represent different trade-offs between storage efficiency, query complexity, and historical fidelity.

SCD TypeDescriptionUse CaseTrade-offs
Type 1 (Overwrite)When an attribute changes, the old value is overwrittenCorrecting a data entry errorNo historical tracking, minimal storage and query complexity
Type 2 (History)Every change generates a new row with a unique surrogate key, temporal bounds (valid_from, valid_to), and an is_current flagRegulatory compliance, audit trails, point-in-time analysis ("What was Alice's city on March 15?")Increased storage (each change creates a new row), more complex queries (requiring WHERE is_current = true for current-state analysis, or temporal predicates for historical queries)
Type 3 (Limited History)Adds a "previous value" column. When a change occurs, the current value moves to the previous column and the new value overwrites the current columnChange-of-address tracking, comparing current vs. previous addressSimpler than Type 2 but only tracks one prior state β€” earlier values are lost if an attribute changes multiple times

What is SCD Implementation in PySpark?

In PySpark with Delta Lake, SCD implementations leverage the MERGE command for atomic upserts.

Key Points:

  • MERGE operation: Enables conditional updates, inserts, and deletes in a single atomic operation
  • Essential for SCD Type 2: Must simultaneously expire old records and insert new versions
  • ACID guarantees: Ensure that SCD processing is idempotent β€” running the same MERGE twice produces the same result

What is Surrogate Key Design?

The surrogate key design is critical for SCD Type 2.

Natural Keys vs. Surrogate Keys:

  • Natural keys (e.g., customer_id): Remain in the dimension but are not unique across versions
  • Surrogate key (typically an auto-incrementing integer or UUID): Uniquely identifies each version, what fact tables reference, enabling joins that are independent of natural key changes

What are Versioning Strategies?

StrategyDescriptionProsCons
Simple incrementing integers (version = 1, 2, 3...)Human-readableEasy to understandRequires lookup for version count
Timestamp-based versioning (valid_from = transaction time)Enables temporal joinsPrecise time-based queriesRequires careful handling of time zones and clock skew
Hash-based versioning (hash of all tracked attributes)Enables change detection without explicit comparisonEfficient change detectionHash collisions possible

Key Takeaway: SCD implementations in PySpark with Delta Lake leverage the MERGE command for atomic upserts, ensuring idempotent processing. The surrogate key design is critical for SCD Type 2, enabling joins independent of natural key changes.

Mathematical Foundations

Definition: Slowly Changing Dimension

A dimension attribute AA is slowly changing if its value changes at rate Ξ»β‰ͺ1/Ξ”t\lambda \ll 1/\Delta t where Ξ”t\Delta t is the observation period. For SCD Type kk, the storage model is:

TypeΒ 1:Β Acurrent←Anew(overwrite)\text{Type 1: } A_{\text{current}} \leftarrow A_{\text{new}} \quad \text{(overwrite)}
TypeΒ 2:Β Rnew=Rβˆͺ{rβ€²:rβ€².A=Anew,rβ€².valid=[tstart,tend)}\text{Type 2: } R_{\text{new}} = R \cup \{r' : r'.A = A_{\text{new}}, r'.\text{valid} = [t_{\text{start}}, t_{\text{end}})\}
TypeΒ 3:Β Aprev←Acurrent,Acurrent←Anew(limitedΒ history)\text{Type 3: } A_{\text{prev}} \leftarrow A_{\text{current}}, A_{\text{current}} \leftarrow A_{\text{new}} \quad \text{(limited history)}

SCD Type 2 Row Explosion

For dimension with nn rows and change rate Ξ»\lambda per period:

∣DT∣=nΓ—(1+Ξ»)T|D_T| = n \times (1 + \lambda)^T

After TT periods, row count grows exponentially. Storage cost: CT=∣DTβˆ£Γ—sΛ‰C_T = |D_T| \times \bar{s}.

Merge Correctness Theorem

SCD Type 2 merge is correct if and only if:

  1. No overlapping valid periods: βˆ€r1,r2Β withΒ sameΒ key:r1.valid∩r2.valid=βˆ…\forall r_1, r_2 \text{ with same key}: r_1.\text{valid} \cap r_2.\text{valid} = \emptyset
  2. Current record marked: βˆƒ!Β r:r.end=∞\exists!\ r: r.\text{end} = \infty
Correct(Merge)β€…β€ŠβŸΊβ€…β€ŠConditionΒ 1∧ConditionΒ 2\text{Correct}(\text{Merge}) \iff \text{Condition 1} \land \text{Condition 2}

Lookup Join Cost

Fact table FF joining with SCD Type 2 dimension DD:

Cjoin=∣Fβˆ£Γ—avg_matchesΓ—βˆ£D∣C_{\text{join}} = |F| \times \text{avg\_matches} \times |D|

With temporal index on valid period: Cindexed=∣Fβˆ£Γ—avg_matchesΓ—log⁑∣D∣C_{\text{indexed}} = |F| \times \text{avg\_matches} \times \log|D|.

Change Detection

Detecting changed records by comparing checksums:

Changed(rold,rnew)β€…β€ŠβŸΊβ€…β€ŠCRC32(rold.payload)β‰ CRC32(rnew.payload)\text{Changed}(r_{\text{old}}, r_{\text{new}}) \iff \text{CRC32}(r_{\text{old}}.\text{payload}) \neq \text{CRC32}(r_{\text{new}}.\text{payload})

This avoids full row comparison with O(1)O(1) hash comparison.

Key Insight

Delta Lake's MERGE INTO simplifies SCD implementations by combining insert, update, and delete in a single atomic operation. The WHEN MATCHED/NOT MATCHED clauses map directly to SCD logic.

Summary

SCD types trade off history retention against storage cost. Type 2 provides full history but causes row explosion. Delta Lake's MERGE INTO enables efficient SCD implementation with ACID guarantees. Temporal indexing on valid periods reduces lookup join cost from linear to logarithmic.

Key Concepts Table

ConceptSCD Type 1SCD Type 2SCD Type 3SCD Type 4
Storage OverheadMinimal (1 row/entity)High (N rows/versions)Low (1 row + prev cols)Medium (history + current)
Historical AccuracyNone (overwritten)Full (all versions)Limited (1 prior)Partial (key attrs full)
Query ComplexitySimpleComplex (temporal)ModerateComplex (2 tables)
Surrogate Key RequiredNoYesOptionalYes
Fact Table JoinNatural keySurrogate keyNatural keySurrogate key
Point-in-Time QueryNoYesNoPartial
Audit TrailNoCompletePartialPartial
ImplementationUPDATEINSERT + UPDATEUPDATE with shift2 tables + triggers
Best ForError correctionsCompliance, auditChange detectionHybrid workloads
Storage (relative)1x5-20x1.2x2-5x

Code Examples

Example 1: SCD Type 1 (Overwrite) with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SCD-Type1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Current Dimension Table (SCD Type 1) ───
dim_customer_data = [
    (101, "Alice Johnson", "New York", "alice@email.com", "2026-01-01"),
    (102, "Bob Smith", "Chicago", "bob@email.com", "2026-01-01"),
    (103, "Carol White", "Houston", "carol@email.com", "2026-01-01"),
]
dim_customer = spark.createDataFrame(
    dim_customer_data,
    ["customer_id", "name", "city", "email", "effective_date"]
)

# Write initial dimension
(
    dim_customer.write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/warehouse/dim_customer_scd1")
)

# ─── Staging: New data with changes ───
staging_data = [
    (101, "Alice Johnson", "Los Angeles", "alice.new@email.com"),  # Changed city + email
    (102, "Bob Smith", "Chicago", "bob@email.com"),                # No change
    (104, "David Lee", "Phoenix", "david@email.com"),              # New record
]
staging_df = spark.createDataFrame(
    staging_data,
    ["customer_id", "name", "city", "email"]
).withColumn("effective_date", current_date())

# ─── SCD Type 1 MERGE ───
dim_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd1")

(
    dim_table.alias("target")
    .merge(
        staging_df.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdate(
        condition="""
            target.name != source.name OR
            target.city != source.city OR
            target.email != source.email
        """,
        set={
            "name": col("source.name"),
            "city": col("source.city"),
            "email": col("source.email"),
            "effective_date": col("source.effective_date")
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)

# Result: Alice's city overwritten to LA, David added, Bob unchanged
result = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd1")
result.orderBy("customer_id").show()
# +-----------+--------------+-------+------------------+---------------+
# |customer_id|          name|   city|             email|effective_date|
# +-----------+--------------+-------+------------------+---------------+
# |        101| Alice Johnson|    LA |alice.new@email.com|   2026-05-31  |
# |        102|     Bob Smith|Chicago|    bob@email.com|   2026-01-01  |
# |        103|   Carol White|Houston|  carol@email.com|   2026-01-01  |
# |        104|     David Lee| Phoenix| david@email.com|   2026-05-31  |
# +-----------+--------------+-------+------------------+---------------+

Example 2: SCD Type 2 (Full History) with Delta Lake

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("SCD-Type2") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Current Dimension Table (SCD Type 2) ───
scd2_schema = StructType([
    StructField("sk_customer_id", LongType(), False),
    StructField("customer_id", StringType(), False),
    StructField("name", StringType()),
    StructField("city", StringType()),
    StructField("email", StringType()),
    StructField("valid_from", DateType()),
    StructField("valid_to", DateType()),
    StructField("is_current", BooleanType()),
    StructField("version", IntegerType()),
])

# Initial load
initial_data = [
    (1, "C101", "Alice Johnson", "New York", "alice@email.com",
     "2026-01-01", None, True, 1),
    (2, "C102", "Bob Smith", "Chicago", "bob@email.com",
     "2026-01-01", None, True, 1),
]
dim_customer = spark.createDataFrame(initial_data, schema=scd2_schema)
(
    dim_customer.write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/warehouse/dim_customer_scd2")
)

# ─── Staging: New and changed records ───
staging_data = [
    ("C101", "Alice Johnson", "Los Angeles", "alice.updated@email.com"),  # Changed
    ("C103", "Carol White", "Houston", "carol@email.com"),                # New
]
staging_df = spark.createDataFrame(
    staging_data,
    ["customer_id", "name", "city", "email"]
).withColumn("load_date", current_date())

# ─── SCD Type 2 MERGE with Surrogate Key Generation ───
dim_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd2")

# Get next surrogate key
max_sk = dim_table.toDF().select(
    coalesce(max("sk_customer_id"), lit(0))
).first()[0]

# Add surrogate key to staging data
staging_with_sk = staging_df.withColumn(
    "new_sk",
    monotonically_increasing_id() + lit(max_sk + 1)
)

# Phase 1: Expire changed records
(
    dim_table.alias("target")
    .merge(
        staging_with_sk.alias("source"),
        "target.customer_id = source.customer_id AND target.is_current = true"
    )
    .whenMatchedUpdate(
        condition="""
            target.name != source.name OR
            target.city != source.city OR
            target.email != source.email
        """,
        set={
            "valid_to": date_sub(col("source.load_date"), 1),
            "is_current": lit(False)
        }
    )
    .execute()
)

# Phase 2: Insert new versions and new records
# Read current state after Phase 1
current_dim = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd2")

# Get max version per customer
max_versions = (
    current_dim
    .groupBy("customer_id")
    .agg(max("version").alias("max_version"))
)

# Prepare new records with correct version numbers
new_records = (
    staging_with_sk
    .join(max_versions, "customer_id", "left")
    .withColumn(
        "version",
        coalesce(col("max_version"), lit(0)) + 1
    )
    .select(
        col("new_sk").alias("sk_customer_id"),
        col("customer_id"),
        col("name"),
        col("city"),
        col("email"),
        col("load_date").alias("valid_from"),
        lit(None).cast(DateType()).alias("valid_to"),
        lit(True).alias("is_current"),
        col("version")
    )
)

# Filter to only new versions (where source changed or is new)
changed_customer_ids = (
    staging_with_sk
    .join(
        current_dim.filter(col("is_current") == True),
        "customer_id",
        "inner"
    )
    .filter(
        (current_dim["name"] != staging_with_sk["name"]) |
        (current_dim["city"] != staging_with_sk["city"]) |
        (current_dim["email"] != staging_with_sk["email"])
    )
    .select("customer_id")
    .distinct()
)

# Insert new versions for changed and new records
records_to_insert = new_records.filter(
    col("customer_id").isin(
        [row.customer_id for row in changed_customer_ids.collect()]
    ) |
    ~col("customer_id").isin(
        [row.customer_id for row in
         current_dim.select("customer_id").distinct().collect()]
    )
)

(
    records_to_insert.write
    .format("delta")
    .mode("append")
    .save("/mnt/warehouse/dim_customer_scd2")
)

# ─── Query: Current State vs. Historical ───
full_dim = spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd2")

print("=== CURRENT STATE ===")
full_dim.filter(col("is_current") == True).show()

print("=== FULL HISTORY ===")
full_dim.orderBy("customer_id", "version").show()

print("=== POINT-IN-TIME QUERY (As of 2026-03-01) ===")
full_dim.filter(
    (col("valid_from") <= "2026-03-01") &
    ((col("valid_to").isNull()) | (col("valid_to") >= "2026-03-01"))
).show()

Example 3: SCD Type 3 (Limited History) and Type 4 (Hybrid)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SCD-Type3-Type4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ---------------------------------------------------------------
# SCD TYPE 3: Limited History (Previous Value)
# ---------------------------------------------------------------
scd3_data = [
    (101, "Alice Johnson", "New York", None, "2026-01-01"),
    (102, "Bob Smith", "Chicago", None, "2026-01-01"),
]
scd3_df = spark.createDataFrame(
    scd3_data,
    ["customer_id", "name", "city", "prev_city", "change_date"]
)

(
    scd3_df.write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/warehouse/dim_customer_scd3")
)

# Staging with changes
staging_scd3 = [
    (101, "Alice Johnson", "Los Angeles"),  # NYC β†’ LA
]
staging_scd3_df = spark.createDataFrame(
    staging_scd3,
    ["customer_id", "name", "city"]
)

# SCD Type 3 MERGE: shift current to previous
dim_scd3 = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_scd3")

(
    dim_scd3.alias("target")
    .merge(
        staging_scd3_df.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdate(
        condition="target.city != source.city",
        set={
            "prev_city": col("target.city"),     # Shift current to previous
            "city": col("source.city"),           # Update current
            "change_date": current_date()
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)

# Result: Alice now has prev_city = NYC, city = LA
spark.read.format("delta").load("/mnt/warehouse/dim_customer_scd3").show()
# +-----------+--------------+------+---------+-----------+
# |customer_id|          name|  city|prev_city|change_date|
# +-----------+--------------+------+---------+-----------+
# |        101| Alice Johnson|    LA| New York| 2026-05-31|
# |        102|     Bob Smith|Chicago|     null| 2026-01-01|
# +-----------+--------------+------+---------+-----------+


# ---------------------------------------------------------------
# SCD TYPE 4: Hybrid (History + Current Tables)
# ---------------------------------------------------------------
# Current table (SCD Type 1 style - always current)
current_dim = [
    (101, "Alice Johnson", "Los Angeles", "alice@email.com"),
    (102, "Bob Smith", "Chicago", "bob@email.com"),
]
current_df = spark.createDataFrame(
    current_dim,
    ["customer_id", "name", "city", "email"]
)
(
    current_df.write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/warehouse/dim_customer_current")
)

# History table (SCD Type 2 style - full audit trail)
history_dim = [
    (1, 101, "Alice Johnson", "New York", "alice@email.com",
     "2026-01-01", "2026-05-30", False, 1),
    (2, 101, "Alice Johnson", "Los Angeles", "alice.updated@email.com",
     "2026-05-31", None, True, 2),
    (3, 102, "Bob Smith", "Chicago", "bob@email.com",
     "2026-01-01", None, True, 1),
]
history_schema = StructType([
    StructField("sk_id", LongType()),
    StructField("customer_id", IntegerType()),
    StructField("name", StringType()),
    StructField("city", StringType()),
    StructField("email", StringType()),
    StructField("valid_from", StringType()),
    StructField("valid_to", StringType()),
    StructField("is_current", BooleanType()),
    StructField("version", IntegerType()),
])
history_df = spark.createDataFrame(history_dim, schema=history_schema)
(
    history_df.write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/warehouse/dim_customer_history")
)

# ─── Update both tables atomically ───
# For Type 4: update current table (overwrite) and append to history
new_change = [
    (103, "Carol White", "Houston", "carol@email.com"),
]
new_df = spark.createDataFrame(
    new_change,
    ["customer_id", "name", "city", "email"]
)

# Update current table (simple overwrite)
current_table = DeltaTable.forPath(spark, "/mnt/warehouse/dim_customer_current")
(
    current_table.alias("target")
    .merge(
        new_df.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

# Append to history table
new_history = new_df.withColumn("sk_id", monotonically_increasing_id() + 100) \
    .withColumn("valid_from", current_date()) \
    .withColumn("valid_to", lit(None).cast(StringType())) \
    .withColumn("is_current", lit(True)) \
    .withColumn("version", lit(1))

new_history.write.format("delta").mode("append") \
    .save("/mnt/warehouse/dim_customer_history")

Performance Metrics

MetricSCD Type 1SCD Type 2SCD Type 3SCD Type 4
MERGE Duration (1M rows)12 seconds45 seconds15 seconds18 sec (current) + 12 sec (history)
Storage per Entity1 row (~200 bytes)5-20 rows (~1-4 KB)1 row (~300 bytes)2 rows (~500 bytes)
Query Latency (Current State)1.2 seconds3.8 seconds1.4 seconds1.1 seconds
Query Latency (Point-in-Time)N/A8.5 secondsN/A9.2 seconds
Join Complexity (with Facts)Simple (= join)Complex (temporal)Simple (= join)Medium (current join)
Concurrent Writer SupportHighMedium (row-level)HighMedium
Backup/Restore Size200 MB/1M entities2 GB/1M entities250 MB/1M entities500 MB/1M entities
IdempotencyYesYes (with care)YesYes
Compliance Audit SupportPoorExcellentFairGood
Implementation ComplexityLowHighMediumMedium-High

Best Practices

  1. Choose the right SCD type for each dimension β€” Not all attributes require the same treatment. Use Type 1 for correcting errors, Type 2 for regulatory compliance and audit trails, Type 3 for change-of-address tracking, and Type 4 for mixed workloads.

  2. Always use surrogate keys for SCD Type 2 β€” Natural keys change across versions; surrogate keys ensure stable references from fact tables. Use monotonically_increasing_id() or a separate key generation step for distributed environments.

  3. Implement temporal joins correctly β€” For SCD Type 2, join fact tables to dimensions using fact.valid_date BETWEEN dim.valid_from AND COALESCE(dim.valid_to, '9999-12-31') to capture the correct version at the time of each fact record.

  4. Use Delta Lake MERGE for idempotent SCD processing β€” MERGE provides atomic upsert semantics that prevent duplicate rows on job retries. Always design SCD pipelines to be idempotent.

  5. Partition dimension tables by is_current β€” This accelerates current-state queries (the most common pattern) by allowing partition pruning. Historical queries scan both partitions but are typically less frequent.

  6. Implement slowly changing dimension quality checks β€” Validate that only one record per natural key has is_current = true, that valid_from < valid_to for expired records, and that surrogate keys are monotonically increasing.

  7. Monitor dimension growth β€” SCD Type 2 tables grow with every change; set up alerts when row count exceeds expected thresholds (e.g., >10x the natural key cardinality indicates potential data quality issues).

  8. Use Z-ORDER on frequently filtered columns β€” Apply OPTIMIZE ZORDER BY valid_from, customer_id to accelerate temporal queries and point-in-time lookups.

  9. Separate current and historical queries β€” Use views or materialized tables for current-state access (WHERE is_current = true) to avoid scanning full history for operational queries.

  10. Document SCD policies per dimension β€” Maintain a data dictionary specifying which attributes use which SCD type, retention policies for historical versions, and business rules for change detection.

Mathematical Foundation Summary

SCD Type 2 maintains temporal history with valid_from ≀ current_time ≀ valid_to for current records. Row growth follows rows = base_rows Γ— (1 + change_rate)^time_periods. Point-in-time query cost is O(log n) with B-tree indexing on valid_from/valid_to. The MERGE operation efficiency depends on join cardinality: matched_ratio = |source ∩ target| / |source|. SCD Type 1 overwrites with O(1) history but loses audit trail. SCD Type 2 storage overhead: overhead = history_versions / current_versions typically ranging 1.5x-5x.

See also: Data Lakehouse Architecture (34), Change Data Capture (36), Production Hardening (40)

See Also

⭐

Premium Content

Slowly Changing Dimensions (SCD) in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement