Skip to content

This step-by-step-guide aims to walk you through the process of creating application demonstrating usage of NXCALS Data Extraction Thin API.

What you will build

An application which extracts sample data from two CMW device/property parameters for a given time range from NXCALS PRO environment. As well it will demonstrate some very basic Spark operations on the retrieved DataFrames including join operation. It will be followed by extraction of a variable hierarchy with attached variables.

Create application

Create main class of the application:

public class ThinDataAccessExample {

        static {
            System.setProperty("logging.config", "classpath:log4j2.yml");
            System.setProperty("service.url",
                    "https://cs-ccr-nxcals6.cern.ch:19093,https://cs-ccr-nxcals7.cern.ch:19093,https://cs-ccr-nxcals8.cern.ch:19093");

        }

        private static final Logger log = LoggerFactory.getLogger(ThinDataAccessExample.class);
        private static final String SYSTEM_NAME = "CMW";
        //NXCALS PRO, Please use all or subset of Spark Servers addresses
        private static String SPARK_SERVERS_URL = "cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";


        public static void main(String[] args) {

            ThinDataAccessExample thinDataAccessExample = new ThinDataAccessExample();

        }
    }

Add to the main class methods for logging:

public static void login() {

        //Login with RBAC, must set user.password or give it explicitly here, don't forget to remove before commit!
        login(System.getProperty("user.name"), System.getProperty("user.password"));

    }

    private static void login(String user, String password) {

        try {
            AuthenticationClient authenticationClient = AuthenticationClient.create();
            //Don't commit your password here!!!
            RbaToken token = authenticationClient.loginExplicit(user, password);
            ClientTierTokenHolder.setRbaToken(token);
        } catch (AuthenticationException e) {
            throw new IllegalArgumentException("Cannot login", e);
        }

    }

Add method for getting data for a specific CMW signal, for data retrieval via variables located in a given hierarchy and for retrieving data related to fills:

List<ImmutableData> getCmwData() {

        //Create the service stub
        ExtractionServiceBlockingStub extractionService = ServiceFactory.createExtractionService(SPARK_SERVERS_URL);


        String startTime = "2018-08-01 00:00:00.00";
        String endTime = "2018-08-01 01:00:00.00";

        //This is the meta-data query script (you can write your own, this is just a helper class).
        String script = DevicePropertyDataQuery.builder().system(SYSTEM_NAME)
                .startTime(startTime).endTime(endTime).entity()
                .parameter("SPSBQMSPSv1/Acquisition").build();

        log.info("Generated Script: {}", script);


        script += ".select('acqStamp',\"bunchIntensities\")";
        log.info("You can change the script or create your own, both quote types work (\", '): {}", script);

        //Build a query
        AvroQuery avroQuery = AvroQuery.newBuilder()
                .setScript(script)
                .setCompression(AvroQuery.Codec.BZIP2) //optional bytes compression
                .build();

        //Query for data (return type is just bytes that need to be converted to desired record type, currently Avro or CMW ImmutableData)
        AvroData avroData = extractionService.query(avroQuery);

        log.info("What are the fields available?");
        log.info("Avro Schema: {}",avroData.getAvroSchema());


        long nrOfRecordsinExampleDataset = avroData.getRecordCount();
        log.info("Number of records: {} ", nrOfRecordsinExampleDataset);

        log.info("Converting data bytes into Avro Generic Records");
        List<GenericRecord> records = Avro.records(avroData);

        GenericRecord record = records.get(0);

        log.info("Record content: {}", record);

        log.info("Accessing a record field {}", record.get("bunchLengths") );


        log.info("Converting data into CMW ImmutableData objects");
        List<ImmutableData> cmwRecords = Datax.records(avroData);
        log.info("Immutable record: {}", cmwRecords.get(0));
        return cmwRecords;
    }

    List<Long> getVariableDataForHierarchy( String hierarchyPath) {
        //Create the service stub
        ExtractionServiceBlockingStub extractionService = ServiceFactory.createExtractionService(SPARK_SERVERS_URL);

        //Create the meta-data service stub
        HierarchyService service = ServiceClientFactory.createHierarchyService();
        List<Long> datasetSizes = new ArrayList<>();

        log.info("Getting hierarchy for {}", hierarchyPath);
        Hierarchy node = service.findOne(Hierarchies.suchThat().path().eq(hierarchyPath))
                .orElseThrow(()->new IllegalArgumentException("No such hierarchy path " + hierarchyPath));

        log.info("Found hierarchy: {}", node.getNodePath());

        String startTime = "2018-06-19 00:00:00.000"; //UTC
        String endTime = "2018-06-19 00:10:00.000"; //UTC

        Set<Variable> variables = service.getVariables(node.getId());
        for (Variable variableData: variables) {
            String script = DataQuery.builder().byVariables().system("CMW").startTime(startTime)
                    .endTime(endTime).variable(variableData.getVariableName()).build();

            log.info("Querying for {} variable between {} and {}, script: {}", variableData.getVariableName(), startTime, endTime, script);

            List<GenericRecord> records = Avro
                    .records(extractionService.query(AvroQuery.newBuilder().setScript(script).build()));

            long datasetSize = records.size();
            datasetSizes.add(datasetSize);
            log.info("Got {} rows for {}", datasetSize, variableData.getVariableName());
            records.subList(0,10).forEach(System.out::println);
        }

        return datasetSizes;
    }

    void getFillData() {

        //Create the service stub
        FillServiceGrpc.FillServiceBlockingStub fillService = ServiceFactory.createFillService(SPARK_SERVERS_URL);

        int fillNumber = 7252;

        log.info("Quering for fill nr {}", fillNumber);

        // Query for data
        FillData fillData = fillService.findFill(FillQueryByNumber.newBuilder().setFillNr(fillNumber).build());

        TimeWindow timeWindow = fillData.getValidity();

        StringJoiner beamJoiner = new StringJoiner(", ");
        fillData.getBeamModesList().forEach(b -> beamJoiner.add(b.getBeamModeValue()));

        log.info("Extracted fill {} starting at {} and finishing at {} having the following beam modes: {}",
                fillData.getNumber(),
                TimeUtils.getInstantFromNanos(timeWindow.getStartTime()),
                TimeUtils.getInstantFromNanos(timeWindow.getEndTime()),
                beamJoiner.toString());

        log.info("Quering for fills in time range");

        long start = TimeUtils.getNanosFromString("2018-10-02 00:00:00.378000000");
        long stop = TimeUtils.getNanosFromString("2018-10-10 00:00:00.56000000");

        Iterator<FillData> fillsData = fillService.findFills(FillQueryByWindow.newBuilder()
                .setTimeWindow(TimeWindow.newBuilder().setStartTime(start).setEndTime(stop).build()).build());

        // Optionally convert to list
        List<FillData> fillsDataList = new ArrayList<>();
        fillsData.forEachRemaining(fillsDataList::add);

        StringJoiner fillJoiner = new StringJoiner(",");
        fillsDataList.forEach(f -> fillJoiner.add(String.valueOf(f.getNumber())));

        log.info("Extracted {} fills with the time range: {}", fillsDataList.size(), fillJoiner.toString());


        log.info("Quering for the last completed fill");
        FillData lastCompleted = fillService.getLastCompleted(Empty.newBuilder().build());

        TimeWindow lastCompletedWindow = fillData.getValidity();

        log.info("Last completed fill: {}, started at {} and finished at {}",
                lastCompleted.getNumber(),
                TimeUtils.getInstantFromNanos(lastCompletedWindow.getStartTime()),
                TimeUtils.getInstantFromNanos(lastCompletedWindow.getEndTime())
        );

        // Convert from Thin API Extraction FillData object to standard extraction API Fill object
        Fill fill = FillConverter.toFill(lastCompleted);

        log.info("Fill number originating from Fill object:  {}", fill.getNumber());

        // Cleanup channel created during service initialization
        ((ManagedChannel)fillService.getChannel()).shutdown();
    }

Add newly created methods to the main method:

public static void main(String[] args) {

        login();

        ThinDataAccessExample thinDataAccessExample = new ThinDataAccessExample();

        log.info("############## Lets get some cmw data ############");
        thinDataAccessExample.getCmwData();

        log.info("############## Lets get some variable data ############");
        thinDataAccessExample.getVariableDataForHierarchy("/EXAMPLE");

        log.info("############## Lets get some fill data ############");
        thinDataAccessExample.getFillData();

        log.info("THE END, Happy Exploring Spark Thin API!");
    }

Build application

Prepare gradle.properties file with the following content:

nxcalsVersion=1.3.10

cmwDataxVersion=1.5.2
log4jVersion=2.17.2
fasterxmlJacksonVersion=2.13.4

Important

Make sure that you have the latest production version of NXCALS!

Prepare a "minimal" version of build.gradle file and place it in the main directory of your application.

apply plugin:'application'
mainClassName = "cern.myproject.ThinDataAccessExample"

configurations.all {
    resolutionStrategy {
        force 'com.google.code.gson:gson:' + gsonVersion

        eachDependency { DependencyResolveDetails details ->
            if (details.requested.group.startsWith('com.fasterxml.jackson')) {
                details.useVersion fasterxmlJacksonVersion
            }
        }

   }
// Depending on your gradle version you might have to do those exclusions (prior to gradle 5.x it did not honor the exclusions from POMs).
      exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
      exclude group: "org.slf4j", module: "slf4j-log4j12"
      exclude group: "log4j", module: "log4j"
      exclude group: "log4j", module: "apache-log4j-extras"
      exclude group: "ch.qos.logback", module: "logback-classic"
}

dependencies {

    compile group: 'cern.nxcals', name: 'nxcals-extraction-api-thin', version: nxcalsVersion
    compile group: 'cern.nxcals', name: 'nxcals-metadata-api', version: nxcalsVersion

    //in case you want to convert from Avro to ImmutableData
    compile group: 'cern.cmw', name: "cmw-datax", version: cmwDataxVersion


    compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: log4jVersion
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion

    //Required for Yaml in Log4j2
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: fasterxmlJacksonVersion
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: fasterxmlJacksonVersion

//    compileOnly group: 'org.projectlombok', name: 'lombok', version: lombokVersion

    testCompile group: 'junit', name: 'junit', version: junitVersion

}

run {
    println "Setting properties"
    println System.getProperty('user.name')
    systemProperties['user.name'] = System.getProperty('user.name')
    systemProperties['user.password'] = System.getProperty('user.password')
    if(System.getProperty('user.password') == null) {
        println "Please set user.name and user.password properties with -Duser.password=<pass> when running those tests with gradle. They require RBAC login to work!!!"
    }
}

Build and run the application:

../gradlew build run -Duser.name="your.username" -Duser.password="your.password"
Click to see expected application output...
> Task :extraction-api-thin-examples:run
2021-08-13 17:39:40.371 [INFO ] [main] ThinDataAccessExample - ############## Lets get some cmw data ############
2021-08-13 17:39:40.841 [INFO ] [main] ThinDataAccessExample - Generated Script: DataQuery.builder(sparkSession)
.byEntities().system("CMW")
        .startTime("2018-08-01 00:00:00.0").endTime("2018-08-01 01:00:00.0")
        .entity().keyValue("property", "Acquisition").keyValue("device", "SPSBQMSPSv1").build()
2021-08-13 17:39:40.841 [INFO ] [main] ThinDataAccessExample - You can change the script or create your own, both quote types work (", '): DataQuery.builder(sparkSession)
.byEntities().system("CMW")
        .startTime("2018-08-01 00:00:00.0").endTime("2018-08-01 01:00:00.0")
        .entity().keyValue("property", "Acquisition").keyValue("device", "SPSBQMSPSv1").build().select('acqStamp',"bunchIntensities")
2021-08-13 17:39:42.728 [INFO ] [main] ThinDataAccessExample - What are the fields available?
2021-08-13 17:39:42.729 [INFO ] [main] ThinDataAccessExample - Avro Schema: {"type":"record","name":"nxcals_record","namespace":"nxcals","fields":[{"name":"acqStamp","type":["long","null"]},{"name":"bunchIntensities","type":[{"type":"record","name":"bunchIntensities","namespace":"nxcals.nxcals_record","fields":[{"name":"elements","type":[{"type":"array","items":["double","null"]},"null"]},{"name":"dimensions","type":[{"type":"array","items":["int","null"]},"null"]}]},"null"]}]}
2021-08-13 17:39:42.729 [INFO ] [main] ThinDataAccessExample - Number of records: 61 
2021-08-13 17:39:42.730 [INFO ] [main] ThinDataAccessExample - Converting data bytes into Avro Generic Records
2021-08-13 17:39:42.929 [INFO ] [main] ThinDataAccessExample - Record content: {"acqStamp": 1533084040935000000, "bunchIntensities": {"elements": [2079.5322], "dimensions": [1]}}
2021-08-13 17:39:42.929 [INFO ] [main] ThinDataAccessExample - Accessing a record field null
2021-08-13 17:39:42.929 [INFO ] [main] ThinDataAccessExample - Converting data into CMW ImmutableData objects
2021-08-13 17:39:42.993 [INFO ] [main] ThinDataAccessExample - Immutable record: Name: acqStamp
Type: Long
Value: 1533084040935000000

Name: bunchIntensities
Type: double[]Dims: 1
Value: 
{2079.5322}
2021-08-13 17:39:42.994 [INFO ] [main] ThinDataAccessExample - ############## Lets get some variable data ############
2021-08-13 17:39:43.071 [WARN ] [main] URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2021-08-13 17:39:43.291 [INFO ] [main] ThinDataAccessExample - Getting hierarchy for /EXAMPLE
2021-08-13 17:39:44.346 [INFO ] [main] ThinDataAccessExample - Found hierarchy: /EXAMPLE
2021-08-13 17:39:44.810 [INFO ] [main] ThinDataAccessExample - THE END, Happy Exploring Spark Thin API!

Summary

You have successfully retrieved a CMW device/property parameter and data from variables attached to a hierarchy.

See also

Other pages related to the presented example: