This document presents possible NXCALS implementations of selected CALS extraction API methods. NXCALS examples make usage of Data Extraction API
Note
All CALS java examples used in the document assume initialization of metadata and timeseries services. NXCALS example requires Kerberos authentication and SparkSession being present (for more details refer to NXCALS example projects).
Corresponding code (for both projects) is presented below. In case of NXCALS there are 2 types of the initialization code. One making use of Spring Framework and the other based on vanilla Java:
import cern.accsoft.cals.extr.client.service.MetaDataService;
import cern.accsoft.cals.extr.client.service.ServiceBuilder;
import cern.accsoft.cals.extr.client.service.TimeseriesDataService;
import cern.accsoft.cals.extr.domain.core.datasource.DataLocationPreferences;
public class Demo {
private MetaDataService metaDataService;
private TimeseriesDataService timeseriesDataService;
public Demo() {
final ServiceBuilder builder = ServiceBuilder
.getInstance("TIMBER", "TIMBER", DataLocationPreferences.LDB_PRO);
metaDataService = builder.createMetaService();
timeseriesDataService = builder.createTimeseriesService();
}
public static void main(final String args[]) {
final Demo demo = new Demo();
// code snippets follow
}
}
```java
import cern.nxcals.api.extraction.data.builders.DataQuery; import cern.nxcals.api.extraction.data.builders.DevicePropertyDataQuery; import org.apache.spark.SparkContext; import org.apache.spark.ml.feature.Bucketizer; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Import;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
// source the nxcals query builders
@SpringBootApplication
@Import(SparkContext.class)
public class NxcalsDemo {
static {
// for the exact settings please refer to example project
System.setProperty("kerberos.keytab", "keytab_file_location");
System.setProperty("kerberos.principal", "principal");
System.setProperty("service.url", "NXCALS_service_url");
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NxcalsDemo.class, args);
SparkSession spark = context.getBean(SparkSession.class);
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class NxcalsDemoNoSpring {
static {
// for the exact settings please refer to example project
System.setProperty("kerberos.keytab", "keytab_file_location");
System.setProperty("kerberos.principal", "principal");
System.setProperty("service.url", "NXCALS_service_url");
}
private static final String HDP_CLUSTER_SPARK_JAVA_HOME = "/var/nxcals/jdk1.11";
public static void main(String[] args) {
SparkConf sparkConf = createSparkConf();
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
// code snippets follow
}
// Creates Spark configuration
private static SparkConf createSparkConf() {
return new SparkConf().setAppName("Example-Application")
.setMaster("local[*]")
.set("spark.submit.deployMode", "client")
.set("spark.ui.port", "4050")
.set("spark.yarn.appMasterEnv.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
.set("yarn.app.mapreduce.am.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
.set("mapreduce.map.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
.set("mapreduce.reduce.env.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
.set("spark.yarn.jars", "hdfs:///project/nxcals/lib/spark-/*.jar")
.set("spark.yarn.am.extraClassPath", "/usr/hdp/hadoop/lib/native")
.set("spark.executor.extraClassPath", "/usr/hdp/hadoop/lib/native")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", "1")
.set("spark.executor.memory", "1g")
.set("spark.executorEnv.JAVA_HOME", HDP_CLUSTER_SPARK_JAVA_HOME)
.set("sql.caseSensitive", "true")
.set("spark.kerberos.access.hadoopFileSystems", "nxcals");
}
}
...
}
Getting data for the most recent value prior to the time window
CALS implementation
Variable var = demo.metaDataService
.getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
.getVariable(0);
Timestamp refTime = Timestamp.from(Instant.parse("2018-05-23T00:05:54.500Z"));
LoggingTimeInterval loggingTimeInterval = LoggingTimeInterval.DAY;
demo.timeseriesDataService
.getLastDataPriorToTimestampWithinUserInterval(var, refTime, loggingTimeInterval)
.print(LoggingTimeZone.UTC_TIME);
Timestamp(UTC_TIME): "2018-05-23 00:05:53.500" Value: 298.55
NXCALS equivalent
Assuming that interval duration prior to the beginning of time window (reference time) is equal 1 day:
# Necessary libraries.
# The most convenient way for handling of timestamps with nanoseconds is Numpy:
import numpy
ref_time = numpy.datetime64('2018-05-23T00:05:54.500')
interval_start = ref_time - numpy.timedelta64(1,'D')
df = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime(interval_start).endTime(ref_time) \
.entity().device('ZT10.QFO03').property('Acquisition').build() \
.select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False).limit(1)
df.show()
// source the nxcals query builders
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");
Instant refTime = LocalDateTime.parse("2018-05-23 00:05:54.500000000", FORMATTER).toInstant(ZoneOffset.UTC);
Instant intervalStart = refTime.minus((Duration.ofDays(1)));
Dataset<Row> df = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime(intervalStart).endTime(refTime)
.entity().device("ZT10.QFO03").property("Acquisition").build()
.select("cyclestamp", "current").orderBy(functions.desc("cyclestamp")).limit(1);
df.show();
+-------------------+---------+
| cycleStamp| current|
+-------------------+---------+
|1527033953500000000|298.55392|
+-------------------+---------+
Getting data for a variable within a time window and the most recent value prior to the time window
CALS implementation
Variable var = demo.metaDataService
.getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
.getVariable(0);
Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-23T13:30:00.000Z"));
demo.timeseriesDataService
.getDataInTimeWindowAndLastDataPriorToTimeWindowWithinDefaultInterval(
var, startTime, endTime
).stream().sorted(Comparator.comparing(TimeseriesData::getStamp))
.limit(5).forEach(Demo::printValue);
...
private static void printValue(TimeseriesData data) {
try {
System.out.printf("%s -> %f\n",
data.getFormattedStamp(LoggingTimeZone.UTC_TIME), data.getDoubleValue());
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
2018-05-23 00:05:53.500 -> 298.550000
2018-05-23 00:05:54.700 -> 298.590000
2018-05-23 00:05:55.900 -> 298.600000
2018-05-23 00:05:57.100 -> 298.580000
2018-05-23 00:05:58.300 -> 298.610000
NXCALS equivalent
Assuming that interval duration prior to the beginning of time window (start time) is equal 1 month. It is not exactly the same implementation as done in CALS where we sample data before a reference time starting with small data range and extending it in case the most recent values is still not found.
The example below would have to use the iterations implemented directly in the code or through one of the available timeseries libraries for Spark (no concrete recommendations at the moment of writing of this document).
# Necessary libraries:
import numpy
start_time = numpy.datetime64('2018-05-21 00:00:00.000')
interval_start = start_time - numpy.timedelta64(30,'D')
df= DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime(start_time).endTime('2018-05-23 13:30:00.000') \
.entity().device('ZT10.QFO03').property('Acquisition').build() \
.select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False) \
.unionAll ( \
DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime(interval_start).endTime(start_time) \
.entity().device('ZT10.QFO03').property('Acquisition').build() \
.select('cyclestamp', 'current').orderBy('cyclestamp', ascending=False).limit(1))
df.sort('cyclestamp').show(5)
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");
Instant startTime = LocalDateTime.parse("2018-05-21 00:00:00.000000000", FORMATTER).toInstant(ZoneOffset.UTC);
Instant intervalStart = startTime.atOffset(ZoneOffset.UTC).minus(1, ChronoUnit.MONTHS).toInstant();
Dataset<Row> df= DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime(startTime).endTime("2018-05-23 13:30:00.000")
.entity().device("ZT10.QFO03").property("Acquisition").build()
.select("cyclestamp", "current").orderBy(functions.desc("cyclestamp"))
.unionAll (
DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime(intervalStart).endTime(startTime)
.entity().device("ZT10.QFO03").property("Acquisition").build()
.select("cyclestamp", "current").orderBy(functions.desc("cyclestamp")).limit(1));
df.sort("cyclestamp").show(5);
+-------------------+---------+
| cycleStamp| current|
+-------------------+---------+
|1526860799500000000|298.55893|
|1526860801900000000|298.59607|
|1526860803100000000|298.61493|
|1526860804300000000| 298.5677|
|1526860805500000000|298.56143|
+-------------------+---------+
only showing top 5 rows
Getting data for a variable within a time window filtered by timestamps
Important
For this particular example, we have selected a signal being cycle timestamped in order to obtain the same values at the same timestamps for both CALS and NXCALS. That could not be the case for signals using acquisition timestamps (different for both systems).
CALS implementation
final Demo demo = new Demo();
Variable var = demo.metaDataService
.getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
.getVariable(0);
List<Timestamp> stamps = Arrays.asList(
Timestamp.from(Instant.parse("2018-05-23T00:05:54.700Z")),
Timestamp.from(Instant.parse("2018-05-23T04:35:59.500Z")),
Timestamp.from(Instant.parse("2018-05-23T05:11:29.500Z"))
);
demo.timeseriesDataService.getDataFilteredByTimestamps(var, stamps)
.print(LoggingTimeZone.UTC_TIME);
Variable: ZT10.QFO03.ACQUISITION:CURRENT
Timestamp(UTC_TIME): "2018-05-23 00:05:53.500" Value: 298.55
Timestamp(UTC_TIME): "2018-05-23 01:15:53.500" Value: 298.58
Timestamp(UTC_TIME): "2018-05-23 02:21:19.900" Value: 298.6
Getting initial data within specified time window which we would like to filter using 3 timestamps:
# Necessary libraries:
from datetime import datetime
import numpy
df = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-05-21 00:00:00.000').endTime('2018-05-23 13:30:00.000') \
.entity().device('ZT10.QFO03').property('Acquisition').build()
df.select('cyclestamp','current').show()
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");
Dataset<Row> df = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000")
.entity().device("ZT10.QFO03").property("Acquisition").build();
df.select("cyclestamp","current").show();
+-------------------+---------+
| cycleStamp| current|
+-------------------+---------+
|1527033953500000000|298.55392|
|1527034067500000000|298.55743|
|1527034183900000000|298.59647|
|1527034625500000000|298.55054|
|1527034773100000000| 0.0|
|1527034803100000000|298.57617|
|1527034879900000000|298.61642|
|1527035437900000000| 298.5551|
|1527035665900000000|298.54538|
|1527035813500000000|298.59042|
|1527036504700000000|298.55646|
|1527037625500000000|298.59042|
|1527037686700000000|298.57977|
|1527038153500000000|298.57965|
|1527038187100000000|298.57898|
|1527038679100000000|298.56552|
|1527039022300000000| 298.5598|
|1527039244300000000|298.57782|
|1527039289900000000| 298.5763|
|1527039459100000000|298.57492|
+-------------------+---------+
only showing top 20 rows
df2 = spark.createDataFrame([
Row(times=numpy.datetime64('2018-05-23T00:05:53.500000000').astype(datetime)),
Row(times=numpy.datetime64('2018-05-23T01:15:53.500000000').astype(datetime)),
Row(times=numpy.datetime64('2018-05-23T02:21:19.900000000').astype(datetime))])
private static Row createRowWithTimeinNanos(String localDateString, DateTimeFormatter formatter) {
Instant instantTime = LocalDateTime.parse(localDateString, formatter).toInstant(ZoneOffset.UTC);
return RowFactory
.create(TimeUnit.SECONDS.toNanos(instantTime.getEpochSecond()) + instantTime.getNano());
}
...
StructField[] structFields = new StructField[]{
new StructField("times", DataTypes.LongType, false, Metadata.empty())
};
StructType structType = new StructType(structFields);
List<Row> rows = new ArrayList<>();
rows.add(createRowWithTimeinNanos("2018-05-23 00:05:53.500000000", FORMATTER));
rows.add(createRowWithTimeinNanos("2018-05-23 01:15:53.500000000", FORMATTER));
rows.add(createRowWithTimeinNanos("2018-05-23 02:21:19.900000000", FORMATTER));
Dataset<Row> df2 = spark.createDataFrame(rows, structType);
Final result:
df.join(df2, df.cyclestamp == df2.times ) \
.select('cyclestamp', 'current').show()
df.join(df2, df.col("cyclestamp").equalTo(df2.col("times")))
.select("cyclestamp", "current").show();
+-------------------+---------+
| cycleStamp| current|
+-------------------+---------+
|1527038153500000000|298.57965|
|1527033953500000000|298.55392|
|1527042079900000000|298.60202|
+-------------------+---------+
In order to verify correctness of timestamps we can use Numpy library again:
print(numpy.datetime64(1527038153500000000,'ns'))
System.out.println(Instant.ofEpochMilli(1527038153500l).plusNanos(0));
2018-05-23T01:15:53.500000000
Getting data distribution for a variable within a time window
CALS implementation
Variable var = demo.metaDataService
.getVariablesWithNameInListofStrings(Arrays.asList("ZT10.QFO03.ACQUISITION:CURRENT"))
.getVariable(0);
Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-23T13:30:00.000Z"));
DataDistributionBinSet dataDistributionBinSet = demo.timeseriesDataService
.getDataDistributionInTimewindowForBinsNumber(var,startTime, endTime,10);
for(int bucketNr=0; bucketNr<10; bucketNr++) {
System.out.println(dataDistributionBinSet.get(bucketNr));
}
DataDistributionBinImpl [bottomLimit=-0.13, topLimit=30.129, valuesCount=615]
DataDistributionBinImpl [bottomLimit=30.129, topLimit=60.388, valuesCount=4]
DataDistributionBinImpl [bottomLimit=60.388, topLimit=90.647, valuesCount=0]
DataDistributionBinImpl [bottomLimit=90.647, topLimit=120.906, valuesCount=1]
DataDistributionBinImpl [bottomLimit=120.906, topLimit=151.165, valuesCount=28]
DataDistributionBinImpl [bottomLimit=151.165, topLimit=181.424, valuesCount=42]
DataDistributionBinImpl [bottomLimit=181.424, topLimit=211.683, valuesCount=32]
DataDistributionBinImpl [bottomLimit=211.683, topLimit=241.942, valuesCount=11]
DataDistributionBinImpl [bottomLimit=241.942, topLimit=272.201, valuesCount=0]
DataDistributionBinImpl [bottomLimit=272.201, topLimit=302.46, valuesCount=139055]
NXCALS equivalent
Importing necessary libraries including "functions" module for aggregates:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import max, minn
Retrieving sample data:
data = DevicePropertyDataQuery.builder(spark).system('CMW') \
.startTime('2018-05-21 00:00:00.000').endTime('2018-05-23 13:30:00.000') \
.entity().device('ZT10.QFO03').property('Acquisition').build() \
.select('current')
Dataset<Row> data = DevicePropertyDataQuery.builder(spark).system("CMW")
.startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000")
.entity().device("ZT10.QFO03").property("Acquisition").build()
.select("current");
Retrieving min / max values:
minmax=data.select(min('current').alias('minval'), max('current') \
.alias('maxval')).collect()[0]
print(minmax)
Row minmax = ((Row[])(data.select(functions.min("current")
.alias("minval"), functions.max("current").alias("maxval"))).collect())[0];
Row(minval=-0.12526702880859375, maxval=302.4573974609375)
Creating 10 buckets for values range:
nr_buckets = 10
width = (minmax.maxval - minmax.minval) / nr_buckets
print(width)
splits = [ minmax.minval + width * x for x in range(1, nr_buckets - 1) ]
splits = [-float("inf")]+[minmax.minval]+splits+[minmax.maxval]+[float("inf")]
print(splits)
bucketizer = Bucketizer(splits=splits, inputCol="current", outputCol="bucketedFeatures")
bucketed_data = bucketizer.transform(data)
int nrBuckets = 10;
float minVal = minmax.getFloat(0);
float maxVal = minmax.getFloat(1);
float inf = Float.MAX_VALUE;
double width = (maxVal - minVal) / nrBuckets;
System.out.print(width);
double[] splits = new double[nrBuckets + 3];
splits[0] = -inf;
splits[ nrBuckets +2 ] = inf;
for (int x = 0; x<=nrBuckets; x++) {
splits[x+1] = minVal + width * x;
}
Bucketizer bucketizer = new Bucketizer()
.setSplits(splits).setInputCol("current").setOutputCol("bucketedFeatures");
Dataset<Row> bucketedData = bucketizer.transform(data);
30.25826644897461
[-inf, -0.12526702880859375, 30.132999420166016, 60.391265869140625, 90.64953231811523, 120.90779876708984, 151.16606521606445, 181.42433166503906, 211.68259811401367, 241.94086456298828, 302.4573974609375, inf]
Verifying results:
bucketed_data.select('bucketedFeatures').sort('bucketedFeatures') \
.groupBy('bucketedFeatures').count().show()
bucketedData.select("bucketedFeatures").sort("bucketedFeatures")
.groupBy("bucketedFeatures").count().show();
+----------------+------+
|bucketedFeatures| count|
+----------------+------+
| 1.0| 615|
| 2.0| 4|
| 4.0| 1|
| 5.0| 28|
| 6.0| 42|
| 7.0| 32|
| 8.0| 11|
| 9.0|139054|
| 10.0| 1|
+----------------+------+
Getting data for a vector numeric variable within a time window filtered by indices
CALS implementation
var = demo.metaDataService
.getVariablesWithNameInListofStrings(Arrays.asList("SPS.BCTDC.51895:TOTAL_INTENSITY"))
.getVariable(0);
Timestamp startTime = Timestamp.from(Instant.parse("2018-06-21T00:15:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-06-21T00:20:00.000Z"));
int[] indices = new int[] { 2, 4 };
demo.timeseriesDataService
.getVectornumericDataInTimeWindowFilteredByVectorIndices(
var, startTime, endTime, indices
).stream().sorted(Comparator.comparing(TimeseriesData::getStamp))
.limit(5).forEach(Demo::printValues);
...
private static void printValues(TimeseriesData data) {
try {
double[] values = data.getDoubleValues();
System.out.printf("%s -> [%f, %f]\n", data.getFormattedStamp(LoggingTimeZone.UTC_TIME), values[0],
values[1]);
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
2018-06-21 00:15:18.135 -> [0.216857, 0.182584]
2018-06-21 00:15:21.735 -> [540.232060, 836.190700]
2018-06-21 00:15:48.135 -> [0.219973, 0.189438]
2018-06-21 00:15:51.735 -> [531.002600, 823.269400]
2018-06-21 00:16:18.135 -> [0.292881, 0.284157]
NXCALS equivalent
Getting some vectornumeric data within a time window:
```python
Dataset<Row> df = DataQuery.builder(spark).byVariables().system("CMW")
.startTime("2018-05-21 00:00:00.000").endTime("2018-05-21 00:05:00.000")
.variable("SPS.BCTDC.51895:TOTAL_INTENSITY")
.build();
Getting rows with timestamp, value pairs:
inplist=df.select('nxcals_timestamp','nxcals_value').collect()
List<Row> inplist= df.select("nxcals_timestamp","nxcals_value").collectAsList();
Extracting columns 2 and 4 from each vector and returning it as a dataframe:
indlist=[1,3]
outlist=[]
for element in inplist:
retarray=[]
if element.nxcals_value is not None:
for ind in indlist:
retarray.append(element.nxcals_value.elements[ind])
outlist.append(Row(nxcals_timestamp=element.nxcals_timestamp, nxcals_value=Row(elements=retarray, dimensions=[len(retarray)])))
else:
outlist.append(Row(nxcals_timestamp=element.nxcals_timestamp, nxcals_value=None))
outdf=spark.createDataFrame(outlist)
As result from the above operation the following elements:
df.sort('nxcals_timestamp').select('nxcals_timestamp','nxcals_value').limit(5).collect()
df.sort("nxcals_timestamp").select("nxcals_timestamp","nxcals_value").limit(5).show();
got converted to:
outdf.sort('nxcals_timestamp').limit(5).collect()
df.sort("nxcals_timestamp").limit(5).show();
[Row(nxcals_timestamp=1526860806135000000, nxcals_value=Row(dimensions=[2], elements=[1040.4697265625, 1572.702880859375])),
Row(nxcals_timestamp=1526860816935000000, nxcals_value=None),
Row(nxcals_timestamp=1526860824135000000, nxcals_value=None),
Row(nxcals_timestamp=1526860834935000000, nxcals_value=Row(dimensions=[2], elements=[17.86440658569336, 33.05991744995117])),
Row(nxcals_timestamp=1526860842135000000, nxcals_value=Row(dimensions=[2], elements=[1034.31689453125, 1561.6275634765625]))]
Getting multi column timeseries data set
CALS implementation
VariableSet vars = demo.metaDataService
.getVariablesWithNameInListofStrings(
Arrays.asList("SPS.BCTDC.41435:INT_FLATTOP", "SPS.BCTDC.51895:SBF_INTENSITY"));
Timestamp startTime = Timestamp.from(Instant.parse("2018-05-21T00:00:00.000Z"));
Timestamp endTime = Timestamp.from(Instant.parse("2018-05-21T00:24:00.000Z"));
MultiColumnTimeseriesDataSet multiColumn = demo.timeseriesDataService
.getMultiColumnDataInTimeWindow(vars, startTime, endTime);
multiColumn.getTimestamps().stream().sorted(Comparator.reverseOrder()).limit(2)
.forEach(t -> {
System.out.println(multiColumn.getRowOfData(t));
});
[-9.17, 0.3198]
[7648.55, 2824.8354]
NXCALS equivalent
As an input we provide a list of variables (no restriction on datatype) and a time window. For an output we expect to obtain a list of combined timestamps from all the variables with corresponding multiple values (if available for a given timestamp/variable).
Selection of 2 variables (textual and vectornumeric):
vdf1 = DataQuery.builder(spark).byVariables().system('CMW') \
.startTime('2018-05-20 00:00:00.000').endTime('2018-05-20 00:12:00.000') \
.variable('SPS.BCTDC.41435:INT_FLATTOP') \
.build() \
.withColumnRenamed('nxcals_value', 'value1')
vdf2 = DataQuery.builder(spark).byVariables().system('CMW') \
.startTime('2018-05-20 00:00:00.000').endTime('2018-05-20 00:12:00.000') \
.variable('SPS.BCTDC.51895:SBF_INTENSITY') \
.build() \
.withColumnRenamed('nxcals_value', 'value2')
Dataset<Row> vdf1 = DataQuery.builder(spark).byVariables().system("CMW")
.startTime("2018-05-20 00:00:00.000").endTime("2018-05-20 00:12:00.000")
.variable("SPS.BCTDC.41435:INT_FLATTOP")
.build()
.withColumnRenamed("nxcals_value", "value1");
Dataset<Row> vdf2 = DataQuery.builder(spark).byVariables().system("CMW")
.startTime("2018-05-20 00:00:00.000").endTime("2018-05-20 00:12:00.000")
.variable("SPS.BCTDC.51895:SBF_INTENSITY")
.build()
.withColumnRenamed("nxcals_value", "value2");
Creation of combined and unique timestamps:
tsdf = vdf1.select('nxcals_timestamp') \
.union( vdf2.select('nxcals_timestamp')).distinct()
Dataset<Row> tsdf = vdf1.select("nxcals_timestamp")
.union( vdf2.select("nxcals_timestamp")).distinct();
Rename of identical column names (for the final selection). Since textual signals can be "nullable" in order to distinguish them from non-existent values for a given timestamp they are replaced with "N/A" literal:
vdf1=vdf1.withColumnRenamed('nxcals_timestamp', 'ts1').fillna("N/A")
vdf2=vdf2.withColumnRenamed('nxcals_timestamp', 'ts2')
vdf1 = vdf1.withColumnRenamed("nxcals_timestamp", "ts1").na().fill("N/A");
vdf2 = vdf2.withColumnRenamed("nxcals_timestamp", "ts2");
Final join with timestamps:
outdf=tsdf.join(vdf1, tsdf.nxcals_timestamp == vdf1.ts1, how='leftouter') \
.join(vdf2, tsdf.nxcals_timestamp == vdf2.ts2, how='leftouter')
outdf.select('nxcals_timestamp','value1','value2') \
.sort('nxcals_timestamp', ascending=False).show(2)
Dataset<Row> outdf=tsdf.join(vdf1, tsdf.col("nxcals_timestamp").equalTo(vdf1.col("ts1")), "leftouter")
.join(vdf2, tsdf.col("nxcals_timestamp").equalTo(vdf2.col("ts2")), "leftouter");
outdf.select("nxcals_timestamp","value1","value2")
.sort(functions.desc("nxcals_timestamp")).show(2);
+-------------------+----------+---------+
| nxcals_timestamp| value1| value2|
+-------------------+----------+---------+
|1526775118935000000|-6.5800004|0.3047213|
|1526775108135000000| 7648.546|2883.2888|
+-------------------+----------+---------+
only showing top 2 rows