Data Access methods
There are several possibilities for users to access NXCALS data.
In this guide we will explain and give examples how to access data using different methods:
- NXCALS Spark bundle including PySpark tool
- Running PySpark as standalone application
- SWAN Notebook
- Java API
- Python application outside of PySpark shell
- Scala Spark shell (part of NXCALS Spark bundle)
Authorization is required for any of those methods in the form of a Kerberos token. A Kerberos keytab file must be generated and optionally the client machine must provide a valid CERN grid CA certificate. Moreover, the user must be registered on the NXCALS service to access the data.
Supported Java version
Please note that NXCALS requires a specific version of JDK. More information can be found on a dedicated page.
Deprecated Access Methods
- python builders were moved from
- java builders were moved from
DevicePropertyQueryhas been renamed to
VariableQuerywere unified into 'DataQuery', accessible via
NXCALS Spark bundle
NXCALS ecosystem contains many different environments and systems. Configuring a vanilla Spark instance for NXCALS data access requires a brief knowledge of the infrastructure that it depends on and interacts with. Just to mention some of the elements that have to be taken into account besides registration and authentication:
- Hadoop cluster that NXCALS is running (configuration for connecting)
- Location of the API services and brokers
- NXCALS data access client module and it's dependencies schema
- Python execution environment configuration (for PySpark)
- Effective Spark properties configuration
- Basic YARN configuration (optionally for yarn mode execution)
The first set of actions (outside of the context of spark) are considered as prerequisites for Spark configuration. Those actions remain common between different user cases, since they have to be applied for any operation that requires data access via a Spark context. There is where NXCALS Spark bundle comes very handy. That bundle contains a pre-configured spark instance, with one and single goal in mind: provides downloadable, bootstrapped instance of Spark with all the needed configuration to connect on-the-fly to a target NXCALS environment.
PySpark and Scala Spark execution contexts are supported by this bundle.
We recommend reading Spark documentation and going via the book "Spark: The Definitive Guide, 1st Edition" by Bill Chambers and Matei Zaharia.
The latest version of bundle can be downloaded and unzipped using the following bash command:
# Download the latest version of bundle for the NXCALS PRO environment $ curl -s -k -O http://photons-resources.cern.ch/downloads/nxcals_pro/spark/spark-nxcals.zip && unzip spark-nxcals.zip && rm spark-nxcals.zip && cd spark-*-bin-hadoop3.2
If required it can be obtained interactively from the following links:
NXCALS Spark bundle is using relative path configuration in order to load its resources. To allow Spark correctly pick all the required sources you need to execute its binaries from within the extracted spark directory.
One of the tools included in NXCALS bundle is PySpark.
NXCALS PySpark shell installation requires Python 3.6.5 or above. Versions greater than or equal to 3.7.0 were not tested and therefore not guaranteed to work.
You can check python version using:
$ python -V
Setting up Python3 virtual environment for running Pyspark:
Within the same Spark bundle for NXCALS you will find a bash script named source-me.sh.
This script is responsible for setting-up and activating a virtual python3 environment for NXCALS needs. Then, on that virtual python environment it loads the required NXCALS data access python package (bundled with the sources), to make the API directly available on PySpark context.
Sourcing the environment preparation script
Sourcing the python virtual environment preparation script is of highly important and should not be omitted when the intention is to run a PySpark session!
That's because NXCALS sources are installed via the managed pip instance. Doing otherwise will result to a PySpark initialization in non-workable state.
Running PySpark with required Kerberos authentication:
Both the NXCALS API services and the dedicated Hadoop cluster (operating under IT-DB-SAS group) are relying on Kerberos as their authentication mechanism. There is a need to provide the session with Kerberos principal and keytab location.
Using that information a Kerberos token has to be created in the environment:
# Obtain Kerberos token in the environment # You can also skip the kinit step and use --principal <user>@CERN.CH --keytab <keytab-file> # as options, directly to the spark-shell $ kinit -kt /path/to/keytab/my.keytab <user>@CERN.CH
Alternatively, the principal and keytab properties can be passed to the spark context. That can be done by modifying bundle_location/conf/spark-defaults.conf file:
- for the NXCALS API services we need to add to 'spark.driver.extraJavaOptions' two additional options:
- for YARN-specific Kerberos configuration one can use the properties below:
spark.yarn.keytab /path/to/keytab/my.keytab spark.yarn.principal <user>@CERN.CH
As well the properties can be passed as command-line options to spark session:
$ ./bin/pyspark --principal <user>@CERN.CH --keytab /path/to/keytab/my.keytab
Example of PySpark session:
# source the source-me.sh script to setup and activate python3 venv source ./source-me.sh # the execution of the above preparation script, will generate a new directory on the spark-bundle root # named 'nxcals-python3-env' and is used as target for loading the virtual env # check if python path is directly available/served from within the virtual env directory (nxcals-python3-env)$ which python # /path/to/spark/nxcals/bundle/spark-3.1.1-bin-hadoop3.2/nxcals-python3-env/bin/python # check list of modules installed in virtual env: pip list # numpy (1.14.2) # nxcals-data-access-python3 (0.4.40) # pandas (0.24.2) # pip (9.0.1) # pyarrow (0.13.0) # setuptools (28.8.0) # If the output from both commands is similar to the one presented, we are good to go with pyspark # Now just run the pyspark executable and you are ready to go with NXCALS config (nxcals-python3-env)$ ./bin/pyspark # Or executors=5 cores-per-executor=2 memory=3GB driver-memory=10G on YARN mode (nxcals-python3-env)$ ./bin/pyspark --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G # Or just get some help on available parameters (nxcals-python3-env)$ ./bin/pyspark --help # Once the context is loaded # Import NXCALS Builders >>> from cern.nxcals.api.extraction.data.builders import * # Build a CMW specific Device/Property query >>> query = DevicePropertyDataQuery.builder(spark).system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000") .entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build().select("USER","DEST") # Or you can query by key-values, which is a more generic approach >>> query = DataQuery.builder(spark).byEntities().system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000").entity() .keyValue("device","CPS.TGM").keyValue("property", "FULL-TELEGRAM.STRC").build() # Count the records in the dataframe >>> query.count() Count: 644142 # Select specific fields with filter predicates >>> query.select("*").where("DEST like '%EAST%'").show() # To exit pyspark type >>> quit() # To deactivate virtual python3 environment type (nxcals-python3-env)$ deactivate # To activate again type (from the spark bundle directory) $ source nxcals-python3-env/bin/activate # you can run pyspark context again after activation
Run PySpark session customized by external startup file:
There are use cases where there is a need to run a PySpark session together with some interactive start-up commands.
Such interactive commands could be part of an analysis that should be run during start-up of the session. For this purpose the path to that python file should be exported to the system prior to the PySpark executable invocation:
(nxcals-python3-env)$ export PYTHONSTARTUP="/path/to/my/startup.py" (nxcals-python3-env)$ ./bin/pyspark
Running PySpark as standalone application
Instead of running PySpark applications in the interactive mode you can submit them through spark-submit script to be run locally or in a cluster as described here.
Working in close collaboration with our colleagues from EP-SFT (Enric Saavedra, Danilo Piparo) and IT-DB-SAS have led to the successful integration of SWAN's Jupyter notebooks with NXCALS Hadoop cluster for accessing data stored on NXCALS service and analyzing it using Spark.
- The SWAN platform is accessible at the following link: https://swan.cern.ch/
- More information about SWAN, could be found on the dedicated service homepage: https://swan.web.cern.ch/
- SWAN session can be created by following instructions available here
- Example notebooks can be downloaded from CERNBox as described here
If you are planning to use service account instead of your personal computing account, you have to subscribe to CERNBOX service first (https://resources.web.cern.ch/resources/) and login to https://cernbox.cern.ch/ before using SWAN.
A Java examples project can be cloned from GitLab by following steps described here.
First install Jupyter or just Anaconda Python package from https://www.anaconda.com/download/#download.
Once done export the python driver to be the jupyter and run the ./bin/pyspark as normal:
export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='notebook' ./bin/pyspark --master yarn --num-executors 10
This will open a browser with Jupyter notebook. Please wait a bit until Spark is initialized on the cluster.
Make sure to have jupyter in your PATH. Verify using "which jupyter".
At this stage you should be able to open Jupyter notebook from the browser, however before executing any code you still have to add a path to NXCALS packages from NXCALS bundle.
Execute the following in the first cell of your notebook:
import os import sys sys.path.append(os.getcwd() + "/nxcals-python3-env/lib/python3.7/site-packages/")
Python application outside of PySpark shell
In the section below we are going to present how to run application accessing NXCALS data on a standard Python interpreter installation. For the setting up of the python environment we are going to reuse elements (like Spark configuration files and binaries, and NXCALS jars) that are provided by NXCALS Spark bundle.
Required steps that has to be followed in order to run a sample application.
Enabling virtual environment for the python (please note that python3 is required)
Installation of pySpark libraries (if not installed previously):
pip install pyspark==3.1.1
At this stage we need to have Spark package files to which we can point to by setting SPARK_HOME variable. As described in the introduction they can be reused from bundle:
As well we need to install NXCALS sources (coming from NXCALS Spark bundle ). Please note that the correct version of NXCALS release must be provided like 1.0.4 in the example below:
cd <location_of_bundle> pip -v install ./extra-packages/nxcals_extraction_api_python3-1.0.4-py3-none-any.whl --no-index --find-links ./extra-packages
Optionally one can install packages from source Python3 environment (including numpy and install pandas and pyarrow for pandas_udf (both actions require internet connection or proxy):
pip install -r ref-packages.txt pip install pyarrow pandas
Place a sample test.py file below in $SPARK_HOME directory:
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from cern.nxcals.api.extraction.data.builders import * conf = SparkConf() # Possible master values (the location to run the application): # local: Run Spark locally with one worker thread (that is, no parallelism). # local[K]: Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.) # local[*]: Run Spark locally with as many worker threads as logical cores on your host. # yarn: Run using a YARN cluster manager. conf.setMaster('yarn') conf.setAppName('spark-basic') sc = SparkContext(conf=conf) spark = SparkSession(sc) intensity = DevicePropertyDataQuery.builder(spark).system("CMW").startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000").entity().parameter("PR.BCT/HotspotIntensity").build() # count the data points intensity.count() intensity.show()
Run: python test.py
There are certain steps which have to be taken in case the application is run from different place than $SPARK_HOME.
Unfortunately Spark config file: $SPARK_HOME/conf/spark-defaults.conf allows specifying only relative paths in its spark.jars property. In case of default NXCALS Spark bundle settings that corresponds to: nxcals-spark-jars/.
Taking the above into account in order to run test.py in directory /mydirectory it is sufficient to:
Create a symbolic link inside /mydirectory:
ln -s $SPARK_HOME/nxcals-jars/ nxcals-jars
Copy Hadoop configuration $SPARK_HOME/jars/nxcals-hadoop-pro-config-0.4.40.jar to /mydirectory or create a symbolic link:
ln -s $SPARK_HOME/jars/nxcals-hadoop-pro-config-0.4.40.jar nxcals-hadoop-pro-config-0.4.40.jar
Scala Spark shell
NXCALS Spark bundle allows as well to run Scala Spark shell. Similarly to PySpark it relies on Kerberos authentication which can be provided in the same way.
Sample session of spark-shell running Scala code is provided below:
$ ./bin/spark-shell # Or executors=5 cores-per-executor=2 memory=3GB driver-memory=10G with YARN mode $ ./bin/spark-shell --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G # Or just get some help on available parameters $ ./bin/spark-shell --help # OPTIONALLY: Load some examples (from example.scala file, ls the dir to see) # Once the scala shell is started please load the examples (it will run some queries) # # Execution time depends on the number of executors, memory, cores, itp # There will be 3 datasets loaded, data, tgmData & tgmFiltered ready to play with. scala> :load example.scala # Wait for the script to load, watch the data being retrieved and some operations done on it. # Once you get the prompt scala> back you can write your commands. # Some operations on the data (see example.scala file for more or read the Spark manual) scala> data.printSchema scala> tgmData.printSchema scala> data.show scala> tgmData.show scala> tgmFiltered.count scala> tgmFiltered.join(data, "cyclestamp").agg(sum('current')).show # To exit spark-shell type scala> :quit
You can look at some scala code examples for CMW.