Skip to content

Supporting Utilities

The NXCALS APIs include a collection of utility methods designed to assist users with specific tasks, enhancing functionality and improving ease of use. These utilities serve as a convenient add-on, streamlining common operations and providing additional support for various API interactions.

HdfsUtils – Utilities for HDFS Interactions

The HdfsUtils class provides a set of utility methods for interacting with the Hadoop Distributed File System (HDFS). It simplifies common operations such as listing files in a directory and copying files from HDFS to a local destination.

Key Features

  • List Files in HDFS: Retrieve all files from a specified directory, excluding subdirectories.
  • Copy Files to Local Storage: Seamlessly copy files from HDFS to a local destination.

Example

import cern.nxcals.api.extraction.data.builders.DataQuery;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.List;

import static cern.nxcals.api.utils.HdfsUtils.copyToLocal;
import static cern.nxcals.api.utils.HdfsUtils.listFiles;
SparkSession spark = SparkSession.builder().getOrCreate();

try {
    String startTime = "2024-11-04 00:00:00";
    String endTime = "2024-11-04 10:00:00";

    Dataset<Row> df = DataQuery.builder(spark).variables()
            .system("CMW").nameEq("SPS.ABWLM:NO_BUNCHES_CYCLE")
            .timeWindow(startTime, endTime).build();

    String parquetPath = "/user/{username}/parquet";
    df.write().mode("overwrite").parquet(parquetPath);

    List<Path> files = listFiles(parquetPath);
    copyToLocal(parquetPath, "/tmp/parquet");
} catch (Exception e) {
    e.printStackTrace();
}
from nxcals.api.extraction.data.builders import DataQuery
from nxcals.api.common.utils.hdfs_utils import HdfsUtils

hdfs_utils = HdfsUtils(spark)
try:
    start_time = "2024-11-04 00:00:00"
    end_time = "2024-11-04 10:00:00"

    df = DataQuery.builder(spark).variables() \
        .system('CMW') \
        .nameEq('SPS.ABWLM:NO_BUNCHES_CYCLE') \
        .timeWindow(start_time, end_time) \
        .build()

    parquet_path = "/tmp/{username}/parquet_output"
    df.write.mode("overwrite").parquet(parquet_path)

    files = hdfs_utils.list_files(parquet_path)
    hdfs_utils.copy_to_local(parquet_path, "/tmp/parquet_local")

except Exception as e:
    print("Error:", e)

Note

Please replace {username} with a writable location.

ArrayUtils – Utilities for handling of arrays (vectors and matrices)

The ArrayUtils class provides utility method for reshaping structured array fields in Spark DataFrames, commonly used with NXCALS data formats where arrays are stored as structs with elements and dimensions:

{
"elements": [...],
"dimensions": [...]
}

ArrayUtils allows for automatic detection and reshaping of such fields into native Python lists or nested arrays using NumPy.

API Reference

ArrayUtils.reshape(df: DataFrame, array_columns: Optional[List[str]] = None) -> DataFrame

Reshapes structured array columns in a DataFrame into proper multi-dimensional arrays.

Parameters:

  • df — The input DataFrame.
  • array_columns — List of column names to reshape. If None, all fields matching the array pattern will be auto-detected.

Returns:

  • pyspark.sql.DataFrame — A new DataFrame with reshaped array columns.

Example

Data before being reshaped. Structure fields such as elements=[ ...,..., and dimensions=[100, 10] are present in the record:

Click to see expected application output...
Row(nxcals_value=Row(elements=[54, 0, 0, 0, 0, 0, 0, 0, 0, 0, 53, 0, 0, 0, 0, 0, 0, 0, 0, 0, 47, 0, 0, 0, 0, 0, 0, 0, 0, 0, 47, 0, 0, 0, 0, 0, 0, 0, 0, 0, ... , 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], dimensions=[100, 10]), nxcals_entity_id=57161, nxcals_timestamp=1535565694050000000, nxcals_variable_name='HIE-BCAM-PDL:RAWMEAS#NPIXELS')

Conversion of all detected array columns in the dataframe:

from nxcals.api.extraction.data.builders import DataQuery
from nxcals.api.common.utils.array_utils import ArrayUtils

start_time = "2018-08-29 00:00:00.000"
end_time = "2018-08-30 00:01:01.000"

df = DataQuery.builder(spark).variables() \
    .system('CMW') \
    .nameEq('HIE-BCAM-PDL:RAWMEAS#NPIXELS') \
    .timeWindow(start_time, end_time) \
    .build()

ArrayUtils.reshape(df).head()

Conversion of user specified array columns (in this particular example the only one present which is nxcals_value ):

ArrayUtils.reshape(df, ['nxcals_value']).head()
Click to see expected application output...
Row(nxcals_value=[[54, 0, 0, 0, 0, 0, 0, 0, 0, 0], [53, 0, 0, 0, 0, 0, 0, 0, 0, 0], [47, 0, 0, 0, 0, 0, 0, 0, 0, 0], [47, 0, 0, 0, 0, 0, 0, 0, 0, 0], ... , [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], nxcals_entity_id=57161, nxcals_timestamp=1535565694050000000, nxcals_variable_name='HIE-BCAM-PDL:RAWMEAS#NPIXELS')