Supporting Utilities
The NXCALS APIs include a collection of utility methods designed to assist users with specific tasks, enhancing functionality and improving ease of use. These utilities serve as a convenient add-on, streamlining common operations and providing additional support for various API interactions.
HdfsUtils – Utilities for HDFS Interactions
The HdfsUtils
class provides a set of utility methods for interacting with the Hadoop Distributed File System (HDFS). It simplifies common operations such as listing files in a directory and copying files from HDFS to a local destination.
Key Features
- List Files in HDFS: Retrieve all files from a specified directory, excluding subdirectories.
- Copy Files to Local Storage: Seamlessly copy files from HDFS to a local destination.
import cern.nxcals.api.extraction.data.builders.DataQuery;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.List;
import static cern.nxcals.api.utils.HdfsUtils.copyToLocal;
import static cern.nxcals.api.utils.HdfsUtils.listFiles;
SparkSession spark = SparkSession.builder().getOrCreate();
try {
String startTime = "2024-11-04 00:00:00";
String endTime = "2024-11-04 10:00:00";
Dataset<Row> df = DataQuery.builder(spark).variables()
.system("CMW").nameEq("SPS.ABWLM:NO_BUNCHES_CYCLE")
.timeWindow(startTime, endTime).build();
String parquetPath = "/user/{username}/parquet";
df.write().mode("overwrite").parquet(parquetPath);
List<Path> files = listFiles(parquetPath);
copyToLocal(parquetPath, "/tmp/parquet");
} catch (Exception e) {
e.printStackTrace();
}
from nxcals.api.extraction.data.builders import DataQuery
from nxcals.api.common.utils.hdfs_utils import HdfsUtils
hdfs_utils = HdfsUtils(spark)
try:
start_time = "2024-11-04 00:00:00"
end_time = "2024-11-04 10:00:00"
df = DataQuery.builder(spark).variables() \
.system('CMW') \
.nameEq('SPS.ABWLM:NO_BUNCHES_CYCLE') \
.timeWindow(start_time, end_time) \
.build()
parquet_path = "/tmp/{username}/parquet_output"
df.write.mode("overwrite").parquet(parquet_path)
files = hdfs_utils.list_files(parquet_path)
hdfs_utils.copy_to_local(parquet_path, "/tmp/parquet_local")
except Exception as e:
print("Error:", e)
Note
Please replace {username} with a writable location.