Using RHOSAK from WildFly

Introduction

RHOSAK (full name: Red Hat OpenShift Streams for Apache Kafka) is a cloud service hosted by Red Hat which makes setting up, managing and scaling Apache Kafka instances very easy. Also, you get the peace of mind of knowing the instances are patched with the latest security fixes.

Apache Kafka is an open source, distributed streaming platform that enables (among other things) the development of real-time, event-driven applications.

WildFly integrates with Apache Kafka via the MicroProfile Reactive Messaging subsystem, which implements the MicroProfile Reactive Messaging specification.

In this blog we will see how to write a simple application which sends and receives messages to/from a Kafka instance.

We will then point to how you would be able to run the application locally, using the configuration contained in the application.

After that, we will set up a RHOSAK instance, create a topic and deploy our application into OpenShift. An interesting point here is that we will save the server url and the credentials needed to connect to it in an OpenShift secret. We then map the secret via the MicroProfile Config subsystem. The end result is that we override values hard coded in the application (i.e. the ones we used for the standalone case) from an external source.

The source code for the example can found at https://github.com/kabir/blog-mp-reactive-messaging-rhosak. It contains a README for the RHOSAK steps covered here.

Let’s get started!

The Application

The core part of the application is pretty straightforward, it is an @ApplicationScoped CDI bean called MessagingBean. The full source code can be found here. We will just outline the most important points below:

@Inject
@Channel("to-kafka")
private Emitter<String> emitter;

This injects a MicroProfile Reactive Messaging Emitter into the bean. The @Channel annotation comes from MicroProfile Reactive Messaging, and allows us to send messages to the MicroProfile Reactive Messaging stream in its name (in this case the name is ‘to-kafka’). We send messages in the following method:

public Response send(String value) {
    System.out.println("Sending " + value);
    emitter.send(value);
    return Response.accepted().build();
}

This method is called from a class called UserResource which handles POST requests to add data.

Next we have a method using the @Incoming annotation, again from MicroProfile Reactive Messaging, which receives messages from the ‘from-kafka’ MicroProfile Reactive Messaging stream.

@Incoming("from-kafka")
public void receive(String value) {
    System.out.println("Received: " + value);
    synchronized (recentlyReceived) {
        if (recentlyReceived.size() > 3) {
            recentlyReceived.removeFirst();
        }
        recentlyReceived.add(value);
    }
}

It adds the messages to a list containing the three most recent entries. UserResource contains a method handling GET requests which returns the contents of this list.

Then we have a MicroProfile Config properties file at microprofile-config.properties file which does the mapping to Kafka.

The contents of the file are as follows:

# This will be overwritten by the entries set up in the initialize-server.cli script
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

# Configure the 'to-kafka' channel to write to. We write String entries to the Kafka topic 'testing'
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=testing
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Configure the 'from-kafka' channel we receive messages from. We receive String entries from Kafka topic 'testing'
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=testing
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configure Kafka group.id to prevent warn message - if not set, some default value is generated automatically.
mp.messaging.connector.smallrye-kafka.group.id="microprofile-reactive-messaging-kafka-group-id"

The formats of the property keys can be found in the WildFly documentation which also goes into more depth about what each entry means.

In short we’re pointing to a Kafka instance running on localhost:9092, which is the default port Kafka will run on. We’re pointing the @Channel(“to-kafka”) annotated Emitter we saw earlier to Kafka’s testing topic, and pointing the @Incoming(“from-kafka”) annotated receive() method to the same testing topic.

Since both are using the same underlying Kafka topic, messages sent via the Emitter will be received in the receive() method. Finally, since we are sending Strings, we need to tell Kafka to use the String serializer/deserializer.

Running the Application Locally

Since the intent of this article is to show integration with RHOSAK, we won’t go into too many details here, as it has been covered in this previous blog post.

The steps are:

  • Make sure WildFly is running, e.g. by one of the following two approaches

    • Download the latest WildFly zip. Note: it must be AT LEAST WildFly 27.0.0.Alpha4 since this project uses Jakarta EE dependencies, and prior to 27.0.0.Alpha4 WildFly was using the legacy Java EE dependencies. Enable the MicroProfile Reactive Messaging and Reactive Streams Operators extensions/subsystems by running the following operations in a CLI session:

      batch
      /extension=org.wildfly.extension.microprofile.reactive-messaging-smallrye:add
      /extension=org.wildfly.extension.microprofile.reactive-streams-operators-smallrye:add
      /subsystem=microprofile-reactive-streams-operators-smallrye:add
      /subsystem=microprofile-reactive-messaging-smallrye:add
      run-batch
      
      reload
  • Make sure you have a Kafka server running, for example by following steps 1 and 2 of the Kafka Quickstart.

  • In a clone of https://github.com/kabir/blog-mp-reactive-messaging-rhosak run mvn package wildfly:deploy to build and deploy our application

  • Finally post messages to the application, and read them again by running the following commands in a terminal

    $ curl -X POST http://localhost:8080/wildfly-microprofile-reactive-messaging-rhosak-1.0.0-SNAPSHOT/one
    $ curl -X POST http://localhost:8080/wildfly-microprofile-reactive-messaging-rhosak-1.0.0-SNAPSHOT/two
    $ curl http://localhost:8080/wildfly-microprofile-reactive-messaging-rhosak-1.0.0-SNAPSHOT
    [one, two]

You may now stop WildFly and Kafka.

Running WildFly in OpenShift with Kafka provided by RHOSAK

Setting up a Kafka instance on RHOSAK and creating a secret with connection information

First you need to set up a Kafka instance on RHOSAK. Since the rhoas line client is still under active development, the exact instructions how to do so might change. So rather than summarising everything you need to do here, see the prerequisites section of the example application repository for how to install the rhoas client.

Once you have the rhoas client installed, follow the following steps (again from the example application repository) to perform the following steps.

  • Login to RHOSAK

  • Create a Kafka instance, and set it as the active instance

  • Create a Kafka topic

  • Create a service account used to authenticate with the Kafka instance, and grant it access to produce/consume messages on the Kafka instance

  • Create an OpenShift secret called rhoas containing

    • the address of the Kafka instance

    • the service account details

The secret will be called rhoas and contains the following entries:

  • KAFKA_HOST - the address and port of the Kafka instance running on RHOSAK

  • RHOAS_SERVICE_ACCOUNT_CLIENT_ID - the id of the service account used to authenticate with the Kafka instance

  • RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET - the secret used to log in the client

  • RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL - ignored in this example

Additional application configuration to run in OpenShift and connect to RHOSAK

Although we are not quite ready to deploy our application yet, it is worth knowing that we will be using Helm to deploy our application to OpenShift.

To deploy an application using Helm, you use Helm Charts. The Helm chart for our application can be found at helm.yml, and has the following contents:

build:
  uri: https://github.com/kabir/vlog-mp-reactive-messaging-rhosak.git
  mode: bootable-jar
deploy:
  replicas: 1
  volumeMounts:
    - name: rhoas
      mountPath: /etc/config/rhoas
      readOnly: true
  volumes:
    - name: rhoas
      secret:
        secretName: rhoas

This tells it to build a bootable jar of WildFly, which is a single jar containing both the relevant parts of WildFly and our application.

Further, it says to only create one pod running WildFly, and mounts the rhoas secret we created earlier under the directory /etc/config/rhoas on the pod running the server. This directory will contain a file for each entry in our secret. The file name will be the name of the entry, and the contents of the file will be the value of the entry.

When deploying an application into OpenShift using Helm, it will look for a Maven profile called openshift in the application’s POM. The relevant part of our pom.xml is:

<profile>
    <id>openshift</id>
    <build>
        <plugins>
            <plugin>
                <groupId>org.wildfly.plugins</groupId>
                <artifactId>wildfly-jar-maven-plugin</artifactId>
                <version>${version.wildfly-jar.maven.plugin}</version>
                <configuration>
                    <feature-pack-location>wildfly@maven(org.jboss.universe:community-universe)#${version.server.bootable-jar}</feature-pack-location>
                    <layers>
                        <layer>cloud-server</layer>
                        <layer>microprofile-reactive-messaging-kafka</layer>
                    </layers>
                    <plugin-options>
                        <jboss-fork-embedded>true</jboss-fork-embedded>
                    </plugin-options>
                    <cli-sessions>
                        <cli-session>
                            <!-- do not resolve expression as they reference env vars that -->
                            <!-- can be set at runtime -->
                            <resolve-expressions>false</resolve-expressions>
                            <script-files>
                                <script>src/main/scripts/initialize-server.cli</script>
                            </script-files>
                        </cli-session>
                    </cli-sessions>
                    <cloud/>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</profile>

The org.wildfly.plugins:wildfly-jar-maven-plugin plugin is used to create a bootable jar containing the application. We tell it to use the following Galleon layers when provisioning the server jar:

  • microprofile-reactive-messaging-kafka - this provides the MicroProfile Reactive Messaging functionality and the Kafka connector, as well as other dependencies needed by the Reactive Messaging implementation such as CDI. We briefly mentioned this layer in the Running the Application Locally section.

  • cloud-server - this is a trimmed down base server, whose main aim is to offer Jakarta RESTful Web Services functionality along with server dependencies needed to support those.

The plugin will also run the src/main/scripts/initialize-server.cli WildFLy CLI script when configuring the server. It’s contents are:

echo "Adding the 'rhoas' secret volume mount as a MicroProfile Config source..."

/subsystem=microprofile-config-smallrye/config-source=rhosak-binding:add(dir={path=/etc/config/rhoas})

echo "Adding the MicroProfile Config entries mapping the secret values..."

/subsystem=microprofile-config-smallrye/\
config-source=reactive-messaging-properties:add(properties={\
mp.messaging.connector.smallrye-kafka.bootstrap.servers=${KAFKA_HOST},\
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL,\
mp.messaging.connector.smallrye-kafka.sasl.mechanism=PLAIN,\
mp.messaging.connector.smallrye-kafka.sasl.jaas.config="\n\
org.apache.kafka.common.security.plain.PlainLoginModule required\n\
username=\"${RHOAS_SERVICE_ACCOUNT_CLIENT_ID}\"\n\
password=\"${RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET}\";"\
}, ordinal=500)

First of all it is worth noting that we don’t need to enable the MicroProfile Reactive Messaging and Reactive Streams Operators extensions/subsystems in this case. This is unlike when we were using the downloaded WildFly zip archive earlier. This is because when a server is provisioned using Galleon, the microprofile-reactive-messaging-kafka layer takes care of that for us.

The first thing the CLI script does is mount the path /etc/config/rhoas (i.e. where our Helm chart told OpenShift to mount our rhoas secret) as a MicroProfile Config ConfigSource (in this case as a FileSystem ConfigSource supported by our underlying SmallRye implementation of MicroProfile Config).

After this config source is mounted, we can reference values from it in other places that can use MicroProfile Config values. This is what we are doing in the next block, where we tell WildFly’s MicroProfile Config subsystem to add the following properties:

  • mp.messaging.connector.smallrye-kafka.bootstrap.servers uses KAFKA_HOST from our rhoas secret. Adding this here overrides the value that we hardcoded in the microprofile-config.properties earlier.

  • mp.messaging.connector.smallrye-kafka.security.protocol and mp.messaging.connector.smallrye-kafka.sasl.mechanism are used to secure the connection and enable authentication via SASL since RHOSAK is secured. The Security chapter of the Kafka documentation explains these values in more detail.

  • mp.messaging.connector.smallrye-kafka.sasl.jaas.config sets up JAAS configuration to provide the RHOAS_SERVICE_ACCOUNT_CLIENT_ID and RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET from our rhoas secret to autheniticate with RHOSAK.

So in short the above configuration makes values from our secret available to WildFly, overrides the location of the Kafka server, and adds more MicroProfile Config properties to enable SSL and authentication.

Deploying our application

Now that we have configured everything properly, it is time to test our application!

First you will need to install helm, and use it to add the wildfly Helm repostory as outlined in https://docs.wildfly.org/wildfly-charts/

Then from the root folder of your local copy of the example repository, run:

$ helm install rhosak-example -f ./helm.yml wildfly/wildfly

This will return quickly but that does not mean the application is up and running yet. Check the application in the OpenShift console or using oc get deployment rhosak-example -w. Essentially what happens is it starts two pods. One for you application, and another which is doing the build of the bootable jar. Once the build one is done and has published the resulting image, the pod running the application can start properly.

Accessing our application running on OpenShift

First we need the URL of our application on OpenShift:

$ oc get route
NAME             HOST/PORT                                                          PATH   SERVICES         PORT    TERMINATION     WILDCARD
rhosak-example   rhosak-example-kkhan1-dev.apps.sandbox.x8i5.p1.openshiftapps.com          rhosak-example   <all>   edge/Redirect   None

In my case the URL is rhosak-example-kkhan1-dev.apps.sandbox.x8i5.p1.openshiftapps.com. You should of course substitute that with the URL of your application in the following steps.

Next, let’s add some entries using Curl:

$ curl  -X POST https://rhosak-example-kkhan1-dev.apps.sandbox.x8i5.p1.openshiftapps.com/one
$ curl  -X POST https://rhosak-example-kkhan1-dev.apps.sandbox.x8i5.p1.openshiftapps.com/two

These will be sent to Kafka, and received again by the application which will keep a list of the most recently received values. Note that the https:// is needed - if left out, the commands will appear to work, but no data will actually be posted.

To read this list of recently received values, we can run Curl again:

$ curl  https://rhosak-example-kkhan1-dev.apps.sandbox.x8i5.p1.openshiftapps.com
[one, two]

Conclusion

Compared to running locally the RHOSAK steps look a lot more involved. However, we have achieved a lot!

If we break down what we have actually done, it looks simpler:

  • Use rhoas to set up Kafka, a topic, and a service account authorised to publish/consume messages

  • Create a secret called rhoas containing the location of the Kafka instance and credentials to access it

  • Configure our application to use it by:

    • Mounting the secret under /etc/config/rhoas in the Helm Chart

    • Use org.wildfly.plugins:wildfly-jar-maven-plugin to

      • provision a trimmed down server with the required functionality

      • run a CLI script when building the server to mount the /etc/config/rhoas folder as a MicroProfile Config ConfigSource and use values from that to override the location of the server, and add properties to turn on SSL, SASL authentication, and provide the credentials from our secret to authenticate

I hope this guide will be helpful to people wanting to try RHOSAK from WildFly for the first time.