Extracting field types
Based on the information originating from DataFrame, field data type can be determined in different ways. Selected DataFrames will be used to illustrate data type for:
Complete list of Spark data types is available on Spark Apache documentation pages
Example of a scalar data type
Required libraries for the sample code:
from nxcals.api.extraction.data.builders import DataQuery
from pyspark.sql.functions import col
import cern.nxcals.api.extraction.data.builders.DataQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.spark.sql.functions.col;
Creation of DataFrame based on entity having scalar fields:
df1 = DataQuery.builder(spark).byEntities().system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.entity().keyValues({'device': 'LHC.LUMISERVER', 'property': 'CrossingAngleIP1'}) \
.build()
Map<String, Object> keyValues = new HashMap<String, Object>();
keyValues.put("device", "LHC.LUMISERVER");
keyValues.put("property", "CrossingAngleIP1");
Dataset<Row> df1 = DataQuery.builder(spark).byEntities().system("CMW")
.startTime("2018-04-29 00:00:00.000").endTime("2018-04-30 00:00:00.000")
.entity().keyValues(keyValues)
.build();
In order to visualise schema (including DataFrame column types) the following method can be used:
df1.printSchema() # prints schema in the tree format
df1.printSchema(); // prints schema in the tree format
Click to see/hide expected application output...
root
|-- DeltaCrossingAngle: double (nullable = true)
|-- Moving: long (nullable = true)
|-- __record_timestamp__: long (nullable = true)
|-- __record_version__: long (nullable = true)
|-- acqStamp: long (nullable = true)
|-- class: string (nullable = true)
|-- cyclestamp: long (nullable = true)
|-- device: string (nullable = true)
|-- property: string (nullable = true)
|-- selector: string (nullable = true)
|-- nxcals_entity_id: long (nullable = true)
In order to retrieve that information in a form of array with all column names and their data types a dtype method can be used:
df1.dtypes # DataFrame property returning names and data types of all the columns
df1.dtypes(); // DataFrame property returning names and data types of all the columns
Click to see/hide expected application output...
[('DeltaCrossingAngle', 'double'),
('Moving', 'bigint'),
('__record_timestamp__', 'bigint'),
('__record_version__', 'bigint'),
('acqStamp', 'bigint'),
('class', 'string'),
('cyclestamp', 'bigint'),
('device', 'string'),
('property', 'string'),
('selector', 'string'),
('nxcals_entity_id', 'bigint')]
To obtain Spark data type we can refer to the DataFrame schema:
df1.schema # DataFrame property returning its schema as StructType(List(StructField(name, Spark dataType, nullable), ...))
df1.schema(); // DataFrame property returning its schema as StructType(List(StructField(name, Spark dataType, nullable), ...))
Click to see/hide expected application output...
StructType(List(
StructField(DeltaCrossingAngle,DoubleType,true),
StructField(Moving,LongType,true),StructField(__record_timestamp__,LongType,true),
StructField(__record_version__,LongType,true),StructField(acqStamp,LongType,true),
StructField(class,StringType,true),StructField(cyclestamp,LongType,true),
StructField(device,StringType,true),StructField(property,StringType,true),
StructField(selector,StringType,true),StructField(nxcals_entity_id,LongType,true)
))
or directly to the DataFrame schema fields:
df1.schema.fields # Schema property returning List of StructField(name, Spark dataType, nullable)
df1.schema().fields(); // Schema property returning List of StructField(name, Spark dataType, nullable)
Click to see/hide expected application output...
[StructField(DeltaCrossingAngle,DoubleType,true),
StructField(Moving,LongType,true),
StructField(__record_timestamp__,LongType,true),
StructField(__record_version__,LongType,true),
StructField(acqStamp,LongType,true),
StructField(class,StringType,true),
StructField(cyclestamp,LongType,true),
StructField(device,StringType,true),
StructField(property,StringType,true),
StructField(selector,StringType,true),
StructField(nxcals_entity_id,LongType,true)]
For convenience we can create the field name <-> Spark data type mapping:
# Getting data types from schema in Spark as a dictionary
d = dict([f.name, f.dataType] for f in df1.schema.fields)
d['Moving']
// Getting data types from schema in Spark as a dictionary
Map<String, DataType> d = Arrays.stream(df1.schema().fields())
.collect(Collectors.toMap(StructField::name, StructField::dataType));
d.get("Moving");
LongType
Example of a vector data type
Vector and matrix data are expressed in NXCALS as a complex type using two ArrayType types for holding of vectro/matrix data (called elements) and for describing "shape" of the data through the list of dimensions. The concept is ilustrated through the sample code below:
Creation of DataFrame containing vector data:
df2 = DataQuery.builder(spark).byVariables().system('CMW') \
.startTime('2018-05-21 00:00:00.000').endTime('2018-05-21 00:05:00.000') \
.variable('SPS.BCTDC.51895:TOTAL_INTENSITY') \
.build()
Dataset<Row> df2 = DataQuery.builder(spark).byVariables().system("CMW")
.startTime("2018-05-21 00:00:00.000").endTime("2018-05-21 00:05:00.000")
.variable("SPS.BCTDC.51895:TOTAL_INTENSITY")
.build();
having following schema:
df2.schema.fields
df2.schema().fields();
Click to see/hide expected application output...
StructType(List(
StructField(nxcals_value,StructType(List(
StructField(elements,ArrayType(DoubleType,true),true),
StructField(dimensions,ArrayType(IntegerType,true),true)
)),true),
StructField(nxcals_entity_id,LongType,true),StructField(nxcals_timestamp,LongType,true),
StructField(nxcals_variable_name,StringType,true)
))
Selecting first 3 vectors (elements):
elements = df2.withColumn("nx_elements", col("nxcals_value.elements")).withColumn("nx_dimensions", col("nxcals_value.dimensions")).select("nx_elements")
elements.take(3)
Dataset<Row> elements = df2.withColumn("nx_elements", col("nxcals_value.elements"))
.withColumn("nx_dimensions", col("nxcals_value.dimensions")).select("nx_elements");
elements.take(3);
Click to see/hide expected application output...
[Row(nx_elements=[0.2579849, 0.28976566, 0.30659077, 0.29101196, 0.27730262, 0.27481002, 0.25362283, ... , 0.27543315]),
Row(nx_elements=[0.22745048, 0.24552187, 0.24302925, 0.23118937, 0.2374209, 0.24552187, 0.22620416, ... , 0.24302925]),
Row(nx_elements=[61.52985, 1040.4697, 1529.6321, 1572.7029, 1562.2429, 1557.9358, 1555.4746, 1554.244, ... , 2.461194])
]
and their corresponding sizes (please note alternative notation for referencing field names):
dimensions = df2.withColumn("nx_elements", col("nxcals_value")["elements"]).withColumn("nx_dimensions", col("nxcals_value")["dimensions"]).select("nx_dimensions")
dimensions.take(3)
Dataset<Row> dimensions = df2.withColumn("nx_elements", col("nxcals_value").getField("elements"))
.withColumn("nx_dimensions", col("nxcals_value").getField("dimensions")).select("nx_dimensions");
dimensions.take(3);
Click to see/hide expected application output...
[Row(nx_dimensions=[1228]), Row(nx_dimensions=[1228]), Row(nx_dimensions=[1836])]
Example of a matrix data type
Creation of DataFrame containing Matrix data:
df3 = DataQuery.builder(spark).byVariables().system('CMW') \
.startTime('2018-08-15 00:00:00.000').endTime('2018-08-30 00:00:00.000') \
.variable('HIE-BCAM-T2M03:RAWMEAS#NPIXELS') \
.build()
Dataset<Row> df3 = DataQuery.builder(spark).byVariables().system("CMW")
.startTime("2018-08-15 00:00:00.000").endTime("2018-08-30 00:00:00.000")
.variable("HIE-BCAM-T2M03:RAWMEAS#NPIXELS")
.build();
Retrieving matrix data present in the DataFrame:
matrices = df3.withColumn("matrix", col("nxcals_value.elements")) \
.withColumn("dim1", col("nxcals_value.dimensions")[0]) \
.withColumn("dim2", col("nxcals_value.dimensions")[1]) \
.select("matrix", "dim1", "dim2")
matrices.take(2)
Dataset<Row> matrices = (Dataset<Row>) df3
.withColumn("matrix", col("nxcals_value.elements"))
.withColumn("dim1", col("nxcals_value.dimensions").getItem(0))
.withColumn("dim2", col("nxcals_value.dimensions").getItem(1))
.select("matrix", "dim1", "dim2");
matrices.take(2);
Click to see/hide expected application output...
[Row(matrix=[14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 10, 17, 15, 0, 0, ... , 0], dim1=100, dim2=10),
Row(matrix=[14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 10, 16, 13, 0, 0, ... , 0], dim1=100, dim2=10)]