fb
info@yenlo.com
WSO2 Enterprise Integrator 17 min

ActiveMQ Message Group Feature with WSO2 Micro Integrator

A guaranteed delivery messaging system ensures the delivery of the message from sender to receiver even when receiver is offline. WSO2 Micro Integrator can be used to build a guaranteed delivery messaging system along with a message broker.

jenanathan yogendran
Jenanathan Yogendran
Integration Consultant
Social image evaluate activemq message groups

For guaranteed message order, load balancing and failover

A guaranteed delivery messaging system ensures the delivery of the message from sender to receiver even when receiver is offline. WSO2 Micro Integrator can be used to build a guaranteed delivery messaging system along with a message broker.

This functionality is of course also available on the WSO2 Enterprise Integrator, but we will mention Micro Integrator (aka MI) since it is a more recent version.

WSO2 MI provide out of the box support to integrate with different message broker vendors. One of the most popular and open-source message brokers is ActiveMQ. In addition to the normal message brokering functionality, ActiveMQ provides more advanced features to implement different use cases in asynchronous message delivery


In one of our previous blog posts, we discussed the “Exclusive Consumer” feature of ActiveMQ to preserve the message order in guaranteed delivery. In this blog post, we will discuss the “Message Groups” feature provided by the ActiveMQ.

Message Groups

Message Groups feature is an enhancement to “Exclusive Consumer” feature. JMSXGroupID  header is used to group the messages and by grouping the messages we are able to implement use cases such as guaranteed ordered delivery, load balancing between multiple consumers and high availability/failover of consumers.

ActiveMQ uses JMSXGroupID header value to determine which consumer the message should be dispatched. ActiveMQ associates the consumers with message groups based on this header value. If no consumer is associated with a message group, a consumer is chosen from available consumers. Until the chosen consumer goes offline, messages with same message group(JMSXGroupID) will be delivered to the chosen consumer.

If a chosen consumer goes offline, AcitveMQ will pick up another consumer which is not associate with any message groups. If there is no such consumer, it will choose from any available consumer which may have already associated to a message group

In this blog post, we will evaluate this feature using WSO2 MI as Message producer and consumer. Here we will use a REST API which acts as a JMS producer and two JMS proxies as JMS consumers. You can, of course, also use a proxy to act as a JMS producer. Proxies are capable of listerning on a JMS queue (in addition to other transports of course).

This REST API would accept an XML payload and, based on a payload attribute, it would populate the JMSXGroupID header and publish the payload to the queue. Both JMS consumers would consume the messages from the same queue. We will evaluate how ActiveMQ dispatches the messages between the consumers based on JMSXGroupID header value

Message Groups feature provided by the ActiveMQ

We used
– WSO2 MI 4.1.0
– ActiveMQ 5.15.12

Keep in mind that this will work on the Enterprise Integrator as well!

Configure WSO2 EI with ActiveMQ to publish and consume messages

We will not completely describe the installation of WSO2 and ActiveMQ . See other blogs from Yenlo on this topic, or you can follow the instruction provided in WSO2 documentation to configure the setup

Create JMS producer REST API in EI

Create JMS Consumer 1 in EI

This API will accept a xml message and publish the message to the queue “queue1” along with JMS header JMSXGroupID. The value of this header is extracted from incoming API request payload

<property expression="//groupID/text()" name="JMSXGroupID" scope="transport" type="STRING"/>

In Source view the code looks like this. Note that we use a Property Group Mediator to fit the Design on the page of this blog.

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST" uri-template="/message">
        <inSequence>
            <log level="custom">
                <property name="LOG" value="Message received. Publish to queue."/>
            </log>
            <propertyGroup>
                <property expression="//groupID/text()" name="JMSXGroupID" scope="transport" type="STRING"/>
                <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
                <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
                <property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
                <property name="QueueName" scope="default" type="STRING" value="queue1"/>
            </propertyGroup>
            <header expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=myQueueSender'')" name="To" scope="default"/>
            <property action="remove" name="REST_URL_POSTFIX" scope="axis2"/>
            <call>
                <endpoint>
                    <default>
                        <timeout>
                            <duration>10000</duration>
                            <responseAction>fault</responseAction>
                        </timeout>
                        <suspendOnFailure>
                            <errorCodes>-1</errorCodes>
                            <initialDuration>0</initialDuration>
                            <progressionFactor>1.0</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
                            <errorCodes>-1</errorCodes>
                            <retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </default>
                </endpoint>
            </call>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

Create JMS Consumer 1 in EI

This proxy act as JMS consumer and subscribed to the queue “queue1”. Once a message is consumed, it will log the payload and JMXGroupID header value  

Create JMS producer REST API in EI
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="Alleenvooropmaak-jmsconsumer1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <log level="full">
                <property name="LOG" value="Consumer1 : Message consumed from queue."/>
                <property expression="$trp: JMSXGroupID " name="groupID"/>
            </log>
            <log level="custom">
                <property name="LOG" value="Consumer1 : Message processed. Send ACK to queue."/>
            </log>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <log level="full">
                <property name="LOG" value="Consumer1 : Message processed. Send NACK to queue."/>
            </log>
        </faultSequence>
    </target>
    <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.Destination">queue1</parameter>
    <parameter name="transport.jms.ContentType">
        <rules xmlns="">
            <jmsProperty>contentType</jmsProperty>
            <default>application/xml</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.SessionTransacted">true</parameter>
    <parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
    <parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>

Create JMS Consumer 2 in EI

This proxy act as JMS consumer  and subscribed to the queue “queue1” . Once a message is consumed, it will log the payload and JMXGroupID header value  

Create JMS Consumer 2 in EI
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <log level="full">
                <property name="LOG" value="Consumer2 : Message consumed from queue."/>
                <property expression="$trp: JMSXGroupID " name="groupID"/>
            </log>
            <log level="custom">
                <property name="LOG" value="Consumer2 : Message processed. Send ACK to queue."/>
            </log>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <drop/>
        </faultSequence>
    </target>
    <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.Destination">queue1</parameter>
    <parameter name="transport.jms.ContentType">
        <rules xmlns="">
            <jmsProperty>contentType</jmsProperty>
            <default>application/xml</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.SessionTransacted">true</parameter>
    <parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
    <parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>

Testing the setup

We will send two messages with groupID value equal to group1 and another two messages with groupID value equal to group 2

ActiveMQ Testing the setup

Or, if you want to use curl:

curl -k --location --request POST 'https://localhost:8253/publish/message' --header 'Content-Type: application/xml' --data '<groupID>group1</groupID>'

curl -k --location --request POST 'https://localhost:8253/publish/message' --header 'Content-Type: application/xml' --data '<groupID>group2</groupID>'

In order to make the logs more readable, I’ve changed the text color.

[2022-11-25 09:49:40,947]  INFO {LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-25 09:49:41,041]  INFO {LogMediator} – {proxy:jmsconsumer1} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-32983-1669364529898-9:3:1:1:1, Direction: request, LOG = Consumer1 : Message consumed from queue., groupID = group1, Envelope: <?xml version=’1.0′ encoding=’utf-8′?><soapenv:Envelope xmlns:soapenv=”http://schemas.xmlsoap.org/soap/envelope/”><soapenv:Body><groupID>group1</groupID></soapenv:Body></soapenv:Envelope>
[2022-11-25 09:49:41,041]  INFO {LogMediator} – {proxy:jmsconsumer1} LOG = Consumer1 : Message processed. Send ACK to queue.
[2022-11-25 09:50:49,094]  INFO {LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-25 09:50:49,238]  INFO {LogMediator} – {proxy:jmsconsumer2} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-32983-1669364529898-9:4:1:1:1, Direction: request, LOG = Consumer2 : Message consumed from queue., groupID = group2, Envelope: <?xml version=’1.0′ encoding=’utf-8′?><soapenv:Envelope xmlns:soapenv=”http://schemas.xmlsoap.org/soap/envelope/”><soapenv:Body><groupID>group2</groupID></soapenv:Body></soapenv:Envelope>
[2022-11-25 09:50:49,238]  INFO {LogMediator} – {proxy:jmsconsumer2} LOG = Consumer2 : Message processed. Send ACK to queue.

As we can see in the below consumer logs, ActiveMQ has chosen consumer 1 to deliver messages with groupID =group1 and consumer 2 to deliver messages with groupID = group2. Then consecutive messages with same groupIDs are dispatched to associated consumers.

Deactive Consumer 2

Now let see, what would be the behavior of message dispatching when one consumer is offline. In our case, lets deactivate consumer 2 while consumer 1 is active. You can deactivate a JMS proxy via Micro Integrator Dashboard or Management API (or undeploy the proxy).

[2022-11-25 10:12:25,406]  INFO {ServiceTaskManager} – JMS Polling server task stopped for service jmsconsumer2 MessageListenerTask{workerState=3, idleExecutionCount=51, idle=true, connected=true, listenerPaused=false, connectionReceivedError=false}
[2022-11-25 10:12:26,083]  INFO {ServiceTaskManager} – Task manager for service : jmsconsumer2 shutdown
[2022-11-25 10:12:26,084]  INFO {JMSListener} – Stopped listening for JMS messages to service : jmsconsumer2
[2022-11-25 10:12:26,084]  INFO {ProxyService} – {proxy:jmsconsumer2} Stopped the proxy service : jmsconsumer2

After deactivating the consumer 2, send a message with ‘groupID=group2’. Now this message is dispatched to consumer 1. Since there is no consumer associated with this groupID, ActiveMQ pickups another available consumer. In our case, it is consumer 1.

Deactive Consumer 2
[2022-11-25 10:15:09,254] INFO {LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-25 10:15:09,478] INFO {LogMediator} – {proxy:jmsconsumer1} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-36290-1669366889587-9:5:1:1:1, Direction: request, LOG = Consumer1 : Message consumed from queue., groupID = group2, Envelope: group2
[2022-11-25 10:15:09,478] INFO {LogMediator} – {proxy:jmsconsumer1} LOG = Consumer1 : Message processed. Send ACK to queue.

Now let’s see, what would happen if consumer 2 is back online. Activate the consumer 2 via management console and send a message with groupID=group 1 and with groupID=group 2

[2022-11-25 10:16:51,513] INFO {JMSListener} – Connection attempt: 1 for JMS Provider for service: jmsconsumer2 was successful!
[2022-11-25 10:16:51,517] INFO {ServiceTaskManager} – Task manager for service : jmsconsumer2 [re-]initialized
[2022-11-25 10:16:51,527] INFO {ServiceTaskManager} – JMS Polling task activated with state: 1 for service jmsconsumer2
[2022-11-25 10:16:52,519] INFO {JMSListener} – Started to listen on destination : queue1 of type queue for service jmsconsumer2
[2022-11-25 10:16:52,520] INFO {ProxyService} – {proxy:jmsconsumer2} Started the proxy service : jmsconsumer2
[2022-11-25 10:17:02,386] INFO {LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-25 10:17:02,742] INFO {LogMediator} – {proxy:jmsconsumer1} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-36290-1669366889587-9:6:1:1:1, Direction: request, LOG = Consumer1 : Message consumed from queue., groupID = group2, Envelope: group2
[2022-11-25 10:17:02,743] INFO {LogMediator} – {proxy:jmsconsumer1} LOG = Consumer1 : Message processed. Send ACK to queue.

As we see in the log, both messages are dispatched to consumer 1. Because both message groups are associated to consumer 1 by ActiveMQ in the previous step

Making a change

Now let see, how we can tell ActiveMQ to choose a different consumer for a message group when the message group is already assigned to a consumer (In our case both message group messages are assigned to consumer 1 now, and we want to route the group 2 message to consumer 2 again).  This can be done by sending a message with JMSXGroupSeq header with negative value.

Now let’s update our JMS publisher to set JMSXGroupSeq to -1, if incoming requests has closeGroup transport parameter.

Update JMS publisher to set JMSXGroupSeq

As source code:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST" uri-template="/message">
        <inSequence>
            <log level="custom">
                <property name="LOG" value="Message received. Publish to queue."/>
            </log>
            <property expression="//groupID/text()" name="JMSXGroupID" scope="transport" type="STRING"/>
            <filter regex="^true$" source="$trp:closeGroup">
                <then>
                    <property name="JMSXGroupSeq" scope="transport" type="STRING" value="-1"/>
                </then>
                <else/>
            </filter>
            <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
            <property name="QueueName" scope="default" type="STRING" value="queue1"/>
            <header expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory =myQueueSender')" name="To" scope="default"/>
            <property action="remove" name="REST_URL_POSTFIX" scope="axis2"/>
            <call>
                <endpoint>
                    <default>
                        <timeout>
                            <duration>10000</duration>
                            <responseAction>fault</responseAction>
                        </timeout>
                        <suspendOnFailure>
                            <errorCodes>-1</errorCodes>
                            <initialDuration>0</initialDuration>
                            <progressionFactor>1.0</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
                            <errorCodes>-1</errorCodes>
                            <retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </default>
                </endpoint>
            </call>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

Send a message with groupID=group2 and closeGroup:true header. Use either curl or SoapUI.

Send a message with groupID

curl -k –location –request POST ‘https://localhost:8253/publish/message’ –header ‘Content-Type: application/xml’ –header ‘closeGroup: true’ –data ‘<groupID>group2</groupID>’

Then send some group 2 messages. As we can see in the logs, first group2 message with closeGroup:true(i.e JMSXGroupSeq = -1) reaches consumer 1. But this closed the message group and as the result consequent group 2 messages are dispatched to another consumer (consumer 2)

[2022-11-29 13:13:58,784] INFO {org.apache.axis2.transport.jms.JMSListener} – Connection attempt: 1 for JMS Provider for service: jmsconsumer2 was successful!
[2022-11-29 13:13:58,787] INFO {org.apache.axis2.transport.jms.ServiceTaskManager} – Task manager for service : jmsconsumer2 [re-]initialized
[2022-11-29 13:13:58,787] INFO {org.apache.axis2.transport.jms.ServiceTaskManager} – JMS Polling task activated with state: 1 for service jmsconsumer2
[2022-11-29 13:13:59,796] INFO {org.apache.axis2.transport.jms.JMSListener} – Started to listen on destination : queue1 of type queue for service jmsconsumer2
[2022-11-29 13:13:59,796] INFO {org.apache.synapse.core.axis2.ProxyService} – {proxy:jmsconsumer2} Started the proxy service : jmsconsumer2
[2022-11-29 13:14:21,202] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-29 13:14:21,202] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} Value = Hier
[2022-11-29 13:14:21,203] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To = jms:/queue1?transport.jms.ConnectionFactory=myQueueSender
[2022-11-29 13:14:21,203] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To: jms:/queue1?transport.jms.ConnectionFactory=myQueueSender, MessageID: urn:uuid:5404bea4-fea1-4d97-868f-1b6af1c155f5, correlation_id: 5404bea4-fea1-4d97-868f-1b6af1c155f5, Direction: request, Envelope: group2
[2022-11-29 13:14:21,273] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer1} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-39942-1669718798195-9:9:1:1:4, Direction: request, LOG = Consumer1 : Message consumed from queue., groupID = group2, Envelope: group2
[2022-11-29 13:14:21,273] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer1} LOG = Consumer1 : Message processed. Send ACK to queue.
[2022-11-29 13:14:25,397] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-29 13:14:25,398] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To = jms:/queue1?transport.jms.ConnectionFactory=myQueueSender
[2022-11-29 13:14:25,399] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To: jms:/queue1?transport.jms.ConnectionFactory=myQueueSender, MessageID: urn:uuid:6d92c98d-da75-4ea7-9a0a-7a64854b2752, correlation_id: 6d92c98d-da75-4ea7-9a0a-7a64854b2752, Direction: request, Envelope: group2
[2022-11-29 13:14:25,424] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer2} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-39942-1669718798195-9:10:1:1:4, Direction: request, LOG = Consumer2 : Message consumed from queue., groupID = group2, Envelope: group2
[2022-11-29 13:14:30,823] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer2} LOG = Consumer2 : Message processed. Send ACK to queue.
[2022-11-29 13:14:36,282] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-29 13:14:36,283] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To = jms:/queue1?transport.jms.ConnectionFactory=myQueueSender
[2022-11-29 13:14:36,283] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To: jms:/queue1?transport.jms.ConnectionFactory=myQueueSender, MessageID: urn:uuid:e21cb03f-0a52-4252-a3ce-218176e33363, correlation_id: e21cb03f-0a52-4252-a3ce-218176e33363, Direction: request, Envelope: group1
[2022-11-29 13:14:36,290] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer1} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-39942-1669718798195-9:3:1:1:5, Direction: request, LOG = Consumer1 : Message consumed from queue., groupID = group1, Envelope: group1
[2022-11-29 13:14:36,290] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer1} LOG = Consumer1 : Message processed. Send ACK to queue.
[2022-11-29 13:14:40,485] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} LOG = Message received. Publish to queue.
[2022-11-29 13:14:40,486] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To = jms:/queue1?transport.jms.ConnectionFactory=myQueueSender
[2022-11-29 13:14:40,486] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {api:jms-publisher} To: jms:/queue1?transport.jms.ConnectionFactory=myQueueSender, MessageID: urn:uuid:e1975933-c482-4651-8873-16b728300e47, correlation_id: e1975933-c482-4651-8873-16b728300e47, Direction: request, Envelope: group2
[2022-11-29 13:14:40,532] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer2} To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: ID:ip-172-31-39-100.eu-central-1.compute.internal-39942-1669718798195-9:4:1:1:5, Direction: request, LOG = Consumer2 : Message consumed from queue., groupID = group2, Envelope: group2
[2022-11-29 13:14:40,532] INFO {org.apache.synapse.mediators.builtin.LogMediator} – {proxy:jmsconsumer2} LOG = Consumer2 : Message processed. Send ACK to queue.

eng
Close
We appreciate it
Care to share

Please select one of the social media platforms below to share this pages content with the world