Red Hat

Introducing the WildFly MicroProfile Reactive Specifications Feature Pack

I am pleased to announce the 1.0.0.Beta1 release of the MicroProfile Reactive specifications feature pack for WildFly. It offers experimental support for the following MicroProfile specifications, which all focus on the reactive area:

  • MicroProfile Reactive Messaging 1.0 - this is a framework for building event-driven, data streaming and event sourcing applications using CDI. The streams, or channels, can be backed by a variety of messaging technologies. We currently ship connectors for: Apache Kafka, AMQP and MQTT.

  • MicroProfile Reactive Streams Operators 1.0 - Reactive Messaging is build on Reactive Streams. RSO gives you a way to manipulate and handle those streams.

  • MicroProfile Context Propagation 1.0 - The traditional way of propagating state using ThreadLocals does not work well in the reactive world. Async/reactive code often creates a 'pipeline' of code blocks that get executed 'later' - in practice after the method defining them has returned. MicroProfile Context Propagation is there to help you deal with this, so that your deferred code can still for example latch onto the transaction initiated by the calling method.

We are using the SmallRye implementations of each of these specifications.

The source code for the feature pack can be found on GitHub. The README contains links to the specifications, as well as the SmallRye implementations of these and documentation.

Installing the feature pack

We decided to see what the interest is in using these MicroProfile Reactive specifications in WildFly before integrating them into the WildFly code itself, which is why we have shipped this as a Galleon feature pack. This is something that we plan on doing a lot more of in the future for experimental features. Galleon is a tool we have been using internally to compose the server the past several major releases of WildFly.

To install the feature pack, download the latest version of Galleon. At the time of writing this is 4.2.5. Unzip it somewhere, and add its bin/ folder to your path.

Next, save a copy of provision.xml somewhere, and go to that folder in a terminal window. Then run:

$galleon.sh provision ./provision.xml --dir=my-wildfly

This will take some time the first time you do it since it will download a lot of dependencies from Maven. Once that is done, subsequent attempts will be fast.

What this command does is:

  • Provision a slimmed version (compared to the full download) of WildFly containing the relevant parts for a server running in the cloud. The main README of the project repository contains more information about this part. You can adjust this file to choose other parts of the server you may be interested in.

  • Next it provisions the full contents of the feature pack into our new server instance.

  • The provisioned server will be output in the my-wildfly subdirectory, and can be started via the usual my-wildfly/bin/standalone.sh command.

Example

A short example of what these specs can do follows. The code snippets are inspired by the Quickstarts, so be sure to try those out!

First we have a method which generates a new price every five seconds:

    private Random random = new Random();

    @Outgoing("generated-price")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
                .map(tick -> random.nextInt(100));
    }

The @Outgoing annotation comes from Reactive Messaging, and specifies that the stream of generated prices will be sent to a channel called 'generated-price'. Channels may be either in-memory, or they may be backed by a messaging provider.

In this case, we have another method (it can be in another class) annotated with @Incoming, using the same 'generated-price' name:

    @Incoming("generated-price")
    @Outgoing("to-kafka")
    public double process(int priceInUsd) {
        return priceInUsd * CONVERSION_RATE;
    }

The @Incoming annotation tells it to listen for messages on the generated-price channel. There is a match with the name of the @Outgoing annotation in the previous example so this method will receive all the prices generated by the generate() method. As the name is the same in the two annotations, this becomes an in-memory stream.

The method is also annotated with an @Outgoing annotation so once its conversion has been done, the result is sent to the 'to-kafka' channel.

To map this channel to a Kafka stream, we need some configuration, using MicroProfile Config in a microprofile-config.properties that is part of the deployment:

# Selects the Kafka connector for the 'to-kafka' outgoing stream
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
# Maps the outgoing stream to the 'prices' Kafka topic
mp.messaging.outgoing.to-kafka.topic=prices
# Adds a serializer to convert the data
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

Next we create a Publisher that reads this data from Kafka.

    @Inject
    @Channel("from-kafka") Publisher<Double> prices;

This @Channel annotation on a Publisher is conceptually the same as if we had annotated a method with @Incoming("from-kafka") but allows us to do some cool tricks which we will see soon. This is not part of the current Reactive Messaging 1.0 specifaction, but will be part of 1.1. For now it is a SmallRye extension to the specification.

In our microprofile-config.properties that is part of the deployment we configure this channel mapping to the same Kafka stream:

# Selects the Kafka connector for the 'from-kafka' incoming stream
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
# Maps the incoming stream to the 'prices' Kafka topic
mp.messaging.incoming.from-kafka.topic=prices
# Adds a deserializer to convert the data
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

To summarise where we are at so far all the messages which got generated in our generate() methods got sent, via an in memory channel, to our process() method. The process() method did some conversion before sending it to a Kafka topic called 'prices'. Then we listen to that Kafka topic, and are able to publish them from our prices Publisher instance.

Now that we have the converted stream in a Publisher instance we can access it from the non-reactive world, e.g. in a REST endpoint:

    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType(MediaType.TEXT_PLAIN) // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<Double> readThreePrices() {
        // get the next three prices from the price stream
        return ReactiveStreams.fromPublisher(prices)
                .limit(3)
                .buildRs();
    }

To keep things simple, we will consider the above simple version of this method first. As we got the stream into a Publisher by using the @Channel annotation, we have a bridge into the 'user world' from the 'reactive world'. Otherwise we would just have a chain of @Outgoing and @Incoming annotated methods (which of course may be also useful in some cases!).

First, we use the MicroProfile Reactive Streams Operators method ReactiveStreams.fromPublisher() to wrap the publisher. We then specify limit(3) - this has the effect that once someone calls this method the stream will terminate after receiving three prices. We call buildRs() to return a new Publisher for those three items. As the messages are every five seconds the readPrices() method will return while our reactive stream is still receiving and re-emitting the three messages.

Next, let’s see how MicroProfile Context Propagation is useful. We will modify the above method, so that each of the three prices get stored to a database

    @PersistenceContext(unitName = "quickstart")
    EntityManager em;

    @Transactional // This method is transactional
    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType(MediaType.TEXT_PLAIN) // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<Double> readThreePrices() {
        // get the next three prices from the price stream
        return ReactiveStreams.fromPublisher(prices)
                .limit(3)
                .map(price -> {
                    // Context propagation makes this block inherit the transaction of the caller
                    System.out.println("Storing price: " + price);
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.setValue(price);
                    // here we are all in the same transaction
                    // thanks to context propagation
                    em.persist(priceEntity);

                    return price;
                })
                .buildRs();
    }

First of all we have made the method transactional, so a transaction will be started when entering the method. We then read three prices exactly the same as before, but this time we have an extra call to map(). Inside the map() block, we save each price to a database. Thanks to Context Propagation (which is integrated with Reactive Streams Operators) this happens within the transaction of the readThreePrices() method, although that method will have completed by the time the prices come through.

Feedback

We’re keen to hear your feedback! Please raise any issues found at https://github.com/wildfly-extras/wildfly-mp-reactive-feature-pack/issues.

Kabir Khan

Kabir started using JBoss back in 2000, and joined the team in 2004. Since then he has worked on JBoss AS/WildFly as a core developer.

Location:  United Kingdom

Occupation:  Software Engineer

Latest News

back to top