Skip to content

This document presents possible NXCALS implementations of selected CALS extraction API methods. NXCALS examples make usage of Data Extraction API

Note

All CALS java examples used in the document assume initialization of metadata and timeseries services. NXCALS example requires Kerberos authentication and SparkSession being present (for more details refer to NXCALS example projects).

Corresponding code (for both projects) is presented below. In case of NXCALS there are 2 types of the initialization code. One making use of Spring Framework and the other based on vanilla Java:

import cern.accsoft.cals.extr.client.service.MetaDataService;
import cern.accsoft.cals.extr.client.service.ServiceBuilder;
import cern.accsoft.cals.extr.client.service.TimeseriesDataService;
import cern.accsoft.cals.extr.domain.core.datasource.DataLocationPreferences;

public class Demo {

    private MetaDataService metaDataService;
    private TimeseriesDataService timeseriesDataService;     

    public Demo() {
        final ServiceBuilder builder = ServiceBuilder
            .getInstance("TIMBER", "TIMBER", DataLocationPreferences.LDB_PRO);

        metaDataService = builder.createMetaService();
        timeseriesDataService = builder.createTimeseriesService();
    }

    public static void main(final String args[]) {

        final Demo demo = new Demo();

        // code snippets follow
    }
}

```java

import cern.nxcals.api.extraction.data.builders.DataQuery; import cern.nxcals.api.extraction.data.builders.DevicePropertyDataQuery; import org.apache.spark.SparkContext; import org.apache.spark.ml.feature.Bucketizer; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Import;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

// source the nxcals query builders

@SpringBootApplication
@Import(SparkContext.class)
public class NxcalsDemo {

    static {
        // for the exact settings please refer to example project
        System.setProperty("kerberos.keytab", "keytab_file_location");
        System.setProperty("kerberos.principal", "principal");
        System.setProperty("service.url", "NXCALS_service_url");
    }

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(NxcalsDemo.class, args);
        SparkSession spark = context.getBean(SparkSession.class);
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

public class NxcalsDemoNoSpring {

    static {
        // for the exact settings please refer to example project
        System.setProperty("kerberos.keytab", "keytab_file_location");
        System.setProperty("kerberos.principal", "principal");
        System.setProperty("service.url", "NXCALS_service_url");
    }

    private static final String HDP_CLUSTER_SPARK_JAVA_HOME = "/var/nxcals/jdk1.11";

    public static void main(String[] args) {

        SparkConf sparkConf = createSparkConf();
        SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();

        // code snippets follow
    }

    // Creates Spark configuration
    private static SparkConf createSparkConf() {
        return new SparkConf().setAppName("Example-Application")
                .setMaster("local[*]")
                .set("spark.submit.deployMode", "client")
                .set("spark.ui.port", "4050")

                .set("spark.yarn.appMasterEnv.JAVA_HOME",  HDP_CLUSTER_SPARK_JAVA_HOME)
                .set("yarn.app.mapreduce.am.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
                .set("mapreduce.map.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
                .set("mapreduce.reduce.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)

                .set("spark.yarn.jars", "hdfs:///project/nxcals/lib/spark-/*.jar")
                .set("spark.yarn.am.extraClassPath", "/usr/hdp/hadoop/lib/native")

                .set("spark.executor.extraClassPath", "/usr/hdp/hadoop/lib/native")
                .set("spark.executor.instances", "4")
                .set("spark.executor.cores", "1")
                .set("spark.executor.memory", "1g")
                .set("spark.executorEnv.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)

                .set("sql.caseSensitive", "true")
                .set("spark.kerberos.access.hadoopFileSystems", "nxcals");
    }
}
    ...
}

Getting data for the most recent value prior to the time window

CALS implementation

Variable var = demo.metaDataService
  .getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
  .getVariable(0);

Timestamp refTime = Timestamp.from(Instant.parse("2018-05-23T00:05:54.500Z"));
LoggingTimeInterval loggingTimeInterval = LoggingTimeInterval.DAY;

demo.timeseriesDataService
  .getLastDataPriorToTimestampWithinUserInterval(var, refTime, loggingTimeInterval)
  .print(LoggingTimeZone.UTC_TIME);
    Timestamp(UTC_TIME): "2018-05-23 00:05:53.500" Value: 298.55

NXCALS equivalent

Assuming that interval duration prior to the beginning of time window (reference time) is equal 1 day:

# Necessary libraries.
# The most convenient way for handling of timestamps with nanoseconds is Numpy:

import numpy

ref_time = numpy.datetime64('2018-05-23T00:05:54.500')
interval_start = ref_time  - numpy.timedelta64(1,'D')

df = DevicePropertyDataQuery.builder(spark).system('CMW') \
    .startTime(interval_start).endTime(ref_time) \
    .entity().device('ZT10.QFO03').property('Acquisition').build() \
    .select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False).limit(1)

df.show()
// source the nxcals query builders
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");
Instant refTime = LocalDateTime.parse("2018-05-23 00:05:54.500000000", FORMATTER).toInstant(ZoneOffset.UTC);
Instant intervalStart = refTime.minus((Duration.ofDays(1)));

Dataset<Row> df = DevicePropertyDataQuery.builder(spark).system("CMW")
        .startTime(intervalStart).endTime(refTime)
        .entity().device("ZT10.QFO03").property("Acquisition").build()
        .select("cyclestamp", "current").orderBy(functions.desc("cyclestamp")).limit(1);

df.show();
+-------------------+---------+
|         cycleStamp|  current|
+-------------------+---------+
|1527033953500000000|298.55392|
+-------------------+---------+

Getting data for a variable within a time window and the most recent value prior to the time window

CALS implementation

Variable var = demo.metaDataService
    .getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
    .getVariable(0);

Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-23T13:30:00.000Z"));

demo.timeseriesDataService
        .getDataInTimeWindowAndLastDataPriorToTimeWindowWithinDefaultInterval(
                var, startTime, endTime
        ).stream().sorted(Comparator.comparing(TimeseriesData::getStamp))
        .limit(5).forEach(Demo::printValue);

...

private static void printValue(TimeseriesData data) {
    try {
        System.out.printf("%s -> %f\n",
                data.getFormattedStamp(LoggingTimeZone.UTC_TIME), data.getDoubleValue());
    } catch (NoSuchMethodException e) {
        e.printStackTrace();
    }
}
2018-05-23 00:05:53.500 -> 298.550000
2018-05-23 00:05:54.700 -> 298.590000
2018-05-23 00:05:55.900 -> 298.600000
2018-05-23 00:05:57.100 -> 298.580000
2018-05-23 00:05:58.300 -> 298.610000   

NXCALS equivalent

Assuming that interval duration prior to the beginning of time window (start time) is equal 1 month. It is not exactly the same implementation as done in CALS where we sample data before a reference time starting with small data range and extending it in case the most recent values is still not found.

The example below would have to use the iterations implemented directly in the code or through one of the available timeseries libraries for Spark (no concrete recommendations at the moment of writing of this document).

# Necessary libraries:
import numpy

start_time = numpy.datetime64('2018-05-21 00:00:00.000')
interval_start = start_time - numpy.timedelta64(30,'D')

df= DevicePropertyDataQuery.builder(spark).system('CMW') \
    .startTime(start_time).endTime('2018-05-23 13:30:00.000') \
    .entity().device('ZT10.QFO03').property('Acquisition').build() \
    .select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False) \
    .unionAll ( \
    DevicePropertyDataQuery.builder(spark).system('CMW') \
        .startTime(interval_start).endTime(start_time) \
        .entity().device('ZT10.QFO03').property('Acquisition').build() \
        .select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False).limit(1))

df.sort('cyclestamp').show(5)    
DateTimeFormatter FORMATTER =  DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");
Instant startTime = LocalDateTime.parse("2018-05-21 00:00:00.000000000", FORMATTER).toInstant(ZoneOffset.UTC);
Instant intervalStart = startTime.atOffset(ZoneOffset.UTC).minus(1, ChronoUnit.MONTHS).toInstant();

Dataset<Row> df= DevicePropertyDataQuery.builder(spark).system("CMW")
        .startTime(startTime).endTime("2018-05-23 13:30:00.000")
        .entity().device("ZT10.QFO03").property("Acquisition").build()
        .select("cyclestamp", "current").orderBy(functions.desc("cyclestamp"))
        .unionAll (
                DevicePropertyDataQuery.builder(spark).system("CMW")
                        .startTime(intervalStart).endTime(startTime)
                        .entity().device("ZT10.QFO03").property("Acquisition").build()
                        .select("cyclestamp", "current").orderBy(functions.desc("cyclestamp")).limit(1));

df.sort("cyclestamp").show(5);
+-------------------+---------+
|         cycleStamp|  current|
+-------------------+---------+
|1526860799500000000|298.55893|
|1526860801900000000|298.59607|
|1526860803100000000|298.61493|
|1526860804300000000| 298.5677|
|1526860805500000000|298.56143|
+-------------------+---------+
only showing top 5 rows

Getting data for a variable within a time window filtered by timestamps

Important

For this particular example, we have selected a signal being cycle timestamped in order to obtain the same values at the same timestamps for both CALS and NXCALS. That could not be the case for signals using acquisition timestamps (different for both systems).

CALS implementation

final Demo demo = new Demo();

Variable var = demo.metaDataService
    .getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
    .getVariable(0);

List<Timestamp> stamps = Arrays.asList(
    Timestamp.from(Instant.parse("2018-05-23T00:05:54.700Z")),
    Timestamp.from(Instant.parse("2018-05-23T04:35:59.500Z")),
    Timestamp.from(Instant.parse("2018-05-23T05:11:29.500Z"))
);

demo.timeseriesDataService.getDataFilteredByTimestamps(var, stamps)
    .print(LoggingTimeZone.UTC_TIME);

Variable: ZT10.QFO03.ACQUISITION:CURRENT
    Timestamp(UTC_TIME): "2018-05-23 00:05:53.500" Value: 298.55
    Timestamp(UTC_TIME): "2018-05-23 01:15:53.500" Value: 298.58
    Timestamp(UTC_TIME): "2018-05-23 02:21:19.900" Value: 298.6
NXCALS equivalent

Getting initial data within specified time window which we would like to filter using 3 timestamps:

# Necessary libraries:
from datetime import datetime
import numpy

df = DevicePropertyDataQuery.builder(spark).system('CMW') \
    .startTime('2018-05-21 00:00:00.000').endTime('2018-05-23 13:30:00.000') \
    .entity().device('ZT10.QFO03').property('Acquisition').build()

df.select('cyclestamp','current').show()
DateTimeFormatter FORMATTER =  DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");

Dataset<Row> df = DevicePropertyDataQuery.builder(spark).system("CMW")
        .startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000")
        .entity().device("ZT10.QFO03").property("Acquisition").build();

df.select("cyclestamp","current").show();

+-------------------+---------+
|         cycleStamp|  current|
+-------------------+---------+
|1527033953500000000|298.55392|
|1527034067500000000|298.55743|
|1527034183900000000|298.59647|
|1527034625500000000|298.55054|
|1527034773100000000|      0.0|
|1527034803100000000|298.57617|
|1527034879900000000|298.61642|
|1527035437900000000| 298.5551|
|1527035665900000000|298.54538|
|1527035813500000000|298.59042|
|1527036504700000000|298.55646|
|1527037625500000000|298.59042|
|1527037686700000000|298.57977|
|1527038153500000000|298.57965|
|1527038187100000000|298.57898|
|1527038679100000000|298.56552|
|1527039022300000000| 298.5598|
|1527039244300000000|298.57782|
|1527039289900000000| 298.5763|
|1527039459100000000|298.57492|
+-------------------+---------+
only showing top 20 rows
Creation of dataframe containing 3 timestamps:

df2 = spark.createDataFrame([
    Row(times=numpy.datetime64('2018-05-23T00:05:53.500000000').astype(datetime)),
    Row(times=numpy.datetime64('2018-05-23T01:15:53.500000000').astype(datetime)),
    Row(times=numpy.datetime64('2018-05-23T02:21:19.900000000').astype(datetime))])
private static Row createRowWithTimeinNanos(String localDateString, DateTimeFormatter formatter) {

    Instant instantTime = LocalDateTime.parse(localDateString, formatter).toInstant(ZoneOffset.UTC);
    return RowFactory
            .create(TimeUnit.SECONDS.toNanos(instantTime.getEpochSecond()) + instantTime.getNano());
}

...

StructField[] structFields = new StructField[]{
        new StructField("times", DataTypes.LongType, false, Metadata.empty())
};

StructType structType = new StructType(structFields);

List<Row> rows = new ArrayList<>();
rows.add(createRowWithTimeinNanos("2018-05-23 00:05:53.500000000", FORMATTER));
rows.add(createRowWithTimeinNanos("2018-05-23 01:15:53.500000000", FORMATTER));
rows.add(createRowWithTimeinNanos("2018-05-23 02:21:19.900000000", FORMATTER));

Dataset<Row> df2 = spark.createDataFrame(rows, structType);

Final result:

df.join(df2, df.cyclestamp == df2.times ) \
    .select('cyclestamp', 'current').show()
df.join(df2, df.col("cyclestamp").equalTo(df2.col("times")))
        .select("cyclestamp", "current").show();  
+-------------------+---------+
|         cycleStamp|  current|
+-------------------+---------+
|1527038153500000000|298.57965|
|1527033953500000000|298.55392|
|1527042079900000000|298.60202|
+-------------------+---------+

In order to verify correctness of timestamps we can use Numpy library again:

print(numpy.datetime64(1527038153500000000,'ns'))
System.out.println(Instant.ofEpochMilli(1527038153500l).plusNanos(0));
2018-05-23T01:15:53.500000000

Getting data distribution for a variable within a time window

CALS implementation

Variable var = demo.metaDataService
        .getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
        .getVariable(0);
Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-23T13:30:00.000Z"));

DataDistributionBinSet dataDistributionBinSet = demo.timeseriesDataService
    .getDataDistributionInTimewindowForBinsNumber(var,startTime, endTime,10);

for(int bucketNr=0; bucketNr<10; bucketNr++) {
    System.out.println(dataDistributionBinSet.get(bucketNr));
}
DataDistributionBinImpl [bottomLimit=-0.13, topLimit=30.129, valuesCount=615]
DataDistributionBinImpl [bottomLimit=30.129, topLimit=60.388, valuesCount=4]
DataDistributionBinImpl [bottomLimit=60.388, topLimit=90.647, valuesCount=0]
DataDistributionBinImpl [bottomLimit=90.647, topLimit=120.906, valuesCount=1]
DataDistributionBinImpl [bottomLimit=120.906, topLimit=151.165, valuesCount=28]
DataDistributionBinImpl [bottomLimit=151.165, topLimit=181.424, valuesCount=42]
DataDistributionBinImpl [bottomLimit=181.424, topLimit=211.683, valuesCount=32]
DataDistributionBinImpl [bottomLimit=211.683, topLimit=241.942, valuesCount=11]
DataDistributionBinImpl [bottomLimit=241.942, topLimit=272.201, valuesCount=0]
DataDistributionBinImpl [bottomLimit=272.201, topLimit=302.46, valuesCount=139055]

NXCALS equivalent

Importing necessary libraries including "functions" module for aggregates:

from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import max, minn

Retrieving sample data:

data = DevicePropertyDataQuery.builder(spark).system('CMW') \
    .startTime('2018-05-21 00:00:00.000').endTime('2018-05-23 13:30:00.000') \
    .entity().device('ZT10.QFO03').property('Acquisition').build() \
    .select('current')
Dataset<Row> data = DevicePropertyDataQuery.builder(spark).system("CMW")
        .startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000")
        .entity().device("ZT10.QFO03").property("Acquisition").build()
        .select("current");

Retrieving min / max values:

minmax=data.select(min('current').alias('minval'), max('current') \
                   .alias('maxval')).collect()[0]
print(minmax)
Row minmax = ((Row[])(data.select(functions.min("current")
        .alias("minval"), functions.max("current").alias("maxval"))).collect())[0];
Row(minval=-0.12526702880859375, maxval=302.4573974609375)

Creating 10 buckets for values range:

nr_buckets = 10
width = (minmax.maxval - minmax.minval) / nr_buckets

print(width)

splits = [ minmax.minval + width * x for x in range(1, nr_buckets - 1) ]
splits = [-float("inf")]+[minmax.minval]+splits+[minmax.maxval]+[float("inf")]
print(splits)

bucketizer = Bucketizer(splits=splits, inputCol="current", outputCol="bucketedFeatures")
bucketed_data = bucketizer.transform(data)
int nrBuckets = 10;
float minVal = minmax.getFloat(0);
float maxVal = minmax.getFloat(1);

float inf = Float.MAX_VALUE;

double width = (maxVal - minVal) / nrBuckets;
System.out.print(width);

double[] splits = new double[nrBuckets + 3];
splits[0] = -inf;
splits[ nrBuckets +2 ] = inf;
for (int x = 0; x<=nrBuckets; x++) {
    splits[x+1] = minVal + width * x;
}

Bucketizer bucketizer = new Bucketizer()
        .setSplits(splits).setInputCol("current").setOutputCol("bucketedFeatures");
Dataset<Row> bucketedData = bucketizer.transform(data);
30.25826644897461
[-inf, -0.12526702880859375, 30.132999420166016, 60.391265869140625, 90.64953231811523, 120.90779876708984, 151.16606521606445, 181.42433166503906, 211.68259811401367, 241.94086456298828, 302.4573974609375, inf]

Verifying results:

bucketed_data.select('bucketedFeatures').sort('bucketedFeatures') \
    .groupBy('bucketedFeatures').count().show()
bucketedData.select("bucketedFeatures").sort("bucketedFeatures")
        .groupBy("bucketedFeatures").count().show();
+----------------+------+
|bucketedFeatures| count|
+----------------+------+
|             1.0|   615|
|             2.0|     4|
|             4.0|     1|
|             5.0|    28|
|             6.0|    42|
|             7.0|    32|
|             8.0|    11|
|             9.0|139054|
|            10.0|     1|
+----------------+------+

Getting data for a vector numeric variable within a time window filtered by indices

CALS implementation

var = demo.metaDataService
        .getVariablesWithNameInListofStrings(Arrays.asList("SPS.BCTDC.51895:TOTAL_INTENSITY"))
        .getVariable(0);

Timestamp startTime = Timestamp.from(Instant.parse("2018-06-21T00:15:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-06-21T00:20:00.000Z"));

int[] indices = new int[] { 2, 4 };

demo.timeseriesDataService
        .getVectornumericDataInTimeWindowFilteredByVectorIndices(
                var, startTime, endTime, indices
        ).stream().sorted(Comparator.comparing(TimeseriesData::getStamp))
        .limit(5).forEach(Demo::printValues);

...

private static void printValues(TimeseriesData data) {
    try {
        double[] values = data.getDoubleValues();
        System.out.printf("%s -> [%f, %f]\n", data.getFormattedStamp(LoggingTimeZone.UTC_TIME), values[0],
                values[1]);
    } catch (NoSuchMethodException e) {
        e.printStackTrace();
    }
}
2018-06-21 00:15:18.135 -> [0.216857, 0.182584]
2018-06-21 00:15:21.735 -> [540.232060, 836.190700]
2018-06-21 00:15:48.135 -> [0.219973, 0.189438]
2018-06-21 00:15:51.735 -> [531.002600, 823.269400]
2018-06-21 00:16:18.135 -> [0.292881, 0.284157]

NXCALS equivalent

Getting some vectornumeric data within a time window:

```python

Dataset<Row> df = DataQuery.builder(spark).byVariables().system("CMW")
        .startTime("2018-05-21 00:00:00.000").endTime("2018-05-21 00:05:00.000")
        .variable("SPS.BCTDC.51895:TOTAL_INTENSITY")
        .build();

Getting rows with timestamp, value pairs:

inplist=df.select('nxcals_timestamp','nxcals_value').collect()
List<Row> inplist= df.select("nxcals_timestamp","nxcals_value").collectAsList();

Extracting columns 2 and 4 from each vector and returning it as a dataframe:

indlist=[1,3]
outlist=[]

for element in inplist:
    retarray=[]
    if element.nxcals_value is not None:
        for ind in indlist:
            retarray.append(element.nxcals_value.elements[ind])
        outlist.append(Row(nxcals_timestamp=element.nxcals_timestamp, nxcals_value=Row(elements=retarray, dimensions=[len(retarray)])))
    else:
        outlist.append(Row(nxcals_timestamp=element.nxcals_timestamp, nxcals_value=None))

outdf=spark.createDataFrame(outlist)

As result from the above operation the following elements:

df.sort('nxcals_timestamp').select('nxcals_timestamp','nxcals_value').limit(5).collect()
df.sort("nxcals_timestamp").select("nxcals_timestamp","nxcals_value").limit(5).show();

got converted to:

outdf.sort('nxcals_timestamp').limit(5).collect()
df.sort("nxcals_timestamp").limit(5).show();
[Row(nxcals_timestamp=1526860806135000000, nxcals_value=Row(dimensions=[2], elements=[1040.4697265625, 1572.702880859375])),
 Row(nxcals_timestamp=1526860816935000000, nxcals_value=None),
 Row(nxcals_timestamp=1526860824135000000, nxcals_value=None),
 Row(nxcals_timestamp=1526860834935000000, nxcals_value=Row(dimensions=[2], elements=[17.86440658569336, 33.05991744995117])),
 Row(nxcals_timestamp=1526860842135000000, nxcals_value=Row(dimensions=[2], elements=[1034.31689453125, 1561.6275634765625]))]

Getting multi column timeseries data set

CALS implementation

VariableSet vars = demo.metaDataService
    .getVariablesWithNameInListofStrings(
            Arrays.asList("SPS.BCTDC.41435:INT_FLATTOP", "SPS.BCTDC.51895:SBF_INTENSITY"));

Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-21T00:24:00.000Z"));

MultiColumnTimeseriesDataSet multiColumn = demo.timeseriesDataService
        .getMultiColumnDataInTimeWindow(vars, startTime, endTime);

multiColumn.getTimestamps().stream().sorted(Comparator.reverseOrder()).limit(2)
        .forEach(t -> {
            System.out.println(multiColumn.getRowOfData(t));
        });
[-9.17, 0.3198]
[7648.55, 2824.8354]

NXCALS equivalent

As an input we provide a list of variables (no restriction on datatype) and a time window. For an output we expect to obtain a list of combined timestamps from all the variables with corresponding multiple values (if available for a given timestamp/variable).

Selection of 2 variables (textual and vectornumeric):

vdf1 = DataQuery.builder(spark).byVariables().system('CMW') \
    .startTime('2018-05-20 00:00:00.000').endTime('2018-05-20 00:12:00.000') \
    .variable('SPS.BCTDC.41435:INT_FLATTOP') \
    .build() \
    .withColumnRenamed('nxcals_value', 'value1')
vdf2 = DataQuery.builder(spark).byVariables().system('CMW') \
    .startTime('2018-05-20 00:00:00.000').endTime('2018-05-20 00:12:00.000') \
    .variable('SPS.BCTDC.51895:SBF_INTENSITY') \
    .build() \
    .withColumnRenamed('nxcals_value', 'value2')
Dataset<Row> vdf1 = DataQuery.builder(spark).byVariables().system("CMW")
        .startTime("2018-05-20 00:00:00.000").endTime("2018-05-20 00:12:00.000")
        .variable("SPS.BCTDC.41435:INT_FLATTOP")
        .build()
        .withColumnRenamed("nxcals_value", "value1");
Dataset<Row> vdf2 = DataQuery.builder(spark).byVariables().system("CMW")
        .startTime("2018-05-20 00:00:00.000").endTime("2018-05-20 00:12:00.000")
        .variable("SPS.BCTDC.51895:SBF_INTENSITY")
        .build()
        .withColumnRenamed("nxcals_value", "value2");

Creation of combined and unique timestamps:

tsdf = vdf1.select('nxcals_timestamp') \
    .union( vdf2.select('nxcals_timestamp')).distinct()
Dataset<Row> tsdf = vdf1.select("nxcals_timestamp")
        .union( vdf2.select("nxcals_timestamp")).distinct();

Rename of identical column names (for the final selection). Since textual signals can be "nullable" in order to distinguish them from non-existent values for a given timestamp they are replaced with "N/A" literal:

vdf1=vdf1.withColumnRenamed('nxcals_timestamp', 'ts1').fillna("N/A")
vdf2=vdf2.withColumnRenamed('nxcals_timestamp', 'ts2')
vdf1 = vdf1.withColumnRenamed("nxcals_timestamp", "ts1").na().fill("N/A");
vdf2 = vdf2.withColumnRenamed("nxcals_timestamp", "ts2");

Final join with timestamps:

outdf=tsdf.join(vdf1, tsdf.nxcals_timestamp == vdf1.ts1, how='leftouter') \
    .join(vdf2, tsdf.nxcals_timestamp == vdf2.ts2, how='leftouter')

outdf.select('nxcals_timestamp','value1','value2') \
    .sort('nxcals_timestamp', ascending=False).show(2)
Dataset<Row> outdf=tsdf.join(vdf1, tsdf.col("nxcals_timestamp").equalTo(vdf1.col("ts1")), "leftouter")
        .join(vdf2, tsdf.col("nxcals_timestamp").equalTo(vdf2.col("ts2")), "leftouter");

outdf.select("nxcals_timestamp","value1","value2")
        .sort(functions.desc("nxcals_timestamp")).show(2);
+-------------------+----------+---------+
|   nxcals_timestamp|    value1|   value2|
+-------------------+----------+---------+
|1526775118935000000|-6.5800004|0.3047213|
|1526775108135000000|  7648.546|2883.2888|
+-------------------+----------+---------+

only showing top 2 rows