14. Merge & Upsert Operations in PySpark
DfMerge (Upsert)
A merge operation combines INSERT, UPDATE, and DELETE operations in a single atomic transaction based on a match condition. It is the foundation of Change Data Capture (CDC) and slowly changing dimension (SCD) patterns in Delta Lake.
DfChange Data Capture (CDC)
CDC is a technique that captures and tracks changes (inserts, updates, deletes) made to data in a source system, enabling downstream systems to apply the same changes to maintain synchronized copies.
Here,
- =Total merge operation time
- =Time to scan target table for matching rows
- =Time to evaluate match condition on source data
- =Time to apply INSERT/UPDATE/DELETE operations
- =Time to write new Parquet files to Delta table
Merge File Pruning Efficiency
Here,
- =Fraction of files skipped via data skipping (target > 0.9)
- =Number of Parquet files that need to be read
- =Total number of Parquet files in the target table
Delta Lake merge uses data skipping via file-level min/max statistics to prune irrelevant files. To maximize pruning, ensure your merge key is clustered in the target table using ZORDER BY.
For high-frequency upserts, use MERGE with OPTIMIZE ZORDER BY on the merge key to maintain file-level statistics. Run VACUUM periodically to remove old files and prevent metadata bloat.
ThMerge Atomicity
Theorem: Delta Lake merge is atomic at the file level β it reads existing files, applies transformations, writes new files, and atomically updates the Delta log in a single commit. If any step fails, the transaction is rolled back and the table remains in its previous consistent state.
- Merge = atomic INSERT + UPDATE + DELETE based on match condition
- CDC captures source changes for downstream synchronization
- File pruning via min/max statistics is critical for merge performance
- Use
OPTIMIZE ZORDER BYon merge keys; useVACUUMto clean old files - Delta Lake supports concurrent writers with optimistic concurrency control
Merge Operation Steps
ποΈ Merge Operation Architecture
π Detailed Explanation
What is Merge (Upsert)?
- Combines INSERT, UPDATE, and DELETE in a single atomic transaction
- Based on a match condition between source and target
- Foundation of CDC and slowly changing dimension (SCD) patterns
- Implemented through Delta Lake's merge functionality
Merge Operation Components
| Component | Description |
|---|---|
| Match Condition | SQL expression joining source and target records |
| whenMatchedUpdate | Action for matched records (update fields) |
| whenMatchedDelete | Action for matched records (delete rows) |
| whenNotMatchedInsert | Action for unmatched records (insert new rows) |
Key Takeaway: Merge is the most flexible upsert strategy β it handles inserts, updates, and deletes in a single atomic operation.
CDC (Change Data Capture) Patterns
- Captures changes from source systems (typically databases)
- Applies changes to target systems (usually data lakes)
- Handles:
- Late-arriving data
- Conflict resolution
- Data consistency
- Merge operation efficiently manages CDC complexity
Upsert Strategy Comparison
| Strategy | Flexibility | Write Speed | Read Speed | Overhead |
|---|---|---|---|---|
| Merge | Highest | Moderate | Fast | Join required |
| Insert Overwrite | Limited (no partial updates) | Fast | Fast | Partition replacement |
| Append + Dedup | Low | Fastest | Slow | Storage cost |
Choose based on: data volume, update patterns, latency requirements, and complexity constraints.
Performance Optimization
- Partition strategically β partition target table to reduce join scope
- Use efficient join strategies β optimize merge conditions
- Manage file sizes β compact small files regularly
- Include partition keys in merge conditions for partition pruning
Key Takeaway: The target table's partition strategy directly affects merge join performance.
Conflict Resolution
- Optimistic concurrency control handles simultaneous merges
- Conflicts detected β automatic retry
- Understanding conflict resolution is essential for production
- Configure retry policies and monitoring for merge conflicts
Data Quality in Merge Operations
- Schema evolution β handle schema changes during merge
- Data type validation β ensure valid data types
- Referential integrity β maintain relationships
- Delta Lake's schema enforcement helps maintain quality
Monitoring Merge Health
Track these metrics:
- Merge duration β total operation time
- Records processed β throughput measurement
- File compaction rates β storage efficiency
- Conflict frequencies β concurrency issues
Advanced Techniques
- Conditional merges β different actions based on complex conditions
- Merge with aggregation β aggregate source records before merging
- Maintain ACID transaction benefits while enabling sophisticated patterns
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Merge Operation | Join-based upsert with conditional actions | CDC, complex updates |
| CDC (Change Data Capture) | Capturing and applying database changes | Data synchronization |
| Upsert | Insert or update based on existence | Real-time data ingestion |
| Partition Pruning | Reducing join scope through partitioning | Performance optimization |
| Optimistic Concurrency | Conflict resolution for concurrent writes | Multi-writer scenarios |
| Schema Evolution | Handling schema changes during merge | Evolving data structures |
| File Compaction | Merging small files for better performance | Storage optimization |
| Time Travel | Accessing historical versions | Auditing, debugging |
π» Code Examples
Basic Merge Operation
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MergeOperation") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")
# Read source data (e.g., from Kafka, files, etc.)
source_df = spark.read \
.format("json") \
.load("/path/to/source/data")
# Perform merge operation
# Parameters:
# alias("target") β alias for target table
# alias("source") β alias for source DataFrame
# "target.id = source.id" β match condition (SQL expression)
target_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id" # Match condition
).whenMatchedUpdate(
condition="target.last_updated < source.last_updated", # Only update if source is newer
set={
"name": "source.name",
"email": "source.email",
"last_updated": "source.last_updated"
}
).whenNotMatchedInsert(
values={
"id": "source.id",
"name": "source.name",
"email": "source.email",
"last_updated": "source.last_updated",
"created_at": current_timestamp()
}
).execute()
print("Merge operation completed successfully")
Advanced Merge with Delete
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("AdvancedMerge") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")
# Read source data with operation column
source_df = spark.read \
.format("delta") \
.load("/path/to/cdc/source") \
.withColumn("operation", col("op")) # o=insert, u=update, d=delete
# Perform merge with delete operation
# Parameter: whenMatchedDelete β delete matched rows
target_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(
condition="source.operation IN ('o', 'u')", # Insert or Update
set={
"name": "source.name",
"email": "source.email",
"status": "source.status",
"last_updated": "source.timestamp"
}
).whenMatchedDelete(
condition="source.operation = 'd'" # Delete operation
).whenNotMatchedInsert(
condition="source.operation IN ('o', 'u')", # Only insert for new records
values={
"id": "source.id",
"name": "source.name",
"email": "source.email",
"status": "source.status",
"last_updated": "source.timestamp",
"created_at": "source.timestamp"
}
).execute()
print("CDC merge operation completed successfully")
Merge with Partition Pruning
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("PartitionPruningMerge") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read target Delta table (partitioned by date)
target_table = DeltaTable.forPath(spark, "/path/to/partitioned/table")
# Read source data for specific date
source_df = spark.read \
.format("json") \
.load("/path/to/source/data") \
.filter(col("event_date") == "2024-01-15") # Filter to specific partition
# Perform merge with partition pruning
# Include partition key in merge condition for partition pruning
target_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id AND target.event_date = source.event_date" # Include partition key
).whenMatchedUpdate(
set={
"value": "source.value",
"updated_at": current_timestamp()
}
).whenNotMatchedInsert(
values={
"id": "source.id",
"event_date": "source.event_date",
"value": "source.value",
"created_at": current_timestamp(),
"updated_at": current_timestamp()
}
).execute()
print("Partition-pruned merge completed successfully")
Merge with Aggregation
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MergeWithAggregation") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read target Delta table
target_table = DeltaTable.forPath(spark, "/path/to/target/table")
# Read source data and aggregate
source_df = spark.read \
.format("json") \
.load("/path/to/source/data") \
.groupBy("user_id", "event_date") \
.agg(
sum("amount").alias("total_amount"),
count("*").alias("event_count"),
max("timestamp").alias("last_event_time")
)
# Perform merge with aggregated data
target_table.alias("target").merge(
source_df.alias("source"),
"target.user_id = source.user_id AND target.event_date = source.event_date"
).whenMatchedUpdate(
set={
"total_amount": "target.total_amount + source.total_amount",
"event_count": "target.event_count + source.event_count",
"last_event_time": "GREATEST(target.last_event_time, source.last_event_time)",
"updated_at": current_timestamp()
}
).whenNotMatchedInsert(
values={
"user_id": "source.user_id",
"event_date": "source.event_date",
"total_amount": "source.total_amount",
"event_count": "source.event_count",
"last_event_time": "source.last_event_time",
"created_at": current_timestamp(),
"updated_at": current_timestamp()
}
).execute()
print("Merge with aggregation completed successfully")
π Performance Metrics
| Metric | Optimized | Typical | Poor | Optimization |
|---|---|---|---|---|
| Merge Duration | < 30s | 30s-5min | > 5min | Partition pruning, file optimization |
| Records Processed/sec | > 100K | 10K-100K | < 10K | Parallelism, join optimization |
| File Compaction | < 100 files | 100-1000 files | > 1000 files | Regular compaction, optimize command |
| Conflict Rate | < 1% | 1-5% | > 5% | Concurrency control, partitioning |
| Storage Efficiency | > 80% | 60-80% | < 60% | Compaction, z-ordering |
π Best Practices
- Partition strategically β Partition by high-cardinality columns used in join conditions
- Use partition pruning β Always include partition keys in merge conditions
- Optimize file sizes β Aim for 128MB-1GB files for optimal performance
- Regular compaction β Run OPTIMIZE command to merge small files
- Z-order indexing β Create z-order indexes on frequently filtered columns
- Monitor conflicts β Track and minimize merge conflicts
- Handle schema evolution β Use mergeSchema option for schema changes
- Test with realistic data β Validate performance with production-like volumes
- Use checkpointing β For streaming merges, configure appropriate checkpoint intervals
- Backup before major operations β Always backup data before large merge operations
π Related Topics
- 11-structured-streaming.mdx: Streaming architecture for real-time merges
- 12-state-management.mdx: Stateful operations for merge patterns
- 13-window-operations.mdx: Windowed aggregations before merge
- 16-schema-evolution.mdx: Schema changes during merge operations
See Also
- 06-joins-optimization.mdx: Join strategies for merge operations
- 15-data-quality.mdx: Data quality during merge operations
- Kafka Streams (kafka/03): Stream-table duality for upsert semantics
- Data Engineering Streaming (data-engineering/022): CDC merge patterns in streaming pipelines