Extraction Thin API
This API allows to send scripts or perform specific queries (currently related to Fills data) to the existing Spark Server which are executed on the server side and results are returned to the client.
Important
The purpose of this API is to limit the number of jars (and potential clashes) when using Spark and simplify usage for small queries. It is not meant to be called from SWAN (where one can use provided pySpark) or be used for extensive data analysis acquiring large amount of data.
When working with scripts, results are in the form of Avro bytes that need to be converted to the desired row format. Currently we support converting to:
- GenericRecord from Avro
- ImmutableData from CMW Datax
For the queries related to fills results can be converted to native CERN Extraction API domain objects using provided methods:
Fill toFill(FillData fillData)
List<Fill> toFillList(List<FillData> fillDataStream)
and back to Thin API domain objects using:
FillData toFillData(Fill fill)
List<FillData> toFillsData(List<Fill> fills)
The Thin API is currently only supported from Java, in order to use it from Python please resort to JPype.
Important
Please make sure you don't mix together the jars from normal Extraction API together with the Thin client jars. Such a configuration will not work due to jar clashes between those two APIs. In order to use the Thin API please import the nxcals-extraction-api-thin product only.
String script = "DataQuery.builder(sparkSession)" +
".entities().system('CMW')" +
".keyValuesEq({'property': 'Acquisition', 'device': 'SPSBQMSPSv1'})"
".timeWindow('2018-08-01 00:00:00.0', '2018-08-01 01:00:00.0').build()";
String script = "DataQuery.builder(sparkSession)" +
".byEntities().system('CMW')" +
".startTime('2018-08-01 00:00:00.0').endTime('2018-08-01 01:00:00.0')" +
".entity().keyValue('property', 'Acquisition').keyValue('device', 'SPSBQMSPSv1').build()";
There are appropriate Query builders provided in the Thin API that will generate some typical basic query strings.
Java 11
This is currently the only API that supports Java11. Please refer to the compatibility page for more information.
Authentication
The API uses RBAC authentication so no Kerberos token is required. On the other hand one must provide a valid RBAC token present in the environment. So some method of RBAC login is required to be executed before the Thin API is used.
One example of the explicit RBAC login is here:
try {
String user = ""; //obtain the user-name from somewhere
String password = ""; //obtain the password from somewhere
AuthenticationClient authenticationClient = AuthenticationClient.create();
RbaToken token = authenticationClient.loginExplicit(user, password);
ClientTierTokenHolder.setRbaToken(token);
} catch (AuthenticationException e) {
throw new IllegalArgumentException("Cannot login", e);
}
For other methods of login please refer to the RBAC documentation on the wikis.
Example of use
Please find below an example of the typical Thin API client calls for extracting some CMW data:
final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";
//Create the service stub
ExtractionServiceGrpc.ExtractionServiceBlockingStub extractionService = ServiceFactory
.createExtractionService(SPARK_SERVERS_URL);
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
//This is the meta-data query script (you can write your own, this is just a helper class).
String script = ParameterDataQuery.builder().system("CMW")
.parameterEq("SPSBQMSPSv1/Acquisition")
.timeWindow(startTime, endTime).build();
//Adding some custom operations to the generated script
script += ".select('acqStamp',\"bunchIntensities\")";
//Build a query
AvroQuery avroQuery = AvroQuery.newBuilder()
.setScript(script)
.setCompression(AvroQuery.Codec.BZIP2) //optional bytes compression
.build();
//Query for data (return type is just bytes that need to be converted to desired record type, currently Avro or CMW ImmutableData)
AvroData avroData = extractionService.query(avroQuery);
long nrOfRecordsInExampleDataset = avroData.getRecordCount();
//Convert to Avro GenericRecord
List<GenericRecord> records = Avro.records(avroData);
//Do something with the list of GenericRecord objects...
//Convert to ImmutableData
List<ImmutableData> cmwRecords = Datax.records(avroData);
//Do something with the list of ImmutableData objects...
// Cleanup channel created during service initialization
((ManagedChannel) extractionService.getChannel()).shutdown();
#WORK IN PROGRESS, please use JPype
final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";
//Create the service stub
ExtractionServiceGrpc.ExtractionServiceBlockingStub extractionService = ServiceFactory
.createExtractionService(SPARK_SERVERS_URL);
String startTime = "2018-08-01 00:00:00.00";
String endTime = "2018-08-01 01:00:00.00";
//This is the meta-data query script (you can write your own, this is just a helper class).
String script = DevicePropertyDataQuery.builder().system("CMW")
.startTime(startTime).endTime(endTime).entity()
.parameter("SPSBQMSPSv1/Acquisition").build();
//Adding some custom operations to the generated script
script += ".select('acqStamp',\"bunchIntensities\")";
//Build a query
AvroQuery avroQuery = AvroQuery.newBuilder()
.setScript(script)
.setCompression(AvroQuery.Codec.BZIP2) //optional bytes compression
.build();
//Query for data (return type is just bytes that need to be converted to desired record type, currently Avro or CMW ImmutableData)
AvroData avroData = extractionService.query(avroQuery);
long nrOfRecordsInExampleDataset = avroData.getRecordCount();
//Convert to Avro GenericRecord
List<GenericRecord> records = Avro.records(avroData);
//Do something with the list of GenericRecord objects...
//Convert to ImmutableData
List<ImmutableData> cmwRecords = Datax.records(avroData);
//Do something with the list of ImmutableData objects...
// Cleanup channel created during service initialization
((ManagedChannel) extractionService.getChannel()).shutdown();
and an example for extraction of data related to fills:
final String SPARK_SERVERS_URL = "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000";
//Create the service stub
FillServiceGrpc.FillServiceBlockingStub fillService = ServiceFactory.createFillService(SPARK_SERVERS_URL);
long start = TimeUtils.getNanosFromString("2018-10-02 00:00:00.000000000");
long stop = TimeUtils.getNanosFromString("2018-10-04 00:00:00.00000000");
Iterator<FillData> fillsData = fillService.findFills(FillQueryByWindow.newBuilder()
.setTimeWindow(TimeWindow.newBuilder().setStartTime(start).setEndTime(stop).build()).build());
fillsData.forEachRemaining(fillData -> printFillInfo(fillData));
// Cleanup channel created during service initialization
((ManagedChannel) fillService.getChannel()).shutdown();
private static void printFillInfo(FillData fillData) {
StringBuilder fillInfo = new StringBuilder();
StringJoiner beamModes = new StringJoiner(",");
List<BeamMode> beamModesList = fillData.getBeamModesList();
beamModesList.forEach(b -> beamModes.add(b.getBeamModeValue()));
fillInfo.append("Fill nr: ").append(String.valueOf(fillData.getNumber()))
.append(" Beam modes: ").append(beamModes);
System.out.println(fillInfo.toString());
}
#WORK IN PROGRESS, please use JPype
Click to see expected application output...
Fill nr: 7245 Beam modes: INJPHYS,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,BEAMDUMP,RAMPDOWN
Fill nr: 7246 Beam modes: RAMPDOWN,SETUP,INJPROB,INJPHYS
Fill nr: 7247 Beam modes: INJPHYS,SETUP,INJPROB,SETUP,INJPROB,INJPHYS,INJPROB,INJPHYS
Fill nr: 7248 Beam modes: INJPHYS,SETUP,INJPROB,INJPHYS
Fill nr: 7249 Beam modes: INJPHYS,INJPROB,INJPHYS
Fill nr: 7250 Beam modes: INJPHYS,INJPROB,INJPHYS,BEAMDUMP,CYCLING
Fill nr: 7251 Beam modes: CYCLING,SETUP,INJPROB,INJPHYS
Fill nr: 7252 Beam modes: INJPHYS,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,RAMPDOWN
Fill nr: 7253 Beam modes: RAMPDOWN,SETUP,INJPROB,INJPHYS,PRERAMP,RAMP,FLATTOP,SQUEEZE,ADJUST,STABLE,BEAMDUMP,RAMPDOWN
Fill nr: 7254 Beam modes: RAMPDOWN,SETUP,CYCLING
Fill nr: 7255 Beam modes: CYCLING,SETUP,INJPROB,INJPHYS,INJPROB,INJPHYS
For the Spark Server addresses please use:
- PRO: "nxcals-spark-thin-api-lb:14500,cs-ccr-nxcals5.cern.ch:15000,cs-ccr-nxcals6.cern.ch:15000,cs-ccr-nxcals7.cern.ch:15000,cs-ccr-nxcals8.cern.ch:15000"
- TESTBED: "nxcals-spark-thin-api-testbed-lb:14500,cs-ccr-nxcalstbs2.cern.ch:15000,cs-ccr-nxcalstbs3.cern.ch:15000,cs-ccr-nxcalstbs4.cern.ch:15000"
A full working example of the Thin API usage can be seen in our Java examples project
Java doc can be found here.