Cost Optimization for PySpark Workloads
Cost Framework & TCO Comparison
Architecture Diagram: Cost Optimization Framework
Architecture Diagram: Spot Instance Strategy
Detailed Explanation
Cost optimization for PySpark workloads requires a systematic approach that addresses compute, storage, network, and operational costs simultaneously. The largest cost component is typically compute (60-70% of total spend), followed by storage (20-25%), network (5-10%), and operations (5%). Each component offers distinct optimization opportunities with varying effort-to-savings ratios.
What is Compute Optimization?
Compute optimization focuses on right-sizing instances, leveraging spot/preemptible instances, implementing auto-scaling, and optimizing Spark configurations.
| Strategy | Description | Savings |
|---|---|---|
| Spot Instances | Use spot/preemptible instances for burst capacity with minimal risk when properly configured | 60-80% over on-demand pricing |
| Right-Sizing | Match instance types to workload characteristics | 30-50% |
| Auto-Scaling | Dynamically adjust cluster size based on workload demands | 20-40% |
Spot Instance Best Practices:
- Maintain a small base of on-demand instances for guaranteed capacity
- Enable checkpointing every 10-30 seconds to minimize data loss on spot interruptions
- Configure multiple instance types (r5, r4, m5) for diversification across spot pools
Right-Sizing Guidelines:
| Workload Type | Recommended Instance Type | Example |
|---|---|---|
| CPU-bound transformations (joins, aggregations) | Compute-optimized instances | c5, c6g |
| Memory-intensive operations (large shuffles, caching) | Memory-optimized instances | r5, r6g |
| Mixed workloads | General-purpose instances | m5, m6g |
What is Auto-Scaling?
Auto-scaling dynamically adjusts cluster size based on workload demands.
Configuration Tips:
- Configure minimum and maximum executor counts based on historical patterns
- Use scale-down cooldown periods to prevent thrashing
- For streaming workloads: Use fixed-size clusters or schedule-based scaling
- For batch workloads: Use load-based auto-scaling with aggressive scale-down
What is Storage Optimization?
Storage optimization addresses both the cost of storing data and the performance impact of storage choices.
| Strategy | Description | Benefit |
|---|---|---|
| Tiered Storage | Move older data to cheaper storage classes (S3 Standard β S3 Infrequent Access β S3 Glacier) | Cost reduction |
| File Compaction (OPTIMIZE) | Reduce the number of small files | Reduces storage costs (fewer file metadata objects) and improves query performance (fewer files to scan) |
| Columnar Formats + Compression | Use Parquet, ORC with Zstd, Snappy compression | Reduce storage footprint by 60-80% compared to raw text |
What is Network Optimization?
Network optimization focuses on minimizing data movement.
Strategies:
- Data locality: Keep processing close to data (same AZ, same region)
- Columnar formats with predicate pushdown: Reduce the amount of data transferred over the network
- Compression: Reduce network transfer volume
- VPC endpoints for S3/ADLS: Eliminate internet egress charges
- Cross-region replication: Minimize and schedule during off-peak hours
Key Takeaway: Cost optimization requires a systematic approach addressing compute, storage, network, and operational costs simultaneously. The largest cost component is typically compute (60-70% of total spend).
Key Concepts Table
Mathematical Foundations
Definition: Total Cost of Ownership
TCO for a Spark cluster over period :
Spot Instance Savings
Cost reduction with spot instances for fault-tolerant workloads:
Expected interruption rate: to per hour.
Right-Sizing Theorem
Optimal instance count minimizes total cost while meeting SLO:
Over-provisioning by factor : .
Storage Tiering Benefit
Cost savings from tiered storage with access frequency :
where is the fraction of data at tier .
Autoscaling Efficiency
Autoscaling reduces cost by when utilization variance is :
Higher variance = more savings from autoscaling.
Key Insight
The biggest cost lever is usually instance type selection, not count. Memory-optimized instances cost 2-3x more but may process data 5-10x faster, reducing total cost. Always benchmark with your specific workload.
Summary
Cost optimization involves right-sizing instances, using spot/reserved capacity, tiered storage, and autoscaling. TCO includes compute, storage, network, and labor costs. Spot instances provide 60-90% savings with acceptable interruption risk. Autoscaling benefits scale with workload variance.
Key Concepts Table (cont.)
| Optimization Area | Strategy | Savings | Effort | Risk Level |
|---|---|---|---|---|
| Spot Instances | Use spot for executors | 60-80% | Low | Low (with checkpointing) |
| Right-Sizing | Match instance to workload | 30-50% | Medium | Low |
| Auto-Scaling | Dynamic cluster sizing | 20-40% | Medium | Low |
| Storage Tiering | Move old data to IA/Glacier | 40-60% | Low | Low |
| File Compaction | Merge small files | 30-50% | Medium | Low |
| Format Optimization | Use Parquet + Zstd | 20-40% | Low | Low |
| Shuffle Optimization | Reduce data movement | 10-25% | Medium | Low |
| Data Locality | Process data near storage | 10-25% | Medium | Low |
| Caching | Cache hot data in memory | 15-30% | Low | Low |
| Scheduling | Off-peak execution | 15-30% | Low | Medium |
Code Examples
Example 1: Spot Instance Configuration with Auto-Scaling
from pyspark.sql import SparkSession
import json
# βββ EMR Cluster Configuration (Cost-Optimized) βββ
emr_config = {
"Name": "pyspark-cost-optimized",
"ReleaseLabel": "emr-6.15.0",
"Applications": [
{"Name": "Spark"},
{"Name": "JupyterEnterpriseGateway"},
],
"Instances": {
"MasterInstanceType": "m5.xlarge", # On-Demand (always available)
"MasterInstanceMarket": "ON_DEMAND",
"SlaveInstanceType": "r5.2xlarge", # Spot (cost-optimized)
"SlaveInstanceMarket": "SPOT",
"InstanceCount": 3, # Base instances
"Ec2KeyName": "my-key-pair",
"Ec2SubnetId": "subnet-xxx",
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": True,
# Auto-scaling configuration
"AutoScaling": {
"Constraints": {
"MinCapacity": 3,
"MaxCapacity": 20
},
"Rules": [
{
"Name": "ScaleOutCPU",
"Description": "Scale out when CPU > 70%",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ChangeInCapacity",
"ScalingAdjustment": 2,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Statistic": "Average",
"Period": 300,
"EvaluationPeriods": 2,
"Threshold": 70,
"ComparisonOperator": "GreaterThanThreshold"
}
}
},
{
"Name": "ScaleInCPU",
"Description": "Scale in when CPU < 30%",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ChangeInCapacity",
"ScalingAdjustment": -1,
"CoolDown": 600
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Statistic": "Average",
"Period": 300,
"EvaluationPeriods": 3,
"Threshold": 30,
"ComparisonOperator": "LessThanThreshold"
}
}
}
]
}
},
"Configurations": [
{
"Classification": "spark-defaults",
"Properties": {
# Cost-optimized Spark settings
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "3",
"spark.dynamicAllocation.maxExecutors": "20",
"spark.dynamicAllocation.executorIdleTimeout": "120s",
"spark.dynamicAllocation.schedulerBacklogTimeout": "60s",
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout": "60s",
# Spot instance handling
"spark.executor.instances": "3",
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.driver.memory": "4g",
"spark.driver.cores": "2",
# Enable checkpointing for spot resilience
"spark.sql.streaming.checkpointLocation": "s3://my-bucket/checkpoints",
# Adaptive query execution
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# Serialization
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "512m",
# Memory management
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
}
}
]
}
# βββ Create Cost-Optimized Spark Session βββ
spark = (
SparkSession.builder
.appName("Cost-Optimized-PySpark")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "3")
.config("spark.dynamicAllocation.maxExecutors", "20")
.config("spark.executor.instances", "3")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
Example 2: Storage Optimization with Delta Lake
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("Storage-Optimization") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Optimize Table Storage βββ
def optimize_table_storage(spark, table_path, z_order_columns=None):
"""Comprehensive storage optimization for Delta tables."""
delta_table = DeltaTable.forPath(spark, table_path)
# 1. Compaction: Merge small files
print("Running compaction...")
delta_table.optimize().executeCompaction()
# 2. Z-ORDER: Multi-dimensional clustering
if z_order_columns:
print(f"Running Z-ORDER on {z_order_columns}...")
delta_table.optimize().executeZOrderBy(*z_order_columns)
# 3. VACUUM: Remove old files (default 7 days)
print("Running VACUUM...")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(retentionHours=168) # 7 days
# 4. Get table statistics
detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}` LIMIT 5").collect()
print(f"Table Statistics:")
print(f" Num Files: {detail.numFiles}")
print(f" Size: {detail.sizeInBytes / (1024**3):.2f} GB")
print(f" Num Versions: {detail.numVersions}")
return detail
# Optimize a high-traffic table
optimize_table_storage(
spark,
"/mnt/lakehouse/silver/transactions",
z_order_columns=["customer_id", "transaction_date"]
)
# βββ Implement Storage Lifecycle Policies βββ
def setup_storage_lifecycle(bucket_name):
"""Configure S3 lifecycle policies for tiered storage."""
import boto3
s3 = boto3.client('s3')
lifecycle_config = {
'Rules': [
{
'ID': 'TieredStorage',
'Status': 'Enabled',
'Filter': {'Prefix': 'lakehouse/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA',
},
{
'Days': 90,
'StorageClass': 'GLACIER',
},
{
'Days': 365,
'StorageClass': 'DEEP_ARCHIVE',
},
],
'Expiration': {'Days': 730}, # Delete after 2 years
},
{
'ID': 'CleanupTempFiles',
'Status': 'Enabled',
'Filter': {'Prefix': 'lakehouse/temp/'},
'Expiration': {'Days': 7}, # Delete temp files after 7 days
},
]
}
s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
# βββ Monitor Storage Costs βββ
def monitor_storage_costs(spark, table_path):
"""Monitor storage metrics for cost optimization."""
detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`").collect()
# Calculate storage metrics
total_size_gb = detail.sizeInBytes / (1024**3)
num_files = detail.numFiles
avg_file_size_mb = (detail.sizeInBytes / num_files) / (1024**2) if num_files > 0 else 0
# Estimate costs
s3_standard_cost = total_size_gb * 0.023 # $/GB/month
s3_ia_cost = total_size_gb * 0.0125
s3_glacier_cost = total_size_gb * 0.004
print(f"\nStorage Analysis for {table_path}:")
print(f" Total Size: {total_size_gb:.2f} GB")
print(f" Number of Files: {num_files}")
print(f" Average File Size: {avg_file_size_mb:.2f} MB")
print(f"\nEstimated Monthly Costs:")
print(f" S3 Standard: ${s3_standard_cost:.2f}")
print(f" S3 Infrequent Access: ${s3_ia_cost:.2f}")
print(f" S3 Glacier: ${s3_glacier_cost:.2f}")
if avg_file_size_mb < 64:
print(f"\n WARNING: Average file size is small ({avg_file_size_mb:.2f} MB)")
print(f" Consider running OPTIMIZE to merge small files")
return {
"total_size_gb": total_size_gb,
"num_files": num_files,
"avg_file_size_mb": avg_file_size_mb,
"estimated_cost_standard": s3_standard_cost,
}
monitor_storage_costs(spark, "/mnt/lakehouse/silver/transactions")
Example 3: Spark Configuration Tuning for Cost Efficiency
from pyspark.sql import SparkSession
# βββ Cost-Optimized Spark Configuration Templates βββ
# Template 1: Batch Processing (Cost-Optimized)
BATCH_CONFIG = {
"spark.sql.shuffle.partitions": "200", # Reduce from default 200
"spark.sql.adaptive.enabled": "true", # AQE for dynamic optimization
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.files.maxPartitionBytes": "134217728", # 128 MB per partition
"spark.sql.files.openCostInBytes": "8388608", # 8 MB
"spark.memory.fraction": "0.8", # 80% for execution/storage
"spark.memory.storageFraction": "0.3", # 30% for storage
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.rdd.compress": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "512m",
"spark.sql.autoBroadcastJoinThreshold": "10485760", # 10 MB
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true",
}
# Template 2: Streaming Processing (Low Latency)
STREAMING_CONFIG = {
"spark.sql.streaming.microBatchPartitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.blockingRestartTimeout": "60s",
"spark.sql.streaming.stateStore.rocksdb.compaction.enabled": "true",
"spark.sql.streaming.noDataMicroBatches.enabled": "true",
"spark.sql.streaming.schemaInference": "true",
"spark.sql.adaptive.enabled": "true",
"spark.sql.shuffle.partitions": "200",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.2", # Less storage for streaming
"spark.streaming.stopGracefullyOnShutdown": "true",
}
# Template 3: Machine Learning (GPU-Optimized)
ML_CONFIG = {
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true",
"spark.executor.instances": "4",
"spark.executor.memory": "16g",
"spark.executor.cores": "4",
"spark.driver.memory": "8g",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.2", # Less storage for ML
"spark.sql.autoBroadcastJoinThreshold": "52428800", # 50 MB
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
}
def create_cost_optimized_session(app_name, config_template="batch"):
"""Create a cost-optimized Spark session."""
config = {
"batch": BATCH_CONFIG,
"streaming": STREAMING_CONFIG,
"ml": ML_CONFIG,
}.get(config_template, BATCH_CONFIG)
builder = SparkSession.builder.appName(app_name)
for key, value in config.items():
builder = builder.config(key, value)
return builder.getOrCreate()
# Create sessions for different workload types
batch_spark = create_cost_optimized_session("Batch-Processing", "batch")
streaming_spark = create_cost_optimized_session("Streaming-Processing", "streaming")
ml_spark = create_cost_optimized_session("ML-Training", "ml")
Performance Metrics
| Optimization | Before | After | Savings | Implementation Time |
|---|---|---|---|---|
| Spot Instances (executors) | 726/month | 60% ($1,088) | 1 day | |
| Right-Sizing (r5.2xl β r5.xl) | 1,270/month | 30% ($544) | 2 days | |
| Auto-Scaling (3-20 nodes) | 1,180/month | 35% ($634) | 3 days | |
| File Compaction (OPTIMIZE) | 414/month (18TB) | 40% ($276) | 1 day | |
| Storage Tiering (IA after 30d) | 345/month | 50% ($345) | 0.5 days | |
| Format (CSV β Parquet+Zstd) | 276/month | 60% ($414) | 2 days | |
| Shuffle Optimization | 120/month | 40% ($80) | 2 days | |
| Caching (hot data) | 1,542/month | 15% ($272) | 1 day | |
| Scheduling (off-peak) | 1,542/month | 15% ($272) | 1 day | |
| TOTAL | 5,995/month | 37% ($3,511) | 13 days |
Best Practices
-
Always use Spot instances for executors β Spot instances offer 60-80% savings with minimal risk when checkpointing is enabled. Configure multiple instance types (r5, r4, m5) for diversification and use capacity-optimized allocation strategy.
-
Enable dynamic allocation β Set
spark.dynamicAllocation.enabled=truewith appropriate min/max executor counts. This allows the cluster to scale down during quiet periods and scale up during peaks. -
Right-size your executors β Profile your workload to determine the optimal executor size. For most workloads, 4-8 cores per executor with 8-16 GB memory provides the best cost-performance ratio.
-
Implement storage tiering β Configure S3/ADLS lifecycle policies to move data to cheaper storage classes after 30 days (IA) and 90 days (Glacier). Delta Lake time travel queries automatically use the appropriate storage tier.
-
Compact files regularly β Run
OPTIMIZEon all active tables to merge small files. Target 128-512 MB file sizes for optimal read performance and storage efficiency. Use auto-compaction for streaming workloads. -
Use columnar formats with compression β Store data in Parquet or ORC format with Zstd or Snappy compression. This reduces storage by 60-80% and improves query performance through predicate pushdown.
-
Monitor and alert on costs β Set up cloud cost monitoring dashboards and alerts for unexpected cost spikes. Track cost per query, cost per GB processed, and cost per data product.
-
Schedule batch jobs during off-peak hours β Run heavy batch processing during night/weekend hours when spot prices are lower and on-demand capacity is more available.
-
Use broadcast joins for small tables β Set
spark.sql.autoBroadcastJoinThresholdto 10-50 MB to automatically broadcast small tables, avoiding expensive shuffle joins. -
Profile before optimizing β Use the Spark UI, Databricks query profiles, or AWS/GCP cost explorer to identify the actual bottlenecks before applying optimizations. Focus on the highest-impact optimizations first.
Mathematical Foundation Summary
Cost optimization follows TCO = compute_cost + storage_cost + network_cost + operational_cost. Spot instances reduce compute cost by savings = on_demand_price Γ 0.6-0.8 with interruption_probability β 0.05-0.15. Auto-scaling optimizes utilization = actual_usage / allocated_resources targeting 70-85%. Partition pruning reduces I/O cost by reduction = pruned_partitions / total_partitions. Broadcast joins eliminate shuffle cost: shuffle_bytes = 0 for small tables. The cost-performance tradeoff follows Pareto optimality where no single metric can improve without degrading another.
See also: Adaptive Query Execution (30), Production Hardening (40), Data Mesh Architecture (37)
See Also
- Adaptive Query Execution β AQE for dynamic optimization
- Caching Persistence β Caching for cost reduction
- Partitioning Strategies β Partitioning to reduce shuffle
- Cluster Management β Cluster resource optimization