This step-by-step-guide aims to walk you through the process of creating application demonstrating usage of NXCALS ingestion API.
What you will build
An application which generates and stores 10 messages in NXCALS Testbed environment in a asynchronous manner.
Create application
Create main class of the application:
public class IngestionExample {
private static final Logger LOGGER = LoggerFactory.getLogger(IngestionExample.class);
private static final String SYSTEM_NAME = "MOCK-SYSTEM";
private static final long MAX_NUMBER_OF_MSG_TO_BE_SENT = 10L;
static {
System.setProperty("logging.config", "classpath:log4j2.yml");
// NXCALS Testbed (for PRO access please contact Logging team!)
System.setProperty("service.url",
"https://cs-ccr-testbed2.cern.ch:19093,https://cs-ccr-testbed3.cern.ch:19093,https://cs-ccr-nxcalstbs1.cern.ch:19093");
System.setProperty("kafka.producer.bootstrap.servers",
"cs-ccr-nxcalstbs1.cern.ch:9092,cs-ccr-nxcalstbs2.cern.ch:9092,cs-ccr-nxcalstbs3.cern.ch:9092,cs-ccr-nxcalstbs4.cern.ch:9092");
}
public static void main(String[] args) {
IngestionExample ingestionExample = new IngestionExample();
}
}
add method for creation of a sample data to be published:
private static ImmutableData getExampleData() {
final long stampNanos = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
DataBuilder builder = ImmutableData.builder();
// entity key field as specified in the schema of the system definition
builder.add("device", "device_test");
// partition field based on the system declaration (used for data partitioning/indexing on the storage)
builder.add("specification", "specification_test");
// timestamp accordingly to the system (as pointed by the SYSTEM_NAME property) definition
builder.add("timestamp", stampNanos);
// user data (could be anything based on the actual use-case, since it's not enforced by the system)
builder.add("array_field", new int[]{1, 2, 3, 3}, new int[]{1});
builder.add("double_field", 123.6257D);
builder.add("extra_time_field", stampNanos);
builder.add("description", "This record produced by NXCALS examples");
// some fields may be set to null, in such case we have to specify their type
builder.addNull("selector", EntryType.STRING);
return builder.build();
}
Create a "first iteration" of a method for data publishing:
long runExample(long nrOfMessagesToSent) {
AtomicLong nrOfMessagesSent = new AtomicLong(0);
try (Publisher<ImmutableData> publisher = PublisherFactory.newInstance()
.createPublisher(SYSTEM_NAME, Function.identity())) {
LOGGER.info("Will try to publish {} messages with data records, via the NXCALS client publisher",
nrOfMessagesToSent);
for (int i = 0; i < nrOfMessagesToSent; i++) {
int msgNum = i + 1;
// get example data to be sent
ImmutableData exampleData = getExampleData();
publisher.publish(exampleData);
LOGGER.info("Published record for message #{} with timestamp: {}",
msgNum, exampleData.getEntry("timestamp").get());
nrOfMessagesSent.incrementAndGet();
}
} catch (Exception ex) {
LOGGER.error("We cannot send data to NXCALS for some reason:", ex);
}
LOGGER.info("Finished!");
return nrOfMessagesSent.get();
}
and add it to the main method:
public static void main(String[] args) {
IngestionExample ingestionExample = new IngestionExample();
ingestionExample.runExample(MAX_NUMBER_OF_MSG_TO_BE_SENT);
}
Adapt long runExample(long nrOfMessagesToSent)
method so it publishes the data in a asynchronous manner.
Start with adding ThreadPoolExecutor
to the main class:
// we send data to NXCALS in asynchronous way using the below thread pool
private final ExecutorService ingestionExecutor = new ThreadPoolExecutor(1, 5, 2, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100));
and proceed to the modification of the method itself:
long runExample(long nrOfMessagesToSent) {
AtomicLong nrOfMessagesSent = new AtomicLong(0);
try (Publisher<ImmutableData> publisher = PublisherFactory.newInstance()
.createPublisher(SYSTEM_NAME, Function.identity())) {
LOGGER.info("Will try to publish {} messages with data records, via the NXCALS client publisher",
nrOfMessagesToSent);
List<CompletableFuture<Result>> confirmations = new ArrayList<>();
for (int i = 0; i < nrOfMessagesToSent; i++) {
int msgNum = i + 1;
// get example data to be sent
ImmutableData exampleData = getExampleData();
// send data asynchronously to NXCALS
CompletableFuture<Result> resultCallback = publisher.publishAsync(exampleData, ingestionExecutor);
// once it has been sent we should get called via the returned callback of which
// either result of exception will be not null
resultCallback.handle((result, exception) -> {
// if we get exception means that something bad happened with our record (i.e. we used wrong
// timestamp <timestamp = 0>)
if (exception != null) {
LOGGER.error(format("Something bad happened while sending data: %s", exampleData),
exception);
} else {
LOGGER.info("Published record for message #{} with timestamp: {}",
msgNum, exampleData.getEntry("timestamp").get());
nrOfMessagesSent.incrementAndGet();
}
return result;
});
confirmations.add(resultCallback);
}
//wait for all confirmations
CompletableFuture.allOf(confirmations.toArray(new CompletableFuture[] {})).join();
ingestionExecutor.shutdown();
LOGGER.info("Finished!");
} catch (Exception ex) {
LOGGER.error("We cannot send data to NXCALS for some reason: {}", ex);
}
return nrOfMessagesSent.get();
}
Build application
Prepare gradle.properties
file with the following content:
nxcalsVersion=1.5.2
log4jVersion=2.21.0
fasterxmlJacksonVersion=2.13.5
Important
Make sure that you have the latest production version of NXCALS!
Prepare a "minimal" version of build.gradle
file and place it in the main directory of your application.
apply plugin:'application'
mainClassName = "cern.myproject.IngestionExample"
dependencies {
compile group: 'cern.nxcals', name: 'nxcals-ingestion-api', version: nxcalsVersion
compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: log4jVersion
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion
//needed for log4j2 yaml file
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: fasterxmlJacksonVersion
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: fasterxmlJacksonVersion
}
Important
Please note required dependency for data ingestion:
compile group: 'cern.nxcals', name: 'nxcals-ingestion-api'
Build and run the application:
../gradlew build run
Click to see expected application output...
> Task :ingestion-api-examples:run
2019-11-21 15:09:40.488 [WARN ] [main] URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2019-11-21 15:09:41.578 [INFO ] [main] IngestionExample - Will try to publish 10 messages with data records, via the NXCALS client publisher
2019-11-21 15:09:42.068 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #1 with timestamp: 1574345381578000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #2 with timestamp: 1574345381584000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #3 with timestamp: 1574345381584000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #4 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #5 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #6 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #7 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #8 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #9 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #10 with timestamp: 1574345381584000000
2019-11-21 15:09:42.071 [INFO ] [main] IngestionExample - Finished!
Summary
You have successfully created application publishing 10 successive messages in a asynchronous way.
See also
Other pages related to the presented example: