Working with timestamps

In NXCALS timestamps are expressed in nanoseconds and they are internally stored as longs. When performing data extraction some additional steps must be taken in order display them in human readable format.

With introduction of Pandas UDF for Pyspark the task became straightforward, bringing advantage of having the ability to define low-overhead and high-performance UDFs:

  1. Use pandas_udf decorator to declare a Pandas UDF
  2. Create UDF body for timestamp conversion (use of pandas to_datetime function)
  3. Perform selection from Spark dataframe using newly created function
  4. Optionally convert result to Pandas dataframe (for convenience)
import pandas as pd
from nxcals.api.extraction.data.builders import DataQuery
from pyspark.sql.functions import pandas_udf, PandasUDFType



# Defining UDF function converting timestamp stored as long to Human readable version
@pandas_udf('timestamp', PandasUDFType.SCALAR)
def to_stamp(stamps):
    return pd.to_datetime(stamps, unit='ns')

# Getting some sample data
df = DataQuery.builder(spark).byEntities().system('CMW') \
    .startTime('2018-05-03 00:00:00.000').endTime('2018-05-03 01:00:00.000') \
    .entity().keyValue('device', 'LHC.LUMISERVER').keyValue('property', 'CrossingAngleIP1') \
    .build()

# Add formated timestamp column and output result
df.withColumn("stamp", to_stamp(df.acqStamp) ).select('stamp', 'acqStamp').toPandas()

Result:

                      stamp            acqStamp
0   2018-05-03 00:33:01.763 1525307581763000000
1   2018-05-03 00:33:01.761 1525307581761000000
2   2018-05-03 00:05:01.545 1525305901545000000
3   2018-05-03 00:04:46.820 1525305886820000000
4   2018-05-03 00:32:27.560 1525307547560000000
5   2018-05-03 00:04:35.350 1525305875350000000
...

Note

to_stamp function takes a pandas.Series “stamps” and returns the result of “to_datetime(stamps, unit='ns')” as a pandas.Series. Because “to_datetime(stamps, unit='ns')” is vectorized on pandas.Series, the Pandas version is much faster than the row-at-a-time standard UDF version.