In guaranteed asynchronous message delivery systems, message brokers are used to deliver messages from publishers to consumers. In some scenarios, a message published to a message broker queue should be routed to a specific consumer based on the message content (content-based routing).
The common solution for this is, let all the consumers connected to the particular queue consume all the messages and filter out the messages based on the content. But this adds processing overhead at the consumer end.
In this blog post, we will discuss how we can achieve the content-based routing with ActiveMQ’s “Selectors” feature and avoid processing overhead in consumer end. We will show how to configure the WSO2 JMS proxy to subscribe to ActiveMQ with selector/filtering criteria.
ActiveMQ message selectors are typically applied to message headers. But it can also be applied to XML payload using XPATH based selectors. In this blog post, we will only consider the header-based selectors.
Scenario
Let’s say we have two message consumers for a queue and one consumer should process the message with header groupID=group1 and the other consumer with header groupID=group2.
Solution
WSO2 EI will act both as message producer and consumer. We will configure JMS Proxy consumers to consume the respective messages by adding the selector/filter in the proxy configuration.
We used
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12
But it will also work on the Micro Integrator. The configuration is, in essence the same although the configuration is done in the deployment.toml in case of the Micro Integrator not directly in the axis2.xml file as is the case for the Enterprise Integrator.
Configure WSO2 EI with ActiveMQ to publish and consume messages
We will not completely describe the installation of WSO2 and ActiveMQ. You can follow the instruction provided in WSO2 documentation to configure the setup or look at these blogs.
Create JMS producer REST API in EI
This API would accept a xml message as input and publish the message to a queue. While publishing the message, it would set the groupID JMS header based on the groupID value from the payload. This groupID JMS header will be used in JMS consumer configuration to select or filter the relevant messages.
In order to fit the design view, we have used a property group to bundle some properties. This is only for display purposes.
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/message">
<inSequence>
<log level="custom">
<property name="LOG" value="Message received. Publish to queue."/>
</log>
<log level="full"/>
<propertyGroup>
<property expression="//groupID/text()" name="groupID" scope="transport" type="STRING"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<property name="QueueName" scope="default" type="STRING" value="queue1"/>
</propertyGroup>
<header expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=myQueueSender')" name="To" scope="default"/>
<property action="remove" name="REST_URL_POSTFIX" scope="axis2"/>
<call>
<endpoint>
<default>
<timeout>
<duration>10000</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</default>
</endpoint>
</call>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
Create JMS Consumer 1 in EI
This proxy should consume only the messages with JMS header groupID=group1. To tell the ActiveMQ to dispatch the messages with groupID=group1 header, see below in the proxy the selector/filtering condition:
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer1 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer1 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer1 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
The proxy is quite simple, especially from the design view that only shows the high-level information, not the parameters in that view. (you need to go to source view for that). The image below describes both jmsconsumer proxies as they are quite similar.
Create JMS Consumer 2 in EI
This proxy should consume only the messages with JMS header groupID=group1. To tell the ActiveMQ to dispatch the messages with groupID=group2 header, see below in the proxy the selector/filtering condition:
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer2 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer2 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer2 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
Test the setup
We will send requests with groupID values group1, group2 and group3. You can use CURL to send the message to the exposed API. The command would be:
curl -k --location --request POST 'https://localhost:8243/publish/message' --header 'Content-Type: application/xml' --data '<groupID>group1</groupID>'
On the console we see:
As we can see in the logs, message with groupID=group1 is consumed by consumer 1 and message with groupID=group2 is consumed by consumer 2 according to the selector/filter conditions defined.
Message with groupID=group3 not consumed as it is not matching to any filter condition defined for both consumers. If a new consumer subscribes to the queue with selector groupID=’group3′, these messages will be consumed.
Message with groupID=group3 not consumed as it is not matching to any filter condition defined for both consumers. If a new consumer subscribes to the queue with selector groupID=’group3′, these messages will be consumed.