Skip to content

Spark session creation


You can create Spark Session either by using methods provided by Spark directly or utility classes from NXCALS. In either case you will have to provide some details (properties) in order to correctly configure the session. You will have to chose if the session is run locally (on your computer, Local Mode) or on YARN (Hadoop cluster, YARN Mode). In the later case you will have to select the number of executors, number of cores and memory sizes. If you chose to use our utilities some helper classes are provided to make the process a little bit easier. For YARN we provide so-called Flavor that helps selecting the desired size of the resources. Those properties can also be overwritten if needed to match any specific case of yours.

Creating Spark Session with NXCALS utilities in Local Mode

//Defaults to local[*] mode
SparkProperties sparkProperties = SparkProperties.defaults("MY_APP");
SparkSession sparkSession = SparkUtils.createSparkSession(sparkProperties);
# Defaults to local[*] mode
from nxcals import spark_session_builder
spark_session = spark_session_builder.get_or_create(app_name='MY_APP')

Creating Spark Session with NXCALS utilities in YARN Mode using predefined Flavor.

Predefined application size on YARN can be selected using the Flavor class. Please note that any Spark property can be overwritten here according to the needs.

//Using SparkSessionFlavor.SMALL, all properties pre-set, running on Yarn
SparkProperties sparkProperties = SparkProperties.remoteSessionProperties("MY_APP", SparkSessionFlavor.SMALL, "pro");
//Can overwrite (or set) any property
sparkProperties.setProperty("spark.executor.instances", "6");
SparkSession sparkSession = SparkUtils.createSparkSession(sparkProperties);
from nxcals.spark_session_builder import get_or_create, Flavor 
# Using Flavor.YARN_SMALL, all properties pre-set but can be overwritten, running on Yarn
spark_session = get_or_create(app_name='MY_APP', flavor=Flavor.YARN_SMALL,
                              conf={'spark.executor.instances': '6'})

There are three different configurations (Flavors) available (for application resources on the Hadoop cluster):

Configuration type Spark executor cores Spark executor instances Spark executor memory
SMALL 2 4 6g
MEDIUM 4 8 6g
LARGE 8 8 8g

Additionaly the flavors receive some predefined Spark properties as follows:

spark.sql.parquet.enableNestedColumnVectorizedReader = "true"
spark.task.maxDirectResultSize: "2047m"
spark.driver.maxResultSize: "1000g"

For more detailed information on tuning Spark sessions, please consult a dedicated page.

Creating Spark Session using Spark API in Local Mode

SparkSession sparkSession = SparkSession.builder().getOrCreate();
from pyspark.sql import SparkSession
import os
import sys

# make sure SPARK_HOME is set to our preconfigured bundle
if "SPARK_HOME" not in os.environ:
    os.environ["SPARK_HOME"] = os.path.join(sys.prefix, "nxcals-bundle")

# local session
spark_session = SparkSession.builder.getOrCreate()

Creating Spark Session using Spark API in YARN Mode

Please note that in this case you have to specify all the required properties.

SparkConf sparkConf =  new SparkConf()
        .set("spark.submit.deployMode", "client")
        .set("spark.yarn.appMasterEnv.JAVA_HOME",  "/var/nxcals/jdk1.11")
        .set("spark.executorEnv.JAVA_HOME", "/var/nxcals/jdk1.11")
        .set("spark.yarn.jars", "hdfs:////project/nxcals/lib/spark-3.3.1/*.jar,hdfs:////project/nxcals/nxcals_lib/nxcals_pro/*.jar\"")
        .set("spark.executor.instances", "4")
        .set("spark.executor.cores", "1")
        .set("spark.executor.memory", "1g")
        .set("sql.caseSensitive", "true")
        .set("spark.kerberos.access.hadoopFileSystems", "nxcals");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
from pyspark.sql import SparkSession
import os
import sys

# make sure SPARK_HOME is set to our preconfigured bundle
if "SPARK_HOME" not in os.environ:
    os.environ["SPARK_HOME"] = os.path.join(sys.prefix, "nxcals-bundle")

# must set this property if using Spark APIs directly for Yarn
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"

# Yarn session with executors on the cluster
spark_session = SparkSession.builder \
    .appName("MY_APP") \
    .master("yarn") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.yarn.appMasterEnv.JAVA_HOME", "/var/nxcals/jdk1.11") \
    .config("spark.executorEnv.JAVA_HOME", "/var/nxcals/jdk1.11") \
    .config("spark.yarn.jars", "hdfs:////project/nxcals/lib/spark-3.3.1/*.jar,hdfs:////project/nxcals/nxcals_lib/nxcals_pro/*.jar\"") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "1g") \
    .config("sql.caseSensitive", "true") \
    .config("spark.kerberos.access.hadoopFileSystems", "nxcals")\


More information about used properties for YARN can be found in Spark Apache documentation pages

Modes and their names

Possible master values (the location to run the application):

  • local: Run Spark locally with one worker thread (that is, no parallelism).
  • local[K]: Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.)
  • local[*]: Run Spark locally with as many worker threads as logical cores on your host.
  • yarn: Run using a YARN cluster manager.

Using spark session suppliers

When you are implementing long-running service, then you have to handle life-cycle of SparkSession, which might be closed. To handle this problem, services from cern-extraction package can accept Supplier<SparkSession>. Such supplier might be conveniently created with helper methods from SparkUtils class. Here is an example:

Supplier<SparkSession> supplier = SparkUtils.createSparkSessionSupplier(
        SparkProperties.remoteSessionProperties("NXCALS-Aggregation", SparkSessionFlavor.MEDIUM, "pro"));

AggregationService aggregationService = Services.newInstance(supplier).aggregationService();

Supplier created in this way, will take care of recreating SparkSession when it is needed. Apart these services, it can be used with normal extraction calls:

Dataset<Row> ds = DataQuery.builder(supplier.get()).variables().system("CMW")
        .timeWindow("2023-06-11 00:00:00", "2023-06-11 02:00:00")