WSO2 Enterprise Integrator 9 minutes

Setting up RabbitMQ with WSO2 Enterprise Integrator

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
RabbitMQ WSO2 Enterprise Integrator
Scroll

For almost every enterprise the use of queues and message stores are an essential part of the IT landscape. The reason for this statement is that not all processes are synchronous. Sometimes messages need to be stored and there is an integration pattern called the dead letter channel. This describes the situation that the message needs to be delivered to an endpoint that is unavailable. 

RabbitMQ 1

These are all real-life scenarios. In other words, for every organization there will be situations where you’re not able to deliver messages, have a-synchronous processes and need to store messages. 

The Enterprise Integration Pattern (EIP) Dead Letter Channel

The Enterprise Integration Pattern Dead Letter Channel is a pattern that is not something you can solve by simply dropping the message, or by saying well; tough luck and try it again. It is something that you need to take care of. This is, in my opinion, done with a messageStore or queue. When a message cannot be delivered the queue will store it until the time that it can be delivered.

Putting messages on Queues

They can put a message on the queue and let another process be delivered at a later point. You can maximize the number of retries and you can also implement a guaranteed delivery 

So, what is a queue? Well, in its simplest form it is a piece of storage. That piece of storage can be a part of your internal memory. It can be a real message queue product, or even in a database. The idea is that messages on a queue can be picked up at a later stage by another process and hopefully they can be delivered.

The Enterprise integrator from WSO2 supports a number of message queues. So, there is activeMQ, WSO2 Message Broker, Apache qpid and RabbitMQ. But basically, also other systems that adhere to the AMQP protocol.

RabbitMQ

In this blog, we are going to look at an implementation of RabbitMQ in conjunction with the Enterprise Integrator. We will actually create a proxy that will put a message on a queue. And we will create another proxy that will take the message off the queue and send this forth. This is something that we call asynchronous, meaning that there is not a direct request response mechanism. The message gets stored and picked up at a later stage. 

So, what kind of use cases are there, other than the previously mentioned dead letter channel? Well, it could be that the answer is not directly available. Let’s say you submit a quote for remodeling of your house. Someone needs to look at it, do a calculation and send it to you. That is one example of a message processor message store.

Installing RabbitMQ

We are going to install it on Centos 7. A Windows installation is also possible but uses Chocolatey. 

Let’s first create a repo with both the rabbitmq-erlang data and the rabbitmq-server data. This command creates the /etc/yum.repos.d/rabbitmq.repo on a Centos7 environment.

cat <<EOF | sudo tee /etc/yum.repos.d/rabbitmq.repo 
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
gpgcheck=0
enabled=1

[rabbitmq_rabbitmq-server]
name=rabbitmq_rabbitmq-server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/\$basearch
gpgcheck=0
enabled=1
EOF

The rest of the procedure is quite easy and takes three steps. All are done with elevated (sudo) rights. 

1. sudo yum install -y rabbitmq-server
2. sudo systemctl enable --now rabbitmq-server
3. sudo rabbitmq-plugins enable rabbitmq_management

Step 1 installs RabbitMQ, step 2 enables RabbitMQ to be started at boot and step 3 enables the management console. Let us try it out by accessing http://localhost:15672/#/ and login in with guest/guest credentials.

RabbitMQ 2

It works, but a guest account as admin is strange. Let us add the admin user and disable the guest. In a terminal session enter these lines:

sudo rabbitmqctl add_user admin
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
sudo rabbitmqctl delete_user guest

Check the admin console for the changes to be made. As you can see, you can also add a user via the console.

RabbitMQ 3

Setting up Enterprise Integrator

The Enterprise Integrator uses the axis2.xml file in [EI-HOME]/conf/axis2 to enable this specific transport for both Receiver and Sender. I’ve created a new Connection Factory (BlogFactory) next to the default factory. Please see that I’ve added a number of parameters to the setup for the Sender.

RabbitMQ 4
RabbitMQ 5

Download the https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.9.0/amqp-client-5.9.0.jar and put it in the lib directory.

Save the axis2 file and start the Enterprise Integrator using the sh integrator.sh command. If we do not get any errors, we should be seeing something like this: 

[2020-06-02 17:07:47,247]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory} - RabbitMQ ConnectionFactory : AMQPConnectionFactory initialized

[2020-06-02 17:07:47,249]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQListener} - RabbitMQ AMQP Transport Receiver initialized...

We can now start working with a queue. To keep it simple we are going to put a message on a queue and take it off, creating an asynchronous flow. We will do this against a database record that is sent as a soap message and in the end put in the database. We can query the database to see the record.

We create a simple database. In the Netherlands there is a music festival called Down The Rabbithole. So, let’s create a lineup table of bands and singers. We will use MySQL for this. Change the SQL statements to your rdbms if needed. 

create database rabbithole;
use rabbithole;

CREATE TABLE `bands` (
 `bandid` INT(8) NOT NULL AUTO_INCREMENT,
 `bandname` VARCHAR(50),
 `country` VARCHAR(20),
 PRIMARY KEY (`bandid`)
);

insert into bands (bandid, bandname, country) values (1001000, "Charli XCX", "United Kingdom");

To be able to use MySQL we need the MySQL JDBC driver. Copy this file (download from here) to the [EI-HOME]/lib directory and restart the EI. In this case, use the sh integrator.sh -DosgiConsole command to check the driver.

Wait until the EI is started and press ‘Enter’. Type in lb mysql:

osgi> lb mysql

START LEVEL 6

   ID|State      |Level|Name

  105|Active     |    4|mysql_connector_java_5.1.39_bin (1.0.0)

osgi>

This shows the driver is loaded.

Creating an API

We will now create an API that sends a message to the RabbitMQ endpoint. We will not completely show you how to create an API in Integration Studio but rather show the design and source code. 

RabbitMQ 6
<api xmlns="http://ws.apache.org/ns/synapse" name="RabbitAPI" context="/new">
   <resource methods="POST">
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <log level="full"/>
         <call>
            <endpoint>
               <address uri="rabbitmq:/?rabbitmq.connection.factory=BlogFactory&rabbitmq.queue.name=Rabbithole"/>
            </endpoint>
         </call>
         <respond/>
      </inSequence>
      <outSequence/>
      <faultSequence/>
   </resource>
</api>

I am showing the payload with the log mediator and set out only since I do not get a response from the call. In the end I am just responding to the client.

The development is done in Integration Studio, but I’ve chosen to copy it to the Management UI of the Enterprise Integrator. 

Simply go to the Management UI of the Enterprise Integrator and Click on ‘API’, ‘Add – API’.

RabbitMQ 7

Click on ‘Switch to source view’ and copy the code above directly into the Window. Press ‘Save’.

RabbitMQ 8
RabbitMQ 9

Save the API.

Adding a band

When you want to add a band, we can use this API to put it on the queue. We need to make sure that we have the right payload. Right, in this situation, means the values that we need to insert. In this case we need a JSON payload with two fields: bandname and country. How do I know? Well, when you look at the database you will see that bandid is a field with auto increment. In this case I will insert the two fields with a DB Report Mediator in the database. The design uses the names as they are used in the database.

Consume the message in Enterprise Integrator 

When you put a message on a queue, you also need to read it from the queue.

This is the proxy that reads the RabbitMQ Rabbithole Queue. Note that the transport is specifically rabbitmq and not JMS!

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ReadRabbitMQ" startOnLoad="true" trace="enable" transports="rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property expression="json-eval($.bandname)" name="bandname" scope="default" type="STRING"/>
            <property expression="json-eval($.country)" name="country" scope="default" type="STRING"/>
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/rabbithole</url>
                        <user>root</user>
                        <password>root</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[INSERT INTO bands(bandname,country) VALUES(?,?)]]></sql>
                    <parameter expression="get-property('bandname')" type="CHAR"/>
                    <parameter expression="get-property('country')" type="CHAR"/>
                </statement>
            </dbreport>
            <log/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="rabbitmq.exchange.name">exchange</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="rabbitmq.queue.name">Rabbithole</parameter>
    <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
    <parameter name="rabbitmq.queue.route.key">route</parameter>
</proxy>

We insert a message using SoapUI where the Beatles from the UK will be a band that is added to the list.

RabbitMQ 10

There is no response because this is now an asynchronous service. But the record is stored! We can check the bands table. 

RabbitMQ 11

RabbitMQ

RabbitMQ is relatively easy to integrate with the Enterprise Integrator. There are some specific settings that you need to set, like the transport that needs to be set to RabbitMQ. This is related to the special JAR file that we added to the Enterprise Integrator. The example that we have given in this blog is quite simple, but RabbitMQ can surely be used in more complex scenarios.