PySpark DataFrame Operations
DfDataFrame
A DataFrame is a distributed collection of data organized into named columns, conceptually equivalent to a table in a relational database or a data frame in R/Python. It is built on top of RDD with an additional schema (StructType) and is optimized by the Catalyst optimizer.
DfCatalyst Optimizer
The Catalyst Optimizer is Spark's extensible query optimizer that translates logical plans into optimized physical plans through four phases: Analysis, Logical Optimization, Physical Planning, and Code Generation.
DfTungsten
Tungsten is Spark's execution engine that focuses on three areas: memory management (off-heap binary format), code generation (whole-stage codegen), and columnar processing (vectorized operations). It eliminates GC overhead and virtual function calls for 2-10x performance improvement.
DfSchema
A Schema defines the structure of a DataFrame, specifying column names and data types. Schemas can be inferred from data or explicitly defined. Explicit schemas are recommended for production to avoid schema inference overhead (double I/O).
Here,
- =Raw SQL or DataFrame API query
- =Plan with unresolved references (post-analysis)
- =Plan after rule-based optimizations (predicate pushdown, constant folding)
- =Chosen execution strategy (e.g., sort-merge vs broadcast join)
- =Final RDD to execute on the cluster
DataFrame Serialization Cost
Here,
- =Number of rows in the DataFrame
- =Number of columns
- =Serialized size of column i
- =Per-row header/offset overhead
Here,
- =Reduction factor for I/O and memory (0 to 1)
- =Number of columns selected in query
- =Total number of columns in source
Predicate Pushdown Reduction Factor
Here,
- =Fraction of rows after filtering (0 to 1)
- =Number of rows matching filter
- =Total number of rows in source
DataFrames use Tungsten's off-heap binary format instead of JVM object serialization. This avoids GC overhead and reduces memory usage by ~50% compared to RDD-based operations.
Always define an explicit schema when reading data rather than letting Spark infer it. Schema inference requires reading the data twice (once for schema, once for actual processing), which doubles I/O time on large datasets.
Calling collect() on large DataFrames brings all data to the driver node and can cause OutOfMemoryError. Use take(n), show(n), or toPandas() (with limits) for large datasets.
ThPredicate Pushdown Optimization
Theorem: Catalyst's predicate pushdown rule reduces the data volume by factor of F = |filtered_rows| / |total_rows| before subsequent operations. This optimization applies filters as early as possible in the plan, reducing shuffle and computation costs proportionally.
- DataFrames add schema + Catalyst optimization on top of RDDs
- Catalyst pipeline: Analysis β Optimization β Physical Planning β Code Generation
- Tungsten binary format reduces serialization cost by ~50% vs JVM objects
- Always define explicit schemas; avoid
collect()on large DataFrames - Use broadcast joins for tables under the threshold (default 10MB)
- Column pruning reduces I/O by F_prune = C_selected / C_total
- Predicate pushdown reduces data volume by F_pushdown = N_filtered / N_total
Catalyst Optimization Pipeline
Architecture Diagram
Detailed Explanation
1. DataFrame Fundamentals
A DataFrame in PySpark is a distributed collection of data organized into named columns, conceptually equivalent to a table in a relational database.
Key Differences from RDDs:
- Schema awareness: Know the data types of each column
- Catalyst optimizer: Automatically optimize query plans
- Tungsten execution: Efficient memory management and code generation
- Language integration: Works with Python, Scala, Java, and R
2. Schema Design Patterns
Schema design is critical for performance and data quality.
Explicit Schema Definition:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", IntegerType(), False), # Not nullable
StructField("name", StringType(), True), # Nullable
StructField("age", IntegerType(), True) # Nullable
])
Schema Considerations:
- Use appropriate data types (IntegerType vs LongType)
- Nullable fields add overhead; set nullable=False when possible
- Nested structures (StructType, ArrayType, MapType) affect query patterns
- Avoid deep nesting (>3 levels) for performance
3. Catalyst Optimizer Deep Dive
The Catalyst optimizer transforms logical query plans through several phases:
Analysis Phase:
- Resolves column references to specific tables
- Validates function names and types
- Handles implicit type casts
Optimization Phase:
- Predicate pushdown: Filters pushed closer to data source
- Column pruning: Only required columns are read
- Constant folding: Compile-time evaluation of constant expressions
- Join reordering: Optimal join order based on statistics
- Subquery elimination: Convert to joins
Physical Planning:
- Generates multiple physical execution strategies
- Cost-based optimization using table and column statistics
- Decides between broadcast joins, sort-merge joins, etc.
4. Tungsten Execution Engine
Tungsten is Spark's execution engine with three main components:
| Component | Description |
|---|---|
| Memory Management | Off-heap memory to avoid GC overhead; binary format for data storage |
| Code Generation | Whole-stage code generation (JVM bytecode); eliminates virtual function calls |
| Columnar Processing | Vectorized operations; SIMD-friendly data layout |
5. DataFrame Operations
Transformations (Lazy):
select(): Choose columnsfilter()/where(): Apply predicatesgroupBy(): Aggregate by columnsjoin(): Combine DataFrameswithColumn(): Add/modify columnsdrop(): Remove columnsdistinct(): Remove duplicatessort()/orderBy(): Sort data
Actions (Trigger execution):
collect(): Return all rows to drivershow(): Display first n rowscount(): Count rowsfirst()/head(): Get first rowtake(n): Get first n rowswrite: Save to storage
6. Performance Optimization Techniques
Predicate Pushdown:
# Catalyst pushes filter to data source
df.filter(df.age > 30).select("name", "age")
# Generates: Scan[name, age] β Filter(age > 30)
# Instead of: Scan[all columns] β Filter β Select
Broadcast Join Hints:
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "id")
Bucketing:
df.write.bucketBy(100, "id").saveAsTable("bucketed_table")
7. Common Pitfalls
| Pitfall | Impact | Solution |
|---|---|---|
Calling collect() on large datasets | Causes OOM on driver | Use take(n) or show(n) |
| Using Python UDFs | Loses Catalyst optimizations | Use built-in functions |
| Not caching reused DataFrames | Recomputes on each action | Call cache() or persist() |
| Incorrect partitioning | Causes data skew | Use repartition() or coalesce() |
| Ignoring data types | String where int/float should be used | Define explicit schemas |
Best Practice: Always define explicit schemas to avoid schema inference overhead (double I/O).
Key Concepts Table
| Concept | Description | Example |
|---|---|---|
| DataFrame | Distributed collection of data with schema | df = spark.createDataFrame(data, schema) |
| Schema | Structure definition (column names and types) | StructType([StructField("id", IntegerType())]) |
| Catalyst | Query optimizer for logical/physical plans | Automatic optimization of DataFrame operations |
| Tungsten | Execution engine with code generation | Whole-stage code generation, off-heap memory |
| Lazy Evaluation | Transformations built but not executed | Build plan β Action triggers execution |
| Predicate Pushdown | Filter pushed to data source | df.filter(col > 5) β Scan with filter |
| Column Pruning | Only required columns read | df.select("a", "b") β Scan only a, b |
| Broadcast Join | Small table broadcast to all executors | join(broadcast(small_df)) |
| Bucketing | Data partitioned by hash into files | write.bucketBy(100, "id") |
| Caching | Store DataFrame in memory/disk | df.cache() or df.persist() |
Code Examples
Example 1: DataFrame Creation and Schema
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("DataFrameOps").getOrCreate()
# Method 1: From list of tuples with explicit schema
# StructType: Defines the schema structure
# StructField: Defines each column (name, type, nullable)
schema = StructType([
StructField("id", IntegerType(), False), # Not nullable
StructField("name", StringType(), True), # Nullable
StructField("age", IntegerType(), True), # Nullable
StructField("salary", DoubleType(), True), # Nullable
StructField("department", StringType(), True) # Nullable
])
data = [
(1, "Alice", 30, 75000.0, "Engineering"),
(2, "Bob", 25, 65000.0, "Marketing"),
(3, "Charlie", 35, 90000.0, "Engineering"),
(4, "Diana", 28, 70000.0, "Marketing"),
(5, "Eve", 32, 85000.0, "Engineering")
]
# createDataFrame: Create DataFrame from data and schema
df = spark.createDataFrame(data, schema)
# Method 2: From pandas DataFrame
# Converts pandas DataFrame to Spark DataFrame
import pandas as pd
pandas_df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
df_from_pandas = spark.createDataFrame(pandas_df)
# Method 3: From CSV with schema inference
# header: First line is column names
# inferSchema: Automatically detect data types (requires extra pass)
df_from_csv = spark.read.csv("data.csv", header=True, inferSchema=True)
# Method 4: From JSON
# Each line is a JSON object
df_from_json = spark.read.json("data.json")
# Method 5: From Parquet (columnar format)
# Parquet stores schema with data, so no schema needed
df_from_parquet = spark.read.parquet("data.parquet")
# Display schema
df.printSchema()
# root
# |-- id: integer (not null)
# |-- name: string (nullable = true)
# |-- age: integer (nullable = true)
# |-- salary: double (nullable = true)
# |-- department: string (nullable = true)
# Get schema as string
schema_str = df.schema.json()
print(f"Schema JSON: {schema_str}")
# Get column names and data types
print(f"Columns: {df.columns}")
print(f"Data types: {df.dtypes}")
Example 2: Transformations and Actions
# Select columns
# Parameters: *cols - column names or Column objects
selected = df.select("name", "age", "salary")
selected.show()
# Filter rows
# Parameters: condition - boolean Column expression
young = df.filter(df.age < 30)
young.show()
# Alternative: where() is an alias for filter()
young_alt = df.where(df.age < 30)
# Add computed columns
# Parameters: colName - new column name, col - Column expression
with_bonus = df.withColumn("bonus", df.salary * 0.1)
with_bonus.show()
# Complex transformations
result = df \
.filter(col("age") > 25) \ # Keep employees over 25
.withColumn("bonus", col("salary") * 0.1) \ # Calculate 10% bonus
.withColumn("total_compensation", col("salary") + col("bonus")) \ # Total comp
.select("name", "department", "total_compensation") \ # Select relevant columns
.orderBy(desc("total_compensation")) # Sort by total comp descending
result.show()
# Aggregations
# Parameters: *exprs - aggregation expressions
dept_stats = df.groupBy("department").agg(
count("id").alias("employee_count"), # Count employees per department
avg("salary").alias("avg_salary"), # Average salary
max("salary").alias("max_salary"), # Maximum salary
min("salary").alias("min_salary"), # Minimum salary
sum("salary").alias("total_salary") # Total salary
)
dept_stats.show()
# Actions
print(f"Total rows: {df.count()}")
print(f"First row: {df.first()}")
print(f"Schema: {df.dtypes}")
# limit: Return first n rows (lazy transformation)
first_5 = df.limit(5)
first_5.show()
# distinct: Remove duplicate rows
distinct_depts = df.select("department").distinct()
distinct_depts.show()
# drop: Remove columns
df_without_age = df.drop("age")
df_without_age.show()
Example 3: Join Operations
# Create second DataFrame
department_info = spark.createDataFrame([
("Engineering", "San Francisco", 50),
("Marketing", "New York", 30),
("Sales", "Chicago", 40)
], ["department", "location", "headcount"])
# Inner join (default)
# Parameters: other - DataFrame to join with
# on - join condition (column name or expression)
# how - join type (default "inner")
inner_joined = df.join(department_info, "department", "inner")
inner_joined.show()
# Left join (keep all rows from left DataFrame)
# Parameters: how="left" or "left_outer"
left_joined = df.join(department_info, "department", "left")
left_joined.show()
# Broadcast join for performance
# broadcast(): Hint to broadcast the small DataFrame
# Avoids shuffle by sending small table to all executors
from pyspark.sql.functions import broadcast
broadcast_joined = df.join(broadcast(department_info), "department")
# Complex join with multiple conditions
# Parameters: on - list of conditions combined with &
complex_join = df.join(
department_info,
(df.department == department_info.department) &
(df.salary > 50000),
"inner"
)
# Verify join strategy using explain()
broadcast_joined.explain()
# Right join (keep all rows from right DataFrame)
right_joined = df.join(department_info, "department", "right")
right_joined.show()
# Full outer join (keep all rows from both DataFrames)
full_joined = df.join(department_info, "department", "outer")
full_joined.show()
# Left semi join (return rows from left that have match in right)
left_semi = df.join(department_info, "department", "leftsemi")
left_semi.show()
# Left anti join (return rows from left that have NO match in right)
left_anti = df.join(department_info, "department", "leftanti")
left_anti.show()
Example 4: Window Functions
from pyspark.sql.window import Window
# Define window specification
# partitionBy: Divide data into partitions (like GROUP BY)
# orderBy: Define ordering within partition
window_spec = Window.partitionBy("department").orderBy(desc("salary"))
# Add row number within department
# row_number(): Assigns unique sequential number to each row
df_with_rank = df.withColumn(
"rank",
row_number().over(window_spec) # .over() applies window function
)
# Add salary statistics per department
# avg().over(): Calculate average within partition
df_with_stats = df.withColumn(
"dept_avg_salary",
avg("salary").over(Window.partitionBy("department"))
).withColumn(
"salary_vs_avg",
col("salary") - col("dept_avg_salary") # Difference from average
)
# Running total
# rowsBetween: Define frame boundaries
# unboundedPreceding to currentRow: Sum from start to current row
running_window = Window.partitionBy("department").orderBy("id").rowsBetween(
Window.unboundedPreceding, # Start from first row
Window.currentRow # End at current row
)
df_running = df.withColumn(
"running_total",
sum("salary").over(running_window)
)
# Dense rank: Like rank but no gaps in ranking
df_dense_rank = df.withColumn(
"dense_rank",
dense_rank().over(window_spec)
)
# Percent rank: Relative rank as percentage
df_percent_rank = df.withColumn(
"percent_rank",
percent_rank().over(window_spec)
)
# Cumulative distribution
df_cume_dist = df.withColumn(
"cume_dist",
cume_dist().over(window_spec)
)
# Lag: Get value from previous row
lag_window = Window.partitionBy("department").orderBy("id")
df_with_lag = df.withColumn(
"previous_salary",
lag("salary", 1).over(lag_window) # 1 row before
)
# Lead: Get value from next row
df_with_lead = df.withColumn(
"next_salary",
lead("salary", 1).over(lag_window) # 1 row after
)
df_with_stats.show()
Example 5: Pivot and Aggregation
# Pivot: Rotate rows into columns
# Parameters: pivot_col - column to pivot
# values - optional list of values to pivot (default: all unique values)
pivoted = df.groupBy("name").pivot("department").sum("salary")
pivoted.show()
# Pivot with specific values (faster)
pivoted_limited = df.groupBy("name").pivot("department", ["Engineering", "Marketing"]).sum("salary")
pivoted_limited.show()
# Unpivot (melt): Rotate columns into rows
# Using stack function
df_melted = df.select(
"name",
F.expr("stack(2, 'salary', salary, 'bonus', salary * 0.1) as (metric, value)")
)
df_melted.show()
# Cube: Multi-dimensional aggregation
# Parameters: *cols - columns to aggregate over
# Produces subtotals for all combinations
cube_result = df.cube("department", "age").agg(
count("*").alias("count"),
avg("salary").alias("avg_salary")
)
cube_result.show()
# Rollup: Hierarchical aggregation
# Parameters: *cols - columns in hierarchical order
# Produces subtotals for prefixes
rollup_result = df.rollup("department", "age").agg(
count("*").alias("count"),
avg("salary").alias("avg_salary")
)
rollup_result.show()
Performance Metrics
| Operation | DataFrame (ms) | RDD (ms) | Pandas (ms) | Improvement |
|---|---|---|---|---|
| Read 1GB CSV | 1200 | 3500 | 2800 | 2.9x vs RDD |
| Filter 1M rows | 45 | 120 | 35 | 2.7x vs RDD |
| GroupBy + Agg | 85 | 250 | 60 | 2.9x vs RDD |
| Join 10M rows | 320 | 850 | N/A | 2.7x vs RDD |
| Sort 1M rows | 95 | 280 | 45 | 2.9x vs RDD |
| Write Parquet | 180 | 500 | N/A | 2.8x vs RDD |
| Memory Usage | 1x | 3x | 1.5x | 3x vs RDD |
| GC Pause | 5ms | 45ms | N/A | 9x vs RDD |
Best Practices
1. Use Column References, Not Python Objects
# BAD: Creates Python UDF (loses Catalyst optimization)
df.withColumn("new", df.age + 1)
# GOOD: Uses Catalyst-optimized expression
df.withColumn("new", col("age") + 1)
# GOOD: Use SQL expressions for complex logic
df.withColumn("new", expr("CASE WHEN age > 30 THEN 'Senior' ELSE 'Junior' END"))
2. Cache Repeatedly Used DataFrames
# Cache DataFrame used multiple times
df = spark.read.parquet("large_dataset.parquet")
df.cache() # MEMORY_ONLY
# Use appropriate storage level for large datasets
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Serialized + disk spill
# Remember to unpersist when done
df.unpersist()
3. Avoid collect() on Large DataFrames
# BAD: Brings all data to driver (causes OOM)
all_data = df.collect()
# GOOD: Use take for small samples
first_100 = df.take(100)
# GOOD: Use show for display
df.show(20)
# GOOD: Use toPandas for small datasets (max 1000 rows recommended)
pandas_df = df.limit(1000).toPandas()
4. Use Broadcast Joins for Small Tables
from pyspark.sql.functions import broadcast
# Broadcast small DataFrame (< 10MB default threshold)
result = large_df.join(broadcast(small_df), "key")
# Configure broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50MB
# Check join strategy
result.explain() # Look for BroadcastHashJoin
5. Optimize Data Types
# Use appropriate types
from pyspark.sql.types import *
# BAD: Using StringType for numeric data (no predicate pushdown)
df = spark.createDataFrame([(1, "100")], ["id", "amount"])
# GOOD: Use proper types (enables predicate pushdown)
schema = StructType([
StructField("id", IntegerType()),
StructField("amount", IntegerType())
])
# Use ByteType/ShortType for small numbers (saves memory)
from pyspark.sql.types import ByteType, ShortType
small_schema = StructType([
StructField("id", IntegerType()),
StructField("flag", ByteType()), # 1 byte vs 4 bytes for IntegerType
StructField("category", ShortType()) # 2 bytes vs 4 bytes
])
6. Partition Strategically
# Repartition for write operations (parallel writes)
df.repartition(100, "department").write.parquet("output/")
# Coalesce to reduce partitions (no shuffle)
df.coalesce(10).write.parquet("output_small/")
# Partition by column for partition pruning on reads
df.write.partitionBy("year", "month").parquet("output/")
7. Use SQL for Complex Queries
# Create temp view for SQL access
df.createOrReplaceTempView("employees")
# SQL can be more readable for complex queries
result = spark.sql("""
SELECT department,
COUNT(*) as cnt,
AVG(salary) as avg_sal,
MAX(salary) as max_sal
FROM employees
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 1
ORDER BY avg_sal DESC
""")
result.show()
Key Takeaways
- DataFrames provide schema + Catalyst optimization on top of RDDs
- Catalyst pipeline: Analysis β Optimization β Physical Planning β Code Generation
- Tungsten binary format reduces serialization cost by ~50% vs JVM objects
- Column pruning: F_prune = C_selected / C_total (reduces I/O)
- Predicate pushdown: F_pushdown = N_filtered / N_total (reduces data volume)
- Always define explicit schemas to avoid double I/O from schema inference
- Use broadcast joins for tables under the threshold (default 10MB)
- Avoid collect() on large DataFrames; use take(), show(), or toPandas()
- Use window functions for analytics without self-joins
- Cache DataFrames reused across multiple actions