This step-by-step-guide aims to walk you through the process of creating application demonstrating usage of NXCALS Data Extraction API.
What you will build
An application which extracts sample data from two CMW device/property parameters for a given time range from NXCALS PRO environment. As well it will demonstrate some very basic Spark operations on the retrieved DataFrames including join operation. It will be followed by extraction of a variable hierarchy with attached variables.
Create application
Create main class of the application:
@SpringBootApplication
@Import(cern.nxcals.api.config.SparkContext.class)
public class DataAccessExample {
static {
System.setProperty("logging.config", "classpath:log4j2.yml");
// NXCALS PRO
System.setProperty("service.url",
"https://cs-ccr-nxcals5.cern.ch:19093,https://cs-ccr-nxcals6.cern.ch:19093,https://cs-ccr-nxcals7.cern.ch:19093,https://cs-ccr-nxcals8.cern.ch:19093,https://cs-ccr-nxcals5.cern.ch:19094,https://cs-ccr-nxcals6.cern.ch:19094,https://cs-ccr-nxcals7.cern.ch:19094,https://cs-ccr-nxcals8.cern.ch:19094");
}
private static final Logger log = LoggerFactory.getLogger(DataAccessExample.class);
private static final String SYSTEM_NAME = "CMW";
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(DataAccessExample.class, args);
SparkSession spark = context.getBean(SparkSession.class);
DataAccessExample dataAccessExample = new DataAccessExample();
}
}
List<Long> getCmwData(SparkSession spark) {
List<Long> datasetSizes = new ArrayList<>();
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
Dataset<Row> exampleDataset = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
.parameterEq("SPSBQMSPSv1/Acquisition")
.timeWindow(startTime, endTime)
.build();
datasetSizes.add(exampleDataset.count());
Dataset<Row> tgmData = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
.parameterEq("SPS.TGM/FULL-TELEGRAM.STRC")
.timeWindow(startTime, endTime).build();
datasetSizes.add(tgmData.count());
return datasetSizes;
}
List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
HierarchyService service = ServiceClientFactory.createHierarchyService();
List<Long> datasetSizes = new ArrayList<>();
Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
.orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));
String startTime = "2018-06-19 00:00:00.000"; //UTC
String endTime = "2018-06-19 00:10:00.000"; //UTC
Set<Variable> variables = service.getVariables(node.getId());
for (Variable variableData : variables) {
Dataset<Row> dataset = DataQuery.builder(spark).variables().system("CMW")
.nameEq(variableData.getVariableName()).timeWindow(startTime, endTime).build();
long datasetSize = dataset.count();
datasetSizes.add(datasetSize);
}
return datasetSizes;
}
List<Long> getCmwData(SparkSession spark) {
List<Long> datasetSizes = new ArrayList<>();
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
Dataset<Row> exampleDataset = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME)
.startTime(startTime).endTime(endTime).entity()
.parameter("SPSBQMSPSv1/Acquisition").build();
datasetSizes.add(exampleDataset.count());
Dataset<Row> tgmData = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME).startTime(startTime)
.endTime(endTime)
.entity().parameter("SPS.TGM/FULL-TELEGRAM.STRC").build();
datasetSizes.add(tgmData.count());
return datasetSizes;
}
List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
HierarchyService service = ServiceClientFactory.createHierarchyService();
List<Long> datasetSizes = new ArrayList<>();
Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
.orElseThrow(()->new IllegalArgumentException("No such hierarchy path " + hierarchyPath));
String startTime = "2018-06-19 00:00:00.000"; //UTC
String endTime = "2018-06-19 00:10:00.000"; //UTC
Set<Variable> variables = service.getVariables(node.getId());
for (Variable variableData: variables) {
Dataset<Row> dataset = DataQuery.builder(spark).byVariables().system("CMW").startTime(startTime)
.endTime(endTime).variable(variableData.getVariableName()).build();
long datasetSize = dataset.count();
datasetSizes.add(datasetSize);
}
return datasetSizes;
}
"Enhance" created method with some Spark data manipulations and logging information:
List<Long> getCmwData(SparkSession spark) {
List<Long> datasetSizes = new ArrayList<>();
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
//This is the meta-data query
Dataset<Row> exampleDataset = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
.parameterEq("SPSBQMSPSv1/Acquisition")
.timeWindow(startTime, endTime).build();
log.info("What are the fields available?");
exampleDataset.printSchema();
log.info("Some timing data here:");
exampleDataset.select("cyclestamp", "selector", "bunchIntensityMean").show();
long nrOfRecordsinExampleDataset = exampleDataset.count();
log.info("Let's see how many records data were submitted during that time: {} ", nrOfRecordsinExampleDataset);
datasetSizes.add(nrOfRecordsinExampleDataset);
log.info("Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:");
exampleDataset.describe("bunchIntensityMean").show();
exampleDataset.createOrReplaceTempView("myData");
log.info(
"Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'");
spark.sql("select avg(bunchIntensityMean) from myData where selector == 'SPS.USER.LHC1' ").show();
//TGM data to extract destination
Dataset<Row> tgmData = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
.parameterEq("SPS.TGM/FULL-TELEGRAM.STRC")
.timeWindow(startTime, endTime).build();
datasetSizes.add(tgmData.count());
log.info("What are the fields available?");
tgmData.printSchema();
log.info("Printing out data");
tgmData.select("cyclestamp", "USER", "DEST").show();
log.info("Join data to show only for a destination == LHC");
exampleDataset.join(tgmData, "cyclestamp").where("DEST = 'LHC' and bunchIntensityMean is not null")
.select("cyclestamp", "bunchIntensityMean", "DEST").show();
return datasetSizes;
}
List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
HierarchyService service = ServiceClientFactory.createHierarchyService();
List<Long> datasetSizes = new ArrayList<>();
log.info("Getting hierarchy for {}", hierarchyPath);
Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
.orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));
log.info("Found hierarchy: {}", node.getNodePath());
String startTime = "2018-06-19 00:00:00.000"; //UTC
String endTime = "2018-06-19 00:10:00.000"; //UTC
Set<Variable> variables = service.getVariables(node.getId());
for (Variable variableData : variables) {
log.info("Querying for {} variable between {} and {}", variableData.getVariableName(), startTime, endTime);
Dataset<Row> dataset = DataQuery.builder(spark).variables().system("CMW")
.nameEq(variableData.getVariableName())
.timeWindow(startTime, endTime).build();
long datasetSize = dataset.count();
datasetSizes.add(datasetSize);
log.info("Got {} rows for {}", datasetSize, variableData.getVariableName());
dataset.show();
}
return datasetSizes;
}
List<Long> getCmwData(SparkSession spark) {
List<Long> datasetSizes = new ArrayList<>();
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
//This is the meta-data query
Dataset<Row> exampleDataset = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME)
.startTime(startTime).endTime(endTime).entity()
.parameter("SPSBQMSPSv1/Acquisition").build();
log.info("What are the fields available?");
exampleDataset.printSchema();
log.info("Some timing data here:");
exampleDataset.select("cyclestamp", "selector", "bunchIntensityMean").show();
long nrOfRecordsinExampleDataset = exampleDataset.count();
log.info("Let's see how many records data were submitted during that time: {} ", nrOfRecordsinExampleDataset);
datasetSizes.add(nrOfRecordsinExampleDataset);
log.info("Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:");
exampleDataset.describe("bunchIntensityMean").show();
exampleDataset.createOrReplaceTempView("myData");
log.info(
"Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'");
spark.sql("select avg(bunchIntensityMean) from myData where selector == 'SPS.USER.LHC1' ").show();
//TGM data to extract destination
Dataset<Row> tgmData = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME).startTime(startTime)
.endTime(endTime)
.entity().parameter("SPS.TGM/FULL-TELEGRAM.STRC").build();
datasetSizes.add(tgmData.count());
log.info("What are the fields available?");
tgmData.printSchema();
log.info("Printing out data");
tgmData.select("cyclestamp", "USER", "DEST").show();
log.info("Join data to show only for a destination == LHC");
exampleDataset.join(tgmData, "cyclestamp").where("DEST = 'LHC' and bunchIntensityMean is not null")
.select("cyclestamp", "bunchIntensityMean", "DEST").show();
return datasetSizes;
}
List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
HierarchyService service = ServiceClientFactory.createHierarchyService();
List<Long> datasetSizes = new ArrayList<>();
log.info("Getting hierarchy for {}", hierarchyPath);
Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
.orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));
log.info("Found hierarchy: {}", node.getNodePath());
String startTime = "2018-06-19 00:00:00.000"; //UTC
String endTime = "2018-06-19 00:10:00.000"; //UTC
Set<Variable> variables = service.getVariables(node.getId());
for (Variable variableData : variables) {
log.info("Querying for {} variable between {} and {}", variableData.getVariableName(), startTime, endTime);
Dataset<Row> dataset = DataQuery.builder(spark).byVariables().system("CMW").startTime(startTime)
.endTime(endTime).variable(variableData.getVariableName()).build();
long datasetSize = dataset.count();
datasetSizes.add(datasetSize);
log.info("Got {} rows for {}", datasetSize, variableData.getVariableName());
dataset.show();
}
return datasetSizes;
}
and add them to the main method:
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(DataAccessExample.class, args);
SparkSession spark = context.getBean(SparkSession.class);
DataAccessExample dataAccessExample = new DataAccessExample();
log.info("############## Lets get some cmw data ############");
dataAccessExample.getCmwData(spark);
log.info("############## Lets get some variable data ############");
dataAccessExample.getVariableDataForHierarchy(spark, "/EXAMPLE");
log.info("THE END, Happy Exploring Spark API!");
}
Add Spring YAML configuration file
Specific Spark session properties needs to be configured in the external file: resources/application.yml
For running Spark locally with as many worker threads as logical cores on the local machine the following settings can be used:
spark:
masterType: local[*]
java:
home: /var/nxcals/jdk1.11
appName: nxcals_code-snippets
properties:
mapreduce:
map.env.JAVA_HOME: ${spark.java.home}
reduce.env.JAVA_HOME: ${spark.java.home}
spark:
ui.port: 4050
sql.caseSensitive: true
executorEnv.JAVA_HOME: ${spark.java.home}
executor:
instances: 4
cores: 1
memory: 1g
extraClassPath: /usr/hdp/hadoop/lib/native
For taking advantage of running Spark on YARN cluster in a client mode the file should be slightly modified and extended with YARN specific settings:
nxcals.common.dir: nxcals_test
spark:
masterType: yarn
# masterType: local[*]
deployMode: client
java:
home: /var/nxcals/jdk1.11
appName: nxcals_code-snippets
version: 3.1.1
properties:
yarn:
app.mapreduce.am.env.JAVA_HOME: ${spark.java.home}
mapreduce:
map.env.JAVA_HOME: ${spark.java.home}
reduce.env.JAVA_HOME: ${spark.java.home}
spark:
submit.deployMode: client
ui.port: 4050
sql.caseSensitive: true
executorEnv.JAVA_HOME: ${spark.java.home}
executor:
instances: 4
cores: 1
memory: 1g
extraClassPath: /usr/hdp/hadoop/lib/native
extraLibraryPath: /usr/lib/hadoop/lib/native
yarn:
appMasterEnv.JAVA_HOME: ${spark.java.home}
jars: hdfs:///project/nxcals/lib/spark-${spark.version}/*.jar,hdfs:///project/nxcals/nxcals_lib/${nxcals.common.dir}/*.jar
am.extraClassPath: /usr/hdp/hadoop/lib/native
am.extraLibraryPath: /usr/lib/hadoop/lib/native
access.hadoopFileSystems: nxcals
historyServer.address: ithdp1001.cern.ch:18080
Build application
Prepare gradle.properties
file with the following content:
nxcalsVersion=1.5.24
gsonVersion=2.9.1
springBootVersion=2.7.16
log4jVersion=2.21.0
fasterxmlJacksonVersion=2.13.5
Important
Make sure that you have the latest production version of NXCALS!
Prepare a "minimal" version of build.gradle
file and place it in the main directory of your application.
apply plugin:'application'
mainClassName = "cern.myproject.DataAccessExample"
configurations.all {
resolutionStrategy {
force 'com.google.code.gson:gson:' + gsonVersion
eachDependency { DependencyResolveDetails details ->
if (details.requested.group.startsWith('com.fasterxml.jackson')) {
details.useVersion fasterxmlJacksonVersion
}
}
}
// Depending on your gradle version you might have to do those exclusions (prior to gradle 5.x it did not honor the exclusions from POMs).
exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
exclude group: "org.slf4j", module: "slf4j-log4j12"
exclude group: "log4j", module: "log4j"
exclude group: "log4j", module: "apache-log4j-extras"
exclude group: "ch.qos.logback", module: "logback-classic"
}
dependencies {
compile group: 'cern.nxcals', name: 'nxcals-extraction-starter', version: nxcalsVersion
compile group: 'cern.nxcals', name: 'nxcals-hadoop-pro-config', version: nxcalsVersion
compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: springBootVersion
compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: springBootVersion
compile group: 'org.springframework.boot', name: 'spring-boot-starter-log4j2', version: springBootVersion
compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: log4jVersion
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion
//Required for Yaml in Log4j2
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: fasterxmlJacksonVersion
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: fasterxmlJacksonVersion
}
Important
Please note required dependencies for data extraction:
compile group: 'cern.nxcals', name: 'nxcals-extraction-starter'
compile group: 'cern.nxcals', name: 'nxcals-hadoop-pro-config'
Spring boot framework dependencies must be added as well since the stack was selected as a possible choice for our demo application (optional for data extraction itself).
Build and run the application:
../gradlew build run
Click to see expected application output...
> Task :extraction-api-examples:run
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.3.RELEASE)
2019-11-21 17:06:15.499 [INFO ] [main] DataAccessExample - Starting DataAccessExample on cs-ccr-dev3.cern.ch with PID 21027 (/opt/psowinsk/nxcals-examples/extraction-api-examples/build/classes/java/main started by psowinsk in /opt/psowinsk/nxcals-examples/extraction-api-examples)
2019-11-21 17:06:15.512 [INFO ] [main] DataAccessExample - No active profile set, falling back to default profiles: default
2019-11-21 17:06:17.090 [WARN ] [main] NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-11-21 17:06:18.787 [INFO ] [main] DataAccessExample - Started DataAccessExample in 3.837 seconds (JVM running for 5.184)
2019-11-21 17:06:18.791 [INFO ] [main] DataAccessExample - ############## Lets get some cmw data ############
2019-11-21 17:06:19.301 [WARN ] [main] URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2019-11-21 17:06:20.957 [WARN ] [main] DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-11-21 17:06:25.511 [INFO ] [main] DataAccessExample - What are the fields available?
root
|-- __record_timestamp__: long (nullable = true)
|-- __record_version__: long (nullable = true)
|-- acqStamp: long (nullable = true)
|-- beamOk: boolean (nullable = true)
|-- bqmOutputEnable: boolean (nullable = true)
|-- bunchIntensities: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- bunchIntensityMax: float (nullable = true)
|-- bunchIntensityMaxThr: float (nullable = true)
|-- bunchIntensityMean: float (nullable = true)
|-- bunchIntensityMin: float (nullable = true)
|-- bunchIntensityMinThr: float (nullable = true)
|-- bunchIntensityModIndex: float (nullable = true)
|-- bunchIntensityModIndexOk: boolean (nullable = true)
|-- bunchIntensityModIndexThr: float (nullable = true)
|-- bunchIntensityOk: boolean (nullable = true)
|-- bunchIntensityStd: float (nullable = true)
|-- bunchLengthMax: float (nullable = true)
|-- bunchLengthMaxThr: float (nullable = true)
|-- bunchLengthMean: float (nullable = true)
|-- bunchLengthMin: float (nullable = true)
|-- bunchLengthMinThr: float (nullable = true)
|-- bunchLengthOk: boolean (nullable = true)
|-- bunchLengthStd: float (nullable = true)
|-- bunchLengthStdOk: boolean (nullable = true)
|-- bunchLengthStdThr: float (nullable = true)
|-- bunchLengths: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- bunchPeakMax: float (nullable = true)
|-- bunchPeakMaxThr: float (nullable = true)
|-- bunchPeakMean: float (nullable = true)
|-- bunchPeakMin: float (nullable = true)
|-- bunchPeakMinThr: float (nullable = true)
|-- bunchPeakOk: boolean (nullable = true)
|-- bunchPeakStd: float (nullable = true)
|-- bunchPeakStdOk: boolean (nullable = true)
|-- bunchPeakStdThr: float (nullable = true)
|-- bunchPeaks: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- bunchPositions: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- class: string (nullable = true)
|-- cyclestamp: long (nullable = true)
|-- device: string (nullable = true)
|-- diagnostic: integer (nullable = true)
|-- doubletIntensitySplitOk: boolean (nullable = true)
|-- doublets: boolean (nullable = true)
|-- filledBuckets: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- firstBunchPositionOk: boolean (nullable = true)
|-- isTrailingDoublet: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: boolean (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- nAllowedLongBunches: integer (nullable = true)
|-- nBunches: integer (nullable = true)
|-- nBunchesMax: integer (nullable = true)
|-- nBunchesMaxOk: boolean (nullable = true)
|-- nBunchesRamp: integer (nullable = true)
|-- nSatellitesIntensity: integer (nullable = true)
|-- nSatellitesMidBucket: integer (nullable = true)
|-- nTooLongBunches: integer (nullable = true)
|-- nWarningMessages: integer (nullable = true)
|-- oscillationPercentDeltaMax: float (nullable = true)
|-- oscillationPercentDeltaMaxThr: float (nullable = true)
|-- oscillationPercentDeltas: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- oscillationPercentPeakMax: float (nullable = true)
|-- oscillationPercentPeakMaxThr: float (nullable = true)
|-- patternOk: boolean (nullable = true)
|-- property: string (nullable = true)
|-- satelliteBucketsIntensity: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- satelliteBucketsMid: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: float (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- satellitesIntensityThr: float (nullable = true)
|-- satellitesMidBucketThr: float (nullable = true)
|-- satellitesOk: boolean (nullable = true)
|-- selector: string (nullable = true)
|-- stabilityOk: boolean (nullable = true)
|-- tgmNLSQCTL: boolean (nullable = true)
|-- tgmSCNUM: integer (nullable = true)
|-- verifyPattern: boolean (nullable = true)
|-- warningMessages: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- wrongBuckets: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- nxcals_entity_id: long (nullable = true)
2019-11-21 17:06:25.518 [INFO ] [main] DataAccessExample - Some timing data here:
2019-11-21 17:06:25.995 [WARN ] [main] Utils - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+-------------------+-----------------+------------------+
| cyclestamp| selector|bunchIntensityMean|
+-------------------+-----------------+------------------+
|1533083631735000000| SPS.USER.LHC1| null|
|1533084078135000000|SPS.USER.LHCPILOT| 2077.4004|
|1533083185335000000| SPS.USER.LHC1| 2458.5662|
|1533085008135000000|SPS.USER.LHCPILOT| 0.0|
|1533082999335000000| SPS.USER.LHC1| 2480.7834|
|1533083148135000000| SPS.USER.LHC1| null|
|1533084784935000000|SPS.USER.LHCPILOT| 0.0|
|1533084040935000000|SPS.USER.LHCPILOT| null|
|1533084338535000000|SPS.USER.LHCPILOT| 0.0|
|1533083668935000000| SPS.USER.LHC1| 2489.4307|
|1533083966535000000|SPS.USER.LHCPILOT| null|
|1533083408535000000| SPS.USER.LHC1| null|
|1533084115335000000|SPS.USER.LHCPILOT| null|
|1533083445735000000| SPS.USER.LHC1| 2529.9453|
|1533083371335000000| SPS.USER.LHC1| null|
|1533085082535000000|SPS.USER.LHCPILOT| 0.0|
|1533084822135000000|SPS.USER.LHCPILOT| 0.0|
|1533085156935000000| SPS.USER.LHC1| 2570.6726|
|1533083259735000000| SPS.USER.LHC1| 2504.587|
|1533084970935000000|SPS.USER.LHCPILOT| 0.0|
+-------------------+-----------------+------------------+
only showing top 20 rows
2019-11-21 17:06:27.738 [INFO ] [main] DataAccessExample - Let's see how many records data were submitted during that time: 61
2019-11-21 17:06:27.738 [INFO ] [main] DataAccessExample - Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:
+-------+------------------+
|summary|bunchIntensityMean|
+-------+------------------+
| count| 30|
| mean| 1605.936922200521|
| stddev|1162.7277248622167|
| min| 0.0|
| max| 2608.4077|
+-------+------------------+
2019-11-21 17:06:28.240 [INFO ] [main] DataAccessExample - Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'
+-----------------------+
|avg(bunchIntensityMean)|
+-----------------------+
| 2338.4426432291666|
+-----------------------+
2019-11-21 17:06:29.156 [INFO ] [main] DataAccessExample - What are the fields available?
root
|-- BEAMID: integer (nullable = true)
|-- BPNCY: integer (nullable = true)
|-- BPNM: integer (nullable = true)
|-- COMLN: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- CYCLE: integer (nullable = true)
|-- CYTAG: integer (nullable = true)
|-- DDEST: string (nullable = true)
|-- DEST: string (nullable = true)
|-- DURN: integer (nullable = true)
|-- FTOP_ENG: integer (nullable = true)
|-- INJ_ENG: integer (nullable = true)
|-- LHCSEQE: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- LHCSEQR: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- MISC: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- MISC_A: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- MMODE: string (nullable = true)
|-- NBINJ: integer (nullable = true)
|-- NCYTAG: integer (nullable = true)
|-- NDURN: integer (nullable = true)
|-- NUSER: string (nullable = true)
|-- PARTY: string (nullable = true)
|-- SCNUM: integer (nullable = true)
|-- SCTAG: integer (nullable = true)
|-- SPCON: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
|-- USER: string (nullable = true)
|-- __LSA_CYCLE__: string (nullable = true)
|-- __record_timestamp__: long (nullable = true)
|-- __record_version__: long (nullable = true)
|-- acqStamp: long (nullable = true)
|-- class: string (nullable = true)
|-- cyclestamp: long (nullable = true)
|-- device: string (nullable = true)
|-- property: string (nullable = true)
|-- selector: string (nullable = true)
|-- nxcals_entity_id: long (nullable = true)
2019-11-21 17:06:29.157 [INFO ] [main] DataAccessExample - Printing out data
+-------------------+--------+--------+
| cyclestamp| USER| DEST|
+-------------------+--------+--------+
|1533082468935000000| MD1|SPS_DUMP|
|1533083874135000000| MD1|SPS_DUMP|
|1533083296935000000| LHC1| LHC|
|1533084882135000000| SFTPRO2| FTARGET|
|1533081925335000000| SFTPRO2| FTARGET|
|1533082054935000000| SFTPRO2| FTARGET|
|1533083702535000000| MD1|SPS_DUMP|
|1533083946135000000| ZERO|SPS_DUMP|
|1533084148935000000| MD1|SPS_DUMP|
|1533084318135000000| ZERO|SPS_DUMP|
|1533084576135000000| ZERO|SPS_DUMP|
|1533084912135000000| ZERO|SPS_DUMP|
|1533081633735000000| MD1|SPS_DUMP|
|1533082040535000000| SFTPRO2| FTARGET|
|1533082699335000000| MD1|SPS_DUMP|
|1533082281735000000| MD1|SPS_DUMP|
|1533082371735000000| SFTPRO2| FTARGET|
|1533082429335000000| SFTPRO2| FTARGET|
|1533082886535000000| MD1|SPS_DUMP|
|1533083854935000000|LHCPILOT| LHC|
+-------------------+--------+--------+
only showing top 20 rows
2019-11-21 17:06:29.450 [INFO ] [main] DataAccessExample - Join data to show only for a destination == LHC
+-------------------+------------------+----+
| cyclestamp|bunchIntensityMean|DEST|
+-------------------+------------------+----+
|1533083296935000000| 2506.4263| LHC|
|1533083854935000000| 2251.466| LHC|
|1533083259735000000| 2504.587| LHC|
|1533085082535000000| 0.0| LHC|
|1533083185335000000| 2458.5662| LHC|
|1533083110935000000| 2608.4077| LHC|
|1533084412935000000| 2210.8672| LHC|
|1533084375735000000| 2328.534| LHC|
|1533085194135000000| 0.0| LHC|
|1533083445735000000| 2529.9453| LHC|
|1533085008135000000| 0.0| LHC|
|1533082999335000000| 2480.7834| LHC|
|1533085156935000000| 2570.6726| LHC|
|1533083817735000000| 2044.6013| LHC|
|1533083222535000000| 2477.3943| LHC|
|1533084822135000000| 0.0| LHC|
|1533083073735000000| 2493.0784| LHC|
|1533085119735000000| 2529.7126| LHC|
|1533084078135000000| 2077.4004| LHC|
|1533084338535000000| 0.0| LHC|
+-------------------+------------------+----+
only showing top 20 rows
2019-11-21 17:06:30.111 [INFO ] [main] DataAccessExample - ############## Lets get some variable data ############
2019-11-21 17:06:30.112 [INFO ] [main] DataAccessExample - Getting hierarchy for /EXAMPLE
2019-11-21 17:06:30.253 [INFO ] [main] DataAccessExample - Found hierarchy: /EXAMPLE
2019-11-21 17:06:30.253 [INFO ] [main] DataAccessExample - Querying for PR.DCAFTINJ_1:INTENSITY variable between 2018-06-19 00:00:00.000 and 2018-06-19 00:10:00.000
2019-11-21 17:06:30.870 [INFO ] [main] DataAccessExample - Got 477 rows for PR.DCAFTINJ_1:INTENSITY
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id| nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
| 0.0| 55414|1529366833900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366932300000000|PR.DCAFTINJ_1:INT...|
| -0.07446789| 55414|1529366416300000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366583100000000|PR.DCAFTINJ_1:INT...|
| 8.697608| 55414|1529366611900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366615500000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366677900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366980300000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366437900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366907100000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366959900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366878300000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366578300000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366773900000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366526700000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366674300000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366736700000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366859100000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366537500000000|PR.DCAFTINJ_1:INT...|
| 0.0| 55414|1529366728300000000|PR.DCAFTINJ_1:INT...|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows
2019-11-21 17:06:31.205 [INFO ] [main] DataAccessExample - THE END, Happy Exploring Spark API!
Summary
You have successfully retrieved 2 CMW device/property parameters and a hierarchy with attached variables.
See also
Other pages related to the presented example: