πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Schema Design and Complex Types

PySpark AdvancedSchema Management⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 17: Schema Design β€” Structuring Data for Performance

GoogleAppleDifficulty: Hard

Interview Question

"At Google, we design schemas for petabyte-scale analytical workloads. Walk us through the trade-offs between flat and nested schemas, how structs and arrays affect query performance, and how you would implement schema evolution for a rapidly changing data source." β€” Google Data Engineer Interview

"At Apple, we process complex semi-structured data from our services. Explain how Spark handles JSON with nested structures, how to optimize queries on complex types, and when to flatten nested data vs keep it nested." β€” Apple Senior Data Engineer Interview


Schema Definition

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, ArrayType, MapType, BooleanType, TimestampType
)

spark = SparkSession.builder.appName("SchemaInterview").getOrCreate()

# Define complex schema
schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("email", StringType(), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("zipcode", StringType(), True)
    ]), True),
    StructField("phone_numbers", ArrayType(StringType()), True),
    StructField("preferences", MapType(StringType(), StringType()), True),
    StructField("is_active", BooleanType(), True),
    StructField("created_at", TimestampType(), True)
])

# Create DataFrame with schema
data = [(
    "user_001",
    "Alice",
    30,
    "alice@email.com",
    ("123 Main St", "San Francisco", "CA", "94102"),
    ["555-1234", "555-5678"],
    {"theme": "dark", "language": "en"},
    True,
    "2024-01-01"
)]

df = spark.createDataFrame(data, schema)
df.printSchema()

Complex Types

StructType (Nested Objects)

# Access nested fields
df.select(
    col("user_id"),
    col("address.city").alias("city"),
    col("address.state").alias("state")
).show()

# Create struct from flat columns
df_flat = df.select(
    "user_id",
    struct(
        col("street"),
        col("city"),
        col("state"),
        col("zipcode")
    ).alias("address")
)

# Flatten struct to columns
df.select(
    "user_id",
    col("address.*")
).show()

ArrayType (Lists)

# Access array elements
df.select(
    col("user_id"),
    col("phone_numbers")[0].alias("primary_phone"),
    size("phone_numbers").alias("phone_count"),
    array_contains("phone_numbers", "555-1234").alias("has_primary")
).show()

# Explode arrays (creates new rows)
df.select(
    col("user_id"),
    explode(col("phone_numbers")).alias("phone")
).show()

# Flatten arrays
df.select(
    col("user_id"),
    flatten(col("phone_numbers")).alias("all_phones")
)

# Array operations
from pyspark.sql.functions import array_distinct, array_union, sort_array

df.select(
    col("user_id"),
    array_distinct(col("phone_numbers")).alias("unique_phones"),
    sort_array(col("phone_numbers")).alias("sorted_phones")
).show()

MapType (Key-Value Pairs)

# Access map values
df.select(
    col("user_id"),
    col("preferences")["theme"].alias("theme"),
    col("preferences")["language"].alias("language"),
    map_keys("preferences").alias("pref_keys"),
    map_values("preferences").alias("pref_values"),
    size("preferences").alias("pref_count")
).show()

# Explode maps
df.select(
    col("user_id"),
    explode(col("preferences")).alias("key", "value")
).show()

# Create map from columns
df.select(
    col("user_id"),
    map(
        lit("theme"), col("theme"),
        lit("language"), col("language")
    ).alias("preferences")
)

Schema Evolution

Adding Columns

# Add new column to existing data
df_new = df.withColumn("new_column", lit("value"))

# Write with schema evolution
df_new.write \
    .mode("append") \
    .option("mergeSchema", "true") \
    .parquet("s3a://bucket/data/")

# Read with schema evolution
df_evolved = spark.read \
    .option("mergeSchema", "true") \
    .parquet("s3a://bucket/data/")

Renaming Columns

# Rename column (tricky with Parquet)
# Parquet uses column order, not names

# Better approach: Write with new name
df.withColumnRenamed("old_name", "new_name") \
    .write \
    .mode("overwrite") \
    .parquet("s3a://bucket/data/")

# Or use Delta Lake for schema evolution
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3a://bucket/delta/")
delta_table.withColumnRenamed("old_name", "new_name")

Changing Data Types

# Change data type (requires rewrite)
df_typed = df.withColumn("age", col("age").cast("string"))

# Write with new schema
df_typed.write \
    .mode("overwrite") \
    .parquet("s3a://bucket/data/")

# For Delta Lake
delta_table = DeltaTable.forPath(spark, "s3a://bucket/delta/")
# Delta Lake handles type changes more gracefully

Real-World Scenario: Google Nested Schema Optimization

Problem Statement

Design an optimal schema for a clickstream dataset with nested user sessions, events, and properties. Optimize for both analytics and point lookups.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("GoogleSchemaOptimization") \
    .getOrCreate()

# === DESIGN: Nested Schema ===
clickstream_schema = StructType([
    StructField("session_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("device", StructType([
        StructField("type", StringType(), False),
        StructField("os", StringType(), True),
        StructField("browser", StringType(), True),
        StructField("screen_resolution", StringType(), True)
    ]), False),
    StructField("events", ArrayType(StructType([
        StructField("event_type", StringType(), False),
        StructField("event_time", TimestampType(), False),
        StructField("page_url", StringType(), True),
        StructField("properties", MapType(StringType(), StringType()), True)
    ])), False),
    StructField("session_start", TimestampType(), False),
    StructField("session_end", TimestampType(), True),
    StructField("total_events", IntegerType(), False)
])

# Read raw clickstream data
raw_clickstream = spark.read \
    .schema(clickstream_schema) \
    .json("s3a://google-data/clickstream/")

# === QUERY: Extract insights from nested data ===

# 1. Device distribution
device_stats = raw_clickstream \
    .groupBy("device.type", "device.os") \
    .agg(
        count("*").alias("session_count"),
        avg("total_events").alias("avg_events")
    )

# 2. Event type analysis (explode events array)
event_analysis = raw_clickstream \
    .select(
        "session_id",
        "user_id",
        explode("events").alias("event")
    ) \
    .select(
        "session_id",
        "user_id",
        col("event.event_type"),
        col("event.event_time"),
        col("event.page_url"),
        col("event.properties")
    ) \
    .groupBy("event_type") \
    .agg(
        count("*").alias("event_count"),
        approx_count_distinct("session_id").alias("unique_sessions")
    )

# 3. Page popularity (extract from nested events)
page_popularity = raw_clickstream \
    .select(explode("events").alias("event")) \
    .select(col("event.page_url").alias("page_url")) \
    .groupBy("page_url") \
    .count() \
    .orderBy(col("count").desc())

# 4. Property analysis (extract from nested map)
property_analysis = raw_clickstream \
    .select(
        "session_id",
        explode("events").alias("event")
    ) \
    .select(
        "session_id",
        explode(col("event.properties")).alias("key", "value")
    ) \
    .groupBy("key") \
    .agg(
        count("*").alias("occurrences"),
        countDistinct("value").alias("unique_values")
    )

# Show results
device_stats.show(20, truncate=False)
event_analysis.show(20, truncate=False)
page_popularity.show(20, truncate=False)

spark.stop()

Schema Design Best Practices

Flat vs Nested

AspectFlat SchemaNested Schema
Query SimplicitySimpleComplex
Storage EfficiencyLowerHigher
Read PerformanceFasterSlower
Write PerformanceFasterSlower
Schema EvolutionEasierHarder
Use CaseSimple analyticsComplex hierarchical data

When to Flatten

# Flatten when:
# 1. Frequent access to nested fields
# 2. Simple analytics queries
# 3. Joins on nested fields
# 4. Small dataset size

# Keep nested when:
# 1. Data is naturally hierarchical
# 2. Rarely access individual nested fields
# 3. Large dataset (flattening increases storage)
# 4. Write-heavy workloads

Schema Enforcement

# Enforce schema on read
df = spark.read.schema(expected_schema).parquet("s3a://bucket/data/")

# Validate schema
def validate_schema(df, expected):
    actual = {f.name: f.dataType for f in df.schema.fields}
    expected_types = {f.name: f.dataType for f in expected.fields()}
    
    for name, dtype in expected_types.items():
        if name not in actual:
            raise ValueError(f"Missing column: {name}")
        if actual[name] != dtype:
            raise ValueError(f"Type mismatch: {name}")
    
    return True

Edge Cases

1. NULL in Complex Types

# NULL struct
df = spark.createDataFrame([
    (1, None),
    (2, ("city", "state"))
], ["id", "address"])

# Accessing NULL struct returns NULL
df.select(col("address.city")).show()
# +-----------+
# |address.city|
# +-----------+
# |       null|
# |       city|
# +-----------+

2. Empty Arrays/Maps

# Empty arrays behave differently
df = spark.createDataFrame([
    (1, []),
    (2, ["a", "b"])
], ["id", "items"])

df.select(size("items")).show()
# +-----------+
# |size(items)|
# +-----------+
# |          0|
# |          2|
# +-----------+

3. Deeply Nested Data

# Deeply nested data can be slow to query
# Consider flattening frequently accessed paths

# Bad: 5+ levels deep
data = {"level1": {"level2": {"level3": {"level4": {"level5": "value"}}}}}

# Better: Flatten to 2-3 levels
data = {"level1_level2_level3": "value"}

Performance Considerations

OperationFlatNestedImpact
SELECT specific columnsFastSlowerColumn pruning harder
FILTER on nested fieldFastSlowerPredicate pushdown limited
JOIN on nested fieldFastSlowerNeed to extract first
AggregationFastSlowerNeed to explode/flattened
StorageMoreLessNested is more compact
WriteFasterSlowerSerialization overhead

πŸ’‘Design Principle

Design schemas based on query patterns. If you frequently query nested fields, flatten them. If data is naturally hierarchical and rarely accessed individually, keep it nested. The optimal schema balances storage efficiency with query performance.


Summary

Schema design impacts query performance, storage costs, and maintenance complexity. Nested schemas (structs, arrays, maps) are more compact but harder to query. Flat schemas are simpler but use more storage. At Google and Apple, the choice depends on query patterns and data access frequency.

Advertisement