Luckily, ActiveMQ message broker provides a feature called “Composite Destination”. This feature allows to publish the same message to multiple physical ActiveMQ queues in single operation. In this blog post, we will evaluate this feature using WSO2 EI as message producer and consumer.
Below diagram depicts, how EI and ActiveMQ will be setup to evaluate the Composite Destination feature.
Here we will create a REST API (Acts as JMS publisher) in EI which will accept an XML message and publish the message to ActiveMQ queues queue1 and queue2. This operation will be a single operation (i.e. single call to ActiveMQ). Then we will create two JMS message consuming proxies to read the message from respective queues and verify whether both have consumed the same message.
Used product versions
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12
1. Configure WSO2 EI with ActiveMQ to publish and consume messages
Follow the instruction provided in WSO2 documentation to configure the setup
https://wso2docs.atlassian.net/wiki/spaces/EI660/overview
You can use below JMS receiver and sender configurations in <EI_HOME>/conf/axis2/axis2.xml file. Queue JNDI name “amqQueue” will be used in our REST API and Proxy services later. Since both EI and ActiveMQ are running in local machine, localhost is used in provider URL.
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
<parameter name="amqTopic" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
</parameter>
<parameter name="amqQueue" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
<parameter name="default" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
</transportReceiver>
transportSender class="org.apache.axis2.transport.jms.JMSSender" name="jms">
<parameter locked="false" name="amqTopic">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">topic</parameter>
</parameter>
<parameter locked="false" name="amqQueue">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameter>
<parameter locked="false" name="default">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameter>
</transportSender>
2. Create the below REST API in EI to accept any XML message and publish to both queues queue1 and queue2
In order to publish the same message in single operation we need to provide the queue names as comma separated values [1]. As you can see, this is done in the property mediator “QueueName” and used in the construction of JMS publisher URL. JMS publisher URL uses the JNDI name “amqQueue” defined in the transport sender configuration
<property value=”queue1,queue2” name=”QueueName” scope=”default” type=”STRING”/>
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/message">
<inSequence>
<log level="custom">
<property value="Message received. Publish to queue." name="LOG"/>
</log>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<property value="queue1,queue2" name="QueueName" scope="default" type="STRING"/>
<header name="To" scope="default" expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=amqQueue')"/>
<property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
<call>
<endpoint>
<default>
<timeout>
<duration>10000</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
</markForSuspension>
</default>
</endpoint>
</call>
</inSequence>
<outSequence/>
<faultSequence>
</faultSequence>
</resource>
</api>
3. Create JMS Proxy to consume messages from queue 1
Here transport.jms.ConnectionFactory refers to the JNDI name “amqQueue” defined in JMS transport receiver and source queue name(queue1) is mentioned in transport.jms.Destination parameter
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-one" startOnLoad="true" statistics="disable" trace="disable" transports="jms">
<target>
<inSequence>
<log level="full">
<property value="Consumer 1: Message consumed from queue." name="LOG" />
</log>
<log level="custom">
<property value="Consumer 1: Message processed. Send ACK to queue." name="LOG" />
</log>
</inSequence>
<outSequence />
<faultSequence>
<log level="custom">
<property value="Consumer 1: Error occurred. Send NACK to queue." name="LOG" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<drop />
</faultSequence>
</target>
<parameter name="transport.jms.ConnectionFactory">amqQueue</parameter>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<description />
</proxy>
4. Create JMS consumer to consume from queue 2
Here transport.jms.ConnectionFactory refers to the JNDI name “amqQueue” defined in JMS transport receiver and source queue name(queue2) is mentioned in transport.jms.Destination parameter
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-two" startOnLoad="true" statistics="disable" trace="disable" transports="jms">
<target>
<inSequence>
<log level="full">
<property value="Consumer 2: Message consumed from queue." name="LOG" />
</log>
<log level="custom">
<property value="Consumer 2: Message processed. Send ACK to queue." name="LOG" />
</log>
</inSequence>
<outSequence />
<faultSequence>
<log level="custom">
<property value="Consumer 2: Error occurred. Send NACK to queue." name="LOG" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<drop />
</faultSequence>
</target>
<parameter name="transport.jms.ConnectionFactory">amqQueue</parameter>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.Destination">queue2</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<description />
</proxy>
5. Verify the message flow
Publish a sample xml message by calling the REST API created
curl --location --request POST 'https://localhost:8243/publish/message' \
--header 'Content-Type: application/xml' \
--data-raw '<test/>'
Observation from EI logs
– REST API accepts the message and publish to queues (single operation)
– Consumer proxy 1 consumed the same message from queue 1
– Consumer proxy 2 consumed the same message from queue 2