19. Spark Submit in PySpark
DfSpark Submit
spark-submit is the primary command-line tool for submitting PySpark applications to a cluster. It handles application packaging, dependency distribution, resource allocation, and execution across the cluster.
DfDeployment Mode
Deployment mode determines where the driver program runs: client mode (driver runs on the submission machine) or cluster mode (driver runs on a cluster worker node).
Here,
- =Total cluster resources required
- =Resources allocated to driver
- =Number of executors
- =Resources per executor
Optimal Parallelism
Here,
- =Optimal number of parallel tasks
- =Number of executors
- =Cores per executor
- =Multiplier factor (typically 2-3)
spark-submit supports multiple cluster managers: YARN (Hadoop), Kubernetes, Mesos, and Standalone. Each has unique configuration options and deployment considerations.
Use cluster mode for production deployments to ensure the driver runs reliably on the cluster. Use client mode for development and debugging when you need direct access to the driver.
ThResource Configuration Balance
Theorem: Optimal spark-submit configuration requires balancing four competing objectives: (1) Performance β sufficient resources for parallel execution; (2) Cost β minimal resource waste; (3) Reliability β fault tolerance and stability; (4) Scalability β ability to handle varying workloads.
- spark-submit is the primary tool for deploying PySpark applications
- Client mode runs driver locally; cluster mode runs driver on cluster
- YARN, Kubernetes, and Mesos are supported cluster managers
- Optimal configuration balances performance, cost, reliability, and scalability
- Production deployments should use cluster mode with appropriate resource allocation
Client vs Cluster Mode
ποΈ Spark Submit Architecture
π Detailed Explanation
What is spark-submit?
- Primary tool for deploying PySpark applications to a cluster
- Handles:
- Packaging application code and dependencies
- Distributing across the cluster
- Allocating resources
- Launching driver and executors
- Monitoring execution
- Abstracts away distributed computing complexity
Deployment Modes
| Mode | Driver Location | Best For |
|---|---|---|
| Client | Submission machine | Development, debugging |
| Cluster | Cluster worker node | Production deployments |
Key Takeaway: Use cluster mode for production β the driver runs reliably even if the submission machine goes offline.
Resource Allocation Parameters
| Parameter | Description |
|---|---|
--driver-memory | Driver heap size |
--executor-memory | Executor heap size |
--executor-cores | CPU cores per executor |
--num-executors | Number of executors |
Must be carefully configured based on workload and cluster resources.
Cluster Manager URLs
| Manager | --master Value |
|---|---|
| YARN | yarn |
| Kubernetes | API server URL |
| Standalone | Master node URL |
Each has unique configuration options and behaviors.
Configuration Parameters
- Passed using
--conf key=valuepairs - Override default Spark settings
- Common configurations:
spark.sql.shuffle.partitionsspark.serializerspark.dynamicAllocation.enabled
Dependency Management
| Parameter | Purpose |
|---|---|
--packages | Maven coordinates |
--repositories | Additional repositories |
--py-files | Python files to distribute |
Ensures all required dependencies are available on cluster nodes.
Application Parameters
- Main class (for Scala/Java)
- Application file (Python or JAR)
- Command-line arguments passed to the application
Monitoring & Logging
- Event logging for execution data
- Spark History Server integration
- Logging configuration options
- Essential for production deployments
Security Considerations
- Authentication β Kerberos
- Authorization β ACLs
- Encryption β SSL/TLS
- Secure environment variables
Must be configured appropriately for production environments.
Advanced Features
- Dynamic resource allocation β automatic executor scaling
- Priority-based scheduling
- Workflow manager integration β Airflow, Luigi
Best Practices
- Use cluster mode for production
- Right-size resources based on workload
- Use configuration files for complex settings
- Test with representative data
- Implement proper error handling and logging
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| spark-submit | Application submission tool | Deployment execution |
| Deployment Mode | Driver location (client/cluster) | Environment selection |
| Cluster Manager | Resource allocation service | Cluster integration |
| Resource Allocation | CPU/memory distribution | Performance optimization |
| Dependency Management | Package distribution | Environment setup |
| Configuration | Application settings | Behavior customization |
| Monitoring | Application observability | Operations management |
| Security | Access control and encryption | Production hardening |
π» Code Examples
Basic spark-submit Command
# Basic spark-submit command for PySpark application
# Parameter: --master β cluster manager URL
# Parameter: --deploy-mode β client or cluster mode
# Parameter: --driver-memory β driver heap size
# Parameter: --executor-memory β executor heap size
# Parameter: --num-executors β number of executors
spark-submit \
--master yarn \
--deploy-mode client \
--name "MyPySparkApp" \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
my_pyspark_app.py \
--input /data/input \
--output /data/output
Production Configuration with YARN
# Production spark-submit with YARN cluster manager
# Parameter: spark.yarn.historyServer.address β History Server URL
# Parameter: spark.eventLog.enabled β enable event logging
# Parameter: spark.yarn.maxAppAttempts β maximum application attempts
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "ProductionETL" \
--driver-memory 8g \
--executor-memory 16g \
--executor-cores 5 \
--num-executors 20 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.yarn.historyServer.address=history-server-host:18080 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs:///spark-history \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.task.maxFailures=8 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
--py-files utils.py,models.py \
etl_pipeline.py \
--date 2024-01-15 \
--config /etc/spark/config.json
Kubernetes Deployment
# Kubernetes spark-submit
# Parameter: spark.kubernetes.container.image β Docker image
# Parameter: spark.kubernetes.namespace β Kubernetes namespace
# Parameter: spark.kubernetes.authenticate.driver.serviceAccountName β service account
spark-submit \
--master k8s://https://kubernetes-master:6443 \
--deploy-mode cluster \
--name "K8sSparkJob" \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.kubernetes.container.image=spark:3.3.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driver.podTemplateFile=driver-pod.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor-pod.yaml \
--conf spark.kubernetes.driver.request.cores=1 \
--conf spark.kubernetes.executor.request.cores=2 \
--conf spark.kubernetes.executor.limit.cores=4 \
--conf spark.kubernetes.allocation.batch.size=5 \
k8s_job.py \
--input s3a://bucket/input \
--output s3a://bucket/output
Configuration File Approach
# Create spark-submit configuration file
# File: spark-submit-config.properties
config_content = """
# Cluster Manager
spark.master=yarn
spark.submit.deployMode=cluster
# Application Details
spark.app.name=DataProcessingPipeline
# Resource Allocation
spark.driver.memory=8g
spark.executor.memory=16g
spark.executor.cores=5
spark.num.executors=20
# Dynamic Allocation
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=50
spark.dynamicAllocation.executorIdleTimeout=60s
# Performance Optimization
spark.sql.shuffle.partitions=200
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.adaptive.enabled=true
# Monitoring
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-history
spark.yarn.historyServer.address=history-server-host:18080
# Fault Tolerance
spark.task.maxFailures=8
spark.yarn.maxAppAttempts=3
# Security
spark.authenticate=true
spark.authenticate.secret=secret_key
spark.io.encryption.enabled=true
"""
# Write configuration file
with open("spark-submit-config.properties", "w") as f:
f.write(config_content)
# Use configuration file with spark-submit
# Command:
# spark-submit --properties-file spark-submit-config.properties my_app.py
Automated spark-submit Script
#!/usr/bin/env python3
"""
Automated spark-submit script with validation and monitoring
"""
import subprocess
import sys
import os
import json
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SparkSubmitManager:
def __init__(self, config_file):
"""Initialize with configuration file"""
self.config = self.load_config(config_file)
self.validate_config()
def load_config(self, config_file):
"""Load configuration from JSON file"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {config_file}")
sys.exit(1)
def validate_config(self):
"""Validate configuration parameters"""
required_fields = ['master', 'deploy_mode', 'app_name', 'driver_memory',
'executor_memory', 'executor_cores', 'num_executors']
for field in required_fields:
if field not in self.config:
logger.error(f"Missing required configuration field: {field}")
sys.exit(1)
# Validate memory formats
memory_fields = ['driver_memory', 'executor_memory']
for field in memory_fields:
if not self.config[field].endswith(('g', 'm')):
logger.error(f"Invalid memory format for {field}: {self.config[field]}")
sys.exit(1)
logger.info("Configuration validation passed")
def build_spark_submit_command(self, app_file, app_args=None):
"""Build spark-submit command"""
cmd = [
"spark-submit",
"--master", self.config['master'],
"--deploy-mode", self.config['deploy_mode'],
"--name", self.config['app_name'],
"--driver-memory", self.config['driver_memory'],
"--executor-memory", self.config['executor_memory'],
"--executor-cores", str(self.config['executor_cores']),
"--num-executors", str(self.config['num_executors'])
]
# Add configuration options
if 'configs' in self.config:
for key, value in self.config['configs'].items():
cmd.extend(["--conf", f"{key}={value}"])
# Add packages if specified
if 'packages' in self.config:
cmd.extend(["--packages", self.config['packages']])
# Add Python files if specified
if 'py_files' in self.config:
cmd.extend(["--py-files", self.config['py_files']])
# Add application file and arguments
cmd.append(app_file)
if app_args:
cmd.extend(app_args)
return cmd
def execute_submission(self, app_file, app_args=None):
"""Execute spark-submit command"""
cmd = self.build_spark_submit_command(app_file, app_args)
logger.info(f"Executing spark-submit command:")
logger.info(f"Command: {' '.join(cmd)}")
try:
# Execute command
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Monitor output
stdout, stderr = process.communicate()
if process.returncode == 0:
logger.info("Spark application submitted successfully")
logger.info(f"Application ID: {self.extract_application_id(stdout)}")
return True
else:
logger.error(f"Spark submission failed with return code: {process.returncode}")
logger.error(f"Error output: {stderr}")
return False
except Exception as e:
logger.error(f"Exception during spark submission: {str(e)}")
return False
def extract_application_id(self, output):
"""Extract application ID from output"""
for line in output.split('\n'):
if 'application_' in line:
return line.split('application_')[1].split()[0]
return "unknown"
# Example usage
if __name__ == "__main__":
# Create sample configuration
sample_config = {
"master": "yarn",
"deploy_mode": "cluster",
"app_name": "AutomatedSparkJob",
"driver_memory": "4g",
"executor_memory": "8g",
"executor_cores": 4,
"num_executors": 10,
"configs": {
"spark.sql.shuffle.partitions": "200",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.dynamicAllocation.enabled": "true"
},
"packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
"py_files": "utils.py"
}
# Write sample configuration
with open("spark_config.json", "w") as f:
json.dump(sample_config, f, indent=2)
# Initialize manager
manager = SparkSubmitManager("spark_config.json")
# Execute submission
success = manager.execute_submission(
"my_application.py",
["--date", "2024-01-15", "--input", "/data/input"]
)
if success:
logger.info("Application submitted successfully")
else:
logger.error("Application submission failed")
sys.exit(1)
π Performance Metrics
| Metric | Optimal | Typical | Poor | Optimization |
|---|---|---|---|---|
| Startup Time | < 30s | 30s-2min | > 2min | Dependency optimization |
| Resource Utilization | > 80% | 60-80% | < 60% | Right-sizing, dynamic allocation |
| Application Success | > 99% | 95-99% | < 95% | Error handling, retry policies |
| Execution Time | < 1hr | 1-8hr | > 8hr | Parallelism, optimization |
| Cost Efficiency | < 10-50/hr | > $50/hr | Resource optimization |
π Best Practices
- Use cluster mode for production β Ensures driver runs reliably on cluster
- Right-size resources β Match configuration to workload requirements
- Use configuration files β Maintain complex settings in properties files
- Enable dynamic allocation β Automatic scaling based on workload
- Implement error handling β Proper retry and failure recovery
- Monitor applications β Use Spark History Server and metrics
- Test with production data β Validate performance with realistic datasets
- Use appropriate cluster manager β YARN, Kubernetes, or Mesos based on needs
- Optimize dependencies β Minimize package sizes and use fat JARs when needed
- Document configurations β Maintain clear records of spark-submit settings
π Related Topics
- 17-cluster-management.mdx: Cluster resource management
- 18-gc-tuning.mdx: JVM configuration for Spark
- 07-partitioning-strategies.mdx: Partitioning configuration
See Also
- 06-joins-optimization.mdx: Join optimization in production
- 08-caching-persistence.mdx: Caching strategies for applications
- Kafka Streams (kafka/03): Streaming application deployment
- Data Engineering Streaming (data-engineering/022): Production deployment patterns