CERN Extraction API
At the moment CERN Extraction API contains the following services:
Fill Service
Service responsible for retrieving data related to LHC fills.
Available methods
Optional<Fill> findFill(int number)
List<Fill> findFills(Instant startTime, Instant endTime)
Examples
Find a fill using its number
First step consist of obtaining instance of FillService using Spark session:
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("MY_APP");
SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
FillService fillService = Services.newInstance(session).fillService();
once the session is established one can perform data retrieval
Fill fill = fillService.findFill(3000)
.orElseThrow(() -> new IllegalArgumentException("No such a fill"));
printFillInfo(fill);
and visualise using the code snippet:
private static void printFillInfo(Fill fill) {
StringBuilder outputMsg = new StringBuilder();
StringJoiner joiner = new StringJoiner("\n");
outputMsg.append("Fill: ").append(fill.getNumber());
outputMsg.append(", Validity: ").append(validityToString(fill.getValidity())).append("\n");
List<BeamMode> beamModes = fill.getBeamModes();
beamModes.forEach(b -> outputMsg.append(addBeamModeInfo(b)));
System.out.println(outputMsg);
}
private static String addBeamModeInfo(BeamMode beamMode) {
return "\tBeam mode name: " + beamMode.getBeamModeValue() + ", Validity: " + validityToString(beamMode.getValidity()) + "\n";
}
private static String validityToString(TimeWindow validity) {
return validity.getStartTime() + " " + validity.getEndTime();
}
Click to see expected application output...
Fill: 3000, Validity: 2012-08-24T19:57:16.447Z 2012-08-25T00:06:59.143Z
Beam mode name: INJPHYS, Validity: 2012-08-24T19:57:16.447Z 2012-08-24T19:59:48.239Z
Beam mode name: INJPROB, Validity: 2012-08-24T19:59:48.239Z 2012-08-24T20:11:06.159Z
Beam mode name: INJPHYS, Validity: 2012-08-24T20:11:06.159Z 2012-08-24T20:41:19.970Z
Beam mode name: PRERAMP, Validity: 2012-08-24T20:41:19.970Z 2012-08-24T20:45:43.583Z
Beam mode name: RAMP, Validity: 2012-08-24T20:45:43.583Z 2012-08-24T20:59:00.208Z
Beam mode name: FLATTOP, Validity: 2012-08-24T20:59:00.208Z 2012-08-24T21:09:26.405Z
Beam mode name: SQUEEZE, Validity: 2012-08-24T21:09:26.405Z 2012-08-24T21:27:18.544Z
Beam mode name: ADJUST, Validity: 2012-08-24T21:27:18.544Z 2012-08-24T21:35:51.550Z
Beam mode name: STABLE, Validity: 2012-08-24T21:35:51.550Z 2012-08-25T00:00:44.951Z
Beam mode name: BEAMDUMP, Validity: 2012-08-25T00:00:44.951Z 2012-08-25T00:06:53.084Z
Beam mode name: RAMPDOWN, Validity: 2012-08-25T00:06:53.084Z 2012-08-25T00:06:59.143Z
Find a list of fills within a time range
List<Fill> fills = fillService.findFills(
TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000")
);
fills.forEach(FillServiceSnippets::printFillInfo);
Click to see expected application output...
Fill: 6611, Validity: 2018-04-24T16:57:54.689Z 2018-04-25T07:15:18.817Z
Beam mode name: CYCLING, Validity: 2018-04-24T16:57:54.689Z 2018-04-24T17:16:51.466Z
Beam mode name: SETUP, Validity: 2018-04-24T17:16:51.466Z 2018-04-24T18:56:27.642Z
Beam mode name: INJPROB, Validity: 2018-04-24T18:56:27.642Z 2018-04-24T19:13:20.554Z
Beam mode name: INJPHYS, Validity: 2018-04-24T19:13:20.554Z 2018-04-24T19:52:14.632Z
Beam mode name: PRERAMP, Validity: 2018-04-24T19:52:14.632Z 2018-04-24T19:54:30.026Z
Beam mode name: RAMP, Validity: 2018-04-24T19:54:30.026Z 2018-04-24T20:14:44.451Z
Beam mode name: FLATTOP, Validity: 2018-04-24T20:14:44.451Z 2018-04-24T20:17:40.211Z
Beam mode name: SQUEEZE, Validity: 2018-04-24T20:17:40.211Z 2018-04-24T20:28:38.170Z
Beam mode name: ADJUST, Validity: 2018-04-24T20:28:38.170Z 2018-04-24T20:35:27.441Z
Beam mode name: STABLE, Validity: 2018-04-24T20:35:27.441Z 2018-04-25T06:00:49.975Z
Beam mode name: ADJUST, Validity: 2018-04-25T06:00:49.975Z 2018-04-25T07:09:16.574Z
Beam mode name: BEAMDUMP, Validity: 2018-04-25T07:09:16.574Z 2018-04-25T07:10:50.770Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:10:50.770Z 2018-04-25T07:15:18.817Z
Fill: 6612, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T20:58:43.338Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T07:47:43.690Z
Beam mode name: SETUP, Validity: 2018-04-25T07:47:43.690Z 2018-04-25T08:24:19.236Z
Beam mode name: INJPROB, Validity: 2018-04-25T08:24:19.236Z 2018-04-25T08:55:44.730Z
Beam mode name: SETUP, Validity: 2018-04-25T08:55:44.730Z 2018-04-25T09:00:50.865Z
Beam mode name: SETUP, Validity: 2018-04-25T09:00:50.865Z 2018-04-25T09:06:37.199Z
Beam mode name: INJPROB, Validity: 2018-04-25T09:06:37.199Z 2018-04-25T10:17:40.710Z
Beam mode name: INJPHYS, Validity: 2018-04-25T10:17:40.710Z 2018-04-25T11:24:47.827Z
Beam mode name: PRERAMP, Validity: 2018-04-25T11:24:47.827Z 2018-04-25T11:29:46.379Z
Beam mode name: RAMP, Validity: 2018-04-25T11:29:46.379Z 2018-04-25T11:50:12.375Z
Beam mode name: FLATTOP, Validity: 2018-04-25T11:50:12.375Z 2018-04-25T11:57:10.353Z
Beam mode name: SQUEEZE, Validity: 2018-04-25T11:57:10.353Z 2018-04-25T12:08:05.002Z
Beam mode name: ADJUST, Validity: 2018-04-25T12:08:05.002Z 2018-04-25T12:16:59.279Z
Beam mode name: STABLE, Validity: 2018-04-25T12:16:59.279Z 2018-04-25T20:53:16.814Z
Beam mode name: BEAMDUMP, Validity: 2018-04-25T20:53:16.814Z 2018-04-25T20:54:39.637Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T20:54:39.637Z 2018-04-25T20:58:43.338Z
...
Aggregation Service
Service responsible for retrieving aggregated data.
Available methods
Dataset<Row> getData(Variable variable, WindowAggregationProperties properties);
Dataset<Row> getData(Entity entity, WindowAggregationProperties properties);
Dataset<Row> getData(Variable variable, DatasetAggregationProperties properties);
Dataset<Row> getData(Entity entity, DatasetAggregationProperties properties);
More info can be found in Javadoc for AggregationService.
Available functions
A full list of available aggregation functions can be found in Javadoc for aggregation functions
Getting an instance of the service
We can achieve that by using the cern.nxcals.api.custom.service.Services
gateway, configured by
cern.nxcals.api.config.SparkProperties
utility, that's designed to simplify the creation of Spark session:
AggregationService aggregationService = Services.newInstance(SparkProperties.defaults("MY_APP"))
.aggregationService();
Once we have an instance of the service, we can perform data aggregations as on the following examples.
Examples
Get average data for variable scaled on given intervals
Variable myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000");
WindowAggregationProperties aggregationProperties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(8, ChronoUnit.HOURS)
.function((AggregationFunctions.AVG)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, aggregationProperties);
dataset.show();
As we can see in the above example, the aggregation process is controlled by the parameters provided to WindowAggregationProperties
builder. In this common case, we specify time window for our variable data search and parameters to configure the aggregation interval (8 hours).
Finally, we specify the function (average) that would be applied on the data grouped inside each interval.
Click to see expected application output...
+-------------------+------------------+
| timestamp| value|
+-------------------+------------------+
|1587772800000000000|10.502899951667473|
|1587801600000000000|10.497009244154432|
|1587830400000000000|10.501449975833737|
|1587859200000000000|10.498791540785499|
|1587888000000000000|10.5 |
|1587916800000000000|10.501661531025317|
+-------------------+------------------+
Get max data for entity scaled on given intervals
Window aggregations for entities are very similar to the variable ones. Though, as entity query results to a dataframe
that contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action.
Therefore, we need to provide the target field name directly via the aggregationField()
method of WindowAggregationProperties
builder.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000");
WindowAggregationProperties aggregationProperties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(1, ChronoUnit.DAYS)
.function((AggregationFunctions.MAX))
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, aggregationProperties);
dataset.show();
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1586476800000000000| 20|
|1586563200000000000| 20|
|1586649600000000000| 20|
|1586736000000000000| 20|
|1586822400000000000| 7|
+-------------------+-----+
Get repeated data for variable scaled on given intervals
Variable myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000");
WindowAggregationProperties aggregationProperties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(20, ChronoUnit.SECONDS)
.function((AggregationFunctions.REPEAT)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, aggregationProperties);
dataset.show();
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1587772800000000000| 8|
|1587772820000000000| 20|
|1587772840000000000| 10|
|1587772860000000000| 2|
|1587772880000000000| 12|
|1587772900000000000| 6|
|1587772920000000000| 16|
|1587772940000000000| 9|
|1587772960000000000| 20|
|1587772980000000000| 11|
|1587773000000000000| 3|
|1587773020000000000| 12|
|1587773040000000000| 7|
|1587773060000000000| 17|
|1587773080000000000| 9|
+-------------------+-----+
Get interpolated data for entity scaled on given intervals
The following example demonstrates how we can interpolate data on generated intervals.
Interpolation
is a special function as it requires data to be present prior and after the calculated data point.
In order to achieve that, this function is configured to perform data lookup beyond the specified time window.
The default lookup configuration is extending the time window range, up to 1 week, both sides.
If the default values is not enough for the given use-case you can always extend it,
by specifying the custom expand parameters via expandTimeWindowBy
method.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000");
WindowAggregationProperties aggregationProperties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(2, ChronoUnit.MONTHS)
.function((AggregationFunctions.INTERPOLATE.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS)))
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, aggregationProperties);
dataset.show();
Click to see expected application output...
+-------------------+------------------+
| timestamp| value|
+-------------------+------------------+
|1581292800000000000| 4.356387673842736|
|1586552292000000000| 7.472222222222222|
|1591811784000000000|17.708333333333332|
+-------------------+------------------+
Important
REPEAT
and INTERPOLATE
functions perform default data lookup beyond the ranges of the provided time window. Repeat
performs lookup before start and interpolate follows that strategy but also does the same for after end. The default
lookup on both sides is set to one (1) week. It is configurable by the expandTimeWindowBy
method.
Get data for variable aligned on provided dataset
Variable drivingVariable = variableService.findOne(Variables.suchThat().variableName().eq("HX:FILLN"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000");
Dataset<Row> drivingDataset = DataQuery
.getFor(sparkSession, TimeWindow.between(startTime, endTime), drivingVariable);
Variable myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
DatasetAggregationProperties aggregationProperties = DatasetAggregationProperties.builder()
.drivingDataset(drivingDataset).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, aggregationProperties);
dataset.show();
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1457613630355000000| 20|
|1458524879726000000| 20|
|1458932691946000000| 21|
|1458984392272000000| 14|
|1458996045140000000| 12|
|1459062900019000000| 16|
|1459114836147000000| 10|
|1459123375668000000| 18|
|1459135498154000000| 14|
|1459171936969000000| 11|
|1459213975163000000| 3|
|1459236680271000000| 20|
|1459286996157000000| 26|
|1459312886723000000| 2|
|1459372973278000000| 2|
|1459384868421000000| 1|
|1459442811801000000| 21|
|1459448882707000000| 14|
|1459450875860000000| 21|
|1459454413582000000| 4|
+-------------------+-----+
only showing top 20 rows
Get data for entity aligned on provided dataset
Entity data alignment on dataset is very similar to the variable one. Though, as entity query results to a dataset that
contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action.
Therefore, we need to provide the target field name directly via the aggregationField()
method of AggregationProperties
builder.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
DatasetAggregationProperties aggregationProperties = DatasetAggregationProperties.builder()
.drivingDataset(drivingDataset)
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, aggregationProperties);
dataset.show();
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1457613630355000000| 20|
|1458524879726000000| 20|
|1458932691946000000| 21|
|1458984392272000000| 14|
|1458996045140000000| 12|
|1459062900019000000| 16|
|1459114836147000000| 10|
|1459123375668000000| 18|
|1459135498154000000| 14|
|1459171936969000000| 11|
|1459213975163000000| 3|
|1459236680271000000| 20|
|1459286996157000000| 26|
|1459312886723000000| 2|
|1459372973278000000| 2|
|1459384868421000000| 1|
|1459442811801000000| 21|
|1459448882707000000| 14|
|1459450875860000000| 21|
|1459454413582000000| 4|
+-------------------+-----+
only showing top 20 rows
Important
Any dataset implementation can be used as driving dataset for the alignment operation. By default, the
aggregation service expects nxcals extraction timestamp (nxcals_timestamp
). If the provided dataset has a
different/custom timestamp column you can specify it on AggregationProperties
builder via the timestampField()
method.