Open Table Formats: ACID for Data Lakes
Open table formats like Delta Lake and Apache Iceberg bring data warehouse capabilities to data lakes.
Why Open Table Formats Matter
Problems with Traditional Data Lakes:
- Partial writes corrupting data
- No transaction isolation
- Difficulty updating/deleting rows
- No time travel
How Open Table Formats Solve These Problems:
- ACID transactions β reliable writes and updates
- Time travel β query data at any point in time
- Schema evolution β add/modify columns without breaking reads
- Efficient data management β compaction, partition evolution
Key Insight: Open table formats add a metadata layer on top of Parquet files that enables ACID transactions, time travel, schema evolution, and efficient data management while maintaining open file formats.
Architecture Overview
Lakehouse Table Format Architecture
Delta vs Iceberg Comparison
Delta Lake
Delta Lake is an open-source storage layer that brings ACID transactions, schema enforcement, and time travel to Apache Spark and other data engines. It stores data as Parquet files with a transaction log (_delta_log/) that tracks all changes.
Delta Lake Transaction Log
- Transaction Log: _delta_log/00000000000000000000.json
- Checkpoint: Every 10 commits, a Parquet checkpoint is created
- Snapshot: Point-in-time view = sum of all commits up to version V
- Read Version: V_read = latest committed version at query time
- Write Conflict: Two writers -> one succeeds, one retries with conflict resolution
- Storage Overhead: ~1-5% of data volume for metadata
# Delta Lake: Create and write table
from delta import DeltaTable, SparkSessionBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("DeltaLakeExamples") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create Delta table
data = spark.range(0, 1000).withColumn("value", col("id") * 10)
data.write.format("delta").save("/delta/events")
# ACID Transaction: MERGE (Upsert)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/events")
# Upsert new data
new_data = spark.range(500, 1500).withColumn("value", col("id") * 20)
delta_table.alias("target").merge(
new_data.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(
set={"value": "source.value"}
).whenNotMatchedInsert(
values={"id": "source.id", "value": "source.value"}
).execute()
# Time Travel: Read previous version
previous_df = spark.read.format("delta").option("versionAsOf", 5).load("/delta/events")
current_df = spark.read.format("delta").load("/delta/events")
# Compare versions
diff = current_df.exceptAll(previous_df)
print(f"Rows added since version 5: {diff.count()}")
# Schema Evolution: Add new column
spark.sql("""
ALTER TABLE delta.`/delta/events`
ADD COLUMNS (event_type STRING, user_id BIGINT)
""")
# Schema Evolution: Rename column
spark.sql("""
ALTER TABLE delta.`/delta/events`
RENAME COLUMN value TO event_value
""")
# VACUUM: Clean up old files
delta_table.vacuum(retentionHours=168) # 7 days
# Optimize: Compact small files
delta_table.optimize().executeCompaction()
# Z-Order: Optimize for specific columns
delta_table.optimize().executeZOrderBy("user_id", "event_type")
Apache Iceberg
Apache Iceberg is an open table format designed for large analytic datasets. It uses a metadata layer with manifest files that track data files, enabling efficient operations like partition evolution, hidden partitioning, and concurrent writes.
Iceberg Metadata Structure
- Metadata File: Table-level metadata (schema, partition spec, current snapshot ID)
- Snapshot: Point-in-time state of the table
- Manifest List: List of manifest files for a snapshot
- Manifest File: Lists data files with partition values and column-level stats
- Data Files: Parquet/ORC/Avro files containing actual data
- Partition Spec: Defines how data is partitioned (evolvable without rewrite)
- Hidden Partitioning: Partition columns are computed, not explicit in data
# Iceberg: Create catalog and table
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergExamples") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "s3://iceberg-warehouse/") \
.getOrCreate()
# Create Iceberg table with partitioning
spark.sql("""
CREATE TABLE iceberg.analytics.events (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_time TIMESTAMP,
properties STRING
)
USING iceberg
PARTITIONED BY (days(event_time), event_type)
LOCATION 's3://iceberg-warehouse/events/'
TBLPROPERTIES (
'format-version' = '2',
'write.parquet.compression-codec' = 'snappy',
'write.target-file-size-bytes' = '134217728',
'commit.manifest-merge.enabled' = 'true'
)
""")
# Insert data
spark.sql("""
INSERT INTO iceberg.analytics.events
SELECT
id AS event_id,
CASE WHEN id % 3 = 0 THEN 'click'
WHEN id % 3 = 1 THEN 'view'
ELSE 'purchase' END AS event_type,
(id % 100) AS user_id,
timestamp '2025-01-15 10:00:00' + interval (id % 86400) seconds AS event_time,
CAST(id AS STRING) AS properties
FROM range(10000)
""")
# Time Travel
spark.sql("""
SELECT * FROM iceberg.analytics.events
TIMESTAMP AS OF '2025-01-15 12:00:00'
""")
spark.sql("""
SELECT * FROM iceberg.analytics.events
VERSION AS OF 3
""")
# Partition Evolution (no data rewrite)
spark.sql("""
ALTER TABLE iceberg.analytics.events
ADD PARTITION FIELD event_type
""")
# Existing data remains in old partition spec
# New writes use the new spec
# Schema Evolution
spark.sql("""
ALTER TABLE iceberg.analytics.events
ADD COLUMNS (session_id BIGINT, page_url STRING)
""")
# Row-level delete
spark.sql("""
DELETE FROM iceberg.analytics.events
WHERE event_time < '2025-01-01'
""")
# Expire old snapshots
spark.sql("""
CALL iceberg.system.expire_snapshots(
table => 'analytics.events',
older_than => TIMESTAMP '2025-01-01 00:00:00',
retain_last => 10
)
""")
Delta Lake vs. Iceberg Comparison
| Feature | Delta Lake | Apache Iceberg |
|---|---|---|
| Transaction Log | JSON files in _delta_log/ | Avro manifest files |
| ACID Transactions | Yes (MERGE, UPDATE, DELETE) | Yes (MERGE, UPDATE, DELETE) |
| Time Travel | Version-based | Version + Timestamp |
| Schema Evolution | Add/rename columns | Add/rename/reorder columns |
| Partition Evolution | Not supported | Supported (no rewrite) |
| Hidden Partitioning | Not native | Supported |
| Concurrent Writes | Conflict resolution | Conflict resolution |
| File Compaction | OPTIMIZE command | Rewrite manifests |
| Open Standard | Yes (Linux Foundation) | Yes (Apache Foundation) |
| Primary Engine | Apache Spark | Spark, Trino, Flink |
| Cloud Storage | S3, GCS, ADLS | S3, GCS, ADLS |
| Metadata Format | JSON + Parquet | Avro + Parquet |
When to Choose Delta Lake vs. Iceberg
Choosing between Delta Lake and Iceberg depends on: (1) compute engine ecosystem, (2) need for partition evolution, (3) cloud provider preference, and (4) existing infrastructure investments.
| Decision Factor | Choose Delta Lake | Choose Iceberg |
|---|---|---|
| Primary Engine | Spark-centric workloads | Multi-engine (Spark, Trino, Flink) |
| Partition Evolution | Not needed | Frequently changes partition strategy |
| Cloud Provider | Databricks ecosystem | AWS/GCP multi-service |
| Schema Complexity | Simple, stable schemas | Complex, evolving schemas |
| Time Travel | Version-based sufficient | Timestamp-based required |
| Concurrency | Single-writer dominant | Multi-writer patterns |
| Community | Databricks-led | Apache Foundation |
| Maturity | Production-proven | Rapidly maturing |
# Decision matrix implementation
class FormatSelector:
"""Select optimal table format based on requirements."""
def __init__(self):
self.scores = {"delta": 0, "iceberg": 0}
def evaluate(self, requirements: dict) -> str:
"""Score each format against requirements."""
if requirements.get("multi_engine"):
self.scores["iceberg"] += 3
if requirements.get("partition_evolution"):
self.scores["iceberg"] += 2
if requirements.get("spark_only"):
self.scores["delta"] += 2
if requirements.get("databricks"):
self.scores["delta"] += 3
if requirements.get("timestamp_travel"):
self.scores["iceberg"] += 2
if requirements.get("frequent_schema_changes"):
self.scores["iceberg"] += 1
return "iceberg" if self.scores["iceberg"] > self.scores["delta"] else "delta"
selector = FormatSelector()
recommendation = selector.evaluate({
"multi_engine": True,
"partition_evolution": True,
"spark_only": False,
"databricks": False,
"timestamp_travel": True,
"frequent_schema_changes": True
})
print(f"Recommended format: {recommendation}")
Key Concepts Summary
| Concept | Delta Lake | Iceberg | Description |
|---|---|---|---|
| ACID | MERGE INTO | MERGE INTO | Atomic transactions |
| Time Travel | versionAsOf | VERSION AS OF | Historical queries |
| Schema Evolution | ALTER TABLE | ALTER TABLE | Schema changes |
| Partition Evolution | Not supported | ALTER TABLE ADD PARTITION FIELD | Partition changes |
| File Compaction | OPTIMIZE | rewrite_data_files | Small file fix |
| Snapshot Management | VACUUM | expire_snapshots | Cleanup |
| Statistics | Data skipping | Manifest-level stats | Query optimization |
| Concurrent Writes | Conflict resolution | Conflict resolution | Multi-writer safety |
| Open Format | Parquet + JSON log | Parquet + Avro metadata | Vendor neutrality |
| Caching | Delta Cache | Not native | Performance |
Performance Metrics
| Metric | Raw Parquet | Delta Lake | Iceberg | Hive Tables |
|---|---|---|---|---|
| ACID Support | No | Yes | Yes | No |
| Time Travel | No | Yes | Yes | Limited |
| Write Latency | Fast | Fast | Fast | Medium |
| Read Latency | Fast | Fast | Fast | Fast |
| Metadata Overhead | None | Low | Low | Medium |
| Schema Evolution | Manual | Easy | Easy | Hard |
| Small File Problem | Yes | Yes (OPTIMIZE) | Yes (compaction) | Yes |
| Concurrent Safety | No | Yes | Yes | No |
| Storage Format | Open | Open | Open | Open |
| Ecosystem Support | Universal | Spark-centric | Multi-engine | Legacy |
10 Best Practices
- Use OPTIMIZE regularly on Delta Lake tables to compact small files (target: 128 MB per file)
- Run VACUUM on Delta Lake after compaction to remove old files (default: 7-day retention)
- Use Iceberg for multi-engine environments β better support for Spark, Trino, and Flink
- Enable Z-Ordering on frequently filtered columns for data skipping
- Partition by date and entity β partition pruning is the biggest performance lever
- Set write.target-file-size-bytes to 128-256 MB for optimal file sizes
- Expire old snapshots regularly to prevent metadata bloat
- Use schema evolution carefully β test backward compatibility with consumers
- Implement idempotent writes β use MERGE with unique keys to prevent duplicates
- Monitor metadata size β large transaction logs slow down table reads
- Open table formats bring ACID transactions, time travel, and schema evolution to data lakes
- Delta Lake is Spark-centric; Iceberg supports multiple engines (Spark, Trino, Flink)
- File compaction (OPTIMIZE/rewrite_data_files) is essential to prevent small file problems
- Time travel enables historical analysis and data recovery without backups
- Iceberg's partition evolution enables schema changes without data rewrites
See Also
- Data Lake Architecture β Medallion architecture and schema-on-read patterns
- Data Lakehouse β Unified lake + warehouse with Databricks
- Partitioning & Indexing β Z-Ordering and bloom filter optimization
- Redshift Spectrum β Querying S3 data directly from Redshift
- Real-Time Analytics β Streaming ingestion into lakehouse tables
- MLOps for Data Engineering β Feature stores backed by Delta Lake