MicroProfile Reactive Messaging 2.0 in WildFly 25
For WildFly 25, we upgraded the MicroProfile Reactive Messaging support from version 1 to 2. It contains a new @Channel
annotation, which in conjunction with the new Emitter
interface, were introduced in order to make it possible to push data into the MicroProfile Reactive streams from code initiated by a user.
The MicroProfile Reactive Messaing implementation in WildFly is based on the SmallRye Reactive Messaging project. The version included in WildFly 25, introduces a new API to have more control over how we interact with Kafka. We expose this API in WildFly 25.
This post will:
-
Take a simple web application, consisting of a few html pages, and add a Servlet filter to push information about page visits into Reactive Messaging via an
Emitter
.-
These messages will be forwarded onto Kafka
-
-
Show a standalone application to read the last visited page per user from Kafka via the Kafka Streams API
-
Deploy the above application into WildFly, bundling the Kafka Streams API (which we don’t ship in WildFly) to read the last visited page per user.
The code for the application is on GitHub. Additionally, you can find more information about the MicroProfile Reactive Messaging functionality in WildFly in the WildFly documentation.
Running the application
See the GitHub repository README for instructions on how to build and run the different parts of the application. Here we will focus on explaining how it works.
The main application
The main application is contained in the app
folder.
The core of the application is a few html pages which link to each other. Now, we want to track which user visited which page. We do this by enhancing the application with a Servlet filter called MessagingFilter:
public class MessagingFilter extends HttpFilter {
@Inject
@Channel("from-filter")
Emitter<PageVisit> messagingEmitter;
FIrst we have field injected via CDI of type Emitter
, called messagingEmitter
, which is annotated with @Channel
. This Emitter
instance makes it a breeze to push data to the MicroProfile Reactive Messaging stream indicated by the value of the @Channel
annotation (i.e. from-filter
).
@Override
public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws IOException, ServletException {
String user = getUsername(request);
String address = request.getRemoteAddr();
String page = Paths.get(request.getRequestURI()).getFileName().toString();
PageVisit pv = new PageVisit(user, address, page);
messagingEmitter.send(pv);
Next we gather information about the request (user, address and page name), and bundle this information up in a PageVisit
bean.
We then use the injected Emitter
to send the PageVisit
instance. The Emitter
will then send the PageVisit to the from-filter
stream.
// Disable caching for the html pages
((HttpServletResponse)response).addHeader("Cache-control", "no-store");
((HttpServletResponse)response).addHeader("Pragma", "no-cache");
filterChain.doFilter(request, response);
}
Getting the user name is simulated in the following method which randomly chooses a user for the current request. For the purposes of this demo this is to get a few different users in the data recorded when we click on the links when running the application.
private String getUsername(HttpServletRequest servletRequest) {
// Pretend we're looking up the authenticated user
switch ((int)Math.round(Math.random() * 3)) {
case 0:
return "bob";
case 1:
return "emma";
case 2:
return "frank";
case 3:
return "linda";
}
return null;
}
}
Next, we have an ApplicationScoped
CDI bean called MessagingBean
@ApplicationScoped
public class MessagingBean {
@Inject
@Channel("special")
Emitter<PageVisit> special;
@Incoming("from-filter")
@Outgoing("kafka-visits")
public Message<PageVisit> fromFilter(PageVisit pageVisit) {
if (pageVisit.getPage().equals("3.html")) {
special.send(pageVisit);
}
Message<PageVisit> msg = Message.of(pageVisit);
msg = KafkaMetadataUtil.writeOutgoingKafkaMetadata(
msg,
OutgoingKafkaRecordMetadata
.<String>builder()
.withKey(pageVisit.getUserName())
.build());
return msg;
}
@Incoming("special")
public void special(PageVisit pageVisit) {
System.out.println("===> " + pageVisit.getUserName() + " visited " + pageVisit.getPage());
}
}
The fromFilter()
method is annotated with the @Incoming("from-filter")
annotation (from version 1 of the specification) and will receive all messages that were sent on our previous Emitter
.
Since the both the @Incoming
and @Channel
annotations use the value from-filter
(i.e. they match), we end up with a simple in-memory stream. We could of course have routed this via Kafka, but for this example I wanted to keep the configuration needed to map to Kafka as simple as possible. The WildFly documentation goes into more details about how to configure MicroProfile Reactive Messaging streams to consume from Kafka topics.
The fromFilter()
method is also annotated with @Outgoing("kafka-visits")
, and so it is expected that all incoming messages from the from-filter
stream will be forwarded onto the kafka-visits
stream.
The kafka-visits
stream is backed by Kafka (we will see how to map this stream onto a Kafka topic in a second). In this case we decide that we want messages sent on this topic to have a Kafka key, so we:
-
Wrap the incoming
PageVisit
object in aMessage
object, which comes from the MicroProfile Reactive Messaging specification. -
We then create an
OutgoingKafkaRecordMetadata
instance, where we set the key of the record to be the user. We add this metadata to the message by callingKafkaMetadataUtil.writeOutgoingKafkaMetadata()
. The mentioned classes come from the new SmallRye Kafka API. -
Finally we return the massaged
Message
containing our receivedPageVisit
instance, which will forward it to thekafka-visits
stream.
Another thing going on in this example, is that we’re using an injected Emitter
to 'fork' the sending of the received data to an additional location. In fromFilter()
, if the page 3.html
was visited, we will also send the received PageVisit
via the injected Emitter
. This in turn will send the PageVisit
instance on the special
stream indicated in its @Channel
annotation.
The special()
method, annotated with @Incoming(`special
) receives messages from the special
stream (i.e. the ones sent via the Emitter
).
When running the application, and clicking on the 3
link, you should see output in the server logs. Additionally, every click on any link will show up in the Kafka consumer logs mentioned in the example README. So, in addition to being able to easily send data from user-initiated code, Emitter
is useful for 'forking' streams, so you can send data to more than one location. This functionality was not present in version 1 of the specification.
To map the kafka-visits
stream to a Kafka topic we do the configuration in microprofile-config.properties:
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.kafka-visits.connector=smallrye-kafka
mp.messaging.outgoing.kafka-visits.topic=page-visits
mp.messaging.outgoing.kafka-visits.value.serializer=org.wildfly.blog.reactive.messaging.common.PageVisitsSerializer
This points the mapping towards localhost:9092
to connect to Kafka, maps the kafka-visits
stream to the page-visits
kafka topic, and specifies PageVisitsSerializer to be used to serialize the PageVisit
instances that we send to Kafka. The WildFly documentation contains more detailed information about this configuration.
If you deploy the application into WildFly, as outlined in the example README, and you performed the optional step of connecting a Kafka consumer, you should see the output similar to this in the Kafka consumer terminal as you click the links in the application hosted at http://localhost:8080/app/:
frank 127.0.0.1app emma 127.0.0.13.html frank 127.0.0.11.html linda 127.0.0.13.html frank 127.0.0.11.html emma 127.0.0.12.html frank 127.0.0.13.html
When you visit 3.html
, there will be additional output from the special()
method in WildFly’s server.log
===> emma visited 3.html ===> linda visited 3.html ===> frank visited 3.html
Reading data from Kafka in a standalone application
While it is nice to be able to send (and receive, although not shown in this example) messages via Kafka, we may want to query the data in Kafka later.
The code for the command line application to query data from Kafka is contained in the streams
folder. It contains a very simple (I am a beginner at this part) application to get the most recent page visits per user. It uses the Kafka Streams API to interact with Kafka.
The Main
class calls through to a more interesting DataStoreWrapper
class.
public static void main(String[] args) throws Exception {
try (DataStoreWrapper dsw = new DataStoreWrapper()) {
dsw.init();
Map<String, String> lastPagesByUser = Collections.emptyMap();
try {
dsw.readLastVisitedPageByUsers();
} catch (InvalidStateStoreException e) {
}
if (lastPagesByUser.size() == 0) {
// It seems that although the stream is reported as RUNNING
// in dsw.init() it still needs some time to settle. Until that
// happens there is no data or we get InvalidStateStoreException
Thread.sleep(4000);
lastPagesByUser = dsw.readLastVisitedPageByUsers();
}
System.out.println("Last pages visited:\n" + lastPagesByUser);
}
}
}
Note
|
There is some error handling here. In case you get no entries, or if you get InvalidStateStoreException , try increasing the timeout in the sleep.
|
Looking at the DataStoreWrapper class, the first thing to note is that it is 'CDI ready'. Although this section will run it as a standalone application where CDI is not relevant, we will reuse this class later in an application deployed in WildFly.
@ApplicationScoped
public class DataStoreWrapper implements Closeable {
private volatile KafkaStreams streams;
We will initialise this streams instance in the init()
method below.
@Inject
private ConfigSupplier configSupplier = new ConfigSupplier() {
@Override
public String getBootstrapServers() {
return "localhost:9092";
}
@Override
public String getTopicName() {
return "page-visits";
}
};
The configSupplier
field is inititalised to an implementation of ConfigSupplier
which hard codes the values of the Kafka bootstrap servers, and the topic name. When deploying this into WildFly later we will use MicroProfile Config to set these values to avoid hard coding them.
DataStoreWrapper() {
}
Next, we will take a look at the init()
method where we set up the ability to query the stream.
@PostConstruct
void init() {
try {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configSupplier.getBootstrapServers());
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageVisitSerde.class.getName());
// For this we want to read all the data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
The above sets configuration properties to connect to kafka, and sets Serde
s for (de)serializing the Kafka record keys and values. The class PageVisitSerde
is used to (de)serialise our PageVisit
class from earlier.
We also specify that we want all the data stored on this topic.
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore("test-store");
KTable<String, PageVisit> source = builder.table(
configSupplier.getTopicName(),
Materialized.<String, PageVisit>as(stateStore)
.withKeySerde(Serdes.String())
.withValueSerde(new PageVisitSerde()));
final Topology topology = builder.build();
this.streams = new KafkaStreams(topology, props);
Now we create a KTable
associated with the Kafka topic, and create a StateStore
from that. In this case since we are using the Kafka record key (above we used the user for this when sending to Kafka) as the KTable
key, we will get one entry (the latest) for each user. Note this is a very simple example, and not an in-depth exploration of the Kafka Streams API, so of course more advanced views on the stored data are possible!
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicReference<KafkaStreams.State> state = new AtomicReference<>();
streams.setStateListener((newState, oldState) -> {
state.set(newState);
switch (newState) {
case RUNNING:
case ERROR:
case PENDING_SHUTDOWN:
startLatch.countDown();
}
});
this.streams.start();
startLatch.await(10, TimeUnit.SECONDS);
System.out.println("Stream started");
if (state.get() != KafkaStreams.State.RUNNING) {
throw new IllegalStateException();
}
Finally, we start the stream and wait for it to start.
} catch (Exception e) {
if (this.streams != null) {
this.streams.close();
}
throw new RuntimeException(e);
}
}
The readLastVisitedPageByUsers()
method uses the StateStore
we set up earlier and returns all the found entries:
public Map<String, String> readLastVisitedPageByUsers() {
StoreQueryParameters<ReadOnlyKeyValueStore<String, PageVisit>> sqp = StoreQueryParameters.fromNameAndType("test-store", QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<String, PageVisit> store = this.streams.store(sqp);
Map<String, String> lastPageByUser = new HashMap<>();
KeyValueIterator<String, PageVisit> it = store.all();
it.forEachRemaining(keyValue -> lastPageByUser.put(keyValue.key, keyValue.value.getPage()));
return lastPageByUser;
}
@PreDestroy
public void close() {
this.streams.close();
}
}
If you run the application, following the instructions in the example README, you should see output like this:
Stream started
Last pages visited:
{frank=3.html, emma=2.html, linda=3.html}
As already mentioned, this will be the latest page visited for each user.
Reading data from Kafka in a WildFly application
WildFly does not ship with the Kafka Streams API, but we can still deploy the application above into WildFly with some adjustments in how we package it. The example README contains more details, but in a nutshell we:
-
Include the Kafka Streams API jar in our deployment
-
Make sure we don’t include all the Kafka Streams API jar’s transitive dependencies in our deployment since they already exist in WildFly.
-
Modify the deployment’s META-INF/MANIFEST.MF to set up a dependency on the
org.apache.kafka.client
JBoss Module. This module contains the Kafka client jar, which is needed by the Kafka Streams API.
In our standalone application, we hardcoded the bootstrap servers and the topic name. When deploying to WildFly we would like to avoid recompiling the application if, say, Kafka moves somewhere else, so we specify this information in microprofile-config.properties:
kafka.bootstrap.servers=localhost:9092
kafka.topic=page-visits
We then create an implementation of the ConfigSupplier
interface in MpConfigConfigSupplier
. This is an ApplicationScoped
CDI bean which gets injected with the MicroProfile Config containing the properties from the microprofile-config.properties
file:
@ApplicationScoped
public class MpConfigConfigSupplier implements ConfigSupplier {
@Inject
Config config;
@Override
public String getBootstrapServers() {
return config.getValue("kafka.bootstrap.servers", String.class);
}
@Override
public String getTopicName() {
return config.getValue("kafka.topic", String.class);
}
}
Our DataStoreWrapper class from earlier is a CDI bean, and so our MpConfigConfigSupplier
will get injected into its configSupplier
field, overwriting the default implementation that was used in the standalone application case:
@ApplicationScoped
public class DataStoreWrapper implements Closeable {
private volatile KafkaStreams streams;
@Inject
private ConfigSupplier configSupplier = new ConfigSupplier() {
// -- SNIP --
// This implementation gets replaced by the injected MpConfigConfigSupplier
In order to be able to call this from a client, we add a simple REST endpoint:
@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public class StreamsEndpoint {
@Inject
DataStoreWrapper wrapper;
@GET
@Path("/last-visited")
public Map<String, String> getLastVisited() {
return wrapper.readLastVisitedPageByUsers();
}
}
This simply delegates to our DataStoreWrapper
.
If you deploy the application as outlined in the example README, and visit http://localhost:8080/streams/last-visited you should see output like:
{"frank":"3.html","emma":"2.html","linda":"3.html"}
Conclusion
We have seen how to leverage the new Emitter in MicroProfile Reactive Messaging 2 to push data to MicroProfile Reactive Messaging Streams, and how to send data to Kafka. We also used the new Kafka User API to set the Kafka record key in the data sent to Kafka.
Although we did not receive data from Kafka in this example, we leveraged the Kafka Streams API to read the data we stored in Kafka in a standalone application as well as in an application deployed to WildFly.
References
The WildFly documentation contains more information on the various configuration options for using MicroProfile Reactive Messaging with Kafka in WildFly.
Also, the SmallRye Reactive Messaging Kafka Connector documentation contains a fuller reference of configuration options for Kafka, as well as more information about MicroProfile Reactive Messaging in general.
Finally, the MicroProfile Reactive Messaging specification can be found in the eclipse/microprofile-reactive-messaging GitHub project.