Data extraction methods
Introduction
There are several possibilities for users to access NXCALS data.
In this guide we will explain and give examples how to extract data using different methods:
- Bundled software
- NXCALS package ("pip install nxcals") - NEW
- NXCALS Spark bundle
- SWAN Notebook
- Java API
- JAVA API via Python
- Jupyter
- Python application outside of PySpark shell
- Scala Spark shell (part of NXCALS Spark bundle)
Note
Authorization is required for any of those methods in the form of a Kerberos token. A Kerberos keytab file must be generated and optionally the client machine must provide a valid CERN grid CA certificate. Moreover, the user must be registered on the NXCALS service to access the data.
Supported Java and Python versions
Please note that NXCALS requires a specific version of Java and Python runtimes.
Deprecated Access Methods
- python builders were moved from
cern.nxcals.pyquery.builders
tonxcals.api.extraction.data.builders
- java builders were moved from
cern.nxcals.data.access.builders
tocern.nxcals.api.extraction.data.builders
DevicePropertyQuery
has been renamed toDevicePropertyDataQuery
KeyValuesQuery
andVariableQuery
were unified into 'DataQuery', accessible viabyEntities()
andbyVariables()
respectively
Bundled software
The NXCALS ecosystem contains many different environments and systems. Configuring a vanilla Spark instance for NXCALS data access requires a brief knowledge of the infrastructure that it depends on and interacts with. Here are some of the elements that have to be taken into account besides registration and authentication:
- Hadoop cluster that NXCALS is running (configuration for connecting)
- Location of the API services and brokers
- NXCALS data access client module and its dependencies schema
- Python execution environment configuration (for PySpark)
- Effective Spark properties configuration
- Basic YARN configuration (optionally for yarn mode execution)
The first set of actions (outside of the context of spark) are considered as prerequisites for Spark configuration. Those actions remain common between different user cases, since they have to be applied for any operation that requires data access via a Spark context. There is where the NXCALS bundles come very handy. Those bundles contain a pre-configured spark instance, with a single goal in mind: providing a bootstrapped instance of Spark with all the needed configuration to connect on-the-fly to a target NXCALS environment.
PySpark and Scala Spark execution contexts are supported by those bundles.
We recommend reading Spark documentation and going via the book "Spark: The Definitive Guide" by Bill Chambers and Matei Zaharia.
NXCALS package
Known issue with python cern module imports and JPype
The nxcals package contains DataQuery builders that are registered under the python cern namespace. This action is preventing end-user applications, utilizing JPype from importing Java classes under cern package (ex. cern.lsa.*)
If you experience similar issues after running pip install nxcals
, please refer to the documentation section bellow,
to check how you can overcome the issue
Installation
To install the latest version of the NXCALS package, you will need to setup a Python venv
with access to the ACC-PY package repository.
If you have a computer from the CERN accelerator sector (ACC):
source /acc/local/share/python/acc-py/base/pro/setup.sh
acc-py venv ./venv
source ./venv/bin/activate
python -m pip install nxcals
Important
In case of getting the following error message during installation of NXCALS package :
zipfile.BadZipFile: File is not a zip file
python -m pip install nxcals --no-cache
Important
When instantiating SparkSession using SparkSession.builder() in the YARN mode it is important to setup environment variable PYSPARK_PYTHON="./environment/bin/python", like in the example below:
import os
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON']="./environment/bin/python"
spark = SparkSession.builder.master('yarn').appName('my-name').getOrCreate()
print(spark.sparkContext.parallelize(range(2)).map(lambda x: x).collect())
Please note that when using nxcals.spark_session_builder.get_or_create() method this setting is not necessary (as it is already done in our code). More information on Python packaging can be found in PySpark documentation for Python packaging.
If you don't have a computer from the CERN accelerator sector (ACC):
python3 -m venv ./venv
source ./venv/bin/activate
python -m pip install git+https://gitlab.cern.ch/acc-co/devops/python/acc-py-pip-config.git
python -m pip install nxcals
Known issue with executing script in the YARN mode from a non "ACC" machine
In the case when machine does not have access to ACC-PY distribution and a script is submitted to the cluster using YARN mode, the similar error may occur:
Error:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (ithdp1058.cern.ch executor 3):
java.io.IOException: Cannot run program "./environment/bin/python": error=2, No such file or directory
Use NXCALS package with other BE libraries
After successfully running the above pip install nxcals
commands, the process will install the following packages on your python setup:
(venv) [user@host]$ pip list | grep nxcals
nxcals x.y.z
nxcals-spark-session-builder x.y.z
nxcals-extraction-api-python3 x.y.z
nxcals-extraction-api-python3-legacy x.y.z
nxcals-extraction-api-python3-legacy
. This package contains the obsolete DataQuery builders under
cern
namespace and is loaded only for compatibility reasons. The legacy package will be eventually phased-out!
Legacy package is locking cern python namespace!
Unfortunately, it's the legacy package that locks the cern namespace and needs to be removed if your intention is to use NXCALS together with other cern libraries (especially the ones that expose java classes via Jpype)
Overcome issues with NXCALS and JPype
When using the newly nxcals package with another cern library that exposes Java classes directly as python modules (ex. PyLSA), one will quickly experience exceptions similar to the following example.
Try loading Java classes exposed as python modules via JPype on python context:
from cern.lsa.domain.settings import *
Will yield the following exception:
Traceback (most recent call last):
File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'cern.lsa'
In order to avoid this exception, we need to unlock the cern namespace in the python environment. Only then JPype would be able to expose the requested Java classes as python proxy modules.
To unlock cern namespace in the python env, the nxcals legacy extraction package needs to be removed! One can achieve that by the following steps:
1) remove the legacy package from python environemnt
pip uninstall nxcals-extraction-api-python3-legacy
cern
package from the nxcals native python DataQuery
imports
Before:
from cern.nxcals.api.extraction.data.builders import *
from nxcals.api.extraction.data.builders import *
Hint
Proceed with the above actions, only if you experience issues with module loading (ModuleNotFoundError). In most use-cases this issue is not visible and can be safely ignored
Info
In the scenarios when the application is deployed, it is not easy to perform "pip uninstall" step described above directly at the deployment location.
One posibility to overcome this issue is to manually edit 'deployment/app/requirments.txt' file created in your project folder while performing:
acc-py app lock ./
nxcals-extraction-api-python3-legacy==....
line.
Then the application can be deployed without that particular package removed from the requirements specification, using:
acc-py app deploy --deploy-base /tmp/your_deployment_location ./
PySpark
One of the tools included in NXCALS bundle is PySpark. To use it, first activate you Python venv:
source ./venv/bin/activate
pyspark
Warning
Invoking pyspark
without any arguments launches a session in Spark local mode meaning that driver and executor(s)
are runninig in the same JVM on the local machine). Moreover default properties are being used including minimal spark.driver.memory=1500m
for running
of simple code snippets. This and other Spark properties should be adjusted to the individual requirements
in order to avoid exceptions like: java.lang.OutOfMemoryError: Java heap space
Example of PySpark session when using NXCALS package
# Activate your Python venv
$ source ./venv/bin/activate
# Now just run the pyspark executable and you are ready to go with NXCALS using
# default configuration: running Spark in local mode with default Spark properties
# including minimal amount of memory for Spark driver
(venv) $ pyspark
# Or alternatively you can run Spark on YARN with customized configuration,
# for example: executors=5 cores-per-executor=2 memory=3GB driver-memory=10G
(venv) $ pyspark --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G
# Or just get some help on available parameters
(venv) $ pyspark --help
# Once pyspark has started
# Import NXCALS Builders
>>> from nxcals.api.extraction.data.builders import *
# Build a CMW specific Device/Property query
>>> query = DevicePropertyDataQuery.builder(spark).system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000").entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build().select("USER","DEST")
# Or you can query by key-values, which is a more generic approach
>>> query = DataQuery.builder(spark).byEntities().system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000").entity().keyValue("device","CPS.TGM").keyValue("property", "FULL-TELEGRAM.STRC").build()
# Count the records in the dataframe
>>> query.count()
Count: 644142
# Select specific fields with filter predicates
>>> query.select("*").where("DEST like '%EAST%'").show()
# To exit pyspark type
>>> quit()
# Deactivate your Python venv
(venv) $ deactivate
Add libraries to your cluster machine
When working in YARN mode, all the machines in the cluster receive your virtual environment (which gets first packed and then sent to all the machines in the cluster when launching PySpark in YARN mode). If you install a new library with pip install, and you need to use it, then simply remove the packed venv from the bundle in the venv:
>>> rm $VIRTUAL_ENV/nxcals-python3-env.tar.gz
Alternatively, you can run the build_packed_venv.sh that is located in the Spark config directory. From the bundle launch:
>>> ./conf/build_packed_venv.sh
Run PySpark session customized by external startup file:
There are use cases where there is a need to run a PySpark session together with some interactive start-up commands.
Such interactive commands could be part of an analysis that should be run during start-up of the session. For this purpose the path to that Python file should be put in the PYTHONSTARTUP
environment variable passed to pyspark
:
(venv) $ PYTHONSTARTUP="/path/to/my/startup.py" pyspark
Standalone PySpark script
Instead of running PySpark applications in the interactive mode you can submit them through spark-submit
script to be run locally or in a cluster as described here.
Standalone Python application
You can also run standalone Python applications in your Python venv.
Here is a sample Python application:
from nxcals import spark_session_builder
from nxcals.api.extraction.data.builders import DevicePropertyDataQuery
# Possible master values (the location to run the application):
# local: Run Spark locally with one worker thread (that is, no parallelism).
# local[K]: Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.)
# local[*]: Run Spark locally with as many worker threads as logical cores on your host.
# yarn: Run using a YARN cluster manager.
spark = spark_session_builder.get_or_create(app_name='spark-basic', master='yarn')
intensity = DevicePropertyDataQuery.builder(spark).system("CMW").startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000").entity().parameter("PR.BCT/HotspotIntensity").build()
# count the data points
intensity.count()
intensity.show()
You can simply run it in your Python venv:
$ source ./venv/bin/activate
(venv) $ python sample.py
Note
Alternatively one can build Spark session on YARN using so called flavor corresponding to predefined resource sizing with a choice of YARN_SMALL, YARN_MEDIUM and YARN_LARGE, for example:
from nxcals.spark_session_builder import get_or_create, Flavor
spark = get_or_create(flavor=Flavor.YARN_SMALL)
These can be overwritten by individual Spark properties:
spark = get_or_create(flavor=Flavor.YARN_MEDIUM, conf={'spark.executor.memory': '5g'})
As well one can specify environment - pro (default) or testbed:
spark = get_or_create(flavor=Flavor.LARGE, hadoop_env="pro")
Scala Spark shell
The NXCALS Spark bundle also allows to run the Scala Spark shell. Similarly to PySpark it relies on Kerberos authentication which can be provided in the same way.
Sample session of spark-shell running Scala code is provided below:
$ ./bin/spark-shell
# Use a specific Spark configuration: executors=5 cores-per-executor=2 memory=3GB driver-memory=10G in YARN mode
$ ./bin/spark-shell --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G
# Or just get some help on available parameters
$ ./bin/spark-shell --help
# OPTIONALLY: Load some examples (from example.scala file, ls the dir to see)
# Once the scala shell is started please load the examples (it will run some queries)
#
# Execution time depends on the number of executors, memory, cores, itp
# There will be 3 datasets loaded, data, tgmData & tgmFiltered ready to play with.
scala> :load example.scala
# Wait for the script to load, watch the data being retrieved and some operations done on it.
# Once you get the prompt scala> back you can write your commands.
# Some operations on the data (see example.scala file for more or read the Spark manual)
scala> data.printSchema
scala> tgmData.printSchema
scala> data.show
scala> tgmData.show
scala> tgmFiltered.count
scala> tgmFiltered.join(data, "cyclestamp").agg(sum('current')).show
# To exit spark-shell type
scala> :quit
You can look at some scala code examples for CMW.
NXCALS Spark bundle
Bundle download
The latest version of bundle can be downloaded and unzipped using the following bash command:
# Download the latest version of bundle for the NXCALS PRO environment
$ curl -s -k -O http://photons-resources.cern.ch/downloads/nxcals_pro/spark/spark-nxcals.zip &&
unzip spark-nxcals.zip && rm spark-nxcals.zip && cd spark-*-bin-hadoop3
If required it can be obtained interactively from the following links:
PRO | spark-nxcals.zip |
---|---|
TESTBED | spark-nxcals.zip |
Important
NXCALS Spark bundle is using relative path configuration in order to load its resources. To allow Spark correctly pick all the required sources you need to execute its binaries from within the extracted spark directory.
Setting up your virtual environment for running pySpark:
Similarly to using NXCALS through the installation with pip, we need to use acc-py as our virtual environment. If you have a computer from the CERN accelerator sector (ACC):
source /acc/local/share/python/acc-py/base/pro/setup.sh
acc-py venv ./venv
source ./venv/bin/activate
If you don't have a computer from the CERN accelerator sector (ACC):
python3 -m venv ./venv
source ./venv/bin/activate
python -m pip install git+https://gitlab.cern.ch/acc-co/devops/python/acc-py-pip-config.git
Within the same Spark bundle for NXCALS you will find a bash script named source-me.sh.
This script is responsible for setting-up the virtual environment for NXCALS needs. Then, on that virtual python environment it loads the required NXCALS data access python package (bundled with the sources), to make the API directly available on PySpark context.
Sourcing the environment preparation script
Sourcing the python virtual environment preparation script is of highly important and should not be omitted when the intention is to run a PySpark session!
That's because NXCALS sources are installed via the managed pip instance. Doing otherwise will result to a PySpark initialization in non-workable state.
Running PySpark with required Kerberos authentication:
Example of PySpark session when using NXCALS Spark bundle:
# source the source-me.sh script to setup and activate python3 venv
source ./source-me.sh
# the execution of the above preparation script, will generate a new directory on the spark-bundle root
# named 'nxcals-python3-env' and is used as target for loading the virtual env
# check if python path is directly available/served from within the virtual env directory
(nxcals-python3-env)$ which python
# /path/to/spark/nxcals/bundle/spark-3.3.1-bin-hadoop3/nxcals-python3-env/bin/python
# check list of modules installed in virtual env:
pip list
# numpy (1.14.2)
# nxcals-data-access-python3 (1.3.26)
# pandas (0.24.2)
# pip (9.0.1)
# pyarrow (0.13.0)
# setuptools (28.8.0)
# If the output from both commands is similar to the one presented, we are good to go with pyspark
# Now just run the pyspark executable and you are ready to go with NXCALS config
(nxcals-python3-env)$ ./bin/pyspark
# Or executors=5 cores-per-executor=2 memory=3GB driver-memory=10G on YARN mode
(nxcals-python3-env)$ ./bin/pyspark --master yarn --num-executors 5 --executor-cores 2 --executor-memory 3G --conf spark.driver.memory=10G
# Once the context is loaded
# Import NXCALS Builders
>>> from nxcals.api.extraction.data.builders import *
# Build a CMW specific Device/Property query
>>> query = DevicePropertyDataQuery.builder(spark).system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000")
.entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build().select("USER","DEST")
# Or you can query by key-values, which is a more generic approach
>>> query = DataQuery.builder(spark).byEntities().system("CMW").startTime("2018-04-02 00:00:00.000").endTime("2018-05-02 00:00:00.000").entity()
.keyValue("device","CPS.TGM").keyValue("property", "FULL-TELEGRAM.STRC").build()
# Count the records in the dataframe
>>> query.count()
Count: 644142
# To exit pyspark type
>>> quit()
# To deactivate virtual python3 environment type
(nxcals-python3-env)$ deactivate
SWAN Notebook
Working in close collaboration with our colleagues from EP-SFT (Enric Saavedra, Danilo Piparo) and IT-DB-SAS have led to the successful integration of SWAN's Jupyter notebooks with NXCALS Hadoop cluster for accessing data stored on NXCALS service and analyzing it using Spark.
- The SWAN platform is accessible at the following link: https://swan.cern.ch/
- More information about SWAN, could be found on the dedicated service homepage: https://swan.web.cern.ch/
- SWAN session can be created by following the instructions available here
- Example notebooks can be downloaded from CERNBox as described here
Important
If you are planning to use service account instead of your personal computing account, you have to subscribe to CERNBOX service first (https://resources.web.cern.ch/resources/) and login to https://cernbox.cern.ch/ before using SWAN.
Java API
A Java examples project can be cloned from GitLab by following steps described here.
Java API via Python
Currently NXCALS provides a Python API only for Data Extraction API which is accessible through NXCALS package or NXCALS Spark bundle. There is a possibility of accessing from Python other NXCALS Java APIs such as:
Jupyter
First install Jupyter into the venv with installed NXCALS that was created before.
python -m pip install jupyter
Important
Make sure to have jupyter in your PATH. Verify using "which jupyter".
Once done, export the pyspark python driver to be the jupyter and run the pyspark utility
from {venv}/nxcals-bundle/bin/pyspark
:
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=notebook
./venv/nxcals-bundle/bin/pyspark --master yarn --num-executors 10 # please update the path to venv if needed
This will open a browser with the Jupyter notebook. Create/open a notebook file and wait for the kernel and SparkSession to
start. After that, the already created SparkSession and SparkContext will be available under the spark
and sc
variables respectively.
Now your notebook is ready for the interaction with NXCALS API.