Skip to content

Spark session creation

Introduction

You can create Spark Session either by using methods provided by Spark directly or utility classes from NXCALS. In either case you will have to provide some details (properties) in order to correctly configure the session. You will have to chose if the session is run locally (on your computer, Local Mode) or on YARN (Hadoop cluster, YARN Mode). In the later case you will have to select the number of executors, number of cores and memory sizes. If you chose to use our utilities some helper classes are provided to make the process a little bit easier. For YARN we provide so-called Flavor that helps selecting the desired size of the resources. Those properties can also be overwritten if needed to match any specific case of yours.

Creating Spark Session with NXCALS utilities in Local Mode

//Defaults to local[*] mode
SparkProperties sparkProperties = SparkProperties.defaults("MY_APP");
SparkSession sparkSession = SparkUtils.createSparkSession(sparkProperties);
# Defaults to local[*] mode
from nxcals import spark_session_builder
spark_session = spark_session_builder.get_or_create(app_name='MY_APP')

Creating Spark Session with NXCALS utilities in YARN Mode using predefined Flavor.

Predefined application size on YARN can be selected using the Flavor class. Please note that any Spark property can be overwritten here according to the needs.

//Using SparkSessionFlavor.SMALL, all properties pre-set, running on Yarn
SparkProperties sparkProperties = SparkProperties.remoteSessionProperties("MY_APP", SparkSessionFlavor.SMALL, "pro");
//Can overwrite (or set) any property
sparkProperties.setProperty("spark.executor.instances", "6");
SparkSession sparkSession = SparkUtils.createSparkSession(sparkProperties);
from nxcals.spark_session_builder import get_or_create, Flavor 
# Using Flavor.YARN_SMALL, all properties pre-set but can be overwritten, running on Yarn
spark_session = get_or_create(app_name='MY_APP', flavor=Flavor.YARN_SMALL,
                              conf={'spark.executor.instances': '6'})

Creating Spark Session using Spark API in Local Mode

SparkSession sparkSession = SparkSession.builder().getOrCreate();
from pyspark.sql import SparkSession
import os
import sys

# make sure SPARK_HOME is set to our preconfigured bundle
if "SPARK_HOME" not in os.environ:
    os.environ["SPARK_HOME"] = os.path.join(sys.prefix, "nxcals-bundle")

# local session
spark_session = SparkSession.builder.getOrCreate()

Creating Spark Session using Spark API in YARN Mode

Please note that in this case you have to specify all the required properties.

SparkConf sparkConf =  new SparkConf()
        .setAppName("MY_APP")
        .setMaster("yarn")
        .set("spark.submit.deployMode", "client")
        .set("spark.yarn.appMasterEnv.JAVA_HOME",  "/var/nxcals/jdk1.11")
        .set("spark.executorEnv.JAVA_HOME", "/var/nxcals/jdk1.11")
        .set("spark.yarn.jars", "hdfs:////project/nxcals/lib/spark-3.3.1/*.jar,hdfs:////project/nxcals/nxcals_lib/nxcals_pro/*.jar\"")
        .set("spark.executor.instances", "4")
        .set("spark.executor.cores", "1")
        .set("spark.executor.memory", "1g")
        .set("sql.caseSensitive", "true")
        .set("spark.kerberos.access.hadoopFileSystems", "nxcals");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
from pyspark.sql import SparkSession
import os
import sys

# make sure SPARK_HOME is set to our preconfigured bundle
if "SPARK_HOME" not in os.environ:
    os.environ["SPARK_HOME"] = os.path.join(sys.prefix, "nxcals-bundle")

# must set this property if using Spark APIs directly for Yarn
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"

# Yarn session with executors on the cluster
spark_session = SparkSession.builder \
    .appName("MY_APP") \
    .master("yarn") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.yarn.appMasterEnv.JAVA_HOME", "/var/nxcals/jdk1.11") \
    .config("spark.executorEnv.JAVA_HOME", "/var/nxcals/jdk1.11") \
    .config("spark.yarn.jars", "hdfs:////project/nxcals/lib/spark-3.3.1/*.jar,hdfs:////project/nxcals/nxcals_lib/nxcals_pro/*.jar\"") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "1g") \
    .config("sql.caseSensitive", "true") \
    .config("spark.kerberos.access.hadoopFileSystems", "nxcals")\
    .getOrCreate()

Note

More information about used properties for YARN can be found in Spark Apache documentation pages

Note

There are three different configurations (Flavors) available (for application resources on the Hadoop cluster):

Configuration type Spark executor cores Spark executor instances Spark executor memory
SMALL 2 4 2g
MEDIUM 4 8 4g
LARGE 4 16 4g

Additionaly the flavors receive some predefined Spark properties as follows:

properties spark.sql.parquet.enableNestedColumnVectorizedReader = "true" spark.task.maxDirectResultSize: "1000g" spark.driver.maxResultSize: "1000g"

For more detailed information on tuning Spark sessions, please consult a dedicated page.