Skip to content

Frequently Asked Questions

What is the preferred syntax to extract any element of the ‘WrappedArray’ inside the samples column of the result pyspark Dataframe?

Each array is an object (struct) with two fields: elements & dimensions:

|-- MISC: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
In order to select a value from an index you can use the following code (example should work for any array):
tgmData.select("MISC").withColumn("val", tgmData["MISC.elements"].getItem(0)).show()
or
tgmData.select("MISC").withColumn("val", tgmData["MISC.elements"][0]).show()

During data analysis, we produce results that we want to store to process further or for future reference. Does NXCALS offer the possibility to store data back?

We don’t currently offer any structured approach to this nor ingesting the data via Python back to NXCALS. The only solution we can propose now is to save your results to an arbitrary & shared HDFS directory:

dataframe =  //load & process using NXCALS + Spark

dataframe.write.parquet("/user/<some analysis dir>")
and load after:
dataframe = spark.read.load("/user/<some analysis dir>") 
The dir structure & sharing locations is a user responsibility.

How to run Spark application outside of NXCALS bundle?

The documentation is provided in our confluence pages describing NXCALS infrastructure: NXCALS Data Access Udser Guide

How to query information about variable <-> device/property/field mappings?

Current CALS variable configurations can be queried using CCDE report: CALS variables and processes.

What are the key values for each system that can be used in DataQuery byEntities() builder?

  • WINCCOA : [{"name":"variable_name","type":"string”}]
  • CMW : [{"name":"device","type":"string"},{"name":"property","type":"string"}]

That allows to create queries for WINCCOA and CMW systems like the ones presented in NXCALS Data Extraction API reference document.

I heard that internally NXCALS is using HBase for the recent data. How am I affected by that?

During the data extraction from NXCALS last 48 hours are coming from HBase, older data is taken directly from HDFS. Thanks to this architecture, the data is nearly immediately available after being acquired. HDFS requires data compaction & deduplication (requires few hours to a day). The data origin is invisible for the end user while being extracted.

What is the subscription validation?

In our new CMW datasources all invalid subscriptions are removed and enter “validation state”. They are checked periodically until they produce data again. This means some small data loses once the device is working again due to the validation time.

This is to protect the new datasources from thousands of exceptions we get in CALS – makes debugging datasources difficult – makes the processing of data slow (exceptions are costly)

What is the meaning of VALIDATION_OK status for subscription in CCDE editor ?

It means that the specified configuration is valid and more important that we actually managed to retrieve data. Other possible statuses are: VALIDATION_IN_ERROR (accompanied by the error message) and NEW (waiting for being validated)

Is there any way to query the NXCALS metadata such as device classes for CMW system?

Currently this information can be obtained from Java API (in the future this functionality will be ported to Python).

You can actually get them from the EntityData or the EntityDataHistory (for historical changes) for CMW Entities only:

EntityData.getPartitionData().getKeyValues().get("class")
EntityHistoryData.getPartitionData().getKeyValues().get("class")

Details about all the methods available through Service Client API can be found in our Java Documentation.

Saying that CCDE remains a valid source for information about classes and CCDA provides a REST API to extract this data.

Why application fails with an exception when switched to yarn client mode (and runs fine in local[*] mode)

For running Spark in yarn client mode, make sure that your firewall is open or bind the driver port via spark.driver.port in application.yml to a port opened for incoming connection. In case of blocked ports error:

"SparkContext - Error initializing SparkContext. org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master."

appears.

Can we also have a list of the variables already migrated?

Variables being migrated could be found using CCDE report: CALS-to-NXCALS Data Migration.

Information about variables can be retrieved as well using NXCALS Metadata API.

How performance compares between CALS and NXCALS systems?

  • CALS:
    • Better on smaller data sets
    • Uses caches for repeated queries
    • Not so good on arrays & Strings
  • NXCALS:
    • Better on bigger queries ( > ~1-10 days, depends on data)
    • Better on arrays ( > 1h, depends on data)
    • Extraction time linearly dependent on storage file sizes (partitions), but also data structure, compression ratios, selected columns, executor number, CPUs, etc.

Any tips for improving NXCALS data extraction performance?

Tip 1: Persist Your Data

  • Working on a sub-set of devices from a very large file for predefined time-period?
  • Load them (from NXCALS) to a separate DataSet and save to HDFS (DONE JUST ONCE)
    • to your HDFS home dir, HDFS /tmp, etc
    • ds.write.parqet('/dir')
  • Do multiple analysis on this file with pure Spark
    • ds = spark.read.parquet('/dir')
    • ds.<my-analysis>.toPandas()

Tip 2: Make use of Spark Cache

from nxcals.api.extraction.data.builders import *
import time
start=time.time()
ds=DevicePropertyDataQuery.builder(spark).system("CMW")\
.startTime("2018-12-01 00:00:00.000").endTime("2018-12-01 23:59:00.000").entity()\
.parameter("RPMBB.UA23.RSD2.A12B1/SUB_51").build().sample(False,0.09)

ds.persist()
pd = ds.select(acqStamp','I_MEAS').toPandas()
print(f"Time elapsed {time.time()-start} seconds")

Are there 'CALS practices' which will be deprecated, in favor of 'NXCALS practices'?

  • Don’t use the old API – use Spark!
  • Don’t “extract and analyze” – favour “analyze and extract the result”
  • Data storage size affects extraction performance

All our datasources encapsulate data into data map. Would it be possible to provide a function/library to convert a org.apache.spark.sql.Dataset to a cern.japc.core.AcquiredParameterValue ?

So far, we have a converter to CMW ImmutableData that is symmetric to what we accept at the Ingestion API: SparkRowToImmutableDataConverter class. It is possible to have a converter to JAPC related data, but we have not planned it in the near future.

How can I detect type of extracted data?

On the Spark DataSet there is a method schema() that returns the schema of the dataset with all the information:

dataset.schema().fields()[exampleDataset.schema().fieldIndex("name")].dataType();

How long does it take for the data to be available for extraction after its acquisition ?

When data is acquired from data sources it is immediately sent to NXCALS ingestion API. Being temporarily stored in Kafka (acting as a storage buffer with a retention of 2.5 days) it is being collected by the NXCALS ETL process. From there it is sent to HBase where it stays for the following 48 hours and is made available to users.

Data which going through this path is available for extraction after around 30 seconds. For special uses cases there is a possibility to decrease that time through parametrising our Ingestion API. When this is required please contact acc-logging-support@cern.ch for more details.

In parallel compactor process performs compaction and deduplication of the same data (operations which are time consuming) and stores its computation results in HDFS for queries of the data older than 48 hours.

Late data case:

In the event of acquisition of late data e.i. data having its timestamp in the past (older than 48 hours), it is being stored directly in HDFS (bypassing HBase stage). In this scenario NXCALS requires certain amount of time for compaction and deduplication before the data is available for extraction.

The required time is not predictable, since it depends on the compactor load. It takes 2 hours from the last update for a given partition/day for this data to be considered for compaction. In other words the client has to stop sending data and 2 hours have to pass before we look at the staging files.

What happens with data sent for ingestion multiple times

If the data is sent multiple times (having the same timestamp, and not necessarily the same values), within the window of the current day (or even within 2 hours after midnight), it will be compacted together and deduplicated. In such a case there is no guarantee of which value is preserved. For that reason it is not advised to send the same data multiple times, as it can produce unexpected results.

In case of 'late data' sent in the following days duplicates will be created. Currently we don't re-process a given old 'day' as a whole after some new, but late data arrives to it. Actually at the level of HDFS, we just add a new data file to the already existing data files in a given day without re-processing it all together (thus leaving duplicates).

Please note that this behaviour might change in the future.

java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter

NXCALS client extraction libraries are not enforcing any logging implementation, yet Spark and specifically spark-core (https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12) maintains a dependency on log4j logging framework.

In a project, using NXCALS extraction libs, without any Spark compatible logging framework, one would get exceptions like:

org/apache/log4j/spi/Filter
java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter
    at cern.nxcals.api.custom.service.TestSparkSession.<clinit>(TestSparkSession.java:13)
    at cern.nxcals.api.custom.service.aggregation.BaseAggregationFunctionTest.<clinit>(BaseAggregationFunctionTest.java:23)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    [...]
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.Filter
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 52 more

Therefore, there is a need to specify explicitly a logging implementation of your choice that is compatible with log4j. This implementation can most likely be in a form of slf4j implementation, we propose the following options:

  • slf4j-log4j12
  • log4j-over-slf4j

More information on which one should be used in your project can be found here: