CERN Extraction API
At the moment CERN Extraction API contains the following services:
Warning
Running Python code for this API requires a specific setup as described here.
Fill Service
Service responsible for retrieving data related to LHC fills.
Available methods
Optional<Fill> findFill(int number)
List<Fill> findFills(Instant startTime, Instant endTime)
Examples
Find a fill using its number
First step consist of obtaining instance of FillService using Spark session:
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("MY_APP");
SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
FillService fillService = Services.newInstance(session).fillService();
from nxcals.spark_session_builder import get_or_create # on swan create spark session from UI (star icon)
spark = get_or_create()
Services = spark._jvm.cern.nxcals.api.custom.service.Services
fillService = Services.newInstance(spark._jsparkSession).fillService()
from cern.nxcals.api.custom.service import Services
from org.apache.spark import SparkConf
from org.apache.spark.sql import SparkSession
sparkConf = SparkConf().setMaster("local[*]").setAppName("MY_APP")
session = SparkSession.builder().config(sparkConf).getOrCreate()
fillService = Services.newInstance(session).fillService()
once the session is established one can perform data retrieval
Fill fill = fillService.findFill(3000)
.orElseThrow(() -> new IllegalArgumentException("No such a fill"));
Utilities.printFillInfo(fill);
fill = fillService.findFill(3000)
if fill.isEmpty():
raise ValueError("No such a fill")
Utilities.printFillInfo(fill.get())
fill = fillService.findFill(3000)
if fill.isEmpty():
raise ValueError("No such a fill")
Utilities.printFillInfo(fill.get())
and visualise using the code snippet:
class Utilities {
private static void printFillInfo(Fill fill) {
StringBuilder outputMsg = new StringBuilder();
StringJoiner joiner = new StringJoiner("\n");
outputMsg.append("Fill: ").append(fill.getNumber());
outputMsg.append(", Validity: ").append(validityToString(fill.getValidity())).append("\n");
List<BeamMode> beamModes = fill.getBeamModes();
beamModes.forEach(b -> outputMsg.append(addBeamModeInfo(b)));
System.out.println(outputMsg);
}
private static String addBeamModeInfo(BeamMode beamMode) {
return "\tBeam mode name: " + beamMode.getBeamModeValue() + ", Validity: " + validityToString(beamMode.getValidity()) + "\n";
}
private static String validityToString(TimeWindow validity) {
return validity.getStartTime() + " " + validity.getEndTime();
}
}
class Utilities:
@staticmethod
def printFillInfo(fill):
outputMsg = "Fill: " + str(fill.getNumber())
outputMsg += ", Validity: " + Utilities.__validityToString(fill.getValidity()) + "\n"
beamModes = fill.getBeamModes()
for beamMode in beamModes:
outputMsg += Utilities.__addBeamModeInfo(beamMode)
print(outputMsg)
@staticmethod
def __addBeamModeInfo(beamMode):
return "\tBeam mode name: " + beamMode.getBeamModeValue() + ", Validity: " + Utilities.__validityToString(beamMode.getValidity()) + "\n"
@staticmethod
def __validityToString(validity):
return validity.getStartTime().toString() + " " + validity.getEndTime().toString()
Click to see expected application output...
Fill: 3000, Validity: 2012-08-24T19:57:16.447Z 2012-08-25T00:06:59.143Z
Beam mode name: INJPHYS, Validity: 2012-08-24T19:57:16.447Z 2012-08-24T19:59:48.239Z
Beam mode name: INJPROB, Validity: 2012-08-24T19:59:48.239Z 2012-08-24T20:11:06.159Z
Beam mode name: INJPHYS, Validity: 2012-08-24T20:11:06.159Z 2012-08-24T20:41:19.970Z
Beam mode name: PRERAMP, Validity: 2012-08-24T20:41:19.970Z 2012-08-24T20:45:43.583Z
Beam mode name: RAMP, Validity: 2012-08-24T20:45:43.583Z 2012-08-24T20:59:00.208Z
Beam mode name: FLATTOP, Validity: 2012-08-24T20:59:00.208Z 2012-08-24T21:09:26.405Z
Beam mode name: SQUEEZE, Validity: 2012-08-24T21:09:26.405Z 2012-08-24T21:27:18.544Z
Beam mode name: ADJUST, Validity: 2012-08-24T21:27:18.544Z 2012-08-24T21:35:51.550Z
Beam mode name: STABLE, Validity: 2012-08-24T21:35:51.550Z 2012-08-25T00:00:44.951Z
Beam mode name: BEAMDUMP, Validity: 2012-08-25T00:00:44.951Z 2012-08-25T00:06:53.084Z
Beam mode name: RAMPDOWN, Validity: 2012-08-25T00:06:53.084Z 2012-08-25T00:06:59.143Z
Find a list of fills within a time range
List<Fill> fills = fillService.findFills(
TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000")
);
fills.forEach(Utilities::printFillInfo);
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
fills = fillService.findFills(TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000"))
for fill in fills:
Utilities.printFillInfo(fill)
from cern.nxcals.api.utils import TimeUtils
fills = fillService.findFills(TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000"))
for fill in fills:
Utilities.printFillInfo(fill)
Click to see expected application output...
Fill: 6611, Validity: 2018-04-24T16:57:54.689Z 2018-04-25T07:15:18.817Z
Beam mode name: CYCLING, Validity: 2018-04-24T16:57:54.689Z 2018-04-24T17:16:51.466Z
Beam mode name: SETUP, Validity: 2018-04-24T17:16:51.466Z 2018-04-24T18:56:27.642Z
Beam mode name: INJPROB, Validity: 2018-04-24T18:56:27.642Z 2018-04-24T19:13:20.554Z
Beam mode name: INJPHYS, Validity: 2018-04-24T19:13:20.554Z 2018-04-24T19:52:14.632Z
Beam mode name: PRERAMP, Validity: 2018-04-24T19:52:14.632Z 2018-04-24T19:54:30.026Z
Beam mode name: RAMP, Validity: 2018-04-24T19:54:30.026Z 2018-04-24T20:14:44.451Z
Beam mode name: FLATTOP, Validity: 2018-04-24T20:14:44.451Z 2018-04-24T20:17:40.211Z
Beam mode name: SQUEEZE, Validity: 2018-04-24T20:17:40.211Z 2018-04-24T20:28:38.170Z
Beam mode name: ADJUST, Validity: 2018-04-24T20:28:38.170Z 2018-04-24T20:35:27.441Z
Beam mode name: STABLE, Validity: 2018-04-24T20:35:27.441Z 2018-04-25T06:00:49.975Z
Beam mode name: ADJUST, Validity: 2018-04-25T06:00:49.975Z 2018-04-25T07:09:16.574Z
Beam mode name: BEAMDUMP, Validity: 2018-04-25T07:09:16.574Z 2018-04-25T07:10:50.770Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:10:50.770Z 2018-04-25T07:15:18.817Z
Fill: 6612, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T20:58:43.338Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T07:47:43.690Z
Beam mode name: SETUP, Validity: 2018-04-25T07:47:43.690Z 2018-04-25T08:24:19.236Z
Beam mode name: INJPROB, Validity: 2018-04-25T08:24:19.236Z 2018-04-25T08:55:44.730Z
Beam mode name: SETUP, Validity: 2018-04-25T08:55:44.730Z 2018-04-25T09:00:50.865Z
Beam mode name: SETUP, Validity: 2018-04-25T09:00:50.865Z 2018-04-25T09:06:37.199Z
Beam mode name: INJPROB, Validity: 2018-04-25T09:06:37.199Z 2018-04-25T10:17:40.710Z
Beam mode name: INJPHYS, Validity: 2018-04-25T10:17:40.710Z 2018-04-25T11:24:47.827Z
Beam mode name: PRERAMP, Validity: 2018-04-25T11:24:47.827Z 2018-04-25T11:29:46.379Z
Beam mode name: RAMP, Validity: 2018-04-25T11:29:46.379Z 2018-04-25T11:50:12.375Z
Beam mode name: FLATTOP, Validity: 2018-04-25T11:50:12.375Z 2018-04-25T11:57:10.353Z
Beam mode name: SQUEEZE, Validity: 2018-04-25T11:57:10.353Z 2018-04-25T12:08:05.002Z
Beam mode name: ADJUST, Validity: 2018-04-25T12:08:05.002Z 2018-04-25T12:16:59.279Z
Beam mode name: STABLE, Validity: 2018-04-25T12:16:59.279Z 2018-04-25T20:53:16.814Z
Beam mode name: BEAMDUMP, Validity: 2018-04-25T20:53:16.814Z 2018-04-25T20:54:39.637Z
Beam mode name: RAMPDOWN, Validity: 2018-04-25T20:54:39.637Z 2018-04-25T20:58:43.338Z
Aggregation Service
Service responsible for retrieving aggregated data.
Available methods
Dataset<Row> getData(Variable variable, WindowAggregationProperties properties);
Dataset<Row> getData(Entity entity, WindowAggregationProperties properties);
Dataset<Row> getData(Variable variable, DatasetAggregationProperties properties);
Dataset<Row> getData(Entity entity, DatasetAggregationProperties properties);
More info can be found in Javadoc for AggregationService.
Available functions
A full list of available aggregation functions can be found in Javadoc for aggregation functions
Getting an instance of the service
We can achieve that by using the cern.nxcals.api.custom.service.Services
gateway, configured by
cern.nxcals.api.config.SparkProperties
utility, that's designed to simplify the creation of Spark session:
AggregationService aggregationService = Services.newInstance(SparkProperties.defaults("MY_APP"))
.aggregationService();
from nxcals.spark_session_builder import get_or_create # on swan create spark session from UI (star icon)
spark = get_or_create()
Services = spark._jvm.cern.nxcals.api.custom.service.Services
aggregationService = Services.newInstance(spark._jsparkSession).aggregationService()
from cern.nxcals.api.custom.service import Services
from cern.nxcals.api.config import SparkProperties
aggregationService = Services.newInstance(SparkProperties.defaults("MY_APP")) \
.aggregationService()
Once we have an instance of the service, we can perform data aggregations as on the following examples.
Examples
Get average data for variable scaled on given intervals
Variable myVariable = variableService.findOne(Variables.suchThat()
.variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000");
WindowAggregationProperties properties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(8, ChronoUnit.HOURS)
.function((AggregationFunctions.AVG)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(8, ChronoUnit.HOURS).function((AggregationFunctions.AVG)).build()
print(properties)
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(8, ChronoUnit.HOURS).function((AggregationFunctions.AVG)).build()
print(properties)
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
As we can see in the above example, the aggregation process is controlled by the parameters provided to WindowAggregationProperties
builder. In this common case, we specify time window for our variable data search and parameters to configure the aggregation interval (8 hours).
Finally, we specify the function (average) that would be applied on the data grouped inside each interval.
Click to see expected application output...
+-------------------+------------------+
| timestamp| value|
+-------------------+------------------+
|1587772800000000000|10.502899951667473|
|1587801600000000000|10.497009244154432|
|1587830400000000000|10.501449975833737|
|1587859200000000000|10.498791540785499|
|1587888000000000000|10.5 |
|1587916800000000000|10.501661531025317|
+-------------------+------------------+
Get max data for entity scaled on given intervals
Window aggregations for entities are very similar to the variable ones. Though, as entity query results to a dataframe
that contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action.
Therefore, we need to provide the target field name directly via the aggregationField()
method of WindowAggregationProperties
builder.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000");
WindowAggregationProperties properties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(1, ChronoUnit.DAYS)
.function((AggregationFunctions.MAX))
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
systemService = ServiceClientFactory.createSystemSpecService()
entityService = ServiceClientFactory.createEntityService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(1, ChronoUnit.DAYS).function((AggregationFunctions.MAX)).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties
systemService = ServiceClientFactory.createSystemSpecService()
entityService = ServiceClientFactory.createEntityService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(1, ChronoUnit.DAYS).function((AggregationFunctions.MAX)).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1586476800000000000| 20|
|1586563200000000000| 20|
|1586649600000000000| 20|
|1586736000000000000| 20|
|1586822400000000000| 7|
+-------------------+-----+
Get repeated data for variable scaled on given intervals
Variable myVariable = variableService.findOne(Variables.suchThat()
.variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000");
WindowAggregationProperties properties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(20, ChronoUnit.SECONDS)
.function((AggregationFunctions.REPEAT)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(20, ChronoUnit.SECONDS).function((AggregationFunctions.REPEAT)).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000")
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(20, ChronoUnit.SECONDS).function((AggregationFunctions.REPEAT)).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1587772800000000000| 8|
|1587772820000000000| 20|
|1587772840000000000| 10|
|1587772860000000000| 2|
|1587772880000000000| 12|
|1587772900000000000| 6|
|1587772920000000000| 16|
|1587772940000000000| 9|
|1587772960000000000| 20|
|1587772980000000000| 11|
|1587773000000000000| 3|
|1587773020000000000| 12|
|1587773040000000000| 7|
|1587773060000000000| 17|
|1587773080000000000| 9|
+-------------------+-----+
Get interpolated data for entity scaled on given intervals
The following example demonstrates how we can interpolate data on generated intervals.
Interpolation
is a special function as it requires data to be present prior and after the calculated data point.
In order to achieve that, this function is configured to perform data lookup beyond the specified time window.
The default lookup configuration is extending the time window range, up to 1 week, both sides.
If the default values is not enough for the given use-case you can always extend it,
by specifying the custom expand parameters via expandTimeWindowBy
method.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000");
AggregationFunction customInterpolateFunc = AggregationFunctions.INTERPOLATE
.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS);
WindowAggregationProperties properties = WindowAggregationProperties.builder()
.timeWindow(startTime, endTime)
.interval(2, ChronoUnit.MONTHS)
.function(customInterpolateFunc)
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000")
customInterpolateFunc = AggregationFunctions.INTERPOLATE.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS)
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(2, ChronoUnit.MONTHS).function(customInterpolateFunc).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties
entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000")
customInterpolateFunc = AggregationFunctions.INTERPOLATE.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS)
properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
.interval(2, ChronoUnit.MONTHS).function(customInterpolateFunc).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+------------------+
| timestamp| value|
+-------------------+------------------+
|1581292800000000000| 4.356387673842736|
|1586552292000000000| 7.472222222222222|
|1591811784000000000|17.708333333333332|
+-------------------+------------------+
Important
REPEAT
and INTERPOLATE
functions perform default data lookup beyond the ranges of the provided time window. Repeat
performs lookup before start and interpolate follows that strategy but also does the same for after end. The default
lookup on both sides is set to one (1) week. It is configurable by the expandTimeWindowBy
method.
Get data for variable aligned on provided dataset
SparkSession sparkSession = SparkSession.active();
Variable drivingVariable = variableService.findOne(Variables.suchThat()
.variableName().eq("HX:FILLN"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000");
Dataset<Row> drivingDataset = DataQuery.getFor(sparkSession,
TimeWindow.between(startTime, endTime), drivingVariable);
Variable myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
DatasetAggregationProperties properties = DatasetAggregationProperties.builder()
.drivingDataset(drivingDataset).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
TimeWindow = spark._jvm.cern.nxcals.api.domain.TimeWindow
DatasetAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.DatasetAggregationProperties
DataQuery = spark._jvm.cern.nxcals.api.extraction.data.builders.DataQuery
variableService = ServiceClientFactory.createVariableService()
drivingVariable = variableService.findOne(Variables.suchThat().variableName().eq("HX:FILLN"))
if drivingVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000")
drivingDataset = DataQuery.getFor(spark._jsparkSession, TimeWindow.between(startTime, endTime), drivingVariable.get())
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.domain import TimeWindow
from cern.nxcals.api.custom.service.aggregation import DatasetAggregationProperties
from cern.nxcals.api.extraction.data.builders import DataQuery
from org.apache.spark.sql import SparkSession
variableService = ServiceClientFactory.createVariableService()
sparkSession = SparkSession.active()
drivingVariable = variableService.findOne(Variables.suchThat().variableName().eq("HX:FILLN"))
if drivingVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000")
drivingDataset = DataQuery.getFor(sparkSession, TimeWindow.between(startTime, endTime), drivingVariable.get())
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1457613630355000000| 20|
|1458524879726000000| 20|
|1458932691946000000| 21|
|1458984392272000000| 14|
|1458996045140000000| 12|
|1459062900019000000| 16|
|1459114836147000000| 10|
|1459123375668000000| 18|
|1459135498154000000| 14|
|1459171936969000000| 11|
|1459213975163000000| 3|
|1459236680271000000| 20|
|1459286996157000000| 26|
|1459312886723000000| 2|
|1459372973278000000| 2|
|1459384868421000000| 1|
|1459442811801000000| 21|
|1459448882707000000| 14|
|1459450875860000000| 21|
|1459454413582000000| 4|
+-------------------+-----+
only showing top 20 rows
Get data for entity aligned on provided dataset
Entity data alignment on dataset is very similar to the variable one. Though, as entity query results to a dataset that
contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action.
Therefore, we need to provide the target field name directly via the aggregationField()
method of AggregationProperties
builder.
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
DatasetAggregationProperties properties = DatasetAggregationProperties.builder()
.drivingDataset(drivingDataset)
.aggregationField("CYCLE")
.build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
DatasetAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.DatasetAggregationProperties
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.custom.service.aggregation import DatasetAggregationProperties
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
| timestamp|value|
+-------------------+-----+
|1457613630355000000| 20|
|1458524879726000000| 20|
|1458932691946000000| 21|
|1458984392272000000| 14|
|1458996045140000000| 12|
|1459062900019000000| 16|
|1459114836147000000| 10|
|1459123375668000000| 18|
|1459135498154000000| 14|
|1459171936969000000| 11|
|1459213975163000000| 3|
|1459236680271000000| 20|
|1459286996157000000| 26|
|1459312886723000000| 2|
|1459372973278000000| 2|
|1459384868421000000| 1|
|1459442811801000000| 21|
|1459448882707000000| 14|
|1459450875860000000| 21|
|1459454413582000000| 4|
+-------------------+-----+
only showing top 20 rows
Important
Any dataset implementation can be used as driving dataset for the alignment operation. By default, the
aggregation service expects nxcals extraction timestamp (nxcals_timestamp
). If the provided dataset has a
different/custom timestamp column you can specify it on AggregationProperties
builder via the timestampField()
method.
Extraction Service
Service responsible for retrieving data based on a time window and provided extraction properties. The extraction properties, also define the data lookup strategy that should be applied during data retrieval.
Available methods
Dataset<Row> getData(Variable variable, ExtractionProperties properties);
Dataset<Row> getData(Entity entity, ExtractionProperties properties);
More info can be found in Javadoc for ExtractionService.
Available lookup strategies
Lookup strategy defines whether (and how exactly) the extraction process will expand the data search to identify records beyond the provided time window. A full list of available lookup strategies can be found in Javadoc for lookup strategies
Getting an instance of the service
We can achieve that by using the cern.nxcals.api.custom.service.Services
gateway, configured by
cern.nxcals.api.config.SparkProperties
utility, that's designed to simplify the creation of Spark session:
ExtractionService extractionService = Services.newInstance(SparkProperties.defaults("MY_APP"))
.extractionService();
from nxcals.spark_session_builder import get_or_create # on swan create spark session from UI (star icon)
spark = get_or_create()
Services = spark._jvm.cern.nxcals.api.custom.service.Services
extractionService = Services.newInstance(spark._jsparkSession).extractionService()
from cern.nxcals.api.custom.service import Services
from cern.nxcals.api.config import SparkProperties
extractionService = Services.newInstance(SparkProperties.defaults("MY_APP")).extractionService()
Once we have an instance of the service, we can perform data aggregations as on the following examples.
Examples
Get data for variable, but do not perform lookup, if data exists
Variable myVariable = variableService.findOne(Variables.suchThat()
.variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000");
ExtractionProperties properties = ExtractionProperties.builder()
.timeWindow(startTime, endTime)
.lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build();
Dataset<Row> dataset = extractionService.getData(myVariable, properties);
System.out.println("Dataset count: " + dataset.count());
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myVariable.get(), properties)
print(dataset.count())
dataset.show()
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myVariable.get(), properties)
print(dataset.count())
dataset.show()
As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties
builder. In this common case, we specify time window for our variable data search and parameters to configure the extraction time window (1 day).
Finally, we specify the lookup strategy (LAST_BEFORE_START_IF_EMPTY
) that would be applied during the extraction process.
By specifying this strategy, we instruct the process to perform lookup, only if the dataset for the specific period is empty.
In this example we expect data for the given time window, thus no lookup will take place.
Click to see expected application output...
Dataset count: 49655
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id| nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
| 16| 46955|1587772954300000000| CPS.TGM:CYCLE|
| 2| 46955|1587773242300000000| CPS.TGM:CYCLE|
| 17| 46955|1587773929900000000| CPS.TGM:CYCLE|
| 3| 46955|1587774148300000000| CPS.TGM:CYCLE|
| 9| 46955|1587774262300000000| CPS.TGM:CYCLE|
| 12| 46955|1587774897100000000| CPS.TGM:CYCLE|
| 4| 46955|1587775993900000000| CPS.TGM:CYCLE|
| 7| 46955|1587776136700000000| CPS.TGM:CYCLE|
| 13| 46955|1587776778700000000| CPS.TGM:CYCLE|
| 9| 46955|1587777046300000000| CPS.TGM:CYCLE|
| 7| 46955|1587777459100000000| CPS.TGM:CYCLE|
| 16| 46955|1587777513100000000| CPS.TGM:CYCLE|
| 1| 46955|1587778528300000000| CPS.TGM:CYCLE|
| 8| 46955|1587778541500000000| CPS.TGM:CYCLE|
| 13| 46955|1587778936300000000| CPS.TGM:CYCLE|
| 8| 46955|1587779237500000000| CPS.TGM:CYCLE|
| 19| 46955|1587779361100000000| CPS.TGM:CYCLE|
| 9| 46955|1587779725900000000| CPS.TGM:CYCLE|
| 9| 46955|1587780352300000000| CPS.TGM:CYCLE|
| 9| 46955|1587780735100000000| CPS.TGM:CYCLE|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows
Get data for variable and perform lookup regardless if data exists
Variable myVariable = variableService.findOne(Variables.suchThat()
.variableName().eq("CPS.TGM:CYCLE"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000");
ExtractionProperties properties = ExtractionProperties.builder()
.timeWindow(startTime, endTime)
.lookupStrategy(LookupStrategy.LAST_BEFORE_START).build();
Dataset<Row> dataset = extractionService.getData(myVariable, properties);
System.out.println("Dataset count: " + dataset.count());
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy
variableService = ServiceClientFactory.createVariableService()
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myVariable.get(), properties)
print(dataset.count())
dataset.show()
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
if myVariable.isEmpty():
raise ValueError("Could not obtain variable from service")
startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myVariable.get(), properties)
print(dataset.count())
dataset.show()
As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties
builder. In this common case, we specify time window for our variable data search and parameters to configure the extraction time window (1 day).
Finally, we specify the lookup strategy (LAST_BEFORE_START
) that would be applied during the extraction process.
By specifying this strategy, we instruct the process to always perform data lookup.
In this example we expect data for the given time window, plus one (1) extra value that would be added to the original example. The additional value would be the product of the lookup action that will take place.
Click to see expected application output...
Dataset count: 49656 (+1 value from the above example, as product of the lookup action)
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id| nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
| 8| 46955|1587772799500000000| CPS.TGM:CYCLE|
| 16| 46955|1587772954300000000| CPS.TGM:CYCLE|
| 2| 46955|1587773242300000000| CPS.TGM:CYCLE|
| 17| 46955|1587773929900000000| CPS.TGM:CYCLE|
| 3| 46955|1587774148300000000| CPS.TGM:CYCLE|
| 9| 46955|1587774262300000000| CPS.TGM:CYCLE|
| 12| 46955|1587774897100000000| CPS.TGM:CYCLE|
| 4| 46955|1587775993900000000| CPS.TGM:CYCLE|
| 7| 46955|1587776136700000000| CPS.TGM:CYCLE|
| 13| 46955|1587776778700000000| CPS.TGM:CYCLE|
| 9| 46955|1587777046300000000| CPS.TGM:CYCLE|
| 7| 46955|1587777459100000000| CPS.TGM:CYCLE|
| 16| 46955|1587777513100000000| CPS.TGM:CYCLE|
| 1| 46955|1587778528300000000| CPS.TGM:CYCLE|
| 8| 46955|1587778541500000000| CPS.TGM:CYCLE|
| 13| 46955|1587778936300000000| CPS.TGM:CYCLE|
| 8| 46955|1587779237500000000| CPS.TGM:CYCLE|
| 19| 46955|1587779361100000000| CPS.TGM:CYCLE|
| 9| 46955|1587779725900000000| CPS.TGM:CYCLE|
| 9| 46955|1587780352300000000| CPS.TGM:CYCLE|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows
Get data for entity, but do not perform lookup, if data exists
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");
ExtractionProperties properties = ExtractionProperties.builder()
.timeWindow(startTime, endTime)
.lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);
System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
from jpype.types import JInt
dataset.show(JInt(2))
As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties
builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).
Finally, we specify the lookup strategy (LAST_BEFORE_START_IF_EMPTY
) that would be applied during the extraction process.
By specifying this strategy, we instruct the process to perform lookup, only if the dataset for the specific period is empty.
In this example we expect data for the given time window, thus no lookup will take place.
Click to see expected application output...
Dataset count: 49660
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG| DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER|__LSA_CYCLE__|__record_timestamp__|__record_version__| acqStamp| class| cyclestamp| device| property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
| 0| 11658| 25| null| 16| null|PS_DUMP| NONE| 1| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|24192| 2434| null|ZERO| ~~ZERO~~| 1586563201900000000| 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
| 0| 11658| 29| null| 20| null|PS_DUMP| NONE| 1| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|24192| 2434| null|ZERO| ~~ZERO~~| 1586563206700000000| 0|1586563206700000000|CPS.TGM|1586563206700000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows
Get data for entity and perform lookup regardless if data exists
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");
ExtractionProperties properties = ExtractionProperties.builder()
.timeWindow(startTime, endTime)
.lookupStrategy(LookupStrategy.LAST_BEFORE_START).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);
System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
from jpype.types import JInt
dataset.show(JInt(2))
As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties
builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).
Finally, we specify the lookup strategy (LAST_BEFORE_START
) that would be applied during the extraction process.
By specifying this strategy, we instruct the process to always perform data lookup.
In this example we expect data for the given time window, plus one (1) extra value that would be added to the original example. The additional value would be the product of the lookup action that will take place.
Click to see expected application output...
Dataset count: 49661 (+1 value from the above example, as product of the lookup action)
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG| DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER| __LSA_CYCLE__|__record_timestamp__|__record_version__| acqStamp| class| cyclestamp| device| property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
| 0| 16529| 1| null| 1| null|PS_DUMP| NONE| 3| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|21710| 2434| null|LHC4|LHC25#48B_BCMS_PS...| 1586476799500000000| 0|1586476799500000000|CPS.TGM|1586476799500000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
| 0| 11658| 25| null| 16| null|PS_DUMP| NONE| 1| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|24192| 2434| null|ZERO| ~~ZERO~~| 1586563201900000000| 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows
Get data for entity and perform lookup with custom lookup duration
Map<String, Object> keyValues = ImmutableMap
.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
.keyValues().eq(cmwSystemSpec, keyValues))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");
LookupStrategy customLookupStrategy = LookupStrategy.LAST_BEFORE_START
.withLookupDuration(8, ChronoUnit.HOURS);
ExtractionProperties properties = ExtractionProperties.builder()
.timeWindow(startTime, endTime)
.lookupStrategy(customLookupStrategy).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);
System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap
entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()
cmwSystemSpec = systemService.findByName("CMW")
if cmwSystemSpec.isEmpty():
raise ValueError("No such system")
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
customLookupStrategy = LookupStrategy.LAST_BEFORE_START.withLookupDuration(8, ChronoUnit.HOURS)
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(customLookupStrategy).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap
keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
raise ValueError("Could not obtain entity from service")
startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")
customLookupStrategy = LookupStrategy.LAST_BEFORE_START.withLookupDuration(8, ChronoUnit.HOURS)
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(customLookupStrategy).build()
dataset = extractionService.getData(myEntity.get(), properties)
print(dataset.count())
from jpype.types import JInt
dataset.show(JInt(2))
As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties
builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).
Finally, we specify the lookup strategy (LAST_BEFORE_START
) with a custom lookup duration, that would be applied during the extraction process.
By specifying this strategy, we instruct the process to always perform data lookup, but the duration would be only for the specified
period. For this example the lookup duration is configured for 8 hours.
Click to see expected application output...
Dataset count: 49660 (8 hours lookup was not enough to find older data - the original dataset is therefore returned)
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG| DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER|__LSA_CYCLE__|__record_timestamp__|__record_version__| acqStamp| class| cyclestamp| device| property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
| 0| 11658| 25| null| 16| null|PS_DUMP| NONE| 1| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|24192| 2434| null|ZERO| ~~ZERO~~| 1586563201900000000| 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
| 0| 11658| 29| null| 20| null|PS_DUMP| NONE| 1| null| null|null| null| null| null| null| null| null| null| null|NOBEAM|24192| 2434| null|ZERO| ~~ZERO~~| 1586563206700000000| 0|1586563206700000000|CPS.TGM|1586563206700000000|CPS.TGM|FULL-TELEGRAM.STRC| null| 46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows