Skip to content

CERN Extraction API

At the moment CERN Extraction API contains the following services:

Warning

Running Python code for this API requires a specific setup as described here.

Fill Service

Service responsible for retrieving data related to LHC fills.

Available methods

Optional<Fill> findFill(int number) 
List<Fill>     findFills(Instant startTime, Instant endTime)
More info can be found in Javadoc for FillService.

Examples

Find a fill using its number

First step consist of obtaining instance of FillService using Spark session:

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("MY_APP");
SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();

FillService fillService = Services.newInstance(session).fillService();
from nxcals.spark_session_builder import get_or_create # on swan create spark session from UI (star icon)
spark = get_or_create()

Services = spark._jvm.cern.nxcals.api.custom.service.Services

fillService = Services.newInstance(spark._jsparkSession).fillService()
from cern.nxcals.api.custom.service import Services
from org.apache.spark import SparkConf
from org.apache.spark.sql import SparkSession

sparkConf = SparkConf().setMaster("local[*]").setAppName("MY_APP")
session = SparkSession.builder().config(sparkConf).getOrCreate()

fillService = Services.newInstance(session).fillService()

once the session is established one can perform data retrieval

Fill fill = fillService.findFill(3000)
        .orElseThrow(() -> new IllegalArgumentException("No such a fill"));

Utilities.printFillInfo(fill);
fill = fillService.findFill(3000)

if fill.isEmpty():
    raise ValueError("No such a fill")

Utilities.printFillInfo(fill.get())
fill = fillService.findFill(3000)

if fill.isEmpty():
    raise ValueError("No such a fill")

Utilities.printFillInfo(fill.get())

and visualise using the code snippet:

class Utilities {

    private static void printFillInfo(Fill fill) {
        StringBuilder outputMsg = new StringBuilder();
        StringJoiner joiner = new StringJoiner("\n");

        outputMsg.append("Fill: ").append(fill.getNumber());
        outputMsg.append(", Validity: ").append(validityToString(fill.getValidity())).append("\n");

        List<BeamMode> beamModes = fill.getBeamModes();
        beamModes.forEach(b -> outputMsg.append(addBeamModeInfo(b)));

        System.out.println(outputMsg);
    }

    private static String addBeamModeInfo(BeamMode beamMode) {
        return "\tBeam mode name: " + beamMode.getBeamModeValue() + ", Validity: " + validityToString(beamMode.getValidity()) + "\n";
    }

    private static String validityToString(TimeWindow validity) {
        return validity.getStartTime() + " " + validity.getEndTime();
    }
}
class Utilities:
    @staticmethod
    def printFillInfo(fill):
        outputMsg = "Fill: " + str(fill.getNumber())
        outputMsg += ", Validity: " + Utilities.__validityToString(fill.getValidity()) + "\n"

        beamModes = fill.getBeamModes()
        for beamMode in beamModes:
            outputMsg += Utilities.__addBeamModeInfo(beamMode)

        print(outputMsg)

    @staticmethod
    def __addBeamModeInfo(beamMode):
        return "\tBeam mode name: " + beamMode.getBeamModeValue() + ", Validity: " + Utilities.__validityToString(beamMode.getValidity()) + "\n"

    @staticmethod
    def __validityToString(validity):
        return validity.getStartTime().toString() + " " + validity.getEndTime().toString()
Click to see expected application output...
Fill: 3000, Validity: 2012-08-24T19:57:16.447Z 2012-08-25T00:06:59.143Z
    Beam mode name: INJPHYS, Validity: 2012-08-24T19:57:16.447Z 2012-08-24T19:59:48.239Z
    Beam mode name: INJPROB, Validity: 2012-08-24T19:59:48.239Z 2012-08-24T20:11:06.159Z
    Beam mode name: INJPHYS, Validity: 2012-08-24T20:11:06.159Z 2012-08-24T20:41:19.970Z
    Beam mode name: PRERAMP, Validity: 2012-08-24T20:41:19.970Z 2012-08-24T20:45:43.583Z
    Beam mode name: RAMP, Validity: 2012-08-24T20:45:43.583Z 2012-08-24T20:59:00.208Z
    Beam mode name: FLATTOP, Validity: 2012-08-24T20:59:00.208Z 2012-08-24T21:09:26.405Z
    Beam mode name: SQUEEZE, Validity: 2012-08-24T21:09:26.405Z 2012-08-24T21:27:18.544Z
    Beam mode name: ADJUST, Validity: 2012-08-24T21:27:18.544Z 2012-08-24T21:35:51.550Z
    Beam mode name: STABLE, Validity: 2012-08-24T21:35:51.550Z 2012-08-25T00:00:44.951Z
    Beam mode name: BEAMDUMP, Validity: 2012-08-25T00:00:44.951Z 2012-08-25T00:06:53.084Z
    Beam mode name: RAMPDOWN, Validity: 2012-08-25T00:06:53.084Z 2012-08-25T00:06:59.143Z

Find a list of fills within a time range

List<Fill> fills = fillService.findFills(
        TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
        TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000")
);

fills.forEach(Utilities::printFillInfo);
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils

fills = fillService.findFills(TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
    TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000"))

for fill in fills:
    Utilities.printFillInfo(fill)
from cern.nxcals.api.utils import TimeUtils

fills = fillService.findFills(TimeUtils.getInstantFromString("2018-04-25 00:00:00.000000000"),
    TimeUtils.getInstantFromString("2018-04-28 00:00:00.000000000"))

for fill in fills:
    Utilities.printFillInfo(fill)
Click to see expected application output...
Fill: 6611, Validity: 2018-04-24T16:57:54.689Z 2018-04-25T07:15:18.817Z
    Beam mode name: CYCLING, Validity: 2018-04-24T16:57:54.689Z 2018-04-24T17:16:51.466Z
    Beam mode name: SETUP, Validity: 2018-04-24T17:16:51.466Z 2018-04-24T18:56:27.642Z
    Beam mode name: INJPROB, Validity: 2018-04-24T18:56:27.642Z 2018-04-24T19:13:20.554Z
    Beam mode name: INJPHYS, Validity: 2018-04-24T19:13:20.554Z 2018-04-24T19:52:14.632Z
    Beam mode name: PRERAMP, Validity: 2018-04-24T19:52:14.632Z 2018-04-24T19:54:30.026Z
    Beam mode name: RAMP, Validity: 2018-04-24T19:54:30.026Z 2018-04-24T20:14:44.451Z
    Beam mode name: FLATTOP, Validity: 2018-04-24T20:14:44.451Z 2018-04-24T20:17:40.211Z
    Beam mode name: SQUEEZE, Validity: 2018-04-24T20:17:40.211Z 2018-04-24T20:28:38.170Z
    Beam mode name: ADJUST, Validity: 2018-04-24T20:28:38.170Z 2018-04-24T20:35:27.441Z
    Beam mode name: STABLE, Validity: 2018-04-24T20:35:27.441Z 2018-04-25T06:00:49.975Z
    Beam mode name: ADJUST, Validity: 2018-04-25T06:00:49.975Z 2018-04-25T07:09:16.574Z
    Beam mode name: BEAMDUMP, Validity: 2018-04-25T07:09:16.574Z 2018-04-25T07:10:50.770Z
    Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:10:50.770Z 2018-04-25T07:15:18.817Z

Fill: 6612, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T20:58:43.338Z
    Beam mode name: RAMPDOWN, Validity: 2018-04-25T07:15:18.817Z 2018-04-25T07:47:43.690Z
    Beam mode name: SETUP, Validity: 2018-04-25T07:47:43.690Z 2018-04-25T08:24:19.236Z
    Beam mode name: INJPROB, Validity: 2018-04-25T08:24:19.236Z 2018-04-25T08:55:44.730Z
    Beam mode name: SETUP, Validity: 2018-04-25T08:55:44.730Z 2018-04-25T09:00:50.865Z
    Beam mode name: SETUP, Validity: 2018-04-25T09:00:50.865Z 2018-04-25T09:06:37.199Z
    Beam mode name: INJPROB, Validity: 2018-04-25T09:06:37.199Z 2018-04-25T10:17:40.710Z
    Beam mode name: INJPHYS, Validity: 2018-04-25T10:17:40.710Z 2018-04-25T11:24:47.827Z
    Beam mode name: PRERAMP, Validity: 2018-04-25T11:24:47.827Z 2018-04-25T11:29:46.379Z
    Beam mode name: RAMP, Validity: 2018-04-25T11:29:46.379Z 2018-04-25T11:50:12.375Z
    Beam mode name: FLATTOP, Validity: 2018-04-25T11:50:12.375Z 2018-04-25T11:57:10.353Z
    Beam mode name: SQUEEZE, Validity: 2018-04-25T11:57:10.353Z 2018-04-25T12:08:05.002Z
    Beam mode name: ADJUST, Validity: 2018-04-25T12:08:05.002Z 2018-04-25T12:16:59.279Z
    Beam mode name: STABLE, Validity: 2018-04-25T12:16:59.279Z 2018-04-25T20:53:16.814Z
    Beam mode name: BEAMDUMP, Validity: 2018-04-25T20:53:16.814Z 2018-04-25T20:54:39.637Z
    Beam mode name: RAMPDOWN, Validity: 2018-04-25T20:54:39.637Z 2018-04-25T20:58:43.338Z

Aggregation Service

Service responsible for retrieving aggregated data.

Available methods

Dataset<Row> getData(Variable variable, WindowAggregationProperties properties); 
Dataset<Row> getData(Entity entity, WindowAggregationProperties properties);

Dataset<Row> getData(Variable variable, DatasetAggregationProperties properties); 
Dataset<Row> getData(Entity entity, DatasetAggregationProperties properties);

More info can be found in Javadoc for AggregationService.

Available functions

A full list of available aggregation functions can be found in Javadoc for aggregation functions

Getting an instance of the service

We can achieve that by using the cern.nxcals.api.custom.service.Services gateway, configured by cern.nxcals.api.config.SparkProperties utility, that's designed to simplify the creation of Spark session:

AggregationService aggregationService = Services.newInstance(SparkProperties.defaults("MY_APP"))
        .aggregationService();
from nxcals.spark_session_builder import get_or_create  # on swan create spark session from UI (star icon)
spark = get_or_create()

Services = spark._jvm.cern.nxcals.api.custom.service.Services

aggregationService = Services.newInstance(spark._jsparkSession).aggregationService()
from cern.nxcals.api.custom.service import Services
from cern.nxcals.api.config import SparkProperties

aggregationService = Services.newInstance(SparkProperties.defaults("MY_APP")) \
    .aggregationService()

Once we have an instance of the service, we can perform data aggregations as on the following examples.

Examples

Get average data for variable scaled on given intervals

Variable myVariable = variableService.findOne(Variables.suchThat()
        .variableName().eq("CPS.TGM:CYCLE"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000");

WindowAggregationProperties properties = WindowAggregationProperties.builder()
        .timeWindow(startTime, endTime)
        .interval(8, ChronoUnit.HOURS)
        .function((AggregationFunctions.AVG)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit

AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(8, ChronoUnit.HOURS).function((AggregationFunctions.AVG)).build()

print(properties)

dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit

from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-27 00:00:00.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(8, ChronoUnit.HOURS).function((AggregationFunctions.AVG)).build()

print(properties)

dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()

As we can see in the above example, the aggregation process is controlled by the parameters provided to WindowAggregationProperties builder. In this common case, we specify time window for our variable data search and parameters to configure the aggregation interval (8 hours). Finally, we specify the function (average) that would be applied on the data grouped inside each interval.

Click to see expected application output...
+-------------------+------------------+
|          timestamp|             value|
+-------------------+------------------+
|1587772800000000000|10.502899951667473|
|1587801600000000000|10.497009244154432|
|1587830400000000000|10.501449975833737|
|1587859200000000000|10.498791540785499|
|1587888000000000000|10.5              |
|1587916800000000000|10.501661531025317|
+-------------------+------------------+

Get max data for entity scaled on given intervals

Window aggregations for entities are very similar to the variable ones. Though, as entity query results to a dataframe that contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action. Therefore, we need to provide the target field name directly via the aggregationField() method of WindowAggregationProperties builder.

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");

Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000");

WindowAggregationProperties properties = WindowAggregationProperties.builder()
        .timeWindow(startTime, endTime)
        .interval(1, ChronoUnit.DAYS)
        .function((AggregationFunctions.MAX))
        .aggregationField("CYCLE")
        .build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit

AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties

DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

systemService = ServiceClientFactory.createSystemSpecService()
entityService = ServiceClientFactory.createEntityService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(1, ChronoUnit.DAYS).function((AggregationFunctions.MAX)).aggregationField("CYCLE").build()

dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Entities

from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit

from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties

systemService = ServiceClientFactory.createSystemSpecService()
entityService = ServiceClientFactory.createEntityService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-14 00:00:10.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(1, ChronoUnit.DAYS).function((AggregationFunctions.MAX)).aggregationField("CYCLE").build()

dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
|          timestamp|value|
+-------------------+-----+
|1586476800000000000|   20|
|1586563200000000000|   20|
|1586649600000000000|   20|
|1586736000000000000|   20|
|1586822400000000000|    7|
+-------------------+-----+

Get repeated data for variable scaled on given intervals

Variable myVariable = variableService.findOne(Variables.suchThat()
        .variableName().eq("CPS.TGM:CYCLE"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000");

WindowAggregationProperties properties = WindowAggregationProperties.builder()
        .timeWindow(startTime, endTime)
        .interval(20, ChronoUnit.SECONDS)
        .function((AggregationFunctions.REPEAT)).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit

AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(20, ChronoUnit.SECONDS).function((AggregationFunctions.REPEAT)).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit

from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-25 00:05:00.000000000")

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(20, ChronoUnit.SECONDS).function((AggregationFunctions.REPEAT)).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
|          timestamp|value|
+-------------------+-----+
|1587772800000000000|    8|
|1587772820000000000|   20|
|1587772840000000000|   10|
|1587772860000000000|    2|
|1587772880000000000|   12|
|1587772900000000000|    6|
|1587772920000000000|   16|
|1587772940000000000|    9|
|1587772960000000000|   20|
|1587772980000000000|   11|
|1587773000000000000|    3|
|1587773020000000000|   12|
|1587773040000000000|    7|
|1587773060000000000|   17|
|1587773080000000000|    9|
+-------------------+-----+

Get interpolated data for entity scaled on given intervals

The following example demonstrates how we can interpolate data on generated intervals. Interpolation is a special function as it requires data to be present prior and after the calculated data point. In order to achieve that, this function is configured to perform data lookup beyond the specified time window. The default lookup configuration is extending the time window range, up to 1 week, both sides. If the default values is not enough for the given use-case you can always extend it, by specifying the custom expand parameters via expandTimeWindowBy method.

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");

Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000");

AggregationFunction customInterpolateFunc = AggregationFunctions.INTERPOLATE
        .expandTimeWindowBy(1, 0, ChronoUnit.MONTHS);

WindowAggregationProperties properties = WindowAggregationProperties.builder()
        .timeWindow(startTime, endTime)
        .interval(2, ChronoUnit.MONTHS)
        .function(customInterpolateFunc)
        .aggregationField("CYCLE")
        .build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit

AggregationFunctions = spark._jvm.cern.nxcals.api.custom.service.aggregation.AggregationFunctions
WindowAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.WindowAggregationProperties

DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")


startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000")

customInterpolateFunc = AggregationFunctions.INTERPOLATE.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS)

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(2, ChronoUnit.MONTHS).function(customInterpolateFunc).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit

from cern.nxcals.api.custom.service.aggregation import AggregationFunctions
from cern.nxcals.api.custom.service.aggregation import WindowAggregationProperties

entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))
if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")


startTime = TimeUtils.getInstantFromString("2020-02-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-08-10 00:00:00.000000000")

customInterpolateFunc = AggregationFunctions.INTERPOLATE.expandTimeWindowBy(1, 0, ChronoUnit.MONTHS)

properties = WindowAggregationProperties.builder().timeWindow(startTime, endTime) \
    .interval(2, ChronoUnit.MONTHS).function(customInterpolateFunc).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+------------------+
|          timestamp|             value|
+-------------------+------------------+
|1581292800000000000| 4.356387673842736|
|1586552292000000000| 7.472222222222222|
|1591811784000000000|17.708333333333332|
+-------------------+------------------+

Important

REPEAT and INTERPOLATE functions perform default data lookup beyond the ranges of the provided time window. Repeat performs lookup before start and interpolate follows that strategy but also does the same for after end. The default lookup on both sides is set to one (1) week. It is configurable by the expandTimeWindowBy method.

Get data for variable aligned on provided dataset

SparkSession sparkSession = SparkSession.active();

Variable drivingVariable = variableService.findOne(Variables.suchThat()
        .variableName().eq("HX:FILLN"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000");
Dataset<Row> drivingDataset = DataQuery.getFor(sparkSession,
        TimeWindow.between(startTime, endTime), drivingVariable);

Variable myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));

DatasetAggregationProperties properties = DatasetAggregationProperties.builder()
        .drivingDataset(drivingDataset).build();
Dataset<Row> dataset = aggregationService.getData(myVariable, properties);
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
TimeWindow = spark._jvm.cern.nxcals.api.domain.TimeWindow

DatasetAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.DatasetAggregationProperties
DataQuery = spark._jvm.cern.nxcals.api.extraction.data.builders.DataQuery

variableService = ServiceClientFactory.createVariableService()

drivingVariable = variableService.findOne(Variables.suchThat().variableName().eq("HX:FILLN"))
if drivingVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000")

drivingDataset = DataQuery.getFor(spark._jsparkSession, TimeWindow.between(startTime, endTime), drivingVariable.get())

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
from cern.nxcals.api.extraction.metadata import ServiceClientFactory
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.domain import TimeWindow

from cern.nxcals.api.custom.service.aggregation import DatasetAggregationProperties
from cern.nxcals.api.extraction.data.builders import DataQuery
from org.apache.spark.sql import SparkSession

variableService = ServiceClientFactory.createVariableService()

sparkSession = SparkSession.active()

drivingVariable = variableService.findOne(Variables.suchThat().variableName().eq("HX:FILLN"))
if drivingVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")


startTime = TimeUtils.getInstantFromString("2016-03-01 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2016-05-01 00:00:00.000000000")

drivingDataset = DataQuery.getFor(sparkSession, TimeWindow.between(startTime, endTime), drivingVariable.get())

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).build()
dataset = aggregationService.getData(myVariable.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
|          timestamp|value|
+-------------------+-----+
|1457613630355000000|   20|
|1458524879726000000|   20|
|1458932691946000000|   21|
|1458984392272000000|   14|
|1458996045140000000|   12|
|1459062900019000000|   16|
|1459114836147000000|   10|
|1459123375668000000|   18|
|1459135498154000000|   14|
|1459171936969000000|   11|
|1459213975163000000|    3|
|1459236680271000000|   20|
|1459286996157000000|   26|
|1459312886723000000|    2|
|1459372973278000000|    2|
|1459384868421000000|    1|
|1459442811801000000|   21|
|1459448882707000000|   14|
|1459450875860000000|   21|
|1459454413582000000|    4|
+-------------------+-----+
only showing top 20 rows

Get data for entity aligned on provided dataset

Entity data alignment on dataset is very similar to the variable one. Though, as entity query results to a dataset that contains multiple data columns, we need to explicitly specify which one we're interested in for the aggregation action. Therefore, we need to provide the target field name directly via the aggregationField() method of AggregationProperties builder.

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");

Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));

DatasetAggregationProperties properties = DatasetAggregationProperties.builder()
        .drivingDataset(drivingDataset)
        .aggregationField("CYCLE")
        .build();
Dataset<Row> dataset = aggregationService.getData(myEntity, properties);
dataset.show();
DatasetAggregationProperties = spark._jvm.cern.nxcals.api.custom.service.aggregation.DatasetAggregationProperties
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
from cern.nxcals.api.custom.service.aggregation import DatasetAggregationProperties
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME
from cern.nxcals.api.custom.domain.CmwSystemConstants import PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

properties = DatasetAggregationProperties.builder().drivingDataset(drivingDataset).aggregationField("CYCLE").build()
dataset = aggregationService.getData(myEntity.get(), properties)
dataset.show()
Click to see expected application output...
+-------------------+-----+
|          timestamp|value|
+-------------------+-----+
|1457613630355000000|   20|
|1458524879726000000|   20|
|1458932691946000000|   21|
|1458984392272000000|   14|
|1458996045140000000|   12|
|1459062900019000000|   16|
|1459114836147000000|   10|
|1459123375668000000|   18|
|1459135498154000000|   14|
|1459171936969000000|   11|
|1459213975163000000|    3|
|1459236680271000000|   20|
|1459286996157000000|   26|
|1459312886723000000|    2|
|1459372973278000000|    2|
|1459384868421000000|    1|
|1459442811801000000|   21|
|1459448882707000000|   14|
|1459450875860000000|   21|
|1459454413582000000|    4|
+-------------------+-----+
only showing top 20 rows

Important

Any dataset implementation can be used as driving dataset for the alignment operation. By default, the aggregation service expects nxcals extraction timestamp (nxcals_timestamp). If the provided dataset has a different/custom timestamp column you can specify it on AggregationProperties builder via the timestampField() method.

Extraction Service

Service responsible for retrieving data based on a time window and provided extraction properties. The extraction properties, also define the data lookup strategy that should be applied during data retrieval.

Available methods

Dataset<Row> getData(Variable variable, ExtractionProperties properties);
Dataset<Row> getData(Entity entity, ExtractionProperties properties);

More info can be found in Javadoc for ExtractionService.

Available lookup strategies

Lookup strategy defines whether (and how exactly) the extraction process will expand the data search to identify records beyond the provided time window. A full list of available lookup strategies can be found in Javadoc for lookup strategies

Getting an instance of the service

We can achieve that by using the cern.nxcals.api.custom.service.Services gateway, configured by cern.nxcals.api.config.SparkProperties utility, that's designed to simplify the creation of Spark session:

ExtractionService extractionService = Services.newInstance(SparkProperties.defaults("MY_APP"))
        .extractionService();
from nxcals.spark_session_builder import get_or_create # on swan create spark session from UI (star icon)
spark = get_or_create()

Services = spark._jvm.cern.nxcals.api.custom.service.Services

extractionService = Services.newInstance(spark._jsparkSession).extractionService()
from cern.nxcals.api.custom.service import Services
from cern.nxcals.api.config import SparkProperties

extractionService = Services.newInstance(SparkProperties.defaults("MY_APP")).extractionService()

Once we have an instance of the service, we can perform data aggregations as on the following examples.

Examples

Get data for variable, but do not perform lookup, if data exists

Variable myVariable = variableService.findOne(Variables.suchThat()
        .variableName().eq("CPS.TGM:CYCLE"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000");

ExtractionProperties properties = ExtractionProperties.builder()
        .timeWindow(startTime, endTime)
        .lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build();
Dataset<Row> dataset = extractionService.getData(myVariable, properties);

System.out.println("Dataset count: " + dataset.count());
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()

dataset = extractionService.getData(myVariable.get(), properties)

print(dataset.count())
dataset.show()
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()

dataset = extractionService.getData(myVariable.get(), properties)

print(dataset.count())
dataset.show()

As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties builder. In this common case, we specify time window for our variable data search and parameters to configure the extraction time window (1 day).

Finally, we specify the lookup strategy (LAST_BEFORE_START_IF_EMPTY) that would be applied during the extraction process. By specifying this strategy, we instruct the process to perform lookup, only if the dataset for the specific period is empty.

In this example we expect data for the given time window, thus no lookup will take place.

Click to see expected application output...
Dataset count: 49655
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id|   nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
|          16|           46955|1587772954300000000|       CPS.TGM:CYCLE|
|           2|           46955|1587773242300000000|       CPS.TGM:CYCLE|
|          17|           46955|1587773929900000000|       CPS.TGM:CYCLE|
|           3|           46955|1587774148300000000|       CPS.TGM:CYCLE|
|           9|           46955|1587774262300000000|       CPS.TGM:CYCLE|
|          12|           46955|1587774897100000000|       CPS.TGM:CYCLE|
|           4|           46955|1587775993900000000|       CPS.TGM:CYCLE|
|           7|           46955|1587776136700000000|       CPS.TGM:CYCLE|
|          13|           46955|1587776778700000000|       CPS.TGM:CYCLE|
|           9|           46955|1587777046300000000|       CPS.TGM:CYCLE|
|           7|           46955|1587777459100000000|       CPS.TGM:CYCLE|
|          16|           46955|1587777513100000000|       CPS.TGM:CYCLE|
|           1|           46955|1587778528300000000|       CPS.TGM:CYCLE|
|           8|           46955|1587778541500000000|       CPS.TGM:CYCLE|
|          13|           46955|1587778936300000000|       CPS.TGM:CYCLE|
|           8|           46955|1587779237500000000|       CPS.TGM:CYCLE|
|          19|           46955|1587779361100000000|       CPS.TGM:CYCLE|
|           9|           46955|1587779725900000000|       CPS.TGM:CYCLE|
|           9|           46955|1587780352300000000|       CPS.TGM:CYCLE|
|           9|           46955|1587780735100000000|       CPS.TGM:CYCLE|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows

Get data for variable and perform lookup regardless if data exists

Variable myVariable = variableService.findOne(Variables.suchThat()
        .variableName().eq("CPS.TGM:CYCLE"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000");

ExtractionProperties properties = ExtractionProperties.builder()
        .timeWindow(startTime, endTime)
        .lookupStrategy(LookupStrategy.LAST_BEFORE_START).build();
Dataset<Row> dataset = extractionService.getData(myVariable, properties);

System.out.println("Dataset count: " + dataset.count());
dataset.show();
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Variables = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Variables
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy

variableService = ServiceClientFactory.createVariableService()

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myVariable.get(), properties)

print(dataset.count())
dataset.show()
from cern.nxcals.api.extraction.metadata.queries import Variables
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy

myVariable = variableService.findOne(Variables.suchThat().variableName().eq("CPS.TGM:CYCLE"))

if myVariable.isEmpty():
    raise ValueError("Could not obtain variable from service")

startTime = TimeUtils.getInstantFromString("2020-04-25 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-26 00:00:00.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myVariable.get(), properties)

print(dataset.count())
dataset.show()

As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties builder. In this common case, we specify time window for our variable data search and parameters to configure the extraction time window (1 day).

Finally, we specify the lookup strategy (LAST_BEFORE_START) that would be applied during the extraction process. By specifying this strategy, we instruct the process to always perform data lookup.

In this example we expect data for the given time window, plus one (1) extra value that would be added to the original example. The additional value would be the product of the lookup action that will take place.

Click to see expected application output...
Dataset count: 49656 (+1 value from the above example, as product of the lookup action)
+------------+----------------+-------------------+--------------------+
|nxcals_value|nxcals_entity_id|   nxcals_timestamp|nxcals_variable_name|
+------------+----------------+-------------------+--------------------+
|           8|           46955|1587772799500000000|       CPS.TGM:CYCLE|
|          16|           46955|1587772954300000000|       CPS.TGM:CYCLE|
|           2|           46955|1587773242300000000|       CPS.TGM:CYCLE|
|          17|           46955|1587773929900000000|       CPS.TGM:CYCLE|
|           3|           46955|1587774148300000000|       CPS.TGM:CYCLE|
|           9|           46955|1587774262300000000|       CPS.TGM:CYCLE|
|          12|           46955|1587774897100000000|       CPS.TGM:CYCLE|
|           4|           46955|1587775993900000000|       CPS.TGM:CYCLE|
|           7|           46955|1587776136700000000|       CPS.TGM:CYCLE|
|          13|           46955|1587776778700000000|       CPS.TGM:CYCLE|
|           9|           46955|1587777046300000000|       CPS.TGM:CYCLE|
|           7|           46955|1587777459100000000|       CPS.TGM:CYCLE|
|          16|           46955|1587777513100000000|       CPS.TGM:CYCLE|
|           1|           46955|1587778528300000000|       CPS.TGM:CYCLE|
|           8|           46955|1587778541500000000|       CPS.TGM:CYCLE|
|          13|           46955|1587778936300000000|       CPS.TGM:CYCLE|
|           8|           46955|1587779237500000000|       CPS.TGM:CYCLE|
|          19|           46955|1587779361100000000|       CPS.TGM:CYCLE|
|           9|           46955|1587779725900000000|       CPS.TGM:CYCLE|
|           9|           46955|1587780352300000000|       CPS.TGM:CYCLE|
+------------+----------------+-------------------+--------------------+
only showing top 20 rows

Get data for entity, but do not perform lookup, if data exists

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");

ExtractionProperties properties = ExtractionProperties.builder()
        .timeWindow(startTime, endTime)
        .lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);

System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy

DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())

dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")

myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START_IF_EMPTY).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())

from jpype.types import JInt
dataset.show(JInt(2))

As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).

Finally, we specify the lookup strategy (LAST_BEFORE_START_IF_EMPTY) that would be applied during the extraction process. By specifying this strategy, we instruct the process to perform lookup, only if the dataset for the specific period is empty.

In this example we expect data for the given time window, thus no lookup will take place.

Click to see expected application output...
Dataset count: 49660
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG|   DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER|__LSA_CYCLE__|__record_timestamp__|__record_version__|           acqStamp|  class|         cyclestamp| device|          property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|    0| 11658|  25| null|   16| null|PS_DUMP| NONE|   1|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|24192| 2434| null|ZERO|     ~~ZERO~~| 1586563201900000000|                 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
|    0| 11658|  29| null|   20| null|PS_DUMP| NONE|   1|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|24192| 2434| null|ZERO|     ~~ZERO~~| 1586563206700000000|                 0|1586563206700000000|CPS.TGM|1586563206700000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows

Get data for entity and perform lookup regardless if data exists

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");

ExtractionProperties properties = ExtractionProperties.builder()
        .timeWindow(startTime, endTime)
        .lookupStrategy(LookupStrategy.LAST_BEFORE_START).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);

System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy

DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())
dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(LookupStrategy.LAST_BEFORE_START).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())
from jpype.types import JInt
dataset.show(JInt(2))

As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).

Finally, we specify the lookup strategy (LAST_BEFORE_START) that would be applied during the extraction process. By specifying this strategy, we instruct the process to always perform data lookup.

In this example we expect data for the given time window, plus one (1) extra value that would be added to the original example. The additional value would be the product of the lookup action that will take place.

Click to see expected application output...
Dataset count: 49661 (+1 value from the above example, as product of the lookup action)
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG|   DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER|       __LSA_CYCLE__|__record_timestamp__|__record_version__|           acqStamp|  class|         cyclestamp| device|          property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|    0| 16529|   1| null|    1| null|PS_DUMP| NONE|   3|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|21710| 2434| null|LHC4|LHC25#48B_BCMS_PS...| 1586476799500000000|                 0|1586476799500000000|CPS.TGM|1586476799500000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
|    0| 11658|  25| null|   16| null|PS_DUMP| NONE|   1|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|24192| 2434| null|ZERO|            ~~ZERO~~| 1586563201900000000|                 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+--------------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows

Get data for entity and perform lookup with custom lookup duration

Map<String, Object> keyValues = ImmutableMap
        .of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC");
Entity myEntity = entityService.findOne(Entities.suchThat()
        .keyValues().eq(cmwSystemSpec, keyValues))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain entity from service"));
Instant startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000");
Instant endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000");

LookupStrategy customLookupStrategy = LookupStrategy.LAST_BEFORE_START
        .withLookupDuration(8, ChronoUnit.HOURS);
ExtractionProperties properties = ExtractionProperties.builder()
        .timeWindow(startTime, endTime)
        .lookupStrategy(customLookupStrategy).build();
Dataset<Row> dataset = extractionService.getData(myEntity, properties);

System.out.println("Dataset count: " + dataset.count());
dataset.show(2);
ServiceClientFactory = spark._jvm.cern.nxcals.api.extraction.metadata.ServiceClientFactory
Entities = spark._jvm.cern.nxcals.api.extraction.metadata.queries.Entities
TimeUtils = spark._jvm.cern.nxcals.api.utils.TimeUtils
ChronoUnit = spark._jvm.java.time.temporal.ChronoUnit
ExtractionProperties = spark._jvm.cern.nxcals.api.custom.service.extraction.ExtractionProperties
LookupStrategy = spark._jvm.cern.nxcals.api.custom.service.extraction.LookupStrategy

DEVICE_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.DEVICE_KEY_NAME
PROPERTY_KEY_NAME = spark._jvm.cern.nxcals.api.custom.domain.CmwSystemConstants.PROPERTY_KEY_NAME
ImmutableMap = spark._jvm.com.google.common.collect.ImmutableMap

entityService = ServiceClientFactory.createEntityService()
systemService = ServiceClientFactory.createSystemSpecService()

cmwSystemSpec = systemService.findByName("CMW")

if cmwSystemSpec.isEmpty():
    raise ValueError("No such system")

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

customLookupStrategy = LookupStrategy.LAST_BEFORE_START.withLookupDuration(8, ChronoUnit.HOURS)
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(customLookupStrategy).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())
dataset.show(2)
from cern.nxcals.api.extraction.metadata.queries import Entities
from cern.nxcals.api.utils import TimeUtils
from java.time.temporal import ChronoUnit
from cern.nxcals.api.custom.service.extraction import ExtractionProperties, LookupStrategy
from cern.nxcals.api.custom.domain.CmwSystemConstants import DEVICE_KEY_NAME, PROPERTY_KEY_NAME
from com.google.common.collect import ImmutableMap

keyValues = ImmutableMap.of(DEVICE_KEY_NAME, "CPS.TGM", PROPERTY_KEY_NAME, "FULL-TELEGRAM.STRC")
myEntity = entityService.findOne(Entities.suchThat().keyValues().eq(cmwSystemSpec.get(), keyValues))

if myEntity.isEmpty():
    raise ValueError("Could not obtain entity from service")

startTime = TimeUtils.getInstantFromString("2020-04-10 00:00:00.000000000")
endTime = TimeUtils.getInstantFromString("2020-04-11 00:00:10.000000000")

customLookupStrategy = LookupStrategy.LAST_BEFORE_START.withLookupDuration(8, ChronoUnit.HOURS)
properties = ExtractionProperties.builder().timeWindow(startTime, endTime).lookupStrategy(customLookupStrategy).build()
dataset = extractionService.getData(myEntity.get(), properties)

print(dataset.count())
from jpype.types import JInt
dataset.show(JInt(2))

As we can see in the above example, the extraction process is controlled by the parameters provided to ExtractionProperties builder. In this common case, we specify time window for our entity data search and parameters to configure the extraction time window (1 day).

Finally, we specify the lookup strategy (LAST_BEFORE_START) with a custom lookup duration, that would be applied during the extraction process. By specifying this strategy, we instruct the process to always perform data lookup, but the duration would be only for the specified period. For this example the lookup duration is configured for 8 hours.

Click to see expected application output...
Dataset count: 49660 (8 hours lookup was not enough to find older data - the original dataset is therefore returned)
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|BATCH|BEAMID|BPNM|COMLN|CYCLE|CYTAG|   DEST|DEST2|DURN|FREE14|FREE4|MISC|MISC_A|NCOMLN|NCYTAG|NDEST|NDURN|NMISC|NPARTY|NUSER| PARTY|SCNUM|SCTAG|SPCON|USER|__LSA_CYCLE__|__record_timestamp__|__record_version__|           acqStamp|  class|         cyclestamp| device|          property|selector|nxcals_entity_id|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
|    0| 11658|  25| null|   16| null|PS_DUMP| NONE|   1|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|24192| 2434| null|ZERO|     ~~ZERO~~| 1586563201900000000|                 0|1586563201900000000|CPS.TGM|1586563201900000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
|    0| 11658|  29| null|   20| null|PS_DUMP| NONE|   1|  null| null|null|  null|  null|  null| null| null| null|  null| null|NOBEAM|24192| 2434| null|ZERO|     ~~ZERO~~| 1586563206700000000|                 0|1586563206700000000|CPS.TGM|1586563206700000000|CPS.TGM|FULL-TELEGRAM.STRC|    null|           46955|
+-----+------+----+-----+-----+-----+-------+-----+----+------+-----+----+------+------+------+-----+-----+-----+------+-----+------+-----+-----+-----+----+-------------+--------------------+------------------+-------------------+-------+-------------------+-------+------------------+--------+----------------+
only showing top 2 rows