πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

14. Merge & Upsert Operations in PySpark

🟒 Free Lesson

Advertisement

14. Merge & Upsert Operations in PySpark

DfMerge (Upsert)

A merge operation combines INSERT, UPDATE, and DELETE operations in a single atomic transaction based on a match condition. It is the foundation of Change Data Capture (CDC) and slowly changing dimension (SCD) patterns in Delta Lake.

DfChange Data Capture (CDC)

CDC is a technique that captures and tracks changes (inserts, updates, deletes) made to data in a source system, enabling downstream systems to apply the same changes to maintain synchronized copies.

Merge Performance Formula
Tmerge=Tscan+Tmatch+Tapply+TwriteT_{merge} = T_{scan} + T_{match} + T_{apply} + T_{write}

Here,

  • TmergeT_{merge}=Total merge operation time
  • TscanT_{scan}=Time to scan target table for matching rows
  • TmatchT_{match}=Time to evaluate match condition on source data
  • TapplyT_{apply}=Time to apply INSERT/UPDATE/DELETE operations
  • TwriteT_{write}=Time to write new Parquet files to Delta table

Merge File Pruning Efficiency

Eprune=1βˆ’Nfiles_scannedNfiles_totalE_{prune} = 1 - \frac{N_{files\_scanned}}{N_{files\_total}}

Here,

  • EpruneE_{prune}=Fraction of files skipped via data skipping (target > 0.9)
  • Nfiles_scannedN_{files\_scanned}=Number of Parquet files that need to be read
  • Nfiles_totalN_{files\_total}=Total number of Parquet files in the target table

Delta Lake merge uses data skipping via file-level min/max statistics to prune irrelevant files. To maximize pruning, ensure your merge key is clustered in the target table using ZORDER BY.

For high-frequency upserts, use MERGE with OPTIMIZE ZORDER BY on the merge key to maintain file-level statistics. Run VACUUM periodically to remove old files and prevent metadata bloat.

ThMerge Atomicity

Theorem: Delta Lake merge is atomic at the file level β€” it reads existing files, applies transformations, writes new files, and atomically updates the Delta log in a single commit. If any step fails, the transaction is rolled back and the table remains in its previous consistent state.

  • Merge = atomic INSERT + UPDATE + DELETE based on match condition
  • CDC captures source changes for downstream synchronization
  • File pruning via min/max statistics is critical for merge performance
  • Use OPTIMIZE ZORDER BY on merge keys; use VACUUM to clean old files
  • Delta Lake supports concurrent writers with optimistic concurrency control

Merge Operation Steps

SourceNew Records1. ScanRead target filesFile pruning via stats2. MatchEvaluate conditionON source.id = target.id3. ApplyINSERT new rowsUPDATE matchedDELETE not matched4. CommitAtomic Delta logDelta Lake Transaction LifecycleRead snapshotWrite new filesUpdate logACID commitCleanup

πŸ—οΈ Merge Operation Architecture

πŸ“š Detailed Explanation

What is Merge (Upsert)?

  • Combines INSERT, UPDATE, and DELETE in a single atomic transaction
  • Based on a match condition between source and target
  • Foundation of CDC and slowly changing dimension (SCD) patterns
  • Implemented through Delta Lake's merge functionality

Merge Operation Components

ComponentDescription
Match ConditionSQL expression joining source and target records
whenMatchedUpdateAction for matched records (update fields)
whenMatchedDeleteAction for matched records (delete rows)
whenNotMatchedInsertAction for unmatched records (insert new rows)

Key Takeaway: Merge is the most flexible upsert strategy β€” it handles inserts, updates, and deletes in a single atomic operation.


CDC (Change Data Capture) Patterns

  • Captures changes from source systems (typically databases)
  • Applies changes to target systems (usually data lakes)
  • Handles:
    • Late-arriving data
    • Conflict resolution
    • Data consistency
  • Merge operation efficiently manages CDC complexity

Upsert Strategy Comparison

StrategyFlexibilityWrite SpeedRead SpeedOverhead
MergeHighestModerateFastJoin required
Insert OverwriteLimited (no partial updates)FastFastPartition replacement
Append + DedupLowFastestSlowStorage cost

Choose based on: data volume, update patterns, latency requirements, and complexity constraints.


Performance Optimization

  1. Partition strategically β€” partition target table to reduce join scope
  2. Use efficient join strategies β€” optimize merge conditions
  3. Manage file sizes β€” compact small files regularly
  4. Include partition keys in merge conditions for partition pruning

Key Takeaway: The target table's partition strategy directly affects merge join performance.


Conflict Resolution

  • Optimistic concurrency control handles simultaneous merges
  • Conflicts detected β†’ automatic retry
  • Understanding conflict resolution is essential for production
  • Configure retry policies and monitoring for merge conflicts

Data Quality in Merge Operations

  • Schema evolution β€” handle schema changes during merge
  • Data type validation β€” ensure valid data types
  • Referential integrity β€” maintain relationships
  • Delta Lake's schema enforcement helps maintain quality

Monitoring Merge Health

Track these metrics:

  • Merge duration β€” total operation time
  • Records processed β€” throughput measurement
  • File compaction rates β€” storage efficiency
  • Conflict frequencies β€” concurrency issues

Advanced Techniques

  • Conditional merges β€” different actions based on complex conditions
  • Merge with aggregation β€” aggregate source records before merging
  • Maintain ACID transaction benefits while enabling sophisticated patterns

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Merge OperationJoin-based upsert with conditional actionsCDC, complex updates
CDC (Change Data Capture)Capturing and applying database changesData synchronization
UpsertInsert or update based on existenceReal-time data ingestion
Partition PruningReducing join scope through partitioningPerformance optimization
Optimistic ConcurrencyConflict resolution for concurrent writesMulti-writer scenarios
Schema EvolutionHandling schema changes during mergeEvolving data structures
File CompactionMerging small files for better performanceStorage optimization
Time TravelAccessing historical versionsAuditing, debugging

πŸ’» Code Examples

Basic Merge Operation

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MergeOperation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")

# Read source data (e.g., from Kafka, files, etc.)
source_df = spark.read \
    .format("json") \
    .load("/path/to/source/data")

# Perform merge operation
# Parameters:
#   alias("target") β€” alias for target table
#   alias("source") β€” alias for source DataFrame
#   "target.id = source.id" β€” match condition (SQL expression)
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"  # Match condition
).whenMatchedUpdate(
    condition="target.last_updated < source.last_updated",  # Only update if source is newer
    set={
        "name": "source.name",
        "email": "source.email",
        "last_updated": "source.last_updated"
    }
).whenNotMatchedInsert(
    values={
        "id": "source.id",
        "name": "source.name",
        "email": "source.email",
        "last_updated": "source.last_updated",
        "created_at": current_timestamp()
    }
).execute()

print("Merge operation completed successfully")

Advanced Merge with Delete

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("AdvancedMerge") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")

# Read source data with operation column
source_df = spark.read \
    .format("delta") \
    .load("/path/to/cdc/source") \
    .withColumn("operation", col("op"))  # o=insert, u=update, d=delete

# Perform merge with delete operation
# Parameter: whenMatchedDelete β€” delete matched rows
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(
    condition="source.operation IN ('o', 'u')",  # Insert or Update
    set={
        "name": "source.name",
        "email": "source.email",
        "status": "source.status",
        "last_updated": "source.timestamp"
    }
).whenMatchedDelete(
    condition="source.operation = 'd'"  # Delete operation
).whenNotMatchedInsert(
    condition="source.operation IN ('o', 'u')",  # Only insert for new records
    values={
        "id": "source.id",
        "name": "source.name",
        "email": "source.email",
        "status": "source.status",
        "last_updated": "source.timestamp",
        "created_at": "source.timestamp"
    }
).execute()

print("CDC merge operation completed successfully")

Merge with Partition Pruning

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("PartitionPruningMerge") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read target Delta table (partitioned by date)
target_table = DeltaTable.forPath(spark, "/path/to/partitioned/table")

# Read source data for specific date
source_df = spark.read \
    .format("json") \
    .load("/path/to/source/data") \
    .filter(col("event_date") == "2024-01-15")  # Filter to specific partition

# Perform merge with partition pruning
# Include partition key in merge condition for partition pruning
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id AND target.event_date = source.event_date"  # Include partition key
).whenMatchedUpdate(
    set={
        "value": "source.value",
        "updated_at": current_timestamp()
    }
).whenNotMatchedInsert(
    values={
        "id": "source.id",
        "event_date": "source.event_date",
        "value": "source.value",
        "created_at": current_timestamp(),
        "updated_at": current_timestamp()
    }
).execute()

print("Partition-pruned merge completed successfully")

Merge with Aggregation

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MergeWithAggregation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")

# Read source data and aggregate
source_df = spark.read \
    .format("json") \
    .load("/path/to/source/data") \
    .groupBy("user_id", "event_date") \
    .agg(
        sum("amount").alias("total_amount"),
        count("*").alias("event_count"),
        max("timestamp").alias("last_event_time")
    )

# Perform merge with aggregated data
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.user_id = source.user_id AND target.event_date = source.event_date"
).whenMatchedUpdate(
    set={
        "total_amount": "target.total_amount + source.total_amount",
        "event_count": "target.event_count + source.event_count",
        "last_event_time": "GREATEST(target.last_event_time, source.last_event_time)",
        "updated_at": current_timestamp()
    }
).whenNotMatchedInsert(
    values={
        "user_id": "source.user_id",
        "event_date": "source.event_date",
        "total_amount": "source.total_amount",
        "event_count": "source.event_count",
        "last_event_time": "source.last_event_time",
        "created_at": current_timestamp(),
        "updated_at": current_timestamp()
    }
).execute()

print("Merge with aggregation completed successfully")

πŸ“ˆ Performance Metrics

MetricOptimizedTypicalPoorOptimization
Merge Duration< 30s30s-5min> 5minPartition pruning, file optimization
Records Processed/sec> 100K10K-100K< 10KParallelism, join optimization
File Compaction< 100 files100-1000 files> 1000 filesRegular compaction, optimize command
Conflict Rate< 1%1-5%> 5%Concurrency control, partitioning
Storage Efficiency> 80%60-80%< 60%Compaction, z-ordering

πŸ† Best Practices

  1. Partition strategically β€” Partition by high-cardinality columns used in join conditions
  2. Use partition pruning β€” Always include partition keys in merge conditions
  3. Optimize file sizes β€” Aim for 128MB-1GB files for optimal performance
  4. Regular compaction β€” Run OPTIMIZE command to merge small files
  5. Z-order indexing β€” Create z-order indexes on frequently filtered columns
  6. Monitor conflicts β€” Track and minimize merge conflicts
  7. Handle schema evolution β€” Use mergeSchema option for schema changes
  8. Test with realistic data β€” Validate performance with production-like volumes
  9. Use checkpointing β€” For streaming merges, configure appropriate checkpoint intervals
  10. Backup before major operations β€” Always backup data before large merge operations

πŸ”— Related Topics

  • 11-structured-streaming.mdx: Streaming architecture for real-time merges
  • 12-state-management.mdx: Stateful operations for merge patterns
  • 13-window-operations.mdx: Windowed aggregations before merge
  • 16-schema-evolution.mdx: Schema changes during merge operations

See Also

  • 06-joins-optimization.mdx: Join strategies for merge operations
  • 15-data-quality.mdx: Data quality during merge operations
  • Kafka Streams (kafka/03): Stream-table duality for upsert semantics
  • Data Engineering Streaming (data-engineering/022): CDC merge patterns in streaming pipelines
⭐

Premium Content

14. Merge & Upsert Operations in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement