Building CMW specific queries
Warning
Before running examples below please make sure that you follow instructions given in:
"Data Access User Guide" document
Basic queries constructed using DevicePropertyDataQuery builder
The CMW system has a very specific way of identifying its data. It uses two mandatory properties: device name and property name.
NXCALS provides a query builder utility that respects this convention. The example bellow uses the DevicePropertyDataQuery builder in order to fetch intensities from PR.BCT/HotspotIntensity (device/property) pair:
# source the nxcals query builders
from nxcals.api.extraction.data.builders import DevicePropertyDataQuery
from pyspark.sql.functions import max
# build the query (note: instead of parameter you can specify device/property separetly:
# .device('PR.BCT').property('HotspotIntensity')
intensity = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-06-17 00:00:00.000').endTime('2018-06-20 00:00:00.000') \
.entity().parameter('PR.BCT/HotspotIntensity').build()
# print the schema
intensity.printSchema()
// source the nxcals query builders
import cern.nxcals.api.extraction.data.builders.DevicePropertyDataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
// build the query (note: instead of parameter you can specify device/property separetly:
// .device("PR.BCT").property("HotspotIntensity")
Dataset<Row> intensity = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000")
.entity().parameter("PR.BCT/HotspotIntensity").build();
// print the schema
intensity.printSchema();
// source the nxcals query builders
import cern.nxcals.api.extraction.data.builders._
// build the query (note: instead of parameter you can specify device/property separetly:
// .device("PR.BCT").property("HotspotIntensity")
val intensity = DevicePropertyDataQuery.builder(spark).system("CMW").
startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000").
entity().parameter("PR.BCT/HotspotIntensity").build()
// print the schema
intensity.printSchema()
Hint
The same result could be achieved by using a different, more generic approach, with a help of the generic DataQuery builder. Please see the "Building generic NXCALS queries" section for more details.
A trimmed printSchema() method output for the intensity values is the following:
root
|-- __record_timestamp__: long (nullable = true)
|-- __record_version__: long (nullable = true)
|-- acqStamp: long (nullable = true)
|-- class: string (nullable = true)
|-- cyclestamp: long (nullable = true)
|-- dcAftEje1: float (nullable = true)
|-- dcAftEje2: float (nullable = true)
|-- dcAftInj1: float (nullable = true)
|-- dcAftInj2: float (nullable = true)
|-- dcAftTra: float (nullable = true)
|-- dcBefEje1: float (nullable = true)
|-- dcBefEje2: float (nullable = true)
|-- dcBefInj1: float (nullable = true)
|-- dcBefInj2: float (nullable = true)
...
The output can be used for creation of queries with specific fields:
# count the presence of recoreds that contain 'acqStamp' and 'dcBefInj1' fields
intensity.select('acqStamp', 'dcBefInj1').count()
// count the presence of recoreds that contain 'acqStamp' and 'dcBefInj1' fields
intensity.select("acqStamp", "dcBefInj1").count();
// count the presence of recoreds that contain 'acqStamp' and 'dcBefInj1' fields
intensity.select("acqStamp", "dcBefInj1").count()
Next, we can get TGM data which will be used for filtering intensities:
tgmData = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-06-17 00:00:00.000').endTime('2018-06-20 00:00:00.000') \
.entity().device('CPS.TGM').property('FULL-TELEGRAM.STRC').build()
Dataset<Row> tgmData = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000")
.entity().device("CPS.TGM").property("FULL-TELEGRAM.STRC").build();
val tgmData = DevicePropertyDataQuery.builder(spark).system("CMW").
startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000").
entity().device("CPS.TGM").property("FULL-TELEGRAM.STRC").build()
Based on the above query, we can select a number of available records per user:
tgmData.groupBy('USER').count().show()
tgmData.groupBy("USER").count().show();
tgmData.groupBy("USER").count().show()
+--------+------+
| USER| count|
+--------+------+
| ION2| 1260|
| MD7| 388|
| EAST1| 12367|
| MD1| 86|
| MD6| 5|
|LHCPROBE| 1092|
| ZERO|104948|
| MD3| 2815|
| MD2| 397|
| EAST2| 8821|
| SFTPRO1| 13043|
| TOF| 497|
|LHCINDIV| 8334|
| SFTPRO2| 25882|
+--------+------+
or select one of the users and display list of 'cyclestamps':
userTOF = tgmData.select('cyclestamp').where("USER == 'TOF'")
userTOF.show()
userTOF.show()
Dataset<Row> userTOF = tgmData.select("cyclestamp").where("USER == 'TOF'");
userTOF.show();
val userTOF = tgmData.select("cyclestamp").where("USER == 'TOF'")
userTOF.show()
+-------------------+
| cyclestamp|
+-------------------+
|1529320564300000000|
|1529321435500000000|
|1529326583500000000|
|1529327058700000000|
|1529317607500000000|
|1529320300300000000|
|1529321118700000000|
|1529322095500000000|
|1529323573900000000|
|1529317871500000000|
|1529327850700000000|
|1529321488300000000|
|1529322042700000000|
|1529322755500000000|
|1529322808300000000|
|1529324682700000000|
|1529326398700000000|
|1529328801100000000|
|1529329804300000000|
|1529322438700000000|
+-------------------+
Joining DataFrames
With the prepared DataFrames we can perform join over common fields. In this example we join the two datasets using cyclestamp:
result = intensity.join(userTOF, intensity.cyclestamp == userTOF.cyclestamp) \
.select(intensity.cyclestamp, intensity.dcAftInj1)
Dataset<Row> result = intensity.join(userTOF, "cyclestamp")
.select(intensity.col("cyclestamp"), intensity.col("dcAftInj1"));
val result = intensity.join(userTOF, "cyclestamp").
select(intensity.col("cyclestamp"), intensity.col("dcAftInj1"))
we can now display the 4 first results as follows:
result.show(4)
result.show(4);
result.show(4)
+-------------------+----------+
| cyclestamp| dcAftInj1|
+-------------------+----------+
|1529317396300000000| 0.0060543|
|1529326715500000000|0.02119005|
|1529318874700000000|0.00544887|
|1529324497900000000|0.03511494|
+-------------------+----------+
Performing simple aggregations
Spark DataFrames provide support for statistical and mathematical functions. Some examples based on data produced by DevicePropertyDataQuery builder can be found below:
# Get intensity data
intData = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-09-27 00:00:00.000').endTime('2018-09-28 01:00:00.000') \
.entity().device('PR.BCT').property('HotspotIntensity').build()
// Get intensity data
Dataset<Row> intData = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-09-27 00:00:00.000").endTime("2018-09-28 01:00:00.000")
.entity().device("PR.BCT").property("HotspotIntensity").build();
// Get intensity data
val intData = DevicePropertyDataQuery.builder(spark).system("CMW").
startTime("2018-09-27 00:00:00.000").endTime("2018-09-28 01:00:00.000").
entity().device("PR.BCT").property("HotspotIntensity").build()
Showing maximum value of dcAftInj1 for CPS.USER.SFTPRO1 selector:
intData.where("selector = 'CPS.USER.SFTPRO1'").select(max(intData.dcAftInj1)).show()
intData.where("selector = 'CPS.USER.SFTPRO1'").select(functions.max(functions.col("dcAftInj1"))).show();
import org.apache.spark.sql.functions._
intData.where("selector = 'CPS.USER.SFTPRO1'").select(functions.max(intData.dcAftInj1)).show()
+--------------+
|max(dcAftInj1)|
+--------------+
| 1805.1597|
+--------------+
The function describe() returns a DataFrame containing information such as number of non-null entries (count), mean, standard deviation, and minimum and maximum value for a numerical column.
intData.describe('dcAftInj1').show()
intData.describe("dcAftInj1").show();
intData.describe("dcAftInj1").show()
+-------+-----------------+
|summary| dcAftInj1|
+-------+-----------------+
| count| 52546|
| mean|570.8605592587371|
| stddev|610.8961214222219|
| min| -0.14598517|
| max| 1805.1597|
+-------+-----------------+
Hint
Without specifying field names the function will calculate summary statistics for all numerical columns present in the DataFrame.
Building query with specific fields
Let's use the same TGM query, but this time to target only specific fields, like batch, destination and user.
tgmDataReduced = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \
.entity().parameter('CPS.TGM/FULL-TELEGRAM.STRC').build() \
.select('BATCH','DEST','USER')
Dataset<Row> tgmDataReduced = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-06-15 00:00:00.000").endTime("2018-06-17 00:00:00.000")
.entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build()
.select("BATCH", "DEST", "USER");
val tgmDataReduced = DevicePropertyDataQuery.builder(spark).system("CMW").
startTime("2018-06-15 00:00:00.000").endTime("2018-06-17 00:00:00.000").
entity().parameter("CPS.TGM/FULL-TELEGRAM.STRC").build().
select("BATCH","DEST","USER")
The above query will act exactly as the one that we have previously mentioned, in terms of data records size, only this time, the result set, is going to be reduced to contain only the fields that we have specified:
tgmDataReduced.printSchema()
tgmDataReduced.printSchema();
tgmDataReduced.printSchema()
root
|-- BATCH: integer (nullable = true)
|-- DEST: string (nullable = true)
|-- USER: string (nullable = true)
|-- __record_timestamp__: long (nullable = true)
|-- nxcals_entity_id: long (nullable = true)