Skip to content

There are several ways to interact with Spark SQL including SQL and the Dataframe APIs. Since the same execution engine is used independently from APIs that are used, the identical computing result is obtained. In practice it means that developer is free to switch back and forth between different APIs depending on its use case and convenience.

This document is going to present 2 ways of working with tabular data retrieved by Spark coming from NXCALS data query builders. The first method uses Spark SQL module which allows to use pure SQL queries, and second one operates directly on data frames using "SQL like" methods.

Below one can find an initial dataset retrieved using standard NXCALS Data Extraction API. It will be used it the examples given in this document.

# source the nxcals query builders
from nxcals.api.extraction.data.builders import *

df = DevicePropertyDataQuery.builder(spark).system('CMW') \
    .startTime('2018-05-21 00:00:00.000').endTime('2018-05-23 13:30:00.000') \
    .entity().device('ZT10.QFO03').property('Acquisition').build()

df.select('acqStamp', 'current').where("selector = 'CPS.USER.TOF'").show(10)
// source the nxcals query builders
import cern.nxcals.api.extraction.data.builders.*
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = DevicePropertyDataQuery.builder(spark).system("CMW")
        .startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000")
        .entity().device("ZT10.QFO03").property("Acquisition").build();

df.select("acqStamp", "current").where("selector = 'CPS.USER.TOF'").show(10);
// source the nxcals query builders
import cern.nxcals.api.extraction.data.builders._

val df = DevicePropertyDataQuery.builder(spark).system("CMW").
    startTime("2018-05-21 00:00:00.000").endTime("2018-05-23 13:30:00.000").
    entity().device("ZT10.QFO03").property("Acquisition").build()

df.select("acqStamp", "current").where("selector = 'CPS.USER.TOF'").show(10)
+-------------------+---------+                                                 
|           acqStamp|  current|
+-------------------+---------+
|1527033954500238525|298.55392|
|1527034184900238525|298.59647|
|1527034626500238525|298.55054|
|1527034774100238525|      0.0|
|1527035666900238525|298.54538|
|1527038154500238525|298.57965|
|1527039245300238525|298.57782|
|1527042204500238525| 298.5944|
|1527042787700238525|298.55682|
|1527043500500238525|298.55563|
+-------------------+---------+
only showing top 10 rows

Spark SQL module example

It allows seamlessly mix SQL queries with Spark programs. Provides a native SQL parser that supports ANSI-SQL.

Below there is an SQL statement for the selection of average, min, max current grouped by selector.

df.createOrReplaceTempView('temp_table')
df2 = spark.sql("SELECT selector, avg(t.current) avg_curr, min(t.current) min_curr, max(t.current) max_curr\
                 FROM temp_table t\
                 GROUP BY selector")
df2.show()
df.createOrReplaceTempView("temp_table");
Dataset<Row> df2 = spark.sql("SELECT selector, avg(t.current) avg_curr, min(t.current) min_curr, max(t.current) max_curr "
        + "FROM temp_table t "
        + "GROUP BY selector");
df2.show();
df.createOrReplaceTempView("temp_table")
val df2 = spark.sql("""SELECT selector, avg(t.current) avg_curr, min(t.current) min_curr, max(t.current) max_curr
                     | FROM temp_table t
                     | GROUP BY selector""")
df2.show()
+-----------------+------------------+--------------+---------+                 
|         selector|          avg_curr|      min_curr| max_curr|
+-----------------+------------------+--------------+---------+
|     CPS.USER.MD3| 297.0989710070555|           0.0| 298.6248|
|     CPS.USER.MD5|  296.960583224237|           0.0| 298.6245|
|CPS.USER.LHCPROBE|293.64540793678975|           0.0|298.61832|
|    CPS.USER.LHC4| 297.2184859233085|           0.0|298.62473|
|     CPS.USER.MD8| 297.1799289967247|           0.0|298.62982|
|    CPS.USER.ZERO|297.04639872378556| -1.2241147E-4|302.42722|
|     CPS.USER.MD4|297.47680294579806|-3.8781664E-25|298.61862|
| CPS.USER.SFTPRO1| 297.2300317265046| -7.446488E-12|302.40848|
|     CPS.USER.MD2|298.59462280273436|      298.5527| 298.6154|
|    CPS.USER.LHC2|298.57755334036693|     298.54474| 298.6235|
|     CPS.USER.MD1| 297.7527660727501|           0.0|298.62097|
|     CPS.USER.TOF|297.09244274083983| -1.2207247E-4| 302.4574|
|      CPS.USER.AD|297.80882665353226|      3.63E-43| 298.6256|
|    CPS.USER.ION2|298.02999528190344|           0.0| 298.6191|
|     CPS.USER.MD7| 298.5799343532986|     298.54492| 298.6182|
|CPS.USER.LHCINDIV| 298.5811757054822|     298.55173|298.62177|
| CPS.USER.SFTPRO2|295.48305652912643|  1.377532E-39|298.63828|
|   CPS.USER.EAST1| 297.3271756939288|-2.7881993E-15|298.62817|
|   CPS.USER.EAST2|  297.151471722651|   -0.12526703|298.62723|
|     CPS.USER.MD6|297.76037549514666|           0.0| 298.6259|
+-----------------+------------------+--------------+---------+

JOIN example "Table aliases" can be created for a number of dataframes. In this case we refer to BMP_LHC_TEST variable in order to create a join between two "tables":

df_bpm = DataQuery.builder(spark).byVariables().system('CMW') \
    .startTime('2017-05-22 00:00:00.000').endTime('2017-05-23 13:30:00.000') \
    .variable('BPM_LHC_TEST') \
    .build()

df_bpm.createOrReplaceTempView('bpm_temp_table')
df2 = spark.sql( "SELECT t1.boolField result, count(t1.boolField) cnt\
                  FROM temp_table t1\
                  JOIN bpm_temp_table t2 ON t2.cycleStamp = t1.cycleStamp\
                  WHERE t2.floatField < 0.1\
                  GROUP BY t1.boolField")
df2.show()
Dataset<Row> df_bpm = DataQuery.builder(spark).byVariables().system("CMW")
        .startTime("2017-05-22 00:00:00.000").endTime("2017-05-23 13:30:00.000")
        .variable("BPM_LHC_TEST")
        .build();

df_bpm.createOrReplaceTempView("bpm_temp_table");
df2 = spark.sql( "SELECT t1.boolField result, count(t1.boolField) cnt "
        + "FROM temp_table t1 "
        + "JOIN bpm_temp_table t2 ON t2.cycleStamp = t1.cycleStamp "
        + "WHERE t2.floatField < 0.1 "
        + "GROUP BY t1.boolField");
df2.show();
val df_bpm = DataQuery.builder(spark).byVariables().system("CMW").
    startTime("2017-05-22 00:00:00.000").endTime("2017-05-23 13:30:00.000").
    variable("BPM_LHC_TEST").
    .build()

df_bpm.createOrReplaceTempView("bpm_temp_table")
df2 = spark.sql( """SELECT t1.boolField result, count(t1.boolField) cnt
                      | FROM temp_table t1
                      | JOIN bpm_temp_table t2 ON t2.cycleStamp = t1.cycleStamp
                      | WHERE t2.floatField < 0.1
                      | GROUP BY t1.boolField""")
df2.show()

Spark DataFrame API

A DataFrame is a dataset organized into named columns. Conceptually, they are equivalent to a table in a relational database. DataFrame API provides easier access to data since it looks conceptually like a table. A multitude of available methods allow to create structures similar to SQL statements.

Note

In Python with a help of Numpy library preparation of timestamps is simplified.

from numpy import datetime64

start = datetime64('2018-05-23T00:05:53.500000000').astype(datetime)
end = datetime64('2018-05-23T01:37:39.100000000').astype(datetime)

df2 = df.filter(df.acqStamp.between(start, end)).groupBy('selector').avg('current')
df2.show()
private static long getNanosFromInstant(Instant instantTime) {
    return TimeUnit.SECONDS.toNanos(instantTime.getEpochSecond()) + instantTime.getNano();
}
...

DateTimeFormatter FORMATTER =  DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n");

Instant startTime = LocalDateTime.parse("2018-05-23 00:05:53.500000000", FORMATTER).toInstant(ZoneOffset.UTC);
Instant endTime = LocalDateTime.parse("2018-05-23 01:37:39.100000000", FORMATTER).toInstant(ZoneOffset.UTC);

df2 = df.filter(df.col("acqStamp")
        .between(getNanosFromInstant(startTime), getNanosFromInstant(endTime)))
        .groupBy("selector").avg("current");
df2.show();
import java.time.Instant
import java.time.format.DateTimeFormatter;
import java.time.ZoneOffset;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

def getNanosFromInstant(instantTime:Instant ) : Long = {
    return TimeUnit.SECONDS.toNanos(instantTime.getEpochSecond()) + instantTime.getNano()
}

val FORMATTER =  DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n")

val start = LocalDateTime.parse("2018-05-23 00:05:53.500000000", FORMATTER).toInstant(ZoneOffset.UTC)
val end = LocalDateTime.parse("2018-05-23 01:37:39.100000000", FORMATTER).toInstant(ZoneOffset.UTC)

val df2 = df.filter(df.col("acqStamp").between(getNanosFromInstant(start), getNanosFromInstant(end))).
    groupBy("selector").avg("current")

df2.show()

Equivalent statement based directly on timestamps follows:

import pyspark.sql.functions as func

df2 = df.filter(df.acqStamp.between(1527033953500000000, 1527039459100000000)) \
    .groupBy('selector').avg('current').select('selector', func.col('avg(current)').alias('avg_current'))
df2.show()
df2 = df.filter(df.col("acqStamp").between(1527033953500000000l, 1527039459100000000l))
        .groupBy("selector").avg("current").select(functions.col("selector"), functions.col("avg(current)").alias("avg_current"));
df2.show();
import org.apache.spark.sql.functions._

df2 = df.filter(df.col("acqStamp").between(1527033953500000000l, 1527039459100000000l)).
    groupBy("selector").functions.avg("current").select("selector", functions.col("avg(current)").alias("avg_current"))
df2.show()
+----------------+------------------+
|        selector|       avg_current|
+----------------+------------------+
|    CPS.USER.MD5|298.58718856092526|
|   CPS.USER.ZERO| 297.2412097626962|
|CPS.USER.SFTPRO1|297.79839171566476|
|    CPS.USER.TOF|296.75962792448775|
|     CPS.USER.AD|298.58368326822915|
|  CPS.USER.EAST2|295.75220027114364|
+----------------+------------------+

""Where" statements with "like" (alternative to filter):

df2.where("selector like 'CPS.USER.S%'").show()
df2.where("selector like 'CPS.USER.S%'").show();
df2.where("selector like 'CPS.USER.S%'").show()
+----------------+------------------+
|        selector|       avg_current|
+----------------+------------------+
|CPS.USER.SFTPRO1|297.79839171566476|
+----------------+------------------+

Ordering:

df2.orderBy(df2.selector.desc()).show()
df2.orderBy(functions.col("selector").desc()).show();
df2.orderBy(df2.selector.desc()).show()

Selecting distinct (alternative to groupBy):

df.select('selector').distinct.show()
df.select("selector").distinct().show();
df.select("selector").distinct.show()

Other useful operations

There is a multitude of actions that can be performed on dataframe which are not similar to typical sql operations. Some of them were listed below. For complete list refer to Spark Data frame API documentation.

Using crosstab (demo purpose only):

df.limit(50).crosstab('current','selector').show()
df.limit(50).stat().crosstab("current","selector").show();
df.limit(50).crosstab('current','selector').sort('current_selector').show()
+----------------+-----------+--------------+-------------+------------+----------------+------------+-------------+
|current_selector|CPS.USER.AD|CPS.USER.EAST2|CPS.USER.LHC4|CPS.USER.MD5|CPS.USER.SFTPRO1|CPS.USER.TOF|CPS.USER.ZERO|
+----------------+-----------+--------------+-------------+------------+----------------+------------+-------------+
|             0.0|          0|             0|            0|           0|               0|           1|            0|
|       298.54538|          0|             0|            0|           0|               0|           1|            0|
|       298.54794|          0|             0|            0|           0|               1|           1|            0|
|       298.55026|          0|             1|            0|           0|               0|           0|            0|
|       298.55054|          0|             0|            0|           0|               0|           1|            0|
|       298.55188|          0|             0|            0|           0|               1|           0|            0|
|       298.55392|          0|             0|            0|           0|               0|           1|            0|
|       298.55466|          0|             0|            0|           0|               1|           0|            0|
|        298.5551|          0|             0|            0|           0|               0|           0|            1|
|       298.55563|          0|             0|            0|           0|               0|           1|            0|
|       298.55588|          0|             0|            0|           0|               0|           0|            1|
|       298.55594|          0|             0|            0|           0|               0|           0|            1|
|       298.55646|          0|             0|            0|           0|               1|           0|            0|
|       298.55682|          0|             1|            0|           0|               0|           1|            0|
|       298.55743|          0|             0|            0|           0|               0|           0|            1|
|        298.5598|          0|             0|            0|           0|               0|           0|            1|
|       298.56097|          0|             0|            1|           0|               0|           0|            0|
|       298.56552|          0|             0|            0|           0|               1|           0|            0|
|       298.56604|          0|             0|            0|           0|               0|           0|            1|
|       298.57184|          0|             0|            0|           0|               0|           0|            1|
+----------------+-----------+--------------+-------------+------------+----------------+------------+-------------+
only showing top 20 rows

Dropping duplicates (can be seen as yet another alternative to groupBy):

df2.select('selector').dropDuplicates().show()
df2.select("selector").dropDuplicates().show();
df2.select("selector").dropDuplicates().show()

Dropping rows with null values (in any of the columns):

df.select('selector','current_max').dropna().count()
df.select("selector","current_max").na().drop().count();
df.select("selector","current_max").dropna().count()

Adding columns:

df2.select('selector','avg_current').withColumn('avg_exp',func.exp('avg_current')).show()
df2.select("selector","avg_current").withColumn("avg_exp", functions.exp("avg_current")).show();
df2.select("selector","avg_current").withColumn("avg_exp",exp("avg_current")).show()

Dropping columns:

df3 = df2.drop('selector')
Dataset<Row> df3 = df2.drop("selector");
val df3 = df2.drop("selector")