Data Extraction API
In order to facilitate NXCALS data retrieval a Data Access API has been implemented in Java. It should be considered as a principal method of accessing logging data and it serves as a base for implementation of other data retrieval mechanisms which are proposed to users.
Those include Python 3 library written directly on top of the Java API as a thin set of native python units that are internally using Py4J - a bridge between Python and Java. Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.
Python language is considered as a first-class citizen in the Spark world because of the availability of multitude of libraries including the ones for visualizing data. Being more analytical oriented it is a great choice for building data science applications. Our python interface has been made available directly from python, via PySpark shell (through NXCALS bundle) and SWAN web interface.
There is yet another possibility of accessing NXCALS API and it could be done through Scala. Since Scala operates on the same JVM and provides language interoperability with the Java, the API becomes automatically available in that language as well (unfortunately NXCALS team does not support Scala as a language).
It is worth to underline that thanks to our approach (reusing the object from the same shared JVM) we have achieved homogeneous functionality across the Java, Python and Scala APIs.
NXCALS Data Access API itself consist of two query builders: DataQuery and ParameterDataQuery (DevicePropertyDataQuery is deprecated) returning result dataset as an output. It always expects specification of time window as a input altogether with information which allows identifying data signal such as: system, generic key values pairs, device/property (in case of CMW system) or variable name (for backward compatibility). More details about exact syntax with some examples can be found below.
Note
The reference presented below is language independent. Concrete examples are given in Python, Java and Scala for clarification.
DataQuery for key-values
Builder responsible for querying generic data using key/value pairs.
DataQuery.builder(spark).entities()
.system("CMW") # str / String
.keyValuesEq({"key":"value"}) # Dict[str, Any] / Map<String, Object>
.keyValuesIn([{"key":"value"}, {"key":"value2"}]) # List[Dict[str, Any]] / List<Map<String, Object>>
.keyValuesLike({"key":"value_pattern%"}) # Dict[str, Any] / Map<String, Object> pattern works only if is string and should be in Oracle SQL pattern style (% and _ as wildcards)
.idEq(id) # int / Long
.idIn({id}) # Set[int] / Set<Long>
.timeWindow(start_time, end_time) # Time may be in multiple formats (str "YYYY-MM-DD HH24:MI:SS.SSS", nanos, datetime etc.) - check type in code
.atTime(time) # Only one time method invoke is possible
.fieldAliases({"alias": ("field1", "field2)}) # Dict[str, Set[str]] / Map<String, Set<String>> optional - allow to specify aliases
.fieldAliases("alias", "field1") # String, ...String - not accessible in Python
.build()
DataQuery.builder(spark).byEntities()
.system(systemString) # "SYSTEM"
# Obligatory time range block
.atTime(timeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.startTime(startTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.duration(duration) # NUMBER
.endTime(endTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
# Optional data context block
.fieldAliases(fieldAliasesMap) # {"FIELD1": ["FIELD-ALIAS1"], "FIELD2": ["FIELD-ALIAS2"]}
# Obligatory entity block which can be repeated
.entity()
.keyValue(keyString, valueString) # "KEY", "VALUE"
.keyValueLike(keyString, valueString) # "KEY", "VALUE-WITH-WILDCARDS"
.keyValues(keyValuesMap) # {"KEY1": "VALUE1", "KEY2": "VALUE2"}
.keyValuesLike(keyString, valueString) # {"KEY1", "VALUE1-WITH-WILDCARDS", "KEY2", "VALUE2-WITH-WILDCARDS"}
.entity()
.build()
Examples
from nxcals.api.extraction.data.builders import DataQuery
df1 = DataQuery.builder(spark).entities().system('WINCCOA') \
.keyValuesEq({'variable_name': 'MB.C16L2:U_HDS_3'}) \
.timeWindow('2018-06-15 00:00:00.000', '2018-06-17 00:00:00.000') \
.build()
df2 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesEq({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP1'}) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
df3 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesIn([{'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP1'}]) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
df4 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesLike({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP%'}) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
df5 = DataQuery.builder(spark).entities().system('CMW').idEq(57336) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
from nxcals.api.extraction.data.builders import DataQuery
df1 = DataQuery.builder(spark).byEntities().system('WINCCOA') \
.startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \
.entity().keyValue('variable_name', 'MB.C16L2:U_HDS_3') \
.build()
df2 = DataQuery.builder(spark).byEntities().system('CMW') \
.startTime('2019-04-29 00:00:00.000').endTime('2019-04-30 00:00:00.000') \
.entity().keyValue('device', 'LHC.LUMISERVER').keyValue('property', 'CrossingAngleIP1') \
.build()
df3 = DataQuery.builder(spark).byEntities().system('CMW') \
.startTime('2019-04-29 00:00:00.000').endTime('2019-04-30 00:00:00.000') \
.entity().keyValues({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP1'}) \
.build()
df4 = DataQuery.builder(spark).byEntities().system('CMW') \
.startTime('2019-04-29 00:00:00.000').endTime('2019-04-30 00:00:00.000') \
.entity().keyValuesLike({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP%'}) \
.build()
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
Map<String, Object> keyValues = new HashMap<>();
keyValues.put("device", "LHC.LUMISERVER");
keyValues.put("property", "CrossingAngleIP1");
Dataset<Row> df1 = DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesEq(keyValues)
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
Map<String, Object> keyValuesLike = new HashMap<>();
keyValuesLike.put("device", "LHC.LUMISERVER");
keyValuesLike.put("property", "CrossingAngleIP%");
Dataset<Row> df2 = DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesLike(keyValuesLike)
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
Dataset<Row> df3 = DataQuery.builder(spark).entities()
.system("CMW")
.idEq(57336)
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
Dataset<Row> df1 = DataQuery.builder(spark).byEntities()
.system("WINCCOA")
.startTime("2018-06-15 00:00:00.000").endTime("2018-06-17 00:00:00.000")
.entity().keyValue("variable_name", "MB.C16L2:U_HDS_3")
.build();
Dataset<Row> df2 = DataQuery.builder(spark).byEntities()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.entity().keyValue("device", "LHC.LUMISERVER").keyValue("property", "CrossingAngleIP1")
.build();
Map<String, Object> keyValues = new HashMap<>();
keyValues.put("device", "LHC.LUMISERVER");
keyValues.put("property", "CrossingAngleIP1");
Dataset<Row> df3 = DataQuery.builder(spark).byEntities()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.entity()
.keyValues(keyValues)
.build();
Map<String, Object> keyValuesLike = new HashMap<>();
keyValuesLike.put("device", "LHC.LUMISERVER");
keyValuesLike.put("property", "CrossingAngleIP%");
Dataset<Row> df4 = DataQuery.builder(spark).byEntities()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.entity().keyValuesLike(keyValuesLike)
.build();
import cern.nxcals.api.extraction.data.builders._
import scala.collection.JavaConversions._
val df1 = DataQuery.builder(spark).entities().system("CMW").
keyValuesEq(mapAsJavaMap(Map("device" -> "LHC.LUMISERVER", "property" -> "CrossingAngleIP1"))).
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
val df2 = DataQuery.builder(spark).entities().system("CMW").
keyValuesLike(mapAsJavaMap(Map("device" -> "LHC.LUMISERVER", "property" -> "CrossingAngleIP%"))).
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
val df3 = DataQuery.builder(spark).entities().system("CMW").
idEq(57336).
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
import cern.nxcals.api.extraction.data.builders._
import scala.collection.JavaConversions._
val df1 = DataQuery.builder(spark).byEntities().system("WINCCOA").
startTime("2018-06-15 00:00:00.000").endTime("2018-06-17 00:00:00.000").
entity().keyValue("variable_name", "MB.C16L2:U_HDS_3").
build()
val df2 = DataQuery.builder(spark).byEntities().system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
entity().keyValue("device", "LHC.LUMISERVER").keyValue("property", "CrossingAngleIP1").
build()
val df3 = DataQuery.builder(spark).byEntities().system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
entity().keyValues(mapAsJavaMap(Map("device" -> "LHC.LUMISERVER", "property" -> "CrossingAngleIP1"))).
build()
val df4 = DataQuery.builder(spark).byEntities().system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
entity().keyValuesLike(mapAsJavaMap(Map("device" -> "LHC.LUMISERVER", "property" -> "CrossingAngleIP%"))).
build()
DataQuery for variables
Builder responsible for querying using variable names
DataQuery.builder(spark).variables()
.system("CMW") # str / String
.nameEq("var") # str / String
.nameIn(["var"]) # List[str] / List<String>
.nameLike("var_name_pattern%") # str / String, pattern should be in Oracle SQL style (% and _ as wildcards)
.idEq(id) # int / Long
.idIn({id}) # Set[int] / Set<Long>
.timeWindow(start_time, end_time) # Time may be in multiple formats (str "YYYY-MM-DD HH24:MI:SS.SSS", nanos, datetime etc.) - check type in code
.atTime(time) # Only one time method invoke is possible
.fieldAliases({"alias": ["field1", "field2"]}) # Dict[str, Set[str]] / Map<String, Set<String>> optional - allow to specify aliases
.fieldAliases("alias", "field1") # String, ...String - not accessible in Python
.build()
DataQuery.builder(spark).byVariables()
.system(systemString) # "SYSTEM"
# Obligatory time range block
.atTime(timeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.startTime(startTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.duration(Duration) # NUMBER
.endTime(endTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.variable(variableNameString) # "VARIABLE-NAME"
.variableLike(variableNameString) # "VARIABLE-NAME-WITH-WILDCARDS"
# Obligatory variable block
.build()
Examples
from nxcals.api.extraction.data.builders import DataQuery
df1 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameEq('LTB.BCT60:INTENSITY') \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
df2 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameIn(['LTB.BCT60:INTENSITY', 'LTB.BCT50:INTENSITY']) \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
df3 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameLike('LTB.BCT%:INTENSITY') \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
df4 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameLike('LTB.BCT50%:INTENSITY') \
.nameEq('LTB.BCT60:INTENSITY') \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
df5 = DataQuery.builder(spark).variables() \
.system("CMW") \
.idEq(1005050) \
.idIn({1011562}) \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
from nxcals.api.extraction.data.builders import *
df1 = DataQuery.builder(spark).byVariables() \
.system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.variable('LTB.BCT60:INTENSITY') \
.build()
df2 = DataQuery.builder(spark).byVariables() \
.system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.variableLike('LTB.BCT%:INTENSITY') \
.build()
df3 = DataQuery.builder(spark).byVariables() \
.system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.variableLike('LTB.BCT50%:INTENSITY') \
.variable('LTB.BCT60:INTENSITY') \
.build()
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.Sets;
Dataset<Row> df1 = DataQuery.builder(spark).variables()
.system("CMW")
.nameEq("LTB.BCT60:INTENSITY")
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
Dataset<Row> df2 = DataQuery.builder(spark).variables()
.system("CMW")
.nameLike("LTB.BCT%:INTENSITY")
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
Dataset<Row> df3 = DataQuery.builder(spark).variables()
.system("CMW")
.nameLike("LTB.BCT%:INTENSITY")
.nameEq("LTB.BCT60:INTENSITY")
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
Dataset<Row> df4 = DataQuery.builder(spark).variables()
.system("CMW")
.idEq(1005050)
.idIn(Sets.newHashSet(1011562L))
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df1 = DataQuery.builder(spark).byVariables()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.variable("LTB.BCT60:INTENSITY")
.build();
Dataset<Row> df2 = DataQuery.builder(spark).byVariables()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.variableLike("LTB.BCT%:INTENSITY")
.build();
Dataset<Row> df3 = DataQuery.builder(spark).byVariables()
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.variableLike("LTB.BCT%:INTENSITY")
.variable("LTB.BCT60:INTENSITY")
.build();
import cern.nxcals.api.extraction.data.builders._
val df1 = DataQuery.builder(spark).variables().
system("CMW").
nameEq("LTB.BCT60:INTENSITY").
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
val df2 = DataQuery.builder(spark).variables().
system("CMW").
nameLike("LTB.BCT%:INTENSITY").
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
val df3 = DataQuery.builder(spark).variables().
system("CMW").
nameLike("LTB.BCT%:INTENSITY").
nameEq("LTB.BCT60:INTENSITY").
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
val df4 = DataQuery.builder(spark).variables().
system("CMW").
idEq(1005050).
idIn(Set(1011562)).
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
build()
import cern.nxcals.api.extraction.data.builders._
val df1 = DataQuery.builder(spark).byVariables().
system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
variable("LTB.BCT60:INTENSITY").
build()
val df2 = DataQuery.builder(spark).byVariables().
system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
variableLike("LTB.BCT%:INTENSITY").
build()
val df3 = DataQuery.builder(spark).byVariables().
system("CMW").
startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
variableLike("LTB.BCT%:INTENSITY").
variable("LTB.BCT60:INTENSITY").
build()
CMW data query builders
Builder responsible for querying using device/property pairs.
ParameterDataQuery.builder(spark)
.system("CMW") # str / String
.deviceEq("dev") # str / String
.propertyEq("property") # str / String
.deviceLike("dev_pattern%") # str / String - pattern should be in Oracle SQL style (% and _ as wildcards)
.propertyLike("property_pattern%") # str / String - pattern
.parameterEq("device/property") # str / String - parameter in form "device/property"
.parameterIn(["device/property"]) # List[str] / List<String> - list of parameters
.parameterLike("devi_e/proper%") # str / String - pattern should be in Oracle SQL style (% and _ as wildcards)
.timeWindow(start_time, end_time) # Time may be in multiple formats (str "YYYY-MM-DD HH24:MI:SS.SSS", nanos, datetime etc.) - check type in code
.atTime(time) # Only one time method invoke is possible
.fieldAliases({"alias": ["field1", "field2]}) # Dict[str, Set[str]] / Map<String, Set<String>> optional - allow to specify aliases
.fieldAliases("alias", "field1") # String, ...String - not accessible in Python
.build()
DevicePropertyDataQuery.builder(spark)
.system(systemString) # "SYSTEM"
# Obligatory time range block
.atTime(timeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.startTime(startTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
.duration(duration) # NUMBER
.endTime(endTimeUtcString) # "YYYY-MM-DD HH24:MI:SS.SSS"
# Optional data context block
.fieldAliases(fieldAliasesMap) # {"FIELD1": ["FIELD-ALIAS1"], "FIELD2": ["FIELD-ALIAS2"]}
# Obligatory entity block which can be repeated
.entity()
.device(deviceString) # "DEVICE-NAME"
.deviceLike(deviceString) # "DEVICE-NAME-WITH-WILDCARDS"
.property(propertyString) # "PROPERTY-NAME"
.propertyLike(propertyString) # "PROPERTY-NAME-WITH-WILDCARDS"
.entity()
.build()
.parameter(parameterString) # "VARIABLE-NAME/PROPERTY-NAME"
.parameterLike(parameterString) # "VARIABLE-NAME/PROPERTY-NAME-WITH-WILDCARDS"
.entity()
.build()
Examples
from nxcals.api.extraction.data.builders import *
df1 = ParameterDataQuery.builder(spark).system('CMW') \
.parameterEq('RADMON.PS-10/ExpertMonitoringAcquisition') \
.timeWindow('2017-08-29 00:00:00.000', '2017-08-29 00:00:10.000') \
.build()
df2 = ParameterDataQuery.builder(spark).system('CMW') \
.deviceEq('RADMON.PS-1').propertyEq('ExpertMonitoringAcquisition') \
.parameterEq('RADMON.PS-10/ExpertMonitoringAcquisition') \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.fieldAliases({'CURRENT 18V': {'current_18V', 'voltage_18V'}}) \
.build()
df3 = ParameterDataQuery.builder(spark).system('CMW') \
.parameterLike('RADMON.PS-%/ExpertMonitoringAcquisition') \
.timeWindow('2017-08-29 00:00:00.000', '2017-08-29 00:00:10.000') \
.build()
from nxcals.api.extraction.data.builders import *
df1 = DevicePropertyDataQuery.builder(spark) \
.system('CMW').startTime('2017-08-29 00:00:00.000').duration(10000000000) \
.entity().parameter('RADMON.PS-10/ExpertMonitoringAcquisition') \
.build()
df2 = DevicePropertyDataQuery.builder(spark) \
.system('CMW').startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.fieldAliases({'CURRENT 18V': {'current_18V', 'voltage_18V'}}) \
.entity().device('RADMON.PS-1').property('ExpertMonitoringAcquisition') \
.entity().parameter('RADMON.PS-10/ExpertMonitoringAcquisition') \
.build()
df3 = DevicePropertyDataQuery.builder(spark) \
.system('CMW').startTime('2017-08-29 00:00:00.000').duration(10000000000) \
.entity().parameterLike('RADMON.PS-%/ExpertMonitoringAcquisition') \
.build()
import cern.nxcals.api.extraction.data.builder.ParameterDataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Dataset<Row> df1 = ParameterDataQuery.builder(spark)
.system("CMW")
.parameterEq("RADMON.PS-10/ExpertMonitoringAcquisition")
.timeWindow("2017-08-29 00:00:00.000", "2017-08-29 00:00:10.000")
.build();
Set<String> fieldAliasesSet = new HashSet<>();
fieldAliasesSet.add("current_18V");
fieldAliasesSet.add("voltage_18V");
Map<String, Set<String>> fieldAliases = new HashMap<>();
fieldAliases.put("CURRENT 18V", fieldAliasesSet);
Dataset<Row> df2 = ParameterDataQuery.builder(spark)
.system("CMW")
.parameterIn("RADMON.PS-1/ExpertMonitoringAcquisition", "RADMON.PS-10/ExpertMonitoringAcquisition")
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.fieldAliases(fieldAliases)
.build();
df2.printSchema();
Dataset<Row> df3 = ParameterDataQuery.builder(spark)
.system("CMW")
.parameterLike("RADMON.PS-%/ExpertMonitoringAcquisition")
.timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000")
.build();
import cern.nxcals.api.extraction.data.builder.DevicePropertyDataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Dataset<Row> df1 = DevicePropertyDataQuery.builder(spark)
.system("CMW")
.startTime("2017-08-29 00:00:00.000").duration(10000000000l)
.entity().parameter("RADMON.PS-10/ExpertMonitoringAcquisition")
.build();
List<String> fieldAliasesList = new ArrayList<>();
fieldAliasesList.add("current_18V");
fieldAliasesList.add("voltage_18V");
Map<String, List<String>> fieldAliases = new HashMap<>();
fieldAliases.put("CURRENT 18V", fieldAliasesList);
Dataset<Row> df2 = DevicePropertyDataQuery.builder(spark)
.system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.fieldAliases(fieldAliases)
.entity().device("RADMON.PS-1").property("ExpertMonitoringAcquisition")
.entity().parameter("RADMON.PS-10/ExpertMonitoringAcquisition")
.build();
df2.printSchema();
Dataset<Row> df3 = DevicePropertyDataQuery.builder(spark)
.system("CMW")
.startTime("2017-08-29 00:00:00.000").duration(10000000000l)
.entity().parameterLike("RADMON.PS-%/ExpertMonitoringAcquisition")
.build();
import cern.nxcals.api.extraction.data.builders._
import scala.collection.JavaConverters._
val df1 = ParameterDataQuery.builder(spark).system("CMW")
parameterEq("RADMON.PS-10/ExpertMonitoringAcquisition").
timeWindow("2017-08-29 00:00:00.000", "2017-08-29 00:00:10.000").
build()
val df2 = ParameterDataQuery.builder(spark).system("CMW").
parameterIn("RADMON.PS-1/ExpertMonitoringAcquisition", "RADMON.PS-10/ExpertMonitoringAcquisition").
timeWindow("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000").
fieldsAliases(mapAsJavaMap(Map("CURRENT 18V" -> seqAsJavaList(Seq("current_18V", "voltage_18V"))))).
build()
val df3 = ParameterDataQuery.builder(spark).system("CMW").
parameterLike("RADMON.PS-%/ExpertMonitoringAcquisition").
timeWindow("2017-08-29 00:00:00.000", "2017-08-29 00:00:10.000").
build()
import cern.nxcals.api.extraction.data.builders._
import scala.collection.JavaConverters._
val df1 = DevicePropertyDataQuery.builder(spark).
system("CMW").startTime("2017-08-29 00:00:00.000").duration(10000000000l).
entity().parameter("RADMON.PS-10/ExpertMonitoringAcquisition").
build()
val df2 = DevicePropertyDataQuery.builder(spark).
system("CMW").startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000").
fieldAliases(mapAsJavaMap(Map("CURRENT 18V" -> seqAsJavaList(Seq("current_18V", "voltage_18V"))))).
entity().device("RADMON.PS-1").property("ExpertMonitoringAcquisition").
entity().parameter("RADMON.PS-10/ExpertMonitoringAcquisition").
build()
val df3 = DevicePropertyDataQuery.builder(spark).
system("CMW").startTime("2017-08-29 00:00:00.000").duration(10000000000l).
entity().parameterLike("RADMON.PS-%/ExpertMonitoringAcquisition").
build()
Alternative to builder static methods
There are another API methods, which are built on the top of builder. They can be used to query by entities and variables.
Variables
Syntax description for Python:
DataQuery.getForVariables(
spark=spark, # Spark session
system=systemString, # "SYSTEM"
start_time=startTimeUtcString, # "YYYY-MM-DD HH24:MI:SS.SSS"
end_time=endTimeUtcString, # "YYYY-MM-DD HH24:MI:SS.SSS"
variables= \ # Specify either variables, or variables_like, or both
variable_names_list, # ["VARIABLE-NAME1", "VARIABLE-NAME2"]
variables_like=variables_like_list # ["VARIABLE-NAME1-WITH-WILDCARDS", "VARIABLE-NAME2-WITH-WILDCARDS"]
field_aliases=field_aliases_map # {"FIELD1": ["FIELD-ALIAS1"], "FIELD2": ["FIELD-ALIAS2"]}
)
Simple query for variable:
from nxcals.api.extraction.data.builders import *
df1 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameIn(['LTB.BCT60:INTENSITY', 'LTB.BCT50:INTENSITY']) \
.timeWindow('2018-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
# or using static functions
df1static = DataQuery.getForVariables(spark, system='CMW',
start_time='2018-04-29 00:00:00.000',
end_time='2018-04-30 00:00:00.000',
variables=['LTB.BCT60:INTENSITY',
'LTB.BCT50:INTENSITY'])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
TimeWindow timeWindow1 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> df1 = DataQuery.builder(spark).variables()
.system("CMW")
.nameEq("LTB.BCT60:INTENSITY")
.timeWindow(timeWindow1)
.build();
// or
Dataset<Row> df1static =
DataQuery.getFor(spark, timeWindow1, "CMW", "LTB.BCT60:INTENSITY");
Query for variable by pattern of variable name:
from nxcals.api.extraction.data.builders import *
df2 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameLike('LTB.BCT%:INTENSITY') \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
# or using static functions
df2static = DataQuery.getForVariables(spark, system='CMW',
start_time='2018-04-29 00:00:00.000',
end_time='2018-04-30 00:00:00.000',
variables_like=['LTB.BCT%:INTENSITY'])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableList;
import java.util.List;
TimeWindow timeWindow2 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> df2 = DataQuery.builder(spark).variables()
.system("CMW")
.nameLike("LTB.BCT%:INTENSITY")
.timeWindow(timeWindow2)
.build();
// or
List<String> variablePatterns = ImmutableList.of("LTB.BCT%:INTENSITY");
Dataset<Row> df2static = DataQuery.getFor(spark, timeWindow2, "CMW",
Collections.emptyList(), variablePatterns);
You can also query for variables by multiple names and patterns:
from nxcals.api.extraction.data.builders import *
df3 = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameLike('LTB.BCT%:INTENSITY') \
.nameIn(['LTB.BCT60:INTENSITY', 'LTB.BCT50:INTENSITY']) \
.timeWindow('2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000') \
.build()
# or using static functions
df3static = DataQuery.getForVariables(spark, system='CMW',
start_time='2018-04-29 00:00:00.000',
end_time='2018-04-30 00:00:00.000',
variables=['LTB.BCT60:INTENSITY',
'LTB.BCT50:INTENSITY'],
variables_like=['LTB.BCT%:INTENSITY'])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableList;
import java.util.List;
TimeWindow timeWindow3 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> df3 = DataQuery.builder(spark).variables()
.system("CMW")
.nameLike("LTB.BCT%:INTENSITY")
.nameEq("LTB.BCT60:INTENSITY")
.timeWindow(timeWindow3)
.build();
// or
List<String> variables = ImmutableList.of("LTB.BCT60:INTENSITY");
Dataset<Row> df3static =
DataQuery.getFor(spark, timeWindow3, "CMW", variables, variablePatterns);
Entities
Syntax description for Python:
DataQuery.getForEntities(
spark=spark, # Spark session
system=systemString, # "SYSTEM"
start_time=startTimeUtcString, # "YYYY-MM-DD HH24:MI:SS.SSS"
end_time=endTimeUtcString, # "YYYY-MM-DD HH24:MI:SS.SSS"
entity_queries=[ # list of EntityQuery
EntityQuery( # Specify either key values
key_values=key_values_map, # {"KEY1": "VALUE1", "KEY2": "VALUE2"}
),
EntityQuery( # or key values like
key_values_like=key_values_like_map # {"KEY1", "VALUE1-WITH-WILDCARDS", "KEY2", "VALUE2-WITH-WILDCARDS"}
),
EntityQuery( # or both
key_values=key_values_map, # {"KEY1": "VALUE1", "KEY2": "VALUE2"}
key_values_like=key_values_like_map # {"KEY1", "VALUE1-WITH-WILDCARDS", "KEY2", "VALUE2-WITH-WILDCARDS"}
)
],
field_aliases=field_aliases_map # {"FIELD1": ["FIELD-ALIAS1"], "FIELD2": ["FIELD-ALIAS2"]}
)
Like above, snippets contain previous examples and queries rewritten to the new API below.
Search for entity by multiple key-values using Map:
from nxcals.api.extraction.data.builders import *
key_values = {'device': 'LHC.LUMISERVER',
'property': 'CrossingAngleIP1'}
df3 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesEq(key_values) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
# or using static functions
df3static = DataQuery.getForEntities(spark, system='CMW',
start_time='2019-04-29 00:00:00.000',
end_time='2019-04-30 00:00:00.000',
entity_queries=[EntityQuery(key_values=key_values)])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
TimeWindow timeWindow3 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Map<String, Object> keyValues = ImmutableMap.of(
"device", "LHC.LUMISERVER",
"property", "CrossingAngleIP1"
);
Dataset<Row> df3 = DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesEq(keyValues)
.timeWindow(timeWindow3)
.build();
//or
Dataset<Row> df3static = DataQuery.getFor(spark, timeWindow3, "CMW", keyValues);
Using patterns in search for entities:
from nxcals.api.extraction.data.builders import *
key_values_like = {'device': 'LHC.LUMISERVER',
'property': 'CrossingAngleIP%'}
df4 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesLike(key_values_like) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
# or using static functions
df4static = DataQuery.getForEntities(spark, system='CMW',
start_time='2019-04-29 00:00:00.000',
end_time='2019-04-30 00:00:00.000',
entity_queries=[
EntityQuery(key_values_like=key_values_like)])
import cern.nxcals.api.domain.EntityQuery;
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.Map;
TimeWindow timeWindow4 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Map<String, Object> keyValuesLike = ImmutableMap.of(
"device", "LHC.LUMISERVER",
"property", "CrossingAngleIP%"
);
Dataset<Row> df4 = DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesLike(keyValuesLike)
.timeWindow(timeWindow4)
.build();
// or
Dataset<Row> df4static = DataQuery.getFor(spark, timeWindow4, "CMW",
new EntityQuery(Collections.emptyMap(), keyValuesLike));
It is also possible to run multiple queries for entities, using both simple key-values or patterns:
from nxcals.api.extraction.data.builders import *
df5 = DataQuery.builder(spark).entities().system('CMW') \
.keyValuesEq(key_values) \
.keyValuesLike(key_values_like) \
.timeWindow('2019-04-29 00:00:00.000', '2019-04-30 00:00:00.000') \
.build()
# or using static functions
df5static = DataQuery.getForEntities(spark, system='CMW',
start_time='2019-04-29 00:00:00.000',
end_time='2019-04-30 00:00:00.000',
entity_queries=[EntityQuery(key_values=key_values),
EntityQuery(key_values_like=key_values_like)])
import cern.nxcals.api.domain.EntityQuery;
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
TimeWindow timeWindow5 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> df5 = DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesLike(keyValuesLike)
.keyValuesEq(keyValues)
.timeWindow(timeWindow5)
.build();
// or
Dataset<Row> df5static = DataQuery.getFor(spark, timeWindow5, "CMW",
new EntityQuery(keyValues),
new EntityQuery(new HashMap<>(), keyValuesLike));
Simple search for entity by key-value (deprecated):
from nxcals.api.extraction.data.builders import *
df1 = DataQuery.builder(spark).byEntities().system('WINCCOA') \
.startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \
.entity().keyValue('variable_name', 'MB.C16L2:U_HDS_3') \
.build()
# or using static functions
df1static = DataQuery.getForEntities(spark, system='WINCCOA',
start_time='2018-06-15 00:00:00.000',
end_time='2018-06-17 00:00:00.000',
entity_queries=[
EntityQuery(key_values={
'variable_name': 'MB.C16L2:U_HDS_3'
})])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableMap;
TimeWindow timeWindow1 = TimeWindow.fromStrings(
"2018-06-15 00:00:00.000", "2018-06-17 00:00:00.000");
Dataset<Row> df1 = DataQuery.builder(spark).byEntities()
.system("WINCCOA")
.startTime(timeWindow1.getStartTime()).endTime(timeWindow1.getEndTime())
.entity().keyValue("variable_name", "MB.C16L2:U_HDS_3")
.build();
// or
Dataset<Row> df1static = DataQuery.getFor(spark, timeWindow1, "WINCCOA",
ImmutableMap.of("variable_name", "MB.C16L2:U_HDS_3"));
Search for entity by multiple key-values (deprecated):
from nxcals.api.extraction.data.builders import *
df2 = DataQuery.builder(spark).byEntities().system('CMW') \
.startTime('2019-04-29 00:00:00.000').endTime('2019-04-30 00:00:00.000') \
.entity() \
.keyValue('device', 'LHC.LUMISERVER') \
.keyValue('property', 'CrossingAngleIP1') \
.build()
# or using static functions
df2static = DataQuery.getForEntities(spark, system='CMW',
start_time='2019-04-29 00:00:00.000',
end_time='2019-04-30 00:00:00.000',
entity_queries=[
EntityQuery(key_values={
'device': 'LHC.LUMISERVER',
'property': 'CrossingAngleIP1'
})])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.ImmutableMap;
TimeWindow timeWindow2 = TimeWindow.fromStrings(
"2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> df2 = DataQuery.builder(spark).byEntities()
.system("CMW")
.startTime(timeWindow2.getStartTime()).endTime(timeWindow2.getEndTime())
.entity().keyValue("device", "LHC.LUMISERVER")
.keyValue("property", "CrossingAngleIP1").build();
// or
Dataset<Row> df2static = DataQuery.getFor(spark, timeWindow2, "CMW",
ImmutableMap.of("device", "LHC.LUMISERVER",
"property", "CrossingAngleIP1"));
Other helper methods
Pivot for variables
Often the problem when extracting variables is that they can have different types. Also separating values from different variables requires additional grouping or filtering. In such cases pivot on variables might be helpful. In general, it returns Dataset
, which contains one column with timestamps ("nxcals_timestamps"
) and other columns named after variable names, with values coming from these variables. It doesn't require variables to have the same type, neither to have aligned timestamps. In the latter case, if a variable doesn't have values for a given timestamp, there will be a null
in output row. Since it requires a series of full-outer join operations, the performance of the operation is in general very low when compared to normal extraction. Then, we strongly recommend using it wisely, on reasonable small datasets.
Example of usage (please notice that it is marked as experimental, and may be subject to change in the future):
from nxcals.api.extraction.data.builders import *
cern_nxcals_api = spark._jvm.cern.nxcals.api
Variables = cern_nxcals_api.extraction.metadata.queries.Variables
variableService = cern_nxcals_api.extraction.metadata.ServiceClientFactory.createVariableService()
variables = variableService.findAll(Variables.suchThat().variableName().like("LTB.BCT%:INTENSITY"))
dataset = DataQuery.getAsPivot(spark, '2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000', variables)
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import cern.nxcals.api.extraction.metadata.ServiceClientFactory;
import cern.nxcals.api.extraction.metadata.VariableService;
import cern.nxcals.api.metadata.queries.Variables;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
VariableService variableService = ServiceClientFactory.createVariableService();
Set<Variable> variables = variableService.findAll(
Variables.suchThat().variableName().like("LTB.BCT%:INTENSITY"));
TimeWindow timeWindow = TimeWindow.fromStrings("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> dataset = DataQuery.getAsPivot(spark, timeWindow, variables);
dataset.printSchema();
Pivot by variable names
Apart from above methods, they are overloaded to accept also variable names with system name. Examples:
from nxcals.api.extraction.data.builders import *
dataset = DataQuery.getAsPivot(spark, '2018-04-29 00:00:00.000', '2018-04-30 00:00:00.000',
system="CMW", variable_names=["LTB.BCT50:INTENSITY"])
import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.builder.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
TimeWindow timeWindow = TimeWindow.fromStrings("2018-04-29 00:00:00.000", "2018-04-30 00:00:00.000");
Dataset<Row> dataset = DataQuery.getAsPivot(spark, timeWindow, "CMW", List.of("LTB.BCT50:INTENSITY"));
dataset.printSchema();