Skip to content

This step-by-step-guide aims to walk you through the process of creating application demonstrating usage of NXCALS Data Extraction API.

What you will build

An application which extracts sample data from two CMW device/property parameters for a given time range from NXCALS PRO environment. As well it will demonstrate some very basic Spark operations on the retrieved DataFrames including join operation. It will be followed by extraction of a variable hierarchy with attached variables.

Create application

Create main class of the application:

@SpringBootApplication
    @Import(cern.nxcals.api.config.SparkContext.class)
    public class DataAccessExample {

        static {
            System.setProperty("logging.config", "classpath:log4j2.yml");

            // NXCALS PRO
            System.setProperty("service.url",
                    "https://cs-ccr-nxcals5.cern.ch:19093,https://cs-ccr-nxcals6.cern.ch:19093,https://cs-ccr-nxcals7.cern.ch:19093,https://cs-ccr-nxcals8.cern.ch:19093,https://cs-ccr-nxcals5.cern.ch:19094,https://cs-ccr-nxcals6.cern.ch:19094,https://cs-ccr-nxcals7.cern.ch:19094,https://cs-ccr-nxcals8.cern.ch:19094");

        }
        private static final Logger log = LoggerFactory.getLogger(DataAccessExample.class);
        private static final String SYSTEM_NAME = "CMW";

        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(DataAccessExample.class, args);
            SparkSession spark = context.getBean(SparkSession.class);

            DataAccessExample dataAccessExample = new DataAccessExample();
        }
}
add to the main class methods for retrieval of the CMW signals (returning dataset sizes) and for retrieval of the hierarchy:

List<Long> getCmwData(SparkSession spark) {

    List<Long> datasetSizes = new ArrayList<>();

    String startTime = "2018-08-01 00:00:00.00";
    String endTime = "2018-08-01 01:00:00.00";

    Dataset<Row> exampleDataset = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
            .parameterEq("SPSBQMSPSv1/Acquisition")
            .timeWindow(startTime, endTime)
            .build();
    datasetSizes.add(exampleDataset.count());

    Dataset<Row> tgmData = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
            .parameterEq("SPS.TGM/FULL-TELEGRAM.STRC")
            .timeWindow(startTime, endTime).build();
    datasetSizes.add(tgmData.count());

    return datasetSizes;
}

List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
    HierarchyService service = ServiceClientFactory.createHierarchyService();
    List<Long> datasetSizes = new ArrayList<>();

    Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
            .orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));

    String startTime = "2018-06-19 00:00:00.000"; //UTC
    String endTime = "2018-06-19 00:10:00.000"; //UTC
    Set<Variable> variables = service.getVariables(node.getId());
    for (Variable variableData : variables) {
        Dataset<Row> dataset = DataQuery.builder(spark).variables().system("CMW")
                .nameEq(variableData.getVariableName()).timeWindow(startTime, endTime).build();

        long datasetSize = dataset.count();
        datasetSizes.add(datasetSize);
    }

    return datasetSizes;
}
List<Long> getCmwData(SparkSession spark) {

    List<Long> datasetSizes = new ArrayList<>();

    String startTime = "2018-08-01 00:00:00.00";
    String endTime = "2018-08-01 01:00:00.00";

    Dataset<Row> exampleDataset = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME)
            .startTime(startTime).endTime(endTime).entity()
            .parameter("SPSBQMSPSv1/Acquisition").build();
    datasetSizes.add(exampleDataset.count());

    Dataset<Row> tgmData = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME).startTime(startTime)
            .endTime(endTime)
            .entity().parameter("SPS.TGM/FULL-TELEGRAM.STRC").build();
    datasetSizes.add(tgmData.count());

    return datasetSizes;
}

List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
    HierarchyService service = ServiceClientFactory.createHierarchyService();
    List<Long> datasetSizes = new ArrayList<>();

    Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
            .orElseThrow(()->new IllegalArgumentException("No such hierarchy path " + hierarchyPath));

    String startTime = "2018-06-19 00:00:00.000"; //UTC
    String endTime = "2018-06-19 00:10:00.000"; //UTC
    Set<Variable> variables = service.getVariables(node.getId());
    for (Variable variableData: variables) {
        Dataset<Row> dataset = DataQuery.builder(spark).byVariables().system("CMW").startTime(startTime)
                .endTime(endTime).variable(variableData.getVariableName()).build();

        long datasetSize = dataset.count();
        datasetSizes.add(datasetSize);
    }

    return datasetSizes;
}

"Enhance" created method with some Spark data manipulations and logging information:

List<Long> getCmwData(SparkSession spark) {

        List<Long> datasetSizes = new ArrayList<>();

        String startTime = "2018-08-01 00:00:00.00";
        String endTime = "2018-08-01 01:00:00.00";

        //This is the meta-data query
        Dataset<Row> exampleDataset = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
                .parameterEq("SPSBQMSPSv1/Acquisition")
                .timeWindow(startTime, endTime).build();

        log.info("What are the fields available?");
        exampleDataset.printSchema();

        log.info("Some timing data here:");
        exampleDataset.select("cyclestamp", "selector", "bunchIntensityMean").show();

        long nrOfRecordsinExampleDataset = exampleDataset.count();
        log.info("Let's see how many records data were submitted during that time: {} ", nrOfRecordsinExampleDataset);
        datasetSizes.add(nrOfRecordsinExampleDataset);

        log.info("Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:");
        exampleDataset.describe("bunchIntensityMean").show();

        exampleDataset.createOrReplaceTempView("myData");
        log.info(
                "Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'");

        spark.sql("select avg(bunchIntensityMean) from myData where selector == 'SPS.USER.LHC1' ").show();

        //TGM data to extract destination
        Dataset<Row> tgmData = ParameterDataQuery.builder(spark).system(SYSTEM_NAME)
                .parameterEq("SPS.TGM/FULL-TELEGRAM.STRC")
                .timeWindow(startTime, endTime).build();
        datasetSizes.add(tgmData.count());

        log.info("What are the fields available?");
        tgmData.printSchema();

        log.info("Printing out data");
        tgmData.select("cyclestamp", "USER", "DEST").show();

        log.info("Join data to show only for a destination == LHC");
        exampleDataset.join(tgmData, "cyclestamp").where("DEST = 'LHC' and bunchIntensityMean is not null")
                .select("cyclestamp", "bunchIntensityMean", "DEST").show();

        return datasetSizes;
    }

    List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
        HierarchyService service = ServiceClientFactory.createHierarchyService();
        List<Long> datasetSizes = new ArrayList<>();

        log.info("Getting hierarchy for {}", hierarchyPath);
        Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
                .orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));

        log.info("Found hierarchy: {}", node.getNodePath());

        String startTime = "2018-06-19 00:00:00.000"; //UTC
        String endTime = "2018-06-19 00:10:00.000"; //UTC
        Set<Variable> variables = service.getVariables(node.getId());
        for (Variable variableData : variables) {
            log.info("Querying for {} variable between {} and {}", variableData.getVariableName(), startTime, endTime);
            Dataset<Row> dataset = DataQuery.builder(spark).variables().system("CMW")
                    .nameEq(variableData.getVariableName())
                    .timeWindow(startTime, endTime).build();

            long datasetSize = dataset.count();
            datasetSizes.add(datasetSize);
            log.info("Got {} rows for {}", datasetSize, variableData.getVariableName());
            dataset.show();
        }

        return datasetSizes;
    }
List<Long> getCmwData(SparkSession spark) {

        List<Long> datasetSizes = new ArrayList<>();

        String startTime = "2018-08-01 00:00:00.00";
        String endTime = "2018-08-01 01:00:00.00";

        //This is the meta-data query
        Dataset<Row> exampleDataset = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME)
                .startTime(startTime).endTime(endTime).entity()
                .parameter("SPSBQMSPSv1/Acquisition").build();

        log.info("What are the fields available?");
        exampleDataset.printSchema();

        log.info("Some timing data here:");
        exampleDataset.select("cyclestamp", "selector", "bunchIntensityMean").show();

        long nrOfRecordsinExampleDataset = exampleDataset.count();
        log.info("Let's see how many records data were submitted during that time: {} ", nrOfRecordsinExampleDataset);
        datasetSizes.add(nrOfRecordsinExampleDataset);

        log.info("Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:");
        exampleDataset.describe("bunchIntensityMean").show();

        exampleDataset.createOrReplaceTempView("myData");
        log.info(
                "Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'");

        spark.sql("select avg(bunchIntensityMean) from myData where selector == 'SPS.USER.LHC1' ").show();

        //TGM data to extract destination
        Dataset<Row> tgmData = DevicePropertyDataQuery.builder(spark).system(SYSTEM_NAME).startTime(startTime)
                .endTime(endTime)
                .entity().parameter("SPS.TGM/FULL-TELEGRAM.STRC").build();
        datasetSizes.add(tgmData.count());

        log.info("What are the fields available?");
        tgmData.printSchema();

        log.info("Printing out data");
        tgmData.select("cyclestamp", "USER", "DEST").show();

        log.info("Join data to show only for a destination == LHC");
        exampleDataset.join(tgmData, "cyclestamp").where("DEST = 'LHC' and bunchIntensityMean is not null")
                .select("cyclestamp", "bunchIntensityMean", "DEST").show();

        return datasetSizes;
    }

    List<Long> getVariableDataForHierarchy(SparkSession spark, String hierarchyPath) {
        HierarchyService service = ServiceClientFactory.createHierarchyService();
        List<Long> datasetSizes = new ArrayList<>();

        log.info("Getting hierarchy for {}", hierarchyPath);
        Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
                .orElseThrow(() -> new IllegalArgumentException("No such hierarchy path " + hierarchyPath));

        log.info("Found hierarchy: {}", node.getNodePath());

        String startTime = "2018-06-19 00:00:00.000"; //UTC
        String endTime = "2018-06-19 00:10:00.000"; //UTC
        Set<Variable> variables = service.getVariables(node.getId());
        for (Variable variableData : variables) {
            log.info("Querying for {} variable between {} and {}", variableData.getVariableName(), startTime, endTime);
            Dataset<Row> dataset = DataQuery.builder(spark).byVariables().system("CMW").startTime(startTime)
                    .endTime(endTime).variable(variableData.getVariableName()).build();

            long datasetSize = dataset.count();
            datasetSizes.add(datasetSize);
            log.info("Got {} rows for {}", datasetSize, variableData.getVariableName());
            dataset.show();
        }

        return datasetSizes;
    }

and add them to the main method:

public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(DataAccessExample.class, args);
        SparkSession spark = context.getBean(SparkSession.class);

        DataAccessExample dataAccessExample = new DataAccessExample();

        log.info("############## Lets get some cmw data ############");
        dataAccessExample.getCmwData(spark);

        log.info("############## Lets get some variable data ############");
        dataAccessExample.getVariableDataForHierarchy(spark, "/EXAMPLE");

        log.info("THE END, Happy Exploring Spark API!");
    }

Add Spring YAML configuration file

Specific Spark session properties needs to be configured in the external file: resources/application.yml

For running Spark locally with as many worker threads as logical cores on the local machine the following settings can be used:

spark:
      masterType: local[*]
      java:
        home: /var/nxcals/jdk1.11
      appName: nxcals_code-snippets
      properties:
        mapreduce:
          map.env.JAVA_HOME: ${spark.java.home}
          reduce.env.JAVA_HOME: ${spark.java.home}
        spark:
          ui.port: 4050
          sql.caseSensitive: true
          executorEnv.JAVA_HOME: ${spark.java.home}
          executor:
            instances: 4
            cores: 1
            memory: 1g
            extraClassPath: /usr/hdp/hadoop/lib/native

For taking advantage of running Spark on YARN cluster in a client mode the file should be slightly modified and extended with YARN specific settings:

nxcals.common.dir: nxcals_test
    spark:
      masterType: yarn
      # masterType: local[*]
      deployMode: client
      java:
        home: /var/nxcals/jdk1.11
      appName: nxcals_code-snippets
      version: 3.1.1
      properties:
        yarn:
          app.mapreduce.am.env.JAVA_HOME: ${spark.java.home}
        mapreduce:
          map.env.JAVA_HOME: ${spark.java.home}
          reduce.env.JAVA_HOME: ${spark.java.home}
        spark:
          submit.deployMode: client
          ui.port: 4050
          sql.caseSensitive: true
          executorEnv.JAVA_HOME: ${spark.java.home}
          executor:
            instances: 4
            cores: 1
            memory: 1g
            extraClassPath: /usr/hdp/hadoop/lib/native
            extraLibraryPath: /usr/lib/hadoop/lib/native
          yarn:
            appMasterEnv.JAVA_HOME: ${spark.java.home}
            jars: hdfs:///project/nxcals/lib/spark-${spark.version}/*.jar,hdfs:///project/nxcals/nxcals_lib/${nxcals.common.dir}/*.jar
            am.extraClassPath: /usr/hdp/hadoop/lib/native
            am.extraLibraryPath: /usr/lib/hadoop/lib/native
            access.hadoopFileSystems: nxcals
            historyServer.address: ithdp1001.cern.ch:18080

Build application

Prepare gradle.properties file with the following content:

nxcalsVersion=1.5.24

gsonVersion=2.9.1
springBootVersion=2.7.16
log4jVersion=2.21.0
fasterxmlJacksonVersion=2.13.5

Important

Make sure that you have the latest production version of NXCALS!

Prepare a "minimal" version of build.gradle file and place it in the main directory of your application.

apply plugin:'application'
mainClassName = "cern.myproject.DataAccessExample"

configurations.all {
    resolutionStrategy {
        force 'com.google.code.gson:gson:' + gsonVersion

        eachDependency { DependencyResolveDetails details ->
            if (details.requested.group.startsWith('com.fasterxml.jackson')) {
                details.useVersion fasterxmlJacksonVersion
            }
        }

   }
// Depending on your gradle version you might have to do those exclusions (prior to gradle 5.x it did not honor the exclusions from POMs).
      exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
      exclude group: "org.slf4j", module: "slf4j-log4j12"
      exclude group: "log4j", module: "log4j"
      exclude group: "log4j", module: "apache-log4j-extras"
      exclude group: "ch.qos.logback", module: "logback-classic"
}

dependencies {

    compile group: 'cern.nxcals', name: 'nxcals-extraction-starter', version: nxcalsVersion
    compile group: 'cern.nxcals', name: 'nxcals-hadoop-pro-config', version: nxcalsVersion


    compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: springBootVersion
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: springBootVersion
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-log4j2', version: springBootVersion

    compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: log4jVersion
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion

    //Required for Yaml in Log4j2
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: fasterxmlJacksonVersion
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: fasterxmlJacksonVersion
}

Important

Please note required dependencies for data extraction:

  • compile group: 'cern.nxcals', name: 'nxcals-extraction-starter'
  • compile group: 'cern.nxcals', name: 'nxcals-hadoop-pro-config'

Spring boot framework dependencies must be added as well since the stack was selected as a possible choice for our demo application (optional for data extraction itself).

Build and run the application:

../gradlew build run
Click to see expected application output...
> Task :extraction-api-examples:run

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-11-21 17:06:15.499 [INFO ] [main] DataAccessExample - Starting DataAccessExample on cs-ccr-dev3.cern.ch with PID 21027 (/opt/psowinsk/nxcals-examples/extraction-api-examples/build/classes/java/main started by psowinsk in /opt/psowinsk/nxcals-examples/extraction-api-examples)
2019-11-21 17:06:15.512 [INFO ] [main] DataAccessExample - No active profile set, falling back to default profiles: default
2019-11-21 17:06:17.090 [WARN ] [main] NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-11-21 17:06:18.787 [INFO ] [main] DataAccessExample - Started DataAccessExample in 3.837 seconds (JVM running for 5.184)
2019-11-21 17:06:18.791 [INFO ] [main] DataAccessExample - ############## Lets get some cmw data ############
2019-11-21 17:06:19.301 [WARN ] [main] URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2019-11-21 17:06:20.957 [WARN ] [main] DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-11-21 17:06:25.511 [INFO ] [main] DataAccessExample - What are the fields available?
root
 |-- __record_timestamp__: long (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- acqStamp: long (nullable = true)
 |-- beamOk: boolean (nullable = true)
 |-- bqmOutputEnable: boolean (nullable = true)
 |-- bunchIntensities: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- bunchIntensityMax: float (nullable = true)
 |-- bunchIntensityMaxThr: float (nullable = true)
 |-- bunchIntensityMean: float (nullable = true)
 |-- bunchIntensityMin: float (nullable = true)
 |-- bunchIntensityMinThr: float (nullable = true)
 |-- bunchIntensityModIndex: float (nullable = true)
 |-- bunchIntensityModIndexOk: boolean (nullable = true)
 |-- bunchIntensityModIndexThr: float (nullable = true)
 |-- bunchIntensityOk: boolean (nullable = true)
 |-- bunchIntensityStd: float (nullable = true)
 |-- bunchLengthMax: float (nullable = true)
 |-- bunchLengthMaxThr: float (nullable = true)
 |-- bunchLengthMean: float (nullable = true)
 |-- bunchLengthMin: float (nullable = true)
 |-- bunchLengthMinThr: float (nullable = true)
 |-- bunchLengthOk: boolean (nullable = true)
 |-- bunchLengthStd: float (nullable = true)
 |-- bunchLengthStdOk: boolean (nullable = true)
 |-- bunchLengthStdThr: float (nullable = true)
 |-- bunchLengths: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- bunchPeakMax: float (nullable = true)
 |-- bunchPeakMaxThr: float (nullable = true)
 |-- bunchPeakMean: float (nullable = true)
 |-- bunchPeakMin: float (nullable = true)
 |-- bunchPeakMinThr: float (nullable = true)
 |-- bunchPeakOk: boolean (nullable = true)
 |-- bunchPeakStd: float (nullable = true)
 |-- bunchPeakStdOk: boolean (nullable = true)
 |-- bunchPeakStdThr: float (nullable = true)
 |-- bunchPeaks: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- bunchPositions: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- class: string (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- device: string (nullable = true)
 |-- diagnostic: integer (nullable = true)
 |-- doubletIntensitySplitOk: boolean (nullable = true)
 |-- doublets: boolean (nullable = true)
 |-- filledBuckets: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- firstBunchPositionOk: boolean (nullable = true)
 |-- isTrailingDoublet: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- nAllowedLongBunches: integer (nullable = true)
 |-- nBunches: integer (nullable = true)
 |-- nBunchesMax: integer (nullable = true)
 |-- nBunchesMaxOk: boolean (nullable = true)
 |-- nBunchesRamp: integer (nullable = true)
 |-- nSatellitesIntensity: integer (nullable = true)
 |-- nSatellitesMidBucket: integer (nullable = true)
 |-- nTooLongBunches: integer (nullable = true)
 |-- nWarningMessages: integer (nullable = true)
 |-- oscillationPercentDeltaMax: float (nullable = true)
 |-- oscillationPercentDeltaMaxThr: float (nullable = true)
 |-- oscillationPercentDeltas: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- oscillationPercentPeakMax: float (nullable = true)
 |-- oscillationPercentPeakMaxThr: float (nullable = true)
 |-- patternOk: boolean (nullable = true)
 |-- property: string (nullable = true)
 |-- satelliteBucketsIntensity: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- satelliteBucketsMid: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- satellitesIntensityThr: float (nullable = true)
 |-- satellitesMidBucketThr: float (nullable = true)
 |-- satellitesOk: boolean (nullable = true)
 |-- selector: string (nullable = true)
 |-- stabilityOk: boolean (nullable = true)
 |-- tgmNLSQCTL: boolean (nullable = true)
 |-- tgmSCNUM: integer (nullable = true)
 |-- verifyPattern: boolean (nullable = true)
 |-- warningMessages: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- wrongBuckets: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- nxcals_entity_id: long (nullable = true)

2019-11-21 17:06:25.518 [INFO ] [main] DataAccessExample - Some timing data here:
2019-11-21 17:06:25.995 [WARN ] [main] Utils - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+-------------------+-----------------+------------------+
|         cyclestamp|         selector|bunchIntensityMean|
+-------------------+-----------------+------------------+
|1533083631735000000|    SPS.USER.LHC1|              null|
|1533084078135000000|SPS.USER.LHCPILOT|         2077.4004|
|1533083185335000000|    SPS.USER.LHC1|         2458.5662|
|1533085008135000000|SPS.USER.LHCPILOT|               0.0|
|1533082999335000000|    SPS.USER.LHC1|         2480.7834|
|1533083148135000000|    SPS.USER.LHC1|              null|
|1533084784935000000|SPS.USER.LHCPILOT|               0.0|
|1533084040935000000|SPS.USER.LHCPILOT|              null|
|1533084338535000000|SPS.USER.LHCPILOT|               0.0|
|1533083668935000000|    SPS.USER.LHC1|         2489.4307|
|1533083966535000000|SPS.USER.LHCPILOT|              null|
|1533083408535000000|    SPS.USER.LHC1|              null|
|1533084115335000000|SPS.USER.LHCPILOT|              null|
|1533083445735000000|    SPS.USER.LHC1|         2529.9453|
|1533083371335000000|    SPS.USER.LHC1|              null|
|1533085082535000000|SPS.USER.LHCPILOT|               0.0|
|1533084822135000000|SPS.USER.LHCPILOT|               0.0|
|1533085156935000000|    SPS.USER.LHC1|         2570.6726|
|1533083259735000000|    SPS.USER.LHC1|          2504.587|
|1533084970935000000|SPS.USER.LHCPILOT|               0.0|
+-------------------+-----------------+------------------+
only showing top 20 rows

2019-11-21 17:06:27.738 [INFO ] [main] DataAccessExample - Let's see how many records data were submitted during that time: 61 
2019-11-21 17:06:27.738 [INFO ] [main] DataAccessExample - Basic statistics about the intensity (bunchIntensityMean), like count, mean, stddev, min, max:
+-------+------------------+
|summary|bunchIntensityMean|
+-------+------------------+
|  count|                30|
|   mean| 1605.936922200521|
| stddev|1162.7277248622167|
|    min|               0.0|
|    max|         2608.4077|
+-------+------------------+

2019-11-21 17:06:28.240 [INFO ] [main] DataAccessExample - Run an SQL statement to calculate average intensity for TOF user: SELECT avg(bunchIntensityMean) FROM myData WHERE selector == 'SPS.USER.LHC1'
+-----------------------+
|avg(bunchIntensityMean)|
+-----------------------+
|     2338.4426432291666|
+-----------------------+

2019-11-21 17:06:29.156 [INFO ] [main] DataAccessExample - What are the fields available?
root
 |-- BEAMID: integer (nullable = true)
 |-- BPNCY: integer (nullable = true)
 |-- BPNM: integer (nullable = true)
 |-- COMLN: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- CYCLE: integer (nullable = true)
 |-- CYTAG: integer (nullable = true)
 |-- DDEST: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DURN: integer (nullable = true)
 |-- FTOP_ENG: integer (nullable = true)
 |-- INJ_ENG: integer (nullable = true)
 |-- LHCSEQE: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- LHCSEQR: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- MISC: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- MISC_A: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- MMODE: string (nullable = true)
 |-- NBINJ: integer (nullable = true)
 |-- NCYTAG: integer (nullable = true)
 |-- NDURN: integer (nullable = true)
 |-- NUSER: string (nullable = true)
 |-- PARTY: string (nullable = true)
 |-- SCNUM: integer (nullable = true)
 |-- SCTAG: integer (nullable = true)
 |-- SPCON: struct (nullable = true)
 |    |-- elements: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |-- USER: string (nullable = true)
 |-- __LSA_CYCLE__: string (nullable = true)
 |-- __record_timestamp__: long (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- acqStamp: long (nullable = true)
 |-- class: string (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- device: string (nullable = true)
 |-- property: string (nullable = true)
 |-- selector: string (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)

2019-11-21 17:06:29.157 [INFO ] [main] DataAccessExample - Printing out data
+-------------------+--------+--------+
|         cyclestamp|    USER|    DEST|
+-------------------+--------+--------+
|1533082468935000000|     MD1|SPS_DUMP|
|1533083874135000000|     MD1|SPS_DUMP|
|1533083296935000000|    LHC1|     LHC|
|1533084882135000000| SFTPRO2| FTARGET|
|1533081925335000000| SFTPRO2| FTARGET|
|1533082054935000000| SFTPRO2| FTARGET|
|1533083702535000000|     MD1|SPS_DUMP|
|1533083946135000000|    ZERO|SPS_DUMP|
|1533084148935000000|     MD1|SPS_DUMP|
|1533084318135000000|    ZERO|SPS_DUMP|
|1533084576135000000|    ZERO|SPS_DUMP|
|1533084912135000000|    ZERO|SPS_DUMP|
|1533081633735000000|     MD1|SPS_DUMP|
|1533082040535000000| SFTPRO2| FTARGET|
|1533082699335000000|     MD1|SPS_DUMP|
|1533082281735000000|     MD1|SPS_DUMP|
|1533082371735000000| SFTPRO2| FTARGET|
|1533082429335000000| SFTPRO2| FTARGET|
|1533082886535000000|     MD1|SPS_DUMP|
|1533083854935000000|LHCPILOT|     LHC|
+-------------------+--------+--------+
only showing top 20 rows

2019-11-21 17:06:29.450 [INFO ] [main] DataAccessExample - Join data to show only for a destination == LHC
+-------------------+------------------+----+
|         cyclestamp|bunchIntensityMean|DEST|
+-------------------+------------------+----+
|1533083296935000000|         2506.4263| LHC|
|1533083854935000000|          2251.466| LHC|
|1533083259735000000|          2504.587| LHC|
|1533085082535000000|               0.0| LHC|
|1533083185335000000|         2458.5662| LHC|
|1533083110935000000|         2608.4077| LHC|
|1533084412935000000|         2210.8672| LHC|
|1533084375735000000|          2328.534| LHC|
|1533085194135000000|               0.0| LHC|
|1533083445735000000|         2529.9453| LHC|
|1533085008135000000|               0.0| LHC|
|1533082999335000000|         2480.7834| LHC|
|1533085156935000000|         2570.6726| LHC|
|1533083817735000000|         2044.6013| LHC|
|1533083222535000000|         2477.3943| LHC|
|1533084822135000000|               0.0| LHC|
|1533083073735000000|         2493.0784| LHC|
|1533085119735000000|         2529.7126| LHC|
|1533084078135000000|         2077.4004| LHC|
|1533084338535000000|               0.0| LHC|
+-------------------+------------------+----+
only showing top 20 rows

2019-11-21 17:06:30.111 [INFO ] [main] DataAccessExample - ############## Lets get some variable data ############
2019-11-21 17:06:30.112 [INFO ] [main] DataAccessExample - Getting hierarchy for /EXAMPLE
2019-11-21 17:06:30.253 [INFO ] [main] DataAccessExample - Found hierarchy: /EXAMPLE
2019-11-21 17:06:30.253 [INFO ] [main] DataAccessExample - Querying for PR.DCAFTINJ_1:INTENSITY variable between 2018-06-19 00:00:00.000 and 2018-06-19 00:10:00.000
2019-11-21 17:06:30.870 [INFO ] [main] DataAccessExample - Got 477 rows for PR.DCAFTINJ_1:INTENSITY
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id|   nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
|         0.0|           55414|1529366833900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366932300000000|PR.DCAFTINJ_1:INT...|
| -0.07446789|           55414|1529366416300000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366583100000000|PR.DCAFTINJ_1:INT...|
|    8.697608|           55414|1529366611900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366615500000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366677900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366980300000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366437900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366907100000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366959900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366878300000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366578300000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366773900000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366526700000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366674300000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366736700000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366859100000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366537500000000|PR.DCAFTINJ_1:INT...|
|         0.0|           55414|1529366728300000000|PR.DCAFTINJ_1:INT...|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows

2019-11-21 17:06:31.205 [INFO ] [main] DataAccessExample - THE END, Happy Exploring Spark API!

Summary

You have successfully retrieved 2 CMW device/property parameters and a hierarchy with attached variables.

See also

Other pages related to the presented example: