πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cost Optimization for PySpark Workloads

🟒 Free Lesson

Advertisement

Cost Optimization for PySpark Workloads

Cost Framework & TCO Comparison

Cost Optimization FrameworkComputeEC2/VM instancesStorageS3/ADLS/GCSNetworkData transferLicenseDatabricks/EMRLaborOps teamTCO Comparison (Annual, 100TB cluster)AWS EMR: $180KDatabricks: $220KServerless: $250KSelf-managed: $140KSavings: Spot instances (60-70%), auto-scaling (30-40%), right-sizing (20-30%)

Architecture Diagram: Cost Optimization Framework

Architecture Diagram: Spot Instance Strategy

Detailed Explanation

Cost optimization for PySpark workloads requires a systematic approach that addresses compute, storage, network, and operational costs simultaneously. The largest cost component is typically compute (60-70% of total spend), followed by storage (20-25%), network (5-10%), and operations (5%). Each component offers distinct optimization opportunities with varying effort-to-savings ratios.


What is Compute Optimization?

Compute optimization focuses on right-sizing instances, leveraging spot/preemptible instances, implementing auto-scaling, and optimizing Spark configurations.

StrategyDescriptionSavings
Spot InstancesUse spot/preemptible instances for burst capacity with minimal risk when properly configured60-80% over on-demand pricing
Right-SizingMatch instance types to workload characteristics30-50%
Auto-ScalingDynamically adjust cluster size based on workload demands20-40%

Spot Instance Best Practices:

  • Maintain a small base of on-demand instances for guaranteed capacity
  • Enable checkpointing every 10-30 seconds to minimize data loss on spot interruptions
  • Configure multiple instance types (r5, r4, m5) for diversification across spot pools

Right-Sizing Guidelines:

Workload TypeRecommended Instance TypeExample
CPU-bound transformations (joins, aggregations)Compute-optimized instancesc5, c6g
Memory-intensive operations (large shuffles, caching)Memory-optimized instancesr5, r6g
Mixed workloadsGeneral-purpose instancesm5, m6g

What is Auto-Scaling?

Auto-scaling dynamically adjusts cluster size based on workload demands.

Configuration Tips:

  • Configure minimum and maximum executor counts based on historical patterns
  • Use scale-down cooldown periods to prevent thrashing
  • For streaming workloads: Use fixed-size clusters or schedule-based scaling
  • For batch workloads: Use load-based auto-scaling with aggressive scale-down

What is Storage Optimization?

Storage optimization addresses both the cost of storing data and the performance impact of storage choices.

StrategyDescriptionBenefit
Tiered StorageMove older data to cheaper storage classes (S3 Standard β†’ S3 Infrequent Access β†’ S3 Glacier)Cost reduction
File Compaction (OPTIMIZE)Reduce the number of small filesReduces storage costs (fewer file metadata objects) and improves query performance (fewer files to scan)
Columnar Formats + CompressionUse Parquet, ORC with Zstd, Snappy compressionReduce storage footprint by 60-80% compared to raw text

What is Network Optimization?

Network optimization focuses on minimizing data movement.

Strategies:

  • Data locality: Keep processing close to data (same AZ, same region)
  • Columnar formats with predicate pushdown: Reduce the amount of data transferred over the network
  • Compression: Reduce network transfer volume
  • VPC endpoints for S3/ADLS: Eliminate internet egress charges
  • Cross-region replication: Minimize and schedule during off-peak hours

Key Takeaway: Cost optimization requires a systematic approach addressing compute, storage, network, and operational costs simultaneously. The largest cost component is typically compute (60-70% of total spend).

Key Concepts Table

Mathematical Foundations

Definition: Total Cost of Ownership

TCO for a Spark cluster over period TT:

TCO=Ccompute⏟instances+Cstorage⏟data+Cnetwork⏟transfer+Clicense⏟software+Cops⏟labor\text{TCO} = \underbrace{C_{\text{compute}}}_{\text{instances}} + \underbrace{C_{\text{storage}}}_{\text{data}} + \underbrace{C_{\text{network}}}_{\text{transfer}} + \underbrace{C_{\text{license}}}_{\text{software}} + \underbrace{C_{\text{ops}}}_{\text{labor}}

Spot Instance Savings

Cost reduction with spot instances for fault-tolerant workloads:

Sspot=1βˆ’CspotCondemandβ‰ˆ0.60Β toΒ 0.90S_{\text{spot}} = 1 - \frac{C_{\text{spot}}}{C_{\text{ondemand}}} \approx 0.60 \text{ to } 0.90

Expected interruption rate: P(interrupt)β‰ˆ0.05P(\text{interrupt}) \approx 0.05 to 0.150.15 per hour.

Right-Sizing Theorem

Optimal instance count nβˆ—n^* minimizes total cost while meeting SLO:

nβˆ—=arg⁑min⁑nC(n)s.t.P(completion<SLO)β‰₯0.99n^* = \arg\min_n C(n) \quad \text{s.t.} \quad \text{P}(\text{completion} < \text{SLO}) \geq 0.99

Over-provisioning by factor kk: Cexcess=(kβˆ’1)Γ—CcomputeC_{\text{excess}} = (k-1) \times C_{\text{compute}}.

Storage Tiering Benefit

Cost savings from tiered storage with access frequency ff:

Stier=βˆ‘i=1nfiΓ—(Chotβˆ’Ctieri)Γ—βˆ£Di∣S_{\text{tier}} = \sum_{i=1}^{n} f_i \times (C_{\text{hot}} - C_{\text{tier}_i}) \times |D_i|

where fif_i is the fraction of data at tier ii.

Autoscaling Efficiency

Autoscaling reduces cost by X%X\% when utilization variance is Οƒ2\sigma^2:

Savingsauto=Οƒ2ΞΌ2Γ—Savingsmax\text{Savings}_{\text{auto}} = \frac{\sigma^2}{\mu^2} \times \text{Savings}_{\text{max}}

Higher variance = more savings from autoscaling.

Key Insight

The biggest cost lever is usually instance type selection, not count. Memory-optimized instances cost 2-3x more but may process data 5-10x faster, reducing total cost. Always benchmark with your specific workload.

Summary

Cost optimization involves right-sizing instances, using spot/reserved capacity, tiered storage, and autoscaling. TCO includes compute, storage, network, and labor costs. Spot instances provide 60-90% savings with acceptable interruption risk. Autoscaling benefits scale with workload variance.

Key Concepts Table (cont.)

Optimization AreaStrategySavingsEffortRisk Level
Spot InstancesUse spot for executors60-80%LowLow (with checkpointing)
Right-SizingMatch instance to workload30-50%MediumLow
Auto-ScalingDynamic cluster sizing20-40%MediumLow
Storage TieringMove old data to IA/Glacier40-60%LowLow
File CompactionMerge small files30-50%MediumLow
Format OptimizationUse Parquet + Zstd20-40%LowLow
Shuffle OptimizationReduce data movement10-25%MediumLow
Data LocalityProcess data near storage10-25%MediumLow
CachingCache hot data in memory15-30%LowLow
SchedulingOff-peak execution15-30%LowMedium

Code Examples

Example 1: Spot Instance Configuration with Auto-Scaling

from pyspark.sql import SparkSession
import json

# ─── EMR Cluster Configuration (Cost-Optimized) ───
emr_config = {
    "Name": "pyspark-cost-optimized",
    "ReleaseLabel": "emr-6.15.0",
    "Applications": [
        {"Name": "Spark"},
        {"Name": "JupyterEnterpriseGateway"},
    ],
    "Instances": {
        "MasterInstanceType": "m5.xlarge",      # On-Demand (always available)
        "MasterInstanceMarket": "ON_DEMAND",
        "SlaveInstanceType": "r5.2xlarge",       # Spot (cost-optimized)
        "SlaveInstanceMarket": "SPOT",
        "InstanceCount": 3,                       # Base instances
        "Ec2KeyName": "my-key-pair",
        "Ec2SubnetId": "subnet-xxx",
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": True,
        # Auto-scaling configuration
        "AutoScaling": {
            "Constraints": {
                "MinCapacity": 3,
                "MaxCapacity": 20
            },
            "Rules": [
                {
                    "Name": "ScaleOutCPU",
                    "Description": "Scale out when CPU > 70%",
                    "Action": {
                        "SimpleScalingPolicyConfiguration": {
                            "AdjustmentType": "ChangeInCapacity",
                            "ScalingAdjustment": 2,
                            "CoolDown": 300
                        }
                    },
                    "Trigger": {
                        "CloudWatchAlarmDefinition": {
                            "MetricName": "CPUUtilization",
                            "Statistic": "Average",
                            "Period": 300,
                            "EvaluationPeriods": 2,
                            "Threshold": 70,
                            "ComparisonOperator": "GreaterThanThreshold"
                        }
                    }
                },
                {
                    "Name": "ScaleInCPU",
                    "Description": "Scale in when CPU < 30%",
                    "Action": {
                        "SimpleScalingPolicyConfiguration": {
                            "AdjustmentType": "ChangeInCapacity",
                            "ScalingAdjustment": -1,
                            "CoolDown": 600
                        }
                    },
                    "Trigger": {
                        "CloudWatchAlarmDefinition": {
                            "MetricName": "CPUUtilization",
                            "Statistic": "Average",
                            "Period": 300,
                            "EvaluationPeriods": 3,
                            "Threshold": 30,
                            "ComparisonOperator": "LessThanThreshold"
                        }
                    }
                }
            ]
        }
    },
    "Configurations": [
        {
            "Classification": "spark-defaults",
            "Properties": {
                # Cost-optimized Spark settings
                "spark.dynamicAllocation.enabled": "true",
                "spark.dynamicAllocation.minExecutors": "3",
                "spark.dynamicAllocation.maxExecutors": "20",
                "spark.dynamicAllocation.executorIdleTimeout": "120s",
                "spark.dynamicAllocation.schedulerBacklogTimeout": "60s",
                "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout": "60s",
                
                # Spot instance handling
                "spark.executor.instances": "3",
                "spark.executor.memory": "8g",
                "spark.executor.cores": "4",
                "spark.driver.memory": "4g",
                "spark.driver.cores": "2",
                
                # Enable checkpointing for spot resilience
                "spark.sql.streaming.checkpointLocation": "s3://my-bucket/checkpoints",
                
                # Adaptive query execution
                "spark.sql.adaptive.enabled": "true",
                "spark.sql.adaptive.coalescePartitions.enabled": "true",
                "spark.sql.adaptive.skewJoin.enabled": "true",
                
                # Serialization
                "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                "spark.kryoserializer.buffer.max": "512m",
                
                # Memory management
                "spark.memory.fraction": "0.8",
                "spark.memory.storageFraction": "0.3",
            }
        }
    ]
}

# ─── Create Cost-Optimized Spark Session ───
spark = (
    SparkSession.builder
    .appName("Cost-Optimized-PySpark")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "3")
    .config("spark.dynamicAllocation.maxExecutors", "20")
    .config("spark.executor.instances", "3")
    .config("spark.executor.memory", "8g")
    .config("spark.executor.cores", "4")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.adaptive.skewJoin.enabled", "true")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.default.parallelism", "200")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

Example 2: Storage Optimization with Delta Lake

from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("Storage-Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Optimize Table Storage ───
def optimize_table_storage(spark, table_path, z_order_columns=None):
    """Comprehensive storage optimization for Delta tables."""
    delta_table = DeltaTable.forPath(spark, table_path)
    
    # 1. Compaction: Merge small files
    print("Running compaction...")
    delta_table.optimize().executeCompaction()
    
    # 2. Z-ORDER: Multi-dimensional clustering
    if z_order_columns:
        print(f"Running Z-ORDER on {z_order_columns}...")
        delta_table.optimize().executeZOrderBy(*z_order_columns)
    
    # 3. VACUUM: Remove old files (default 7 days)
    print("Running VACUUM...")
    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
    delta_table.vacuum(retentionHours=168)  # 7 days
    
    # 4. Get table statistics
    detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
    history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}` LIMIT 5").collect()
    
    print(f"Table Statistics:")
    print(f"  Num Files: {detail.numFiles}")
    print(f"  Size: {detail.sizeInBytes / (1024**3):.2f} GB")
    print(f"  Num Versions: {detail.numVersions}")
    
    return detail

# Optimize a high-traffic table
optimize_table_storage(
    spark,
    "/mnt/lakehouse/silver/transactions",
    z_order_columns=["customer_id", "transaction_date"]
)

# ─── Implement Storage Lifecycle Policies ───
def setup_storage_lifecycle(bucket_name):
    """Configure S3 lifecycle policies for tiered storage."""
    import boto3
    
    s3 = boto3.client('s3')
    lifecycle_config = {
        'Rules': [
            {
                'ID': 'TieredStorage',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'lakehouse/'},
                'Transitions': [
                    {
                        'Days': 30,
                        'StorageClass': 'STANDARD_IA',
                    },
                    {
                        'Days': 90,
                        'StorageClass': 'GLACIER',
                    },
                    {
                        'Days': 365,
                        'StorageClass': 'DEEP_ARCHIVE',
                    },
                ],
                'Expiration': {'Days': 730},  # Delete after 2 years
            },
            {
                'ID': 'CleanupTempFiles',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'lakehouse/temp/'},
                'Expiration': {'Days': 7},  # Delete temp files after 7 days
            },
        ]
    }
    
    s3.put_bucket_lifecycle_configuration(
        Bucket=bucket_name,
        LifecycleConfiguration=lifecycle_config
    )

# ─── Monitor Storage Costs ───
def monitor_storage_costs(spark, table_path):
    """Monitor storage metrics for cost optimization."""
    detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
    history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`").collect()
    
    # Calculate storage metrics
    total_size_gb = detail.sizeInBytes / (1024**3)
    num_files = detail.numFiles
    avg_file_size_mb = (detail.sizeInBytes / num_files) / (1024**2) if num_files > 0 else 0
    
    # Estimate costs
    s3_standard_cost = total_size_gb * 0.023  # $/GB/month
    s3_ia_cost = total_size_gb * 0.0125
    s3_glacier_cost = total_size_gb * 0.004
    
    print(f"\nStorage Analysis for {table_path}:")
    print(f"  Total Size: {total_size_gb:.2f} GB")
    print(f"  Number of Files: {num_files}")
    print(f"  Average File Size: {avg_file_size_mb:.2f} MB")
    print(f"\nEstimated Monthly Costs:")
    print(f"  S3 Standard: ${s3_standard_cost:.2f}")
    print(f"  S3 Infrequent Access: ${s3_ia_cost:.2f}")
    print(f"  S3 Glacier: ${s3_glacier_cost:.2f}")
    
    if avg_file_size_mb < 64:
        print(f"\n  WARNING: Average file size is small ({avg_file_size_mb:.2f} MB)")
        print(f"  Consider running OPTIMIZE to merge small files")
    
    return {
        "total_size_gb": total_size_gb,
        "num_files": num_files,
        "avg_file_size_mb": avg_file_size_mb,
        "estimated_cost_standard": s3_standard_cost,
    }

monitor_storage_costs(spark, "/mnt/lakehouse/silver/transactions")

Example 3: Spark Configuration Tuning for Cost Efficiency

from pyspark.sql import SparkSession

# ─── Cost-Optimized Spark Configuration Templates ───

# Template 1: Batch Processing (Cost-Optimized)
BATCH_CONFIG = {
    "spark.sql.shuffle.partitions": "200",           # Reduce from default 200
    "spark.sql.adaptive.enabled": "true",             # AQE for dynamic optimization
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",
    "spark.sql.files.maxPartitionBytes": "134217728", # 128 MB per partition
    "spark.sql.files.openCostInBytes": "8388608",     # 8 MB
    "spark.memory.fraction": "0.8",                   # 80% for execution/storage
    "spark.memory.storageFraction": "0.3",            # 30% for storage
    "spark.shuffle.compress": "true",
    "spark.shuffle.spill.compress": "true",
    "spark.rdd.compress": "true",
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max": "512m",
    "spark.sql.autoBroadcastJoinThreshold": "10485760",  # 10 MB
    "spark.sql.sources.partitionOverwriteMode": "dynamic",
    "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
    "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true",
}

# Template 2: Streaming Processing (Low Latency)
STREAMING_CONFIG = {
    "spark.sql.streaming.microBatchPartitions": "200",
    "spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
    "spark.sql.streaming.stateStore.rocksdb.blockingRestartTimeout": "60s",
    "spark.sql.streaming.stateStore.rocksdb.compaction.enabled": "true",
    "spark.sql.streaming.noDataMicroBatches.enabled": "true",
    "spark.sql.streaming.schemaInference": "true",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.shuffle.partitions": "200",
    "spark.memory.fraction": "0.8",
    "spark.memory.storageFraction": "0.2",  # Less storage for streaming
    "spark.streaming.stopGracefullyOnShutdown": "true",
}

# Template 3: Machine Learning (GPU-Optimized)
ML_CONFIG = {
    "spark.sql.shuffle.partitions": "200",
    "spark.sql.adaptive.enabled": "true",
    "spark.executor.instances": "4",
    "spark.executor.memory": "16g",
    "spark.executor.cores": "4",
    "spark.driver.memory": "8g",
    "spark.memory.fraction": "0.8",
    "spark.memory.storageFraction": "0.2",  # Less storage for ML
    "spark.sql.autoBroadcastJoinThreshold": "52428800",  # 50 MB
    "spark.sql.execution.arrow.pyspark.enabled": "true",
    "spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
}

def create_cost_optimized_session(app_name, config_template="batch"):
    """Create a cost-optimized Spark session."""
    config = {
        "batch": BATCH_CONFIG,
        "streaming": STREAMING_CONFIG,
        "ml": ML_CONFIG,
    }.get(config_template, BATCH_CONFIG)
    
    builder = SparkSession.builder.appName(app_name)
    
    for key, value in config.items():
        builder = builder.config(key, value)
    
    return builder.getOrCreate()

# Create sessions for different workload types
batch_spark = create_cost_optimized_session("Batch-Processing", "batch")
streaming_spark = create_cost_optimized_session("Streaming-Processing", "streaming")
ml_spark = create_cost_optimized_session("ML-Training", "ml")

Performance Metrics

OptimizationBeforeAfterSavingsImplementation Time
Spot Instances (executors)1,814/month∣1,814/month |726/month60% ($1,088)1 day
Right-Sizing (r5.2xl β†’ r5.xl)1,814/month∣1,814/month |1,270/month30% ($544)2 days
Auto-Scaling (3-20 nodes)1,814/month∣1,814/month |1,180/month35% ($634)3 days
File Compaction (OPTIMIZE)690/month(30TB)∣690/month (30TB) |414/month (18TB)40% ($276)1 day
Storage Tiering (IA after 30d)690/month∣690/month |345/month50% ($345)0.5 days
Format (CSV β†’ Parquet+Zstd)690/month∣690/month |276/month60% ($414)2 days
Shuffle Optimization200/month(network)∣200/month (network) |120/month40% ($80)2 days
Caching (hot data)1,814/month∣1,814/month |1,542/month15% ($272)1 day
Scheduling (off-peak)1,814/month∣1,814/month |1,542/month15% ($272)1 day
TOTAL9,506/monthβˆ—βˆ—βˆ£βˆ—βˆ—9,506/month** | **5,995/month37% ($3,511)13 days

Best Practices

  1. Always use Spot instances for executors β€” Spot instances offer 60-80% savings with minimal risk when checkpointing is enabled. Configure multiple instance types (r5, r4, m5) for diversification and use capacity-optimized allocation strategy.

  2. Enable dynamic allocation β€” Set spark.dynamicAllocation.enabled=true with appropriate min/max executor counts. This allows the cluster to scale down during quiet periods and scale up during peaks.

  3. Right-size your executors β€” Profile your workload to determine the optimal executor size. For most workloads, 4-8 cores per executor with 8-16 GB memory provides the best cost-performance ratio.

  4. Implement storage tiering β€” Configure S3/ADLS lifecycle policies to move data to cheaper storage classes after 30 days (IA) and 90 days (Glacier). Delta Lake time travel queries automatically use the appropriate storage tier.

  5. Compact files regularly β€” Run OPTIMIZE on all active tables to merge small files. Target 128-512 MB file sizes for optimal read performance and storage efficiency. Use auto-compaction for streaming workloads.

  6. Use columnar formats with compression β€” Store data in Parquet or ORC format with Zstd or Snappy compression. This reduces storage by 60-80% and improves query performance through predicate pushdown.

  7. Monitor and alert on costs β€” Set up cloud cost monitoring dashboards and alerts for unexpected cost spikes. Track cost per query, cost per GB processed, and cost per data product.

  8. Schedule batch jobs during off-peak hours β€” Run heavy batch processing during night/weekend hours when spot prices are lower and on-demand capacity is more available.

  9. Use broadcast joins for small tables β€” Set spark.sql.autoBroadcastJoinThreshold to 10-50 MB to automatically broadcast small tables, avoiding expensive shuffle joins.

  10. Profile before optimizing β€” Use the Spark UI, Databricks query profiles, or AWS/GCP cost explorer to identify the actual bottlenecks before applying optimizations. Focus on the highest-impact optimizations first.

Mathematical Foundation Summary

Cost optimization follows TCO = compute_cost + storage_cost + network_cost + operational_cost. Spot instances reduce compute cost by savings = on_demand_price Γ— 0.6-0.8 with interruption_probability β‰ˆ 0.05-0.15. Auto-scaling optimizes utilization = actual_usage / allocated_resources targeting 70-85%. Partition pruning reduces I/O cost by reduction = pruned_partitions / total_partitions. Broadcast joins eliminate shuffle cost: shuffle_bytes = 0 for small tables. The cost-performance tradeoff follows Pareto optimality where no single metric can improve without degrading another.

See also: Adaptive Query Execution (30), Production Hardening (40), Data Mesh Architecture (37)

See Also

⭐

Premium Content

Cost Optimization for PySpark Workloads

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement