πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

19. Spark Submit in PySpark

🟒 Free Lesson

Advertisement

19. Spark Submit in PySpark

DfSpark Submit

spark-submit is the primary command-line tool for submitting PySpark applications to a cluster. It handles application packaging, dependency distribution, resource allocation, and execution across the cluster.

DfDeployment Mode

Deployment mode determines where the driver program runs: client mode (driver runs on the submission machine) or cluster mode (driver runs on a cluster worker node).

Resource Allocation Formula
Rtotal=Rdriver+NexecutorsΓ—RexecutorR_{total} = R_{driver} + N_{executors} \times R_{executor}

Here,

  • RtotalR_{total}=Total cluster resources required
  • RdriverR_{driver}=Resources allocated to driver
  • NexecutorsN_{executors}=Number of executors
  • RexecutorR_{executor}=Resources per executor

Optimal Parallelism

Poptimal=NexecutorsΓ—CexecutorΓ—MfactorP_{optimal} = N_{executors} \times C_{executor} \times M_{factor}

Here,

  • PoptimalP_{optimal}=Optimal number of parallel tasks
  • NexecutorsN_{executors}=Number of executors
  • CexecutorC_{executor}=Cores per executor
  • MfactorM_{factor}=Multiplier factor (typically 2-3)

spark-submit supports multiple cluster managers: YARN (Hadoop), Kubernetes, Mesos, and Standalone. Each has unique configuration options and deployment considerations.

Use cluster mode for production deployments to ensure the driver runs reliably on the cluster. Use client mode for development and debugging when you need direct access to the driver.

ThResource Configuration Balance

Theorem: Optimal spark-submit configuration requires balancing four competing objectives: (1) Performance β€” sufficient resources for parallel execution; (2) Cost β€” minimal resource waste; (3) Reliability β€” fault tolerance and stability; (4) Scalability β€” ability to handle varying workloads.

  • spark-submit is the primary tool for deploying PySpark applications
  • Client mode runs driver locally; cluster mode runs driver on cluster
  • YARN, Kubernetes, and Mesos are supported cluster managers
  • Optimal configuration balances performance, cost, reliability, and scalability
  • Production deployments should use cluster mode with appropriate resource allocation

Client vs Cluster Mode

Client ModeDriverOn client machineExec 1WorkerExec 2WorkerDirect access to driver logsGood for development/debuggingDriver SPOF if client disconnectsCluster ModeExec 1WorkerDriverInside clusterExec 2WorkerDriver runs on cluster nodeRecommended for productionFault tolerant, no client dependencyResource Allocation Flowspark-submitRequest resourcesLaunch containersRegister executorStart tasks

πŸ—οΈ Spark Submit Architecture

πŸ“š Detailed Explanation

What is spark-submit?

  • Primary tool for deploying PySpark applications to a cluster
  • Handles:
    • Packaging application code and dependencies
    • Distributing across the cluster
    • Allocating resources
    • Launching driver and executors
    • Monitoring execution
  • Abstracts away distributed computing complexity

Deployment Modes

ModeDriver LocationBest For
ClientSubmission machineDevelopment, debugging
ClusterCluster worker nodeProduction deployments

Key Takeaway: Use cluster mode for production β€” the driver runs reliably even if the submission machine goes offline.


Resource Allocation Parameters

ParameterDescription
--driver-memoryDriver heap size
--executor-memoryExecutor heap size
--executor-coresCPU cores per executor
--num-executorsNumber of executors

Must be carefully configured based on workload and cluster resources.


Cluster Manager URLs

Manager--master Value
YARNyarn
KubernetesAPI server URL
StandaloneMaster node URL

Each has unique configuration options and behaviors.


Configuration Parameters

  • Passed using --conf key=value pairs
  • Override default Spark settings
  • Common configurations:
    • spark.sql.shuffle.partitions
    • spark.serializer
    • spark.dynamicAllocation.enabled

Dependency Management

ParameterPurpose
--packagesMaven coordinates
--repositoriesAdditional repositories
--py-filesPython files to distribute

Ensures all required dependencies are available on cluster nodes.


Application Parameters

  1. Main class (for Scala/Java)
  2. Application file (Python or JAR)
  3. Command-line arguments passed to the application

Monitoring & Logging

  • Event logging for execution data
  • Spark History Server integration
  • Logging configuration options
  • Essential for production deployments

Security Considerations

  • Authentication β€” Kerberos
  • Authorization β€” ACLs
  • Encryption β€” SSL/TLS
  • Secure environment variables

Must be configured appropriately for production environments.


Advanced Features

  • Dynamic resource allocation β€” automatic executor scaling
  • Priority-based scheduling
  • Workflow manager integration β€” Airflow, Luigi

Best Practices

  1. Use cluster mode for production
  2. Right-size resources based on workload
  3. Use configuration files for complex settings
  4. Test with representative data
  5. Implement proper error handling and logging

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
spark-submitApplication submission toolDeployment execution
Deployment ModeDriver location (client/cluster)Environment selection
Cluster ManagerResource allocation serviceCluster integration
Resource AllocationCPU/memory distributionPerformance optimization
Dependency ManagementPackage distributionEnvironment setup
ConfigurationApplication settingsBehavior customization
MonitoringApplication observabilityOperations management
SecurityAccess control and encryptionProduction hardening

πŸ’» Code Examples

Basic spark-submit Command

# Basic spark-submit command for PySpark application
# Parameter: --master β€” cluster manager URL
# Parameter: --deploy-mode β€” client or cluster mode
# Parameter: --driver-memory β€” driver heap size
# Parameter: --executor-memory β€” executor heap size
# Parameter: --num-executors β€” number of executors

spark-submit \
    --master yarn \
    --deploy-mode client \
    --name "MyPySparkApp" \
    --driver-memory 4g \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    --conf spark.sql.shuffle.partitions=200 \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    my_pyspark_app.py \
    --input /data/input \
    --output /data/output

Production Configuration with YARN

# Production spark-submit with YARN cluster manager
# Parameter: spark.yarn.historyServer.address β€” History Server URL
# Parameter: spark.eventLog.enabled β€” enable event logging
# Parameter: spark.yarn.maxAppAttempts β€” maximum application attempts

spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --name "ProductionETL" \
    --driver-memory 8g \
    --executor-memory 16g \
    --executor-cores 5 \
    --num-executors 20 \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=5 \
    --conf spark.dynamicAllocation.maxExecutors=50 \
    --conf spark.yarn.historyServer.address=history-server-host:18080 \
    --conf spark.eventLog.enabled=true \
    --conf spark.eventLog.dir=hdfs:///spark-history \
    --conf spark.yarn.maxAppAttempts=3 \
    --conf spark.task.maxFailures=8 \
    --conf spark.sql.adaptive.enabled=true \
    --conf spark.sql.adaptive.coalescePartitions.enabled=true \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
    --py-files utils.py,models.py \
    etl_pipeline.py \
    --date 2024-01-15 \
    --config /etc/spark/config.json

Kubernetes Deployment

# Kubernetes spark-submit
# Parameter: spark.kubernetes.container.image β€” Docker image
# Parameter: spark.kubernetes.namespace β€” Kubernetes namespace
# Parameter: spark.kubernetes.authenticate.driver.serviceAccountName β€” service account

spark-submit \
    --master k8s://https://kubernetes-master:6443 \
    --deploy-mode cluster \
    --name "K8sSparkJob" \
    --driver-memory 4g \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    --conf spark.kubernetes.container.image=spark:3.3.0 \
    --conf spark.kubernetes.namespace=spark-jobs \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.driver.podTemplateFile=driver-pod.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=executor-pod.yaml \
    --conf spark.kubernetes.driver.request.cores=1 \
    --conf spark.kubernetes.executor.request.cores=2 \
    --conf spark.kubernetes.executor.limit.cores=4 \
    --conf spark.kubernetes.allocation.batch.size=5 \
    k8s_job.py \
    --input s3a://bucket/input \
    --output s3a://bucket/output

Configuration File Approach

# Create spark-submit configuration file
# File: spark-submit-config.properties

config_content = """
# Cluster Manager
spark.master=yarn
spark.submit.deployMode=cluster

# Application Details
spark.app.name=DataProcessingPipeline

# Resource Allocation
spark.driver.memory=8g
spark.executor.memory=16g
spark.executor.cores=5
spark.num.executors=20

# Dynamic Allocation
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=50
spark.dynamicAllocation.executorIdleTimeout=60s

# Performance Optimization
spark.sql.shuffle.partitions=200
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.adaptive.enabled=true

# Monitoring
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-history
spark.yarn.historyServer.address=history-server-host:18080

# Fault Tolerance
spark.task.maxFailures=8
spark.yarn.maxAppAttempts=3

# Security
spark.authenticate=true
spark.authenticate.secret=secret_key
spark.io.encryption.enabled=true
"""

# Write configuration file
with open("spark-submit-config.properties", "w") as f:
    f.write(config_content)

# Use configuration file with spark-submit
# Command:
# spark-submit --properties-file spark-submit-config.properties my_app.py

Automated spark-submit Script

#!/usr/bin/env python3
"""
Automated spark-submit script with validation and monitoring
"""

import subprocess
import sys
import os
import json
import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SparkSubmitManager:
    def __init__(self, config_file):
        """Initialize with configuration file"""
        self.config = self.load_config(config_file)
        self.validate_config()
    
    def load_config(self, config_file):
        """Load configuration from JSON file"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            logger.error(f"Configuration file not found: {config_file}")
            sys.exit(1)
    
    def validate_config(self):
        """Validate configuration parameters"""
        required_fields = ['master', 'deploy_mode', 'app_name', 'driver_memory', 
                          'executor_memory', 'executor_cores', 'num_executors']
        
        for field in required_fields:
            if field not in self.config:
                logger.error(f"Missing required configuration field: {field}")
                sys.exit(1)
        
        # Validate memory formats
        memory_fields = ['driver_memory', 'executor_memory']
        for field in memory_fields:
            if not self.config[field].endswith(('g', 'm')):
                logger.error(f"Invalid memory format for {field}: {self.config[field]}")
                sys.exit(1)
        
        logger.info("Configuration validation passed")
    
    def build_spark_submit_command(self, app_file, app_args=None):
        """Build spark-submit command"""
        cmd = [
            "spark-submit",
            "--master", self.config['master'],
            "--deploy-mode", self.config['deploy_mode'],
            "--name", self.config['app_name'],
            "--driver-memory", self.config['driver_memory'],
            "--executor-memory", self.config['executor_memory'],
            "--executor-cores", str(self.config['executor_cores']),
            "--num-executors", str(self.config['num_executors'])
        ]
        
        # Add configuration options
        if 'configs' in self.config:
            for key, value in self.config['configs'].items():
                cmd.extend(["--conf", f"{key}={value}"])
        
        # Add packages if specified
        if 'packages' in self.config:
            cmd.extend(["--packages", self.config['packages']])
        
        # Add Python files if specified
        if 'py_files' in self.config:
            cmd.extend(["--py-files", self.config['py_files']])
        
        # Add application file and arguments
        cmd.append(app_file)
        
        if app_args:
            cmd.extend(app_args)
        
        return cmd
    
    def execute_submission(self, app_file, app_args=None):
        """Execute spark-submit command"""
        cmd = self.build_spark_submit_command(app_file, app_args)
        
        logger.info(f"Executing spark-submit command:")
        logger.info(f"Command: {' '.join(cmd)}")
        
        try:
            # Execute command
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            # Monitor output
            stdout, stderr = process.communicate()
            
            if process.returncode == 0:
                logger.info("Spark application submitted successfully")
                logger.info(f"Application ID: {self.extract_application_id(stdout)}")
                return True
            else:
                logger.error(f"Spark submission failed with return code: {process.returncode}")
                logger.error(f"Error output: {stderr}")
                return False
                
        except Exception as e:
            logger.error(f"Exception during spark submission: {str(e)}")
            return False
    
    def extract_application_id(self, output):
        """Extract application ID from output"""
        for line in output.split('\n'):
            if 'application_' in line:
                return line.split('application_')[1].split()[0]
        return "unknown"

# Example usage
if __name__ == "__main__":
    # Create sample configuration
    sample_config = {
        "master": "yarn",
        "deploy_mode": "cluster",
        "app_name": "AutomatedSparkJob",
        "driver_memory": "4g",
        "executor_memory": "8g",
        "executor_cores": 4,
        "num_executors": 10,
        "configs": {
            "spark.sql.shuffle.partitions": "200",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.dynamicAllocation.enabled": "true"
        },
        "packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
        "py_files": "utils.py"
    }
    
    # Write sample configuration
    with open("spark_config.json", "w") as f:
        json.dump(sample_config, f, indent=2)
    
    # Initialize manager
    manager = SparkSubmitManager("spark_config.json")
    
    # Execute submission
    success = manager.execute_submission(
        "my_application.py",
        ["--date", "2024-01-15", "--input", "/data/input"]
    )
    
    if success:
        logger.info("Application submitted successfully")
    else:
        logger.error("Application submission failed")
        sys.exit(1)

πŸ“ˆ Performance Metrics

MetricOptimalTypicalPoorOptimization
Startup Time< 30s30s-2min> 2minDependency optimization
Resource Utilization> 80%60-80%< 60%Right-sizing, dynamic allocation
Application Success> 99%95-99%< 95%Error handling, retry policies
Execution Time< 1hr1-8hr> 8hrParallelism, optimization
Cost Efficiency< 10/hr∣10/hr |10-50/hr> $50/hrResource optimization

πŸ† Best Practices

  1. Use cluster mode for production β€” Ensures driver runs reliably on cluster
  2. Right-size resources β€” Match configuration to workload requirements
  3. Use configuration files β€” Maintain complex settings in properties files
  4. Enable dynamic allocation β€” Automatic scaling based on workload
  5. Implement error handling β€” Proper retry and failure recovery
  6. Monitor applications β€” Use Spark History Server and metrics
  7. Test with production data β€” Validate performance with realistic datasets
  8. Use appropriate cluster manager β€” YARN, Kubernetes, or Mesos based on needs
  9. Optimize dependencies β€” Minimize package sizes and use fat JARs when needed
  10. Document configurations β€” Maintain clear records of spark-submit settings

πŸ”— Related Topics

  • 17-cluster-management.mdx: Cluster resource management
  • 18-gc-tuning.mdx: JVM configuration for Spark
  • 07-partitioning-strategies.mdx: Partitioning configuration

See Also

  • 06-joins-optimization.mdx: Join optimization in production
  • 08-caching-persistence.mdx: Caching strategies for applications
  • Kafka Streams (kafka/03): Streaming application deployment
  • Data Engineering Streaming (data-engineering/022): Production deployment patterns
⭐

Premium Content

19. Spark Submit in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement