We are going to use ActiveMQ as our broker for this blog. We’ll set up an example proxy that sends the message into a Queue. Another proxy is used that listens for messages on that queue and will send the message to a database. Through this combination we can see how to interact with a JMS-endpoint, do an OUT_ONLY invocation and see how to send a message from a queue to another service.
If you do not have ActiveMQ installed, install the software first. The below installation is for a Linux environment and will install version 5.15.8 but you can use any recent version of ActiveMQ v5. Be aware that you can also use Apache Artemis but that would require some changes in jar-files and configuration so I advise to stay on ActiveMQ v5 when following the steps in this blog.
Please change below locations of installation to match your system and preferences. I am, in the end, installing it on my desktop on Centos/7.
wget http://archive.apache.org/dist/activemq/5.15.8/apache-activemq-5.15.8-bin.tar.gz -P /opt/wso2/Downloads
tar -C /opt/wso2/Desktop -zxvf /opt/wso2/Downloads/apache-activemq-5.15.8-bin.tar.gz
mv /opt/wso2/Desktop/apache-activemq-5.15.8 /opt/wso2/Desktop/active
cd /opt/wso2/Desktop/active/bin
./activemq console
The last command above will start ActiveMQ in command-line-console mode. This allows you to see the ActiveMQ log-output directly in the command-line output. Once ActiveMQ is started, open the web-based admin console by going to http://localhost:8161. You can see the URL in the command-line output stating similar to ‘INFO message | ActiveMQ WebConsole available at http://0.0.0.0:8161/’.
Login using admin/admin credentials.
You should now see the console in the browser.
Now let’s prepare our WSO2 Enterprise Integrator installation to connect to this ActiveMQ broker. Copy the following client libraries from the <ACTIVEMQ_HOME>/lib directory to the [EI_HOME]/lib directory.
Why do you need API Management in the frst place?
Download nowFor ActiveMQ 5.8.0 and above the following files need to be copied: Please note that some versions of the jar files have a higher version numbers (they are marked with a * and depend on the ActiveMQ version you’ve installed as the first step).
- (*) activemq-broker-5.8.0.jar
- (*) activemq-client-5.8.0.jar
- (*) activemq-kahadb-store-5.8.0.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- (*) geronimo-j2ee-management_1.1_spec-1.0.1.jar
- geronimo-jta_1.0.1B_spec-1.0.1.jar
- (*) hawtbuf-1.9.jar
- (*) Slf4j-api-1.6.6.jar
- activeio-core-3.1.4.jar (available in the <ACTIVEMQ_HOME>/lib/optional directory)
In the axis2.xml ([APIM-HOME]/conf/axis2 uncomment the JMS transportReceiver and JMS transportSender. Pay attention that you have the pre-defined ActiveMQ based JMS transportReceiver uncommented as there are more JMS transportReceiver alternatives in axis2.xml.
And also activate the JMS transportSender.
Restart the Enterprise Integrator for your changes to take effect.
Use Case
My use case is a proxy with a throttle mediator. This mediator will allow five invocations per minute on the onAccept decision branch, all others will be denied in the onReject decision branch.
In order to save the messages that are being throttled-out, I will put them on a queue and read them with a proxy with an interval of one minute. This way the messages that are going through are stored in the database directly and the other ones are added at a later stage.
The general idea is that the five invocations per minute is a theoretical maximum. Because we have a proxy that will read off a message queue with one invocation or message per minute, we can still deliver all messages without theoretically killing the back end.
A state of the art integration platform for an ambitious university
Download nowCreating a database
The records will be stored in a table aptly named ‘records’. The only value I will insert is the DATE column. The ID and DTSTAMP are done automatically. This is the MySQL definition:
DROP DATABASE active;
CREATE DATABASE active;
USE active;
CREATE TABLE IF NOT EXISTS records (
ID INTEGER NOT NULL AUTO_INCREMENT ,
DATE VARCHAR(40) NOT NULL,
DTSTAMP datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (ID)
);
The proxy
The proxy that receives the messages is shown in the Design view here. It is quite simple; a message comes in and either goes to the database proxy (DBProxy) in OnAccept or in the onReject branch, it’s passed on to a payload mediator to add a timestamp and ultimately the message is stored on the queue.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ACProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<throttle id="test">
<policy>
<wsp:Policy wsu:id="WSO2MediatorThrottlingPolicy" xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
<throttle:MediatorThrottleAssertion xmlns:throttle="http://www.wso2.org/products/wso2commons/throttle"> <throttle:MaximumConcurrentAccess>0</throttle:MaximumConcurrentAccess>
<wsp:Policy>
<throttle:ID throttle:type="IP">other</throttle:ID>
<wsp:Policy>
<throttle:Control>
<wsp:Policy>
<throttle:MaximumCount>5</throttle:MaximumCount> <throttle:UnitTime>60000</throttle:UnitTime>
<throttle:ProhibitTimePeriod>0</throttle:ProhibitTimePeriod>
</wsp:Policy>
</throttle:Control>
</wsp:Policy>
</wsp:Policy>
</throttle:MediatorThrottleAssertion>
</wsp:Policy>
</policy>
<onReject>
<payloadFactory media-type="xml">
<format>
<DATE xmlns="">$1</DATE>
</format>
<args>
<arg evaluator="xml" expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")"/>
</args>
</payloadFactory>
<call>
<endpoint>
<address uri="jms:/mymessages?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue">
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>-1</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</address>
</endpoint>
</call>
</onReject>
<onAccept>
<call>
<endpoint>
<address uri="http://localhost:8280/services/DBProxy">
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>-1</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</address>
</endpoint>
</call>
</onAccept>
</throttle>
<respond/>
</inSequence>
</target>
</proxy>
The source shows the parameters for the throttle- and other mediators, like the property OUT_ONLY. As you can see in the endpoint we specify an elaborate JMS-endpoint URI. The interesting part is the queue name at the beginning. Obviously that queue name must be the same as the queue name used in the readQueue proxy. A mismatch would cause messages not to be delivered at their intended destination.
The DBProxy will store the record in the database;
The proxy checks if there is already a DATE element in which case it came from the queue, and if not then the DATE is retrieved by getting the SYSTEM_DATE property and storing that in a DATE property. This DATE value is then inserted into the records table.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="DBProxy" startOnLoad="true" transports="https http" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<filter xpath="$body//DATE!=''">
<then>
<property expression="$body//DATE" name="DATE" />
</then>
<else>
<property expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")" name="DATE" />
</else>
</filter>
<log level="custom">
<property expression="get-property('DATE')" name="Value"/>
</log>
<dbreport>
<connection>
<pool>
<driver>com.mysql.jdbc.Driver</driver>
<url>jdbc:mysql://localhost:3306/active</url>
<user>root</user>
<password>root</password>
</pool>
</connection>
<statement>
<sql><![CDATA[insert into records (DATE) values (?)]]></sql>
<parameter expression="get-property('DATE')" type="CHAR"/>
</statement>
</dbreport>
<log level="custom">
<property expression="get-property('DATE')" name="Record Stored"/>
</log>
<property name="OUT_ONLY" value="true"/>
<payloadFactory media-type="xml">
<format>
<message xmlns="">stored</message>
</format>
<args/>
</payloadFactory>
<respond/>
</inSequence>
</target></proxy>
ReadQueue
In order to read from the queue, I have created a new proxy that reads messages from the queue with intervals of one minute. The proxy listens on the JMS transport and the parameters (only visible in sourceview) define the queue and speed of reading.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="readQueue" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<log level="full"/>
<call>
<endpoint>
<address uri="http://localhost:8280/services/DBProxy">
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>-1</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</address>
</endpoint>
</call>
<drop/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.Destination">mymessages</parameter>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>text/xml</default>
</rules>
</parameter>
<parameter name="jms.proxy.throttle.enabled">true</parameter>
<parameter name="jms.proxy.throttle.limitPerMinute">1</parameter>
<parameter name="jms.proxy.throttle.mode">fixed-interval</parameter>
</proxy>
The readQueue proxy will send it to the DBProxy endpoint.
Trying it
This example uses an empty SOAP Message to trigger a record to be added to a database, either directly or via a queue. The Enterprise Integrator Try-It functionality can be used to trigger this setup. Simply press Send 10 times in a minute to trigger the following result:
If all works as expected then the result is visible in the ‘records’ table. I’ve submitted 10 records. Five of them are added directly, the other five are throttled out. The DTSTAMP column shows when the record was added to the database and the DATE column shows the timestamp that we set in the ACProxy. We can see here that the last 4 messages were stored with an interval of 1 minute.
MariaDB [active]> select * from records;
+----+-------------------------------+---------------------+
| ID | DATE | DTSTAMP |
+----+-------------------------------+---------------------+
| 1 | 2021-06-10T13:36:42.448+02:00 | 2021-06-10 13:36:42 |
| 2 | 2021-06-10T13:36:43.421+02:00 | 2021-06-10 13:36:43 |
| 3 | 2021-06-10T13:36:44.349+02:00 | 2021-06-10 13:36:44 |
| 4 | 2021-06-10T13:36:45.269+02:00 | 2021-06-10 13:36:45 |
| 5 | 2021-06-10T13:36:46.203+02:00 | 2021-06-10 13:36:46 |
| 6 | 2021-06-10T13:36:47.041+02:00 | 2021-06-10 13:37:00 |
| 7 | 2021-06-10T13:36:50.366+02:00 | 2021-06-10 13:38:00 |
| 8 | 2021-06-10T13:36:51.536+02:00 | 2021-06-10 13:39:00 |
| 9 | 2021-06-10T13:36:52.503+02:00 | 2021-06-10 13:40:00 |
+----+-------------------------------+---------------------+
9 rows in set (0.00 sec)
Conclusion
This blog is a simple exercise to store messages in a queue for later processing. The throttle mediator created a binary situation: it goes through… or not. This is just one example of how a JMS message queue can be used.