Running PySpark as standalone application
You can run Spark applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application.
Submitting Spark Applications
To submit an application consisting of a Python file you can use the spark-submit script.
Symplified spark-submit Syntax
spark-submit --option value python file [application arguments]
where:
Option | Description |
---|---|
python file | Path to a Python file containing a Spark application. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; |
application arguments | Arguments to pass to the main method of your application. |
Selected spark-submit Options
Option | Description |
---|---|
deploy-mode | Deployment mode: cluster and client. In cluster mode, the driver runs on worker hosts. In client mode, the driver runs locally as an external client. Use cluster mode with production jobs; client mode is more appropriate for interactive and debugging uses, where you want to see your application output immediately. Default: client. |
driver-cores | Number of cores used by the driver in cluster mode. Default: 1. |
driver-memory | Maximum heap size (represented as a JVM string; for example 1024m, 2g, and so on) to allocate to the driver. Alternatively, you can use the spark.driver.memory property. |
master | The location to run the application. |
Master Values
Master | Description |
---|---|
local | Run Spark locally with one worker thread (that is, no parallelism). |
local[K] | Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.) |
local[*] | Run Spark locally with as many worker threads as logical cores on your host. |
yarn | Run using a YARN cluster manager. |
Note
For more information you can refer to Apache Submitting Applications documentation.
Example
Let's create a test.py file which contains code for counting of intensity datapoints within a time window:
from nxcals.api.extraction.data.builders import DevicePropertyDataQuery
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
conf = (SparkConf()
.setAppName("intensity_example")
.setMaster('yarn')
.set('spark.driver.memory', '16G')
)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
intensity = DevicePropertyDataQuery.builder(spark).system("CMW") \
.startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000") \
.entity().parameter("PR.BCT/HotspotIntensity").build()
intensity.count()
print("Found %d intensity data points." % intensity.count())
Alternatively spark session can be initialized using a session builder:
spark = SparkSession.builder.master("local").appName('intensity_example').getOrCreate()
...
In order to submit a spark job spark-submit must be invoked from our Spark bundle for NXCALS directory:
./bin/spark-submit test.py
Certain Spark pramaters can be specified as arguments. Lets run our test application on a YARN cluster on 8 cores and using 4GB of memory:
./bin/spark-submit --master yarn --executor-memory 4G --total-executor-cores 8 test.py