Skip to content

Extraction Thin API

This API allows to send scripts or perform specific queries (currently related to Fills data) to the existing Spark Server which are executed on the server side and results are returned to the client.

Important

The purpose of this API is to limit the number of jars (and potential clashes) when using Spark and simplify usage for small queries. It is not meant to be called from SWAN (where one can use provided pySpark) or be used for extensive data analysis acquiring large amount of data.

When working with scripts, results are in the form of Avro bytes that need to be converted to the desired row format. Currently we support converting to:

For the queries related to fills results can be converted to native CERN Extraction API domain objects using provided methods:

  • Fill toFill(FillData fillData)
  • List<Fill> toFillList(List<FillData> fillDataStream)

and back to Thin API domain objects using:

  • FillData toFillData(Fill fill)
  • List<FillData> toFillsData(List<Fill> fills)

The Thin API is currently only supported from Java, in order to use it from Python please resort to JPype.

Important

Please make sure you don't mix together the jars from normal Extraction API together with the Thin client jars. Such a configuration will not work due to jar clashes between those two APIs. In order to use the Thin API please import the nxcals-extraction-api-thin product only.

String script = "DataQuery.builder(sparkSession)" + 
".entities().system('CMW')" +  
".keyValuesEq({'property': 'Acquisition', 'device': 'SPSBQMSPSv1'})"
".timeWindow('2018-08-01 00:00:00.0', '2018-08-01 01:00:00.0').build()";
String script = "DataQuery.builder(sparkSession)" + 
".byEntities().system('CMW')" + 
".startTime('2018-08-01 00:00:00.0').endTime('2018-08-01 01:00:00.0')" + 
".entity().keyValue('property', 'Acquisition').keyValue('device', 'SPSBQMSPSv1').build()";

There are appropriate Query builders provided in the Thin API that will generate some typical basic query strings.

Java 11

This is currently the only API that supports Java11. Please refer to the compatibility page for more information.

Authentication

The API uses RBAC authentication so no Kerberos token is required. On the other hand one must provide a valid RBAC token present in the environment. So some method of RBAC login is required to be executed before the Thin API is used.

One example of the explicit RBAC login is here:

try {
    String user = ""; //obtain the user-name from somewhere
    String password = ""; //obtain the password from somewhere
    AuthenticationClient authenticationClient = AuthenticationClient.create();
    RbaToken token = authenticationClient.loginExplicit(user, password);
    ClientTierTokenHolder.setRbaToken(token);
} catch (AuthenticationException e) {
    throw new IllegalArgumentException("Cannot login", e);
}

For other methods of login please refer to the RBAC documentation on the wikis.

Example of use

Please find below an example of the typical Thin API client calls for extracting some CMW data:

final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";
//Create the service stub
ExtractionServiceGrpc.ExtractionServiceBlockingStub extractionService = ServiceFactory
        .createExtractionService(SPARK_SERVERS_URL);

String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";

//This is the meta-data query script (you can write your own, this is just a helper class).
String script = ParameterDataQuery.builder().system("CMW")
        .parameterEq("SPSBQMSPSv1/Acquisition")
        .timeWindow(startTime, endTime).build();

//Adding some custom operations to the generated script
script += ".select('acqStamp',\"bunchIntensities\")";

//Build a query
AvroQuery avroQuery = AvroQuery.newBuilder()
        .setScript(script)
        .setCompression(AvroQuery.Codec.BZIP2) //optional bytes compression
        .build();

//Query for data (return type is just bytes that need to be converted to desired record type, currently Avro or CMW ImmutableData)
AvroData avroData = extractionService.query(avroQuery);
long nrOfRecordsInExampleDataset = avroData.getRecordCount();
//Convert to Avro GenericRecord
List<GenericRecord> records = Avro.records(avroData);
//Do something with the list of GenericRecord objects...

//Convert to ImmutableData
List<ImmutableData> cmwRecords = Datax.records(avroData);
//Do something with the list of ImmutableData objects...

// Cleanup channel created during service initialization
((ManagedChannel) extractionService.getChannel()).shutdown();
#WORK IN PROGRESS, please use JPype
final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";
//Create the service stub
ExtractionServiceGrpc.ExtractionServiceBlockingStub extractionService = ServiceFactory
        .createExtractionService(SPARK_SERVERS_URL);

String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";

//This is the meta-data query script (you can write your own, this is just a helper class).
String script = DevicePropertyDataQuery.builder().system("CMW")
        .startTime(startTime).endTime(endTime).entity()
        .parameter("SPSBQMSPSv1/Acquisition").build();

//Adding some custom operations to the generated script
script += ".select('acqStamp',\"bunchIntensities\")";

//Build a query
AvroQuery avroQuery = AvroQuery.newBuilder()
        .setScript(script)
        .setCompression(AvroQuery.Codec.BZIP2) //optional bytes compression
        .build();

//Query for data (return type is just bytes that need to be converted to desired record type, currently Avro or CMW ImmutableData)
AvroData avroData = extractionService.query(avroQuery);
long nrOfRecordsInExampleDataset = avroData.getRecordCount();
//Convert to Avro GenericRecord
List<GenericRecord> records = Avro.records(avroData);
//Do something with the list of GenericRecord objects...

//Convert to ImmutableData
List<ImmutableData> cmwRecords = Datax.records(avroData);
//Do something with the list of ImmutableData objects...

// Cleanup channel created during service initialization
((ManagedChannel) extractionService.getChannel()).shutdown();

and an example for extraction of data related to fills:

final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";

//Create the service stub
FillServiceGrpc.FillServiceBlockingStub fillService = ServiceFactory.createFillService(SPARK_SERVERS_URL);

long start = TimeUtils.getNanosFromString("2018-10-02 00:00:00.000000000");
long stop = TimeUtils.getNanosFromString("2018-10-04 00:00:00.00000000");

Iterator<FillData> fillsData = fillService.findFills(FillQueryByWindow.newBuilder()
        .setTimeWindow(TimeWindow.newBuilder().setStartTime(start).setEndTime(stop).build()).build());

fillsData.forEachRemaining(fillData -> printFillInfo(fillData));

// Cleanup channel created during service initialization
((ManagedChannel) fillService.getChannel()).shutdown();
including helper method for printing Fill data:
private static void printFillInfo(FillData fillData) {
    StringBuilder fillInfo = new StringBuilder();
    StringJoiner beamModes = new StringJoiner(",");

    List<BeamMode> beamModesList = fillData.getBeamModesList();
    beamModesList.forEach(b -> beamModes.add(b.getBeamModeValue()));

    fillInfo.append("Fill nr: ").append(String.valueOf(fillData.getNumber()))
            .append(" Beam modes: ").append(beamModes);

    System.out.println(fillInfo.toString());
}

#WORK IN PROGRESS, please use JPype
Click to see expected application output...
Fill nr: 7245 Beam modes: INJPHYS,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,BEAMDUMP,RAMPDOWN
Fill nr: 7246 Beam modes: RAMPDOWN,SETUP,INJPROB,INJPHYS
Fill nr: 7247 Beam modes: INJPHYS,SETUP,INJPROB,SETUP,INJPROB,INJPHYS,INJPROB,INJPHYS
Fill nr: 7248 Beam modes: INJPHYS,SETUP,INJPROB,INJPHYS
Fill nr: 7249 Beam modes: INJPHYS,INJPROB,INJPHYS
Fill nr: 7250 Beam modes: INJPHYS,INJPROB,INJPHYS,BEAMDUMP,CYCLING
Fill nr: 7251 Beam modes: CYCLING,SETUP,INJPROB,INJPHYS
Fill nr: 7252 Beam modes: INJPHYS,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,RAMPDOWN
Fill nr: 7253 Beam modes: RAMPDOWN,SETUP,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,RAMPDOWN
Fill nr: 7254 Beam modes: RAMPDOWN,SETUP,CYCLING
Fill nr: 7255 Beam modes: CYCLING,SETUP,INJPROB,INJPHYS,INJPROB,INJPHYS

For the Spark Server addresses please use:

  • PRO: "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000"
  • TESTBED: "nxcals-spark-thin-api-testbed-lb:14500,cs-ccr-nxcalstbs2.cern.ch:15000,cs-ccr-nxcalstbs3.cern.ch:15000,cs-ccr-nxcalstbs4.cern.ch:15000"

A full working example of the Thin API usage can be seen in our Java examples project

Java doc can be found here.