Skip to content

Using Spark bundle

The NXCALS ecosystem contains many different environments and systems. Configuring a vanilla Spark instance for NXCALS data access requires a brief knowledge of the infrastructure that it depends on and interacts with. Here are some of the elements that have to be taken into account besides registration and authentication:

  • Hadoop cluster that NXCALS is running (configuration for connecting)
  • Location of the API services and brokers
  • NXCALS data access client module and its dependencies schema
  • Python execution environment configuration (for PySpark)
  • Effective Spark properties configuration
  • Basic YARN configuration (optionally for yarn mode execution)

The first set of actions (outside of the context of spark) are considered as prerequisites for Spark configuration. Those actions remain common between different user cases, since they have to be applied for any operation that requires data access via a Spark context. There is where the NXCALS bundle come very handy. The bundle contain a pre-configured spark instance, with a single goal in mind: providing a bootstrapped instance of Spark with all the needed configuration to connect on-the-fly to a target NXCALS environment.

PySpark and Scala Spark execution contexts are supported by bundle.

We recommend reading Spark documentation and going via the book "Spark: The Definitive Guide" by Bill Chambers and Matei Zaharia.

Bundle download

The latest version of bundle can be downloaded and unzipped using the following bash command:

# Download the latest version of bundle for the NXCALS PRO environment
curl -s -k -O http://photons-resources.cern.ch/downloads/nxcals_pro/spark/spark-nxcals.zip &&
unzip spark-nxcals.zip && rm spark-nxcals.zip && cd spark-*-bin-hadoop3

If required it can be obtained interactively from the following links:

PROspark-nxcals.zip
TESTBEDspark-nxcals.zip

PySpark

Setting up your virtual environment for running pySpark (first time)

Similarly to using NXCALS through the installation with pip, we need to use acc-py as our virtual environment. If you have a computer from the CERN accelerator sector (ACC):

source /acc/local/share/python/acc-py/base/pro/setup.sh
acc-py venv ./venv
source ./venv/bin/activate

If you don't have a computer from the CERN accelerator sector (ACC):

python3 -m venv ./venv
source ./venv/bin/activate
python -m pip install git+https://gitlab.cern.ch/acc-co/devops/python/acc-py-pip-config.git

Within the same Spark bundle for NXCALS you will find a bash script named source-me.sh.

This script is responsible for setting-up the virtual environment for NXCALS needs. Then, on that virtual python environment it loads the required NXCALS data access python package (bundled with the sources), to make the API directly available on PySpark context.

Sourcing the environment preparation script

Sourcing the python virtual environment preparation script is of highly important and should not be omitted when the intention is to run a PySpark session!

That's because NXCALS sources are installed via the managed pip instance. Doing otherwise will result to a PySpark initialization in non-workable state.

Running PySpark with required Kerberos authentication:

Running PySpark

# Init Kerberos
$ kinit

# Init venv
$ source ./venv/bin/activate

# source the source-me.sh script to setup and activate python3 venv
(nxcals-python3-env)$ source ./source-me.sh

# the execution of the above preparation script, will generate a new directory on the spark-bundle root
# named 'nxcals-python3-env' and is used as target for loading the virtual env


# check if python path is directly available/served from within the virtual env directory
(nxcals-python3-env)$ which python

# /path/to/spark/nxcals/bundle/spark-3.5.0-bin-hadoop3/nxcals-python3-env/bin/python


# check list of modules installed in virtual env:
(nxcals-python3-env)$ pip list

# numpy (1.14.2)
# nxcals-data-access-python3 (1.4.0)
# pandas (0.24.2)
# pip (9.0.1)
# pyarrow (0.13.0)
# setuptools (28.8.0)


# If the output from both commands is similar to the one presented, we are good to go with pyspark
# Now just run the pyspark executable and you are ready to go with NXCALS config

(nxcals-python3-env)$ ./bin/pyspark

# Or executors=5 cores-per-executor=2 memory=3GB driver-memory=10G on YARN mode

(nxcals-python3-env)$ ./bin/pyspark --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G


# Once the context is loaded

# Import NXCALS Builders
>>> from nxcals.api.extraction.data.builders import *

# Build a CMW specific Device/Property query
>>> query = DevicePropertyDataQuery.builder(spark).system("CMW") \
    .startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000") \
    .entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build().select("USER","DEST")


# Or you can query by key-values, which is a more generic approach 
>>> query = DataQuery.builder(spark).byEntities().system("CMW") \
    .startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000").entity() \
    .keyValue("device","CPS.TGM").keyValue("property", "FULL-TELEGRAM.STRC").build()

# Count the records in the dataframe
>>> query.count()
Count: 644142

# To exit pyspark type
>>> quit()

# To deactivate virtual python3 environment type
(nxcals-python3-env)$ deactivate

How to modify the Spark Bundle for Python to use keytab

Keytab is convinent way to use Kerberos. It allows Spark to login as configured user. To obtain keytab, please follow instruction on authentication page. The principal and keytab properties can be passed to the Spark context. This can be done by modifying the file nxcals-bundle/conf/spark-defaults.conf in your Python venv:

  • for the NXCALS API services we need to add to spark.driver.extraJavaOptions two additional options:
    -Dkerberos.principal=<user>@CERN.CH -Dkerberos.keytab=/path/to/keytab/my.keytab
    
  • for YARN-specific Kerberos configuration one should use additionally the properties below:
    spark.yarn.keytab /path/to/keytab/my.keytab
    spark.yarn.principal <user>@CERN.CH
    

However, above spark.yarn properties can be passed as command-line options to spark session:

$ pyspark --principal <user>@CERN.CH --keytab /path/to/keytab/my.keytab

Standalone PySpark script

Instead of running PySpark applications in the interactive mode you can submit them through spark-submit script to be run locally or in a cluster as described here.

Scala Spark shell

The NXCALS Spark bundle also allows to run the Scala Spark shell. Similarly to PySpark it relies on Kerberos authentication which can be provided in the same way.

Sample session of spark-shell running Scala code is provided below:

$ ./bin/spark-shell

# Use a specific Spark configuration: executors=5 cores-per-executor=2 memory=3GB driver-memory=10G in YARN mode
$ ./bin/spark-shell --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G

# Or just get some help on available parameters
$ ./bin/spark-shell --help

# OPTIONALLY: Load some examples (from example.scala file, ls the dir to see)
# Once the scala shell is started please load the examples (it will run some queries)
#
# Execution time depends on the number of executors, memory, cores, itp
# There will be 3 datasets loaded, data, tgmData & tgmFiltered ready to play with.

scala> :load example.scala

# Wait for the script to load, watch the data being retrieved and some operations done on it.
# Once you get the prompt scala> back you can write your commands.


# Some operations on the data (see example.scala file for more or read the Spark manual)
scala> data.printSchema
scala> tgmData.printSchema

scala> data.show
scala> tgmData.show

scala> tgmFiltered.count
scala> tgmFiltered.join(data, "cyclestamp").agg(sum('current')).show

# To exit spark-shell type
scala> :quit

You can look at some scala code examples for CMW.