16. Schema Evolution in PySpark
DfSchema Evolution
Schema evolution is the process of modifying the schema of a dataset over time while maintaining compatibility with existing data. It allows adding, removing, or renaming columns without breaking existing queries or pipelines.
DfSchema Inference
Schema inference is the process of automatically determining the structure of data by examining the data itself or metadata. Spark can infer schemas from JSON, CSV, Parquet, and other formats.
Here,
- =Schema compatibility percentage
- =Number of backward-compatible changes
- =Total number of schema changes
Schema Evolution Overhead
Here,
- =Total overhead of schema evolution operation
- =Overhead from reading with new schema
- =Overhead from data transformation
- =Overhead from writing with new schema
Schema evolution is particularly important in data lakes where datasets grow over time and requirements change. Delta Lake provides built-in support for schema evolution with ACID guarantees.
Use Delta Lake's mergeSchema option for safe schema evolution. Always test schema changes in staging environments before applying to production data.
ThSchema Compatibility Levels
Theorem: Schema compatibility exists at three levels: (1) Backward compatible β new schema can read old data; (2) Forward compatible β old schema can read new data; (3) Full compatible β both directions work. Delta Lake's mergeSchema provides backward compatibility by default.
- Schema evolution enables safe modification of data structures over time
- Delta Lake provides built-in schema evolution with ACID guarantees
- Three compatibility levels: backward, forward, and full
- Schema inference automates structure discovery from data
- Always test schema changes in staging before production
Schema Compatibility Levels
ποΈ Schema Evolution Architecture
π Detailed Explanation
What is Schema Evolution?
Schema evolution lets you modify data structure over time without breaking existing queries or pipelines.
- Why it matters β Business requirements change constantly; your data schema must adapt
- Core challenge β Keep old queries working while supporting new data shapes
- PySpark support β Delta Lake provides ACID transactions + version control for schema changes
Compatibility: The Golden Rule
When schema changes, two things must hold:
| Compatibility Type | Meaning | Example |
|---|---|---|
| Backward | New schema reads old data | Adding a column with default value |
| Forward | Old schema reads new data | Dropping a column (old queries ignore it) |
| Full | Both directions work | Renaming with alias mapping |
Delta Lake's
mergeSchemagives you backward compatibility by default.
Two Evolution Strategies
1. MergeSchema (Safe β Recommended)
- Adds new columns from write DataFrame to existing schema
- Fills new columns with
nullfor old records - Never breaks existing queries β safe for production
- Minimal overhead β only adds columns, doesn't rewrite data
2. OverwriteSchema (Aggressive β Use with Caution)
- Completely replaces the existing schema
- Breaks backward compatibility β old queries will fail
- Requires write mode =
"overwrite" - Use only in development or migration scenarios
Schema Inference
Spark can auto-detect schema from your data:
- JSON β infers nested structures automatically
- CSV β reads header row or samples data
- Parquet β reads schema from file metadata
Inference enables dynamic evolution β schema adapts to incoming data without explicit definition.
Type Compatibility Rules
Not all type changes are safe:
| Change | Safe? | Risk |
|---|---|---|
string β string | β Yes | None |
int β long | β Yes | Widening, no loss |
string β int | β οΈ Risky | Data loss if non-numeric |
int β string | β Yes | Safe widening |
double β int | β No | Precision loss |
Delta Lake enforces these rules to prevent silent data corruption.
Schema Registry
For enterprise systems, a centralized registry manages schema versions:
- Delta Lake β built-in schema history (via time travel)
- Confluent Schema Registry β external, supports Kafka streaming
- Features β version tracking, compatibility enforcement, rollback
Performance Impact
| Strategy | Overhead | When to Use |
|---|---|---|
| MergeSchema | Low | Production β adds columns only |
| OverwriteSchema | High | Migration β rewrites all data |
| Inference | Medium | Initial load β scans sample data |
Testing Best Practices
- Staging first β test schema changes before production
- Validate queries β ensure existing queries still work
- Check performance β measure read/write times after change
- CI/CD integration β automate schema compatibility testing
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Schema Evolution | Modifying data structure over time | Adaptive data systems |
| Schema Inference | Automatic structure discovery | Dynamic schemas |
| MergeSchema | Safe additive schema changes | Production systems |
| OverwriteSchema | Complete schema replacement | Migration, development |
| Schema Registry | Centralized schema management | Enterprise systems |
| Type Compatibility | Ensuring type changes are safe | Data integrity |
| Backward Compatibility | New schema reads old data | Query stability |
| Forward Compatibility | Old schema reads new data | Application stability |
π» Code Examples
Basic Schema Evolution with MergeSchema
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SchemaEvolution") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create initial DataFrame with schema v1
# Parameter: StructType β defines the schema structure
schema_v1 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True)
])
data_v1 = [
(1, "Alice", "alice@example.com"),
(2, "Bob", "bob@example.com"),
(3, "Charlie", "charlie@example.com")
]
df_v1 = spark.createDataFrame(data_v1, schema_v1)
# Write initial data
df_v1.write \
.format("delta") \
.mode("overwrite") \
.save("/path/to/table")
# Read the table to verify initial schema
table = DeltaTable.forPath(spark, "/path/to/table")
print("Initial Schema:")
print(table.toDF().printSchema())
# Create DataFrame with new schema (adding phone column)
# Parameter: mergeSchema β adds new columns to existing schema
schema_v2 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("phone", StringType(), True) # New column
])
data_v2 = [
(4, "Diana", "diana@example.com", "+1-555-0101"),
(5, "Eve", "eve@example.com", "+1-555-0102")
]
df_v2 = spark.createDataFrame(data_v2, schema_v2)
# Write with schema evolution
# Parameter: mergeSchema β enables schema evolution
df_v2.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/table")
# Verify evolved schema
table = DeltaTable.forPath(spark, "/path/to/table")
print("Evolved Schema:")
print(table.toDF().printSchema())
# Show that old data has null for new column
print("Data with evolved schema:")
table.toDF().show()
Schema Evolution with Type Changes
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SchemaTypeEvolution") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create initial DataFrame
schema_v1 = StructType([
StructField("id", IntegerType(), False),
StructField("amount", StringType(), True), # String type initially
StructField("date", StringType(), True) # String type initially
])
data_v1 = [
(1, "100.50", "2024-01-15"),
(2, "200.75", "2024-01-16"),
(3, "300.00", "2024-01-17")
]
df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write \
.format("delta") \
.mode("overwrite") \
.save("/path/to/type_evolution_table")
# Create DataFrame with proper types
# Parameter: Cast columns to proper types
schema_v2 = StructType([
StructField("id", IntegerType(), False),
StructField("amount", DoubleType(), True), # Changed to Double
StructField("date", DateType(), True) # Changed to Date
])
data_v2 = [
(4, 400.25, "2024-01-18"),
(5, 500.50, "2024-01-19")
]
df_v2 = spark.createDataFrame(data_v2, schema_v2)
# Write with type evolution (may require data transformation)
# Note: Direct type change may not always be supported
# Better approach: Transform data before writing
df_v2_transformed = df_v2 \
.withColumn("amount", col("amount").cast(DoubleType())) \
.withColumn("date", col("date").cast(DateType()))
df_v2_transformed.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/type_evolution_table")
# Verify schema
table = DeltaTable.forPath(spark, "/path/to/type_evolution_table")
print("Schema after type evolution:")
table.toDF().printSchema()
Schema Evolution with Column Renaming
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SchemaRenameEvolution") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create initial DataFrame
schema_v1 = StructType([
StructField("id", IntegerType(), False),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("email_address", StringType(), True)
])
data_v1 = [
(1, "Alice", "Smith", "alice@example.com"),
(2, "Bob", "Johnson", "bob@example.com")
]
df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write \
.format("delta") \
.mode("overwrite") \
.save("/path/to/rename_table")
# Create DataFrame with renamed columns
# Parameter: withColumnRenamed β renames a column
schema_v2 = StructType([
StructField("id", IntegerType(), False),
StructField("full_name", StringType(), True), # Renamed from first_name + last_name
StructField("email", StringType(), True) # Renamed from email_address
])
data_v2 = [
(3, "Charlie Brown", "charlie@example.com"),
(4, "Diana Prince", "diana@example.com")
]
df_v2 = spark.createDataFrame(data_v2, schema_v2)
# Write with column renaming
# Note: Direct column renaming is not supported by mergeSchema
# Alternative: Add new columns and deprecate old ones
df_v2_with_old_columns = df_v2 \
.withColumn("first_name", split(col("full_name"), " ")[0]) \
.withColumn("last_name", split(col("full_name"), " ")[1]) \
.withColumn("email_address", col("email"))
df_v2_with_old_columns.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/rename_table")
# Verify schema
table = DeltaTable.forPath(spark, "/path/to/rename_table")
print("Schema after column renaming:")
table.toDF().printSchema()
Schema Evolution with Schema Registry
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
import json
spark = SparkSession.builder \
.appName("SchemaRegistryEvolution") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Define schema versions
schema_v1 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True)
])
schema_v2 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("phone", StringType(), True)
])
# Simple schema registry implementation
class SchemaRegistry:
def __init__(self):
self.schemas = {}
self.current_version = 0
def register_schema(self, schema, description=""):
"""Register a new schema version"""
self.current_version += 1
schema_info = {
"version": self.current_version,
"schema": schema.json(),
"description": description,
"timestamp": "2024-01-15T10:00:00Z"
}
self.schemas[self.current_version] = schema_info
return self.current_version
def get_schema(self, version):
"""Get schema by version"""
if version in self.schemas:
return StructType.fromJson(json.loads(self.schemas[version]["schema"]))
return None
def get_latest_schema(self):
"""Get latest schema version"""
return self.get_schema(self.current_version)
def validate_compatibility(self, old_schema, new_schema):
"""Validate schema compatibility"""
# Simple compatibility check
old_fields = {field.name for field in old_schema.fields}
new_fields = {field.name for field in new_schema.fields}
# Check if all old fields exist in new schema
missing_fields = old_fields - new_fields
if missing_fields:
return False, f"Missing fields: {missing_fields}"
return True, "Compatible"
# Use schema registry
registry = SchemaRegistry()
# Register schemas
registry.register_schema(schema_v1, "Initial schema")
registry.register_schema(schema_v2, "Added phone field")
# Validate compatibility
old_schema = registry.get_schema(1)
new_schema = registry.get_schema(2)
is_compatible, message = registry.validate_compatibility(old_schema, new_schema)
print(f"Schema compatibility: {is_compatible} - {message}")
# Write data with schema registry validation
df = spark.createDataFrame([
(1, "Alice", "alice@example.com", "+1-555-0101")
], new_schema)
df.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.save("/path/to/registry_table")
π Performance Metrics
| Metric | Optimal | Typical | Poor | Optimization |
|---|---|---|---|---|
| Evolution Speed | < 10s | 10s-5min | > 5min | Incremental evolution |
| Compatibility | 100% | 90-100% | < 90% | Compatibility testing |
| Storage Overhead | < 5% | 5-20% | > 20% | Schema compression |
| Query Impact | < 1% | 1-10% | > 10% | Schema optimization |
| Rollback Time | < 1min | 1-10min | > 10min | Version management |
π Best Practices
- Use additive evolution β Only add new columns, avoid removing or renaming
- Test in staging β Always validate schema changes in non-production environments
- Maintain backward compatibility β Ensure new schemas can read old data
- Use schema registries β Centralize schema management for complex systems
- Document schema changes β Maintain clear documentation of evolution history
- Monitor schema drift β Track unintended schema changes over time
- Validate before writing β Check schema compatibility before data writes
- Plan for rollback β Maintain ability to revert schema changes
- Automate testing β Include schema compatibility in CI/CD pipelines
- Communicate changes β Notify stakeholders of schema evolution
π Related Topics
- 07-partitioning-strategies.mdx: Partitioning strategies during schema evolution
- 14-merge-upsert.mdx: Schema changes during merge operations
- 15-data-quality.mdx: Schema quality and validation
See Also
- 06-joins-optimization.mdx: Join compatibility with schema evolution
- 08-caching-persistence.mdx: Caching strategies for evolved schemas
- Kafka Streams (kafka/03): Schema evolution in streaming systems
- Data Engineering Streaming (data-engineering/022): Schema management in data pipelines