Skip to content

Supporting Utilities

The NXCALS APIs include a collection of utility methods designed to assist users with specific tasks, enhancing functionality and improving ease of use. These utilities serve as a convenient add-on, streamlining common operations and providing additional support for various API interactions.

HdfsUtils – Utilities for HDFS Interactions

The HdfsUtils class provides a set of utility methods for interacting with the Hadoop Distributed File System (HDFS). It simplifies common operations such as listing files in a directory and copying files from HDFS to a local destination.

Key Features

  • List Files in HDFS: Retrieve all files from a specified directory, excluding subdirectories.
  • Copy Files to Local Storage: Seamlessly copy files from HDFS to a local destination.
import cern.nxcals.api.extraction.data.builders.DataQuery;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.List;

import static cern.nxcals.api.utils.HdfsUtils.copyToLocal;
import static cern.nxcals.api.utils.HdfsUtils.listFiles;
SparkSession spark = SparkSession.builder().getOrCreate();

try {
    String startTime = "2024-11-04 00:00:00";
    String endTime = "2024-11-04 10:00:00";

    Dataset<Row> df = DataQuery.builder(spark).variables()
            .system("CMW").nameEq("SPS.ABWLM:NO_BUNCHES_CYCLE")
            .timeWindow(startTime, endTime).build();

    String parquetPath = "/user/{username}/parquet";
    df.write().mode("overwrite").parquet(parquetPath);

    List<Path> files = listFiles(parquetPath);
    copyToLocal(parquetPath, "/tmp/parquet");
} catch (Exception e) {
    e.printStackTrace();
}
from nxcals.api.extraction.data.builders import DataQuery
from nxcals.api.common.utils.hdfs_utils import HdfsUtils

hdfs_utils = HdfsUtils(spark)
try:
    start_time = "2024-11-04 00:00:00"
    end_time = "2024-11-04 10:00:00"

    df = DataQuery.builder(spark).variables() \
        .system('CMW') \
        .nameEq('SPS.ABWLM:NO_BUNCHES_CYCLE') \
        .timeWindow(start_time, end_time) \
        .build()

    parquet_path = "/tmp/{username}/parquet_output"
    df.write.mode("overwrite").parquet(parquet_path)

    files = hdfs_utils.list_files(parquet_path)
    hdfs_utils.copy_to_local(parquet_path, "/tmp/parquet_local")

except Exception as e:
    print("Error:", e)

Note

Please replace {username} with a writable location.