Skip to content

This step-by-step-guide aims to walk you through the process of creating application demonstrating usage of NXCALS ingestion API.

What you will build

An application which generates and stores 10 messages in NXCALS Testbed environment in a asynchronous manner.

Create application

Create main class of the application:

public class IngestionExample {

        private static final Logger LOGGER = LoggerFactory.getLogger(IngestionExample.class);
        private static final String SYSTEM_NAME = "MOCK-SYSTEM";

        private static final long MAX_NUMBER_OF_MSG_TO_BE_SENT = 10L;

        static {
            System.setProperty("logging.config", "classpath:log4j2.yml");


            // NXCALS Testbed (for PRO access please contact Logging team!)
            System.setProperty("service.url",
                    "https://cs-ccr-testbed2.cern.ch:19093,https://cs-ccr-testbed3.cern.ch:19093,https://cs-ccr-nxcalstbs1.cern.ch:19093");
            System.setProperty("kafka.producer.bootstrap.servers",
                    "cs-ccr-nxcalstbs1.cern.ch:9092,cs-ccr-nxcalstbs2.cern.ch:9092,cs-ccr-nxcalstbs3.cern.ch:9092,cs-ccr-nxcalstbs4.cern.ch:9092");
        }

        public static void main(String[] args) {
            IngestionExample ingestionExample = new IngestionExample();
        }
}

add method for creation of a sample data to be published:

private static ImmutableData getExampleData() {
        final long stampNanos = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);

        DataBuilder builder = ImmutableData.builder();

        // entity key field as specified in the schema of the system definition
        builder.add("device", "device_test");

        // partition field based on the system declaration (used for data partitioning/indexing on the storage)
        builder.add("specification", "specification_test");

        // timestamp accordingly to the system (as pointed by the SYSTEM_NAME property) definition
        builder.add("timestamp", stampNanos);

        // user data (could be anything based on the actual use-case, since it's not enforced by the system)
        builder.add("array_field", new int[]{1, 2, 3, 3}, new int[]{1});
        builder.add("double_field", 123.6257D);
        builder.add("extra_time_field", stampNanos);
        builder.add("description", "This record produced by NXCALS examples");

        // some fields may be set to null, in such case we have to specify their type
        builder.addNull("selector", EntryType.STRING);

        return builder.build();
    }

Create a "first iteration" of a method for data publishing:

long runExample(long nrOfMessagesToSent) {
        AtomicLong nrOfMessagesSent = new AtomicLong(0);

        try (Publisher<ImmutableData> publisher = PublisherFactory.newInstance()
                .createPublisher(SYSTEM_NAME, Function.identity())) {

            LOGGER.info("Will try to publish {} messages with data records, via the NXCALS client publisher",
                    nrOfMessagesToSent);

            for (int i = 0; i < nrOfMessagesToSent; i++) {
                int msgNum = i + 1;

                // get example data to be sent
                ImmutableData exampleData = getExampleData();
                publisher.publish(exampleData);

                LOGGER.info("Published record for message #{} with timestamp: {}",
                        msgNum, exampleData.getEntry("timestamp").get());
                nrOfMessagesSent.incrementAndGet();
            }
        } catch (Exception ex) {
            LOGGER.error("We cannot send data to NXCALS for some reason:", ex);
        }
        LOGGER.info("Finished!");
        return nrOfMessagesSent.get();
    }

and add it to the main method:

public static void main(String[] args) {
        IngestionExample ingestionExample = new IngestionExample();
        ingestionExample.runExample(MAX_NUMBER_OF_MSG_TO_BE_SENT);
    }

Adapt long runExample(long nrOfMessagesToSent) method so it publishes the data in a asynchronous manner. Start with adding ThreadPoolExecutor to the main class:

// we send data to NXCALS in asynchronous way using the below thread pool
    private final ExecutorService ingestionExecutor = new ThreadPoolExecutor(1, 5, 2, TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(100));

and proceed to the modification of the method itself:

long runExample(long nrOfMessagesToSent) {
        AtomicLong nrOfMessagesSent = new AtomicLong(0);

        try (Publisher<ImmutableData> publisher = PublisherFactory.newInstance()
                .createPublisher(SYSTEM_NAME, Function.identity())) {

            LOGGER.info("Will try to publish {} messages with data records, via the NXCALS client publisher",
                    nrOfMessagesToSent);

            List<CompletableFuture<Result>> confirmations = new ArrayList<>();

            for (int i = 0; i < nrOfMessagesToSent; i++) {
                int msgNum = i + 1;

                // get example data to be sent
                ImmutableData exampleData = getExampleData();

                // send data asynchronously to NXCALS
                CompletableFuture<Result> resultCallback = publisher.publishAsync(exampleData, ingestionExecutor);
                // once it has been sent we should get called via the returned callback of which
                // either result of exception will be not null
                resultCallback.handle((result, exception) -> {
                    // if we get exception means that something bad happened with our record (i.e. we used wrong
                    // timestamp <timestamp = 0>)
                    if (exception != null) {
                        LOGGER.error(format("Something bad happened while sending data: %s", exampleData),
                                exception);
                    } else {
                        LOGGER.info("Published record for message #{} with timestamp: {}",
                                msgNum, exampleData.getEntry("timestamp").get());
                        nrOfMessagesSent.incrementAndGet();
                    }
                    return result;
                });
                confirmations.add(resultCallback);
            }

            //wait for all confirmations
            CompletableFuture.allOf(confirmations.toArray(new CompletableFuture[] {})).join();
            ingestionExecutor.shutdown();
            LOGGER.info("Finished!");
        } catch (Exception ex) {
            LOGGER.error("We cannot send data to NXCALS for some reason: {}", ex);
        }
        return nrOfMessagesSent.get();
    }

Build application

Prepare gradle.properties file with the following content:

nxcalsVersion=1.4.0

log4jVersion=2.21.0
fasterxmlJacksonVersion=2.13.5

Important

Make sure that you have the latest production version of NXCALS!

Prepare a "minimal" version of build.gradle file and place it in the main directory of your application.

apply plugin:'application'
mainClassName = "cern.myproject.IngestionExample"

dependencies {
    compile group: 'cern.nxcals', name: 'nxcals-ingestion-api', version: nxcalsVersion

    compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: log4jVersion
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion

    //needed for log4j2 yaml file
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: fasterxmlJacksonVersion
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: fasterxmlJacksonVersion
}

Important

Please note required dependency for data ingestion:

  • compile group: 'cern.nxcals', name: 'nxcals-ingestion-api'

Build and run the application:

../gradlew build run
Click to see expected application output...
> Task :ingestion-api-examples:run
2019-11-21 15:09:40.488 [WARN ] [main] URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2019-11-21 15:09:41.578 [INFO ] [main] IngestionExample - Will try to publish 10 messages with data records, via the NXCALS client publisher
2019-11-21 15:09:42.068 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #1 with timestamp: 1574345381578000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #2 with timestamp: 1574345381584000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #3 with timestamp: 1574345381584000000
2019-11-21 15:09:42.069 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #4 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #5 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #6 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #7 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #8 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #9 with timestamp: 1574345381584000000
2019-11-21 15:09:42.070 [INFO ] [kafka-producer-network-thread | producer-1] IngestionExample - Published record for message #10 with timestamp: 1574345381584000000
2019-11-21 15:09:42.071 [INFO ] [main] IngestionExample - Finished!

Summary

You have successfully created application publishing 10 successive messages in a asynchronous way.

See also

Other pages related to the presented example: