Extracting field types

Based on the information originating from DataFrame, field data type can be determined in different ways. Selected DataFrames will be used to illustrate data type for:

Complete list of Spark data types is available on Spark Apache documentation pages

Example of a scalar data type

Required libraries for the sample code:

from nxcals.api.extraction.data.builders import DataQuery
from pyspark.sql.functions import col
import cern.nxcals.api.extraction.data.builders.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.spark.sql.functions.col;

Creation of DataFrame based on entity having scalar fields:

df1 = DataQuery.builder(spark).byEntities().system('CMW') \
    .startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
    .entity().keyValues({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP1'}) \
    .build()
Map<String, Object> keyValues = new HashMap<String, Object>();
keyValues.put("device", "LHC.LUMISERVER");
keyValues.put("property", "CrossingAngleIP1");

Dataset<Row> df1 = DataQuery.builder(spark).byEntities().system("CMW")
    .startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
    .entity().keyValues(keyValues)
    .build();

In order to visualise schema (including DataFrame column types) the following method can be used:

df1.printSchema()   # prints schema in the tree format
df1.printSchema(); // prints schema in the tree format
Click to see/hide expected application output...
root
 |-- DeltaCrossingAngle: double (nullable = true)
 |-- Moving: long (nullable = true)
 |-- __record_timestamp__: long (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- acqStamp: long (nullable = true)
 |-- class: string (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- device: string (nullable = true)
 |-- property: string (nullable = true)
 |-- selector: string (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)

In order to retrieve that information in a form of array with all column names and their data types a dtype method can be used:

df1.dtypes # DataFrame property returning names and data types of all the columns
df1.dtypes(); // DataFrame property returning names and data types of all the columns
Click to see/hide expected application output...
[('DeltaCrossingAngle', 'double'),
 ('Moving', 'bigint'),
 ('__record_timestamp__', 'bigint'),
 ('__record_version__', 'bigint'),
 ('acqStamp', 'bigint'),
 ('class', 'string'),
 ('cyclestamp', 'bigint'),
 ('device', 'string'),
 ('property', 'string'),
 ('selector', 'string'),
 ('nxcals_entity_id', 'bigint')]

To obtain Spark data type we can refer to the DataFrame schema:

df1.schema # DataFrame property returning its schema as StructType(List(StructField(name, Spark dataType, nullable), ...))
df1.schema(); // DataFrame property returning its schema as StructType(List(StructField(name, Spark dataType, nullable), ...))
Click to see/hide expected application output...
StructType(List(
    StructField(DeltaCrossingAngle,DoubleType,true),
    StructField(Moving,LongType,true),StructField(__record_timestamp__,LongType,true),
    StructField(__record_version__,LongType,true),StructField(acqStamp,LongType,true),
    StructField(class,StringType,true),StructField(cyclestamp,LongType,true),
    StructField(device,StringType,true),StructField(property,StringType,true),
    StructField(selector,StringType,true),StructField(nxcals_entity_id,LongType,true)
))

or directly to the DataFrame schema fields:

df1.schema.fields # Schema property returning List of StructField(name, Spark dataType, nullable)
df1.schema().fields(); // Schema property returning List of StructField(name, Spark dataType, nullable)
Click to see/hide expected application output...
[StructField(DeltaCrossingAngle,DoubleType,true),
 StructField(Moving,LongType,true),
 StructField(__record_timestamp__,LongType,true),
 StructField(__record_version__,LongType,true),
 StructField(acqStamp,LongType,true),
 StructField(class,StringType,true),
 StructField(cyclestamp,LongType,true),
 StructField(device,StringType,true),
 StructField(property,StringType,true),
 StructField(selector,StringType,true),
 StructField(nxcals_entity_id,LongType,true)]

For convenience we can create the field name <-> Spark data type mapping:

# Getting data types from schema in Spark as a dictionary
d = dict([f.name, f.dataType] for f in df1.schema.fields)
d['Moving']
// Getting data types from schema in Spark as a dictionary
Map<String, DataType> d = Arrays.stream(df1.schema().fields())
        .collect(Collectors.toMap(StructField::name, StructField::dataType));
d.get("Moving");
LongType

Example of a vector data type

Vector and matrix data are expressed in NXCALS as a complex type using two ArrayType types for holding of vectro/matrix data (called elements) and for describing "shape" of the data through the list of dimensions. The concept is ilustrated through the sample code below:

Creation of DataFrame containing vector data:

df2 = DataQuery.builder(spark).byVariables().system('CMW') \
    .startTime('2018-05-21 00:00:00.000').endTime('2018-05-21 00:05:00.000') \
    .variable('SPS.BCTDC.51895:TOTAL_INTENSITY') \
    .build()
Dataset<Row> df2 = DataQuery.builder(spark).byVariables().system("CMW")
    .startTime("2018-05-21 00:00:00.000").endTime("2018-05-21 00:05:00.000")
    .variable("SPS.BCTDC.51895:TOTAL_INTENSITY")
    .build();

having following schema:

df2.schema.fields
df2.schema().fields();
Click to see/hide expected application output...
StructType(List(
       StructField(nxcals_value,StructType(List(
           StructField(elements,ArrayType(DoubleType,true),true),
           StructField(dimensions,ArrayType(IntegerType,true),true)
           )),true),
       StructField(nxcals_entity_id,LongType,true),StructField(nxcals_timestamp,LongType,true),
       StructField(nxcals_variable_name,StringType,true)
   ))

Selecting first 3 vectors (elements):

elements = df2.withColumn("nx_elements", col("nxcals_value.elements")).withColumn("nx_dimensions", col("nxcals_value.dimensions")).select("nx_elements")
elements.take(3)
Dataset<Row> elements = df2.withColumn("nx_elements", col("nxcals_value.elements"))
        .withColumn("nx_dimensions", col("nxcals_value.dimensions")).select("nx_elements");
elements.take(3);
Click to see/hide expected application output...
[Row(nx_elements=[0.2579849, 0.28976566, 0.30659077, 0.29101196, 0.27730262, 0.27481002, 0.25362283, ... , 0.27543315]), 
 Row(nx_elements=[0.22745048, 0.24552187, 0.24302925, 0.23118937, 0.2374209, 0.24552187, 0.22620416, ... , 0.24302925]),
 Row(nx_elements=[61.52985, 1040.4697, 1529.6321, 1572.7029, 1562.2429, 1557.9358, 1555.4746, 1554.244, ... , 2.461194])
]       

and their corresponding sizes (please note alternative notation for referencing field names):

dimensions = df2.withColumn("nx_elements", col("nxcals_value")["elements"]).withColumn("nx_dimensions", col("nxcals_value")["dimensions"]).select("nx_dimensions")
dimensions.take(3)
Dataset<Row> dimensions = df2.withColumn("nx_elements", col("nxcals_value").getField("elements"))
        .withColumn("nx_dimensions", col("nxcals_value").getField("dimensions")).select("nx_dimensions");
dimensions.take(3);
Click to see/hide expected application output...
[Row(nx_dimensions=[1228]), Row(nx_dimensions=[1228]), Row(nx_dimensions=[1836])]

Example of a matrix data type

Creation of DataFrame containing Matrix data:

df3 = DataQuery.builder(spark).byVariables().system('CMW') \
    .startTime('2018-08-15 00:00:00.000').endTime('2018-08-30 00:00:00.000') \
    .variable('HIE-BCAM-T2M03:RAWMEAS#NPIXELS') \
    .build()
Dataset<Row> df3 = DataQuery.builder(spark).byVariables().system("CMW")
    .startTime("2018-08-15 00:00:00.000").endTime("2018-08-30 00:00:00.000")
    .variable("HIE-BCAM-T2M03:RAWMEAS#NPIXELS")
    .build();

Retrieving matrix data present in the DataFrame:

matrices = df3.withColumn("matrix", col("nxcals_value.elements")) \
        .withColumn("dim1", col("nxcals_value.dimensions")[0]) \
        .withColumn("dim2", col("nxcals_value.dimensions")[1]) \
        .select("matrix", "dim1", "dim2")
matrices.take(2)
Dataset<Row> matrices = (Dataset<Row>) df3
        .withColumn("matrix", col("nxcals_value.elements"))
        .withColumn("dim1", col("nxcals_value.dimensions").getItem(0))
        .withColumn("dim2", col("nxcals_value.dimensions").getItem(1))
        .select("matrix", "dim1", "dim2");
matrices.take(2);
Click to see/hide expected application output...
[Row(matrix=[14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 10, 17, 15, 0, 0, ... , 0], dim1=100, dim2=10),
 Row(matrix=[14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 10, 16, 13, 0, 0, ... , 0], dim1=100, dim2=10)]