Delta Lake Integration
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Delta Lake Fundamentals
Delta Lake adds ACID transactions, schema enforcement, and time travel to Spark's data lake capabilities. It's built on top of Parquet files with a transaction log.
Creating Delta Tables
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder \
.appName("DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Write DataFrame as Delta table
df = spark.read.parquet("hdfs://data/sales")
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save("hdfs://delta/sales")
# Or save as managed table
df.write \
.format("delta") \
.saveAsTable("sales_table")
# Read Delta table
delta_df = spark.read.format("delta").load("hdfs://delta/sales")
βΉοΈ
Interview Insight: Delta Lake stores data as Parquet files with a JSON transaction log. The log tracks all changes, enabling ACID transactions and time travel.
ACID Transactions
# Delta provides ACID transactions for data lake operations
from delta.tables import DeltaTable
# Load existing Delta table
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
# Upsert operation (MERGE)
new_data = spark.read.parquet("hdfs://data/new-sales")
delta_table.alias("target") \
.merge(
new_data.alias("source"),
"target.sale_id = source.sale_id"
) \
.whenMatchedUpdate(set={
"amount": "source.amount",
"updated_at": "source.timestamp"
}) \
.whenNotMatchedInsert(values={
"sale_id": "source.sale_id",
"amount": "source.amount",
"created_at": "source.timestamp"
}) \
.execute()
# Atomic writes ensure data consistency
# If any part fails, the entire operation is rolled back
Time Travel
# Query historical versions of data
# By version number
df_version_5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("hdfs://delta/sales")
# By timestamp
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-06-28") \
.load("hdfs://delta/sales")
# View table history
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
history = delta_table.history()
history.show(truncate=False)
# Rollback to previous version
delta_table.restoreToVersion(5)
# Compare versions
current = spark.read.format("delta").load("hdfs://delta/sales")
previous = spark.read.format("delta").option("versionAsOf", 4).load("hdfs://delta/sales")
diff = current.join(previous, "sale_id", "left_anti")
print(f"New rows: {diff.count()}")
β οΈ
Warning: Time travel retains all historical data by default. Configure delta.logRetentionDuration and delta.deletedFileRetentionDuration to manage storage.
Schema Evolution
# Delta supports schema evolution without rewriting data
df = spark.read.parquet("hdfs://data/sales")
# Add new column
df_with_new_col = df.withColumn("tax", F.col("amount") * 0.08)
# Write with mergeSchema option
df_with_new_col.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("hdfs://delta/sales")
# Overwrite with new schema
df_new_schema = df.select("sale_id", "amount", "tax", "region")
df_new_schema.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("hdfs://delta/sales")
# Check schema evolution
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
print(delta_table.toDF().schema)
Z-Ordering for Query Optimization
# Z-ordering co-locates related data for faster queries
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
# Z-order by frequently queried columns
delta_table.optimize().executeZOrderBy("customer_id", "product_id")
# Liquid clustering (Spark 3.x+)
# More flexible than Z-ordering
df.write \
.format("delta") \
.mode("overwrite") \
.option("delta.clusteringColumns", "customer_id, product_id") \
.save("hdfs://delta/sales")
# Optimize existing table
spark.sql("""
OPTIMIZE delta.`hdfs://delta/sales`
ZORDER BY (customer_id, product_id)
""")
# Check file statistics
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
spark.sql("""
SELECT * FROM delta.`hdfs://delta/sales`.delta.`hdfs://delta/sales`.tombstones
""").show()
βΉοΈ
Pro Tip: Z-order on columns used in WHERE clauses and JOIN conditions. This enables data skipping and reduces I/O by reading only relevant files.
Data Compaction and Cleanup
# Compact small files for better performance
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
# Optimize: Compacts small files
delta_table.optimize().executeCompaction()
# Vacuum: Remove old files (after retention period)
# Default retention: 7 days
delta_table.vacuum(168) # 168 hours = 7 days
# Configure retention
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(24) # 24 hours (aggressive)
# Check file stats
spark.sql("""
SELECT
count(*) as file_count,
sum(size) as total_size_mb
FROM delta.`hdfs://delta/sales`.delta_files
""").show()
Delta Lake with Streaming
# Delta supports both batch and streaming reads/writes
# Write streaming data to Delta
events_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.load()
parsed = events_stream \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Write to Delta table
query = parsed \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "hdfs://checkpoints/events-delta") \
.start("hdfs://delta/events")
# Read Delta as stream
streaming_df = spark.readStream \
.format("delta") \
.load("hdfs://delta/events")
# Process streaming data
processed = streaming_df \
.groupBy(F.window("timestamp", "5 minutes")) \
.agg(F.count("*").alias("count"))
query = processed \
.writeStream \
.format("console") \
.outputMode("update") \
.start()
Performance Tuning
# Optimize Delta performance
spark = SparkSession.builder \
.appName("DeltaPerformance") \
.config("spark.databricks.delta.properties.defaults.autoOptimize.enabled", "true") \
.config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite.enabled", "true") \
.config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact.enabled", "true") \
.getOrCreate()
# Auto-optimize: Automatically compacts small files
df.write \
.format("delta") \
.mode("append") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.option("delta.autoOptimize.autoCompact", "true") \
.save("hdfs://delta/events")
# Manual optimization for specific tables
spark.sql("""
OPTIMIZE delta.`hdfs://delta/events`
WHERE date >= '2024-06-01'
""")
# Check optimization statistics
spark.sql("DESCRIBE EXTENDED delta.`hdfs://delta/events`").show(100)
βΉοΈ
Key Takeaway: Delta Lake brings database capabilities to data lakes. Use ACID transactions for data consistency, time travel for auditing, Z-ordering for query performance, and auto-optimize for small file management.
Follow-Up Questions
- How does Delta Lake achieve ACID transactions on object storage?
- Explain the difference between Z-ordering and liquid clustering.
- How does time travel impact storage costs and query performance?
- Describe strategies for migrating from Parquet to Delta Lake.
- How does Delta Lake handle concurrent writes?