πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

16. Schema Evolution in PySpark

🟒 Free Lesson

Advertisement

16. Schema Evolution in PySpark

DfSchema Evolution

Schema evolution is the process of modifying the schema of a dataset over time while maintaining compatibility with existing data. It allows adding, removing, or renaming columns without breaking existing queries or pipelines.

DfSchema Inference

Schema inference is the process of automatically determining the structure of data by examining the data itself or metadata. Spark can infer schemas from JSON, CSV, Parquet, and other formats.

Schema Compatibility Score
Cschema=Ncompatible_changesNtotal_changesΓ—100%C_{schema} = \frac{N_{compatible\_changes}}{N_{total\_changes}} \times 100\%

Here,

  • CschemaC_{schema}=Schema compatibility percentage
  • Ncompatible_changesN_{compatible\_changes}=Number of backward-compatible changes
  • Ntotal_changesN_{total\_changes}=Total number of schema changes

Schema Evolution Overhead

Oevolution=Oread+Otransform+OwriteO_{evolution} = O_{read} + O_{transform} + O_{write}

Here,

  • OevolutionO_{evolution}=Total overhead of schema evolution operation
  • OreadO_{read}=Overhead from reading with new schema
  • OtransformO_{transform}=Overhead from data transformation
  • OwriteO_{write}=Overhead from writing with new schema

Schema evolution is particularly important in data lakes where datasets grow over time and requirements change. Delta Lake provides built-in support for schema evolution with ACID guarantees.

Use Delta Lake's mergeSchema option for safe schema evolution. Always test schema changes in staging environments before applying to production data.

ThSchema Compatibility Levels

Theorem: Schema compatibility exists at three levels: (1) Backward compatible β€” new schema can read old data; (2) Forward compatible β€” old schema can read new data; (3) Full compatible β€” both directions work. Delta Lake's mergeSchema provides backward compatibility by default.

  • Schema evolution enables safe modification of data structures over time
  • Delta Lake provides built-in schema evolution with ACID guarantees
  • Three compatibility levels: backward, forward, and full
  • Schema inference automates structure discovery from data
  • Always test schema changes in staging before production

Schema Compatibility Levels

Schema Compatibility LevelsBackward CompatibleNew schema reads OLD dataAdd columns with defaultsDelta mergeSchema defaultForward CompatibleOld schema reads NEW dataIgnore unknown columnsUseful for rolling upgradesFull CompatibleBoth directions workAdd optional columns onlySafest for productionEvolution Process Flow1. Define new schema2. Test compatibility3. Apply mergeSchema4. Validate data5. Deploy with confidence

πŸ—οΈ Schema Evolution Architecture

πŸ“š Detailed Explanation

What is Schema Evolution?

Schema evolution lets you modify data structure over time without breaking existing queries or pipelines.

  • Why it matters β€” Business requirements change constantly; your data schema must adapt
  • Core challenge β€” Keep old queries working while supporting new data shapes
  • PySpark support β€” Delta Lake provides ACID transactions + version control for schema changes

Compatibility: The Golden Rule

When schema changes, two things must hold:

Compatibility TypeMeaningExample
BackwardNew schema reads old dataAdding a column with default value
ForwardOld schema reads new dataDropping a column (old queries ignore it)
FullBoth directions workRenaming with alias mapping

Delta Lake's mergeSchema gives you backward compatibility by default.


Two Evolution Strategies

1. MergeSchema (Safe β€” Recommended)

  • Adds new columns from write DataFrame to existing schema
  • Fills new columns with null for old records
  • Never breaks existing queries β€” safe for production
  • Minimal overhead β€” only adds columns, doesn't rewrite data

2. OverwriteSchema (Aggressive β€” Use with Caution)

  • Completely replaces the existing schema
  • Breaks backward compatibility β€” old queries will fail
  • Requires write mode = "overwrite"
  • Use only in development or migration scenarios

Schema Inference

Spark can auto-detect schema from your data:

  • JSON β€” infers nested structures automatically
  • CSV β€” reads header row or samples data
  • Parquet β€” reads schema from file metadata

Inference enables dynamic evolution β€” schema adapts to incoming data without explicit definition.


Type Compatibility Rules

Not all type changes are safe:

ChangeSafe?Risk
string β†’ stringβœ… YesNone
int β†’ longβœ… YesWidening, no loss
string β†’ int⚠️ RiskyData loss if non-numeric
int β†’ stringβœ… YesSafe widening
double β†’ int❌ NoPrecision loss

Delta Lake enforces these rules to prevent silent data corruption.


Schema Registry

For enterprise systems, a centralized registry manages schema versions:

  • Delta Lake β€” built-in schema history (via time travel)
  • Confluent Schema Registry β€” external, supports Kafka streaming
  • Features β€” version tracking, compatibility enforcement, rollback

Performance Impact

StrategyOverheadWhen to Use
MergeSchemaLowProduction β€” adds columns only
OverwriteSchemaHighMigration β€” rewrites all data
InferenceMediumInitial load β€” scans sample data

Testing Best Practices

  1. Staging first β€” test schema changes before production
  2. Validate queries β€” ensure existing queries still work
  3. Check performance β€” measure read/write times after change
  4. CI/CD integration β€” automate schema compatibility testing

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Schema EvolutionModifying data structure over timeAdaptive data systems
Schema InferenceAutomatic structure discoveryDynamic schemas
MergeSchemaSafe additive schema changesProduction systems
OverwriteSchemaComplete schema replacementMigration, development
Schema RegistryCentralized schema managementEnterprise systems
Type CompatibilityEnsuring type changes are safeData integrity
Backward CompatibilityNew schema reads old dataQuery stability
Forward CompatibilityOld schema reads new dataApplication stability

πŸ’» Code Examples

Basic Schema Evolution with MergeSchema

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SchemaEvolution") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create initial DataFrame with schema v1
# Parameter: StructType β€” defines the schema structure
schema_v1 = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True)
])

data_v1 = [
    (1, "Alice", "alice@example.com"),
    (2, "Bob", "bob@example.com"),
    (3, "Charlie", "charlie@example.com")
]

df_v1 = spark.createDataFrame(data_v1, schema_v1)

# Write initial data
df_v1.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/path/to/table")

# Read the table to verify initial schema
table = DeltaTable.forPath(spark, "/path/to/table")
print("Initial Schema:")
print(table.toDF().printSchema())

# Create DataFrame with new schema (adding phone column)
# Parameter: mergeSchema β€” adds new columns to existing schema
schema_v2 = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True)  # New column
])

data_v2 = [
    (4, "Diana", "diana@example.com", "+1-555-0101"),
    (5, "Eve", "eve@example.com", "+1-555-0102")
]

df_v2 = spark.createDataFrame(data_v2, schema_v2)

# Write with schema evolution
# Parameter: mergeSchema β€” enables schema evolution
df_v2.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/table")

# Verify evolved schema
table = DeltaTable.forPath(spark, "/path/to/table")
print("Evolved Schema:")
print(table.toDF().printSchema())

# Show that old data has null for new column
print("Data with evolved schema:")
table.toDF().show()

Schema Evolution with Type Changes

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SchemaTypeEvolution") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create initial DataFrame
schema_v1 = StructType([
    StructField("id", IntegerType(), False),
    StructField("amount", StringType(), True),  # String type initially
    StructField("date", StringType(), True)  # String type initially
])

data_v1 = [
    (1, "100.50", "2024-01-15"),
    (2, "200.75", "2024-01-16"),
    (3, "300.00", "2024-01-17")
]

df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/path/to/type_evolution_table")

# Create DataFrame with proper types
# Parameter: Cast columns to proper types
schema_v2 = StructType([
    StructField("id", IntegerType(), False),
    StructField("amount", DoubleType(), True),  # Changed to Double
    StructField("date", DateType(), True)  # Changed to Date
])

data_v2 = [
    (4, 400.25, "2024-01-18"),
    (5, 500.50, "2024-01-19")
]

df_v2 = spark.createDataFrame(data_v2, schema_v2)

# Write with type evolution (may require data transformation)
# Note: Direct type change may not always be supported
# Better approach: Transform data before writing
df_v2_transformed = df_v2 \
    .withColumn("amount", col("amount").cast(DoubleType())) \
    .withColumn("date", col("date").cast(DateType()))

df_v2_transformed.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/type_evolution_table")

# Verify schema
table = DeltaTable.forPath(spark, "/path/to/type_evolution_table")
print("Schema after type evolution:")
table.toDF().printSchema()

Schema Evolution with Column Renaming

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SchemaRenameEvolution") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create initial DataFrame
schema_v1 = StructType([
    StructField("id", IntegerType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email_address", StringType(), True)
])

data_v1 = [
    (1, "Alice", "Smith", "alice@example.com"),
    (2, "Bob", "Johnson", "bob@example.com")
]

df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/path/to/rename_table")

# Create DataFrame with renamed columns
# Parameter: withColumnRenamed β€” renames a column
schema_v2 = StructType([
    StructField("id", IntegerType(), False),
    StructField("full_name", StringType(), True),  # Renamed from first_name + last_name
    StructField("email", StringType(), True)  # Renamed from email_address
])

data_v2 = [
    (3, "Charlie Brown", "charlie@example.com"),
    (4, "Diana Prince", "diana@example.com")
]

df_v2 = spark.createDataFrame(data_v2, schema_v2)

# Write with column renaming
# Note: Direct column renaming is not supported by mergeSchema
# Alternative: Add new columns and deprecate old ones
df_v2_with_old_columns = df_v2 \
    .withColumn("first_name", split(col("full_name"), " ")[0]) \
    .withColumn("last_name", split(col("full_name"), " ")[1]) \
    .withColumn("email_address", col("email"))

df_v2_with_old_columns.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/rename_table")

# Verify schema
table = DeltaTable.forPath(spark, "/path/to/rename_table")
print("Schema after column renaming:")
table.toDF().printSchema()

Schema Evolution with Schema Registry

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
import json

spark = SparkSession.builder \
    .appName("SchemaRegistryEvolution") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define schema versions
schema_v1 = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True)
])

schema_v2 = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True)
])

# Simple schema registry implementation
class SchemaRegistry:
    def __init__(self):
        self.schemas = {}
        self.current_version = 0
    
    def register_schema(self, schema, description=""):
        """Register a new schema version"""
        self.current_version += 1
        schema_info = {
            "version": self.current_version,
            "schema": schema.json(),
            "description": description,
            "timestamp": "2024-01-15T10:00:00Z"
        }
        self.schemas[self.current_version] = schema_info
        return self.current_version
    
    def get_schema(self, version):
        """Get schema by version"""
        if version in self.schemas:
            return StructType.fromJson(json.loads(self.schemas[version]["schema"]))
        return None
    
    def get_latest_schema(self):
        """Get latest schema version"""
        return self.get_schema(self.current_version)
    
    def validate_compatibility(self, old_schema, new_schema):
        """Validate schema compatibility"""
        # Simple compatibility check
        old_fields = {field.name for field in old_schema.fields}
        new_fields = {field.name for field in new_schema.fields}
        
        # Check if all old fields exist in new schema
        missing_fields = old_fields - new_fields
        if missing_fields:
            return False, f"Missing fields: {missing_fields}"
        
        return True, "Compatible"

# Use schema registry
registry = SchemaRegistry()

# Register schemas
registry.register_schema(schema_v1, "Initial schema")
registry.register_schema(schema_v2, "Added phone field")

# Validate compatibility
old_schema = registry.get_schema(1)
new_schema = registry.get_schema(2)

is_compatible, message = registry.validate_compatibility(old_schema, new_schema)
print(f"Schema compatibility: {is_compatible} - {message}")

# Write data with schema registry validation
df = spark.createDataFrame([
    (1, "Alice", "alice@example.com", "+1-555-0101")
], new_schema)

df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/path/to/registry_table")

πŸ“ˆ Performance Metrics

MetricOptimalTypicalPoorOptimization
Evolution Speed< 10s10s-5min> 5minIncremental evolution
Compatibility100%90-100%< 90%Compatibility testing
Storage Overhead< 5%5-20%> 20%Schema compression
Query Impact< 1%1-10%> 10%Schema optimization
Rollback Time< 1min1-10min> 10minVersion management

πŸ† Best Practices

  1. Use additive evolution β€” Only add new columns, avoid removing or renaming
  2. Test in staging β€” Always validate schema changes in non-production environments
  3. Maintain backward compatibility β€” Ensure new schemas can read old data
  4. Use schema registries β€” Centralize schema management for complex systems
  5. Document schema changes β€” Maintain clear documentation of evolution history
  6. Monitor schema drift β€” Track unintended schema changes over time
  7. Validate before writing β€” Check schema compatibility before data writes
  8. Plan for rollback β€” Maintain ability to revert schema changes
  9. Automate testing β€” Include schema compatibility in CI/CD pipelines
  10. Communicate changes β€” Notify stakeholders of schema evolution

πŸ”— Related Topics

  • 07-partitioning-strategies.mdx: Partitioning strategies during schema evolution
  • 14-merge-upsert.mdx: Schema changes during merge operations
  • 15-data-quality.mdx: Schema quality and validation

See Also

  • 06-joins-optimization.mdx: Join compatibility with schema evolution
  • 08-caching-persistence.mdx: Caching strategies for evolved schemas
  • Kafka Streams (kafka/03): Schema evolution in streaming systems
  • Data Engineering Streaming (data-engineering/022): Schema management in data pipelines
⭐

Premium Content

16. Schema Evolution in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement