info@yenlo.com
eng
Menu
WSO2 9 min

WSO2 EI Publish message to multiple ActiveMQ queues in single operation 

ActiveMQ message broker provides a feature called “Composite Destination” [1]. This feature allows to publish the same message to multiple physical ActiveMQ queues in single operation. In this blog post, we will evaluate this feature using WSO2 EI as message producer and consumer.

Jenanathan Yogendran
Jenanathan Yogendran
Integration Consultant
WSO2 EI Publish message to multiple ActiveMQ queues in single operation

Luckily, ActiveMQ message broker provides a feature called “Composite Destination”. This feature allows to publish the same message to multiple physical ActiveMQ queues in single operation. In this blog post, we will evaluate this feature using WSO2 EI as message producer and consumer.

Below diagram depicts, how EI and ActiveMQ will be setup to evaluate the Composite Destination feature.

Here we will create a REST API (Acts as JMS publisher) in EI which will accept an XML message and publish the message to ActiveMQ queues queue1 and queue2. This operation will be a single operation (i.e. single call to ActiveMQ). Then we will create two JMS message consuming proxies to read the message from respective queues and verify whether both have consumed the same message.

WSO2 EI publish Message To Multiple ActiveMQ

Used product versions 
– WSO2 EI 6.6.0 
– ActiveMQ 5.15.12

1. Configure WSO2 EI with ActiveMQ to publish and consume messages  

Follow the instruction provided in WSO2 documentation to configure the setup 
https://wso2docs.atlassian.net/wiki/spaces/EI660/overview
 
You can use below JMS receiver and sender configurations in <EI_HOME>/conf/axis2/axis2.xml file. Queue JNDI name “amqQueue” will be used in our REST API and Proxy services later. Since both EI and ActiveMQ are running in local machine, localhost is used in provider URL.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> 

        <parameter name="amqTopic" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> 

        </parameter> 

        <parameter name="amqQueue" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> 

        </parameter> 

        <parameter name="default" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> 

        </parameter> 

    </transportReceiver> 
transportSender class="org.apache.axis2.transport.jms.JMSSender" name="jms"> 

        <parameter locked="false" name="amqTopic"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">topic</parameter> 

        </parameter> 

        <parameter locked="false" name="amqQueue"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter> 

        </parameter> 

        <parameter locked="false" name="default"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter> 

        </parameter> 

    </transportSender> 

2. Create the below REST API in EI to accept any XML message and publish to both queues queue1 and queue2 

In order to publish the same message in single operation we need to provide the queue names as comma separated values [1]. As you can see, this is done in the property mediator “QueueName” and used in the construction of JMS publisher URL. JMS publisher URL uses the JNDI name “amqQueue” defined in the transport sender configuration 
 
<property value=”queue1,queue2” name=”QueueName” scope=”default” type=”STRING”/> 

<?xml version="1.0" encoding="UTF-8"?> 

<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse"> 

    <resource methods="POST" uri-template="/message"> 

        <inSequence> 

            <log level="custom"> 

              <property value="Message received. Publish to queue." name="LOG"/> 

            </log> 

            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> 

            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> 

            <property name="messageType" scope="axis2" type="STRING" value="application/xml"/> 

            <property value="queue1,queue2" name="QueueName" scope="default" type="STRING"/> 

            <header name="To" scope="default" expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=amqQueue')"/> 

            <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/> 

            <call> 

            <endpoint> 

                <default> 

                    <timeout> 

                        <duration>10000</duration> 

                        <responseAction>fault</responseAction> 

                    </timeout> 

                    <suspendOnFailure> 

                        <errorCodes>-1</errorCodes> 

                        <initialDuration>0</initialDuration> 

                        <progressionFactor>1.0</progressionFactor> 

                        <maximumDuration>0</maximumDuration> 

                    </suspendOnFailure> 

                    <markForSuspension> 

                        <errorCodes>-1</errorCodes> 

                    </markForSuspension> 

                </default> 

            </endpoint> 

            </call> 

        </inSequence> 

        <outSequence/> 

        <faultSequence> 

        </faultSequence> 

    </resource> 

</api> 

3. Create JMS Proxy to consume messages from queue 1 

Here transport.jms.ConnectionFactory refers to the JNDI name “amqQueue” defined in JMS transport receiver and source queue name(queue1) is mentioned in transport.jms.Destination parameter 

<?xml version="1.0" encoding="UTF-8"?> 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-one" startOnLoad="true" statistics="disable" trace="disable" transports="jms"> 

  <target> 

    <inSequence> 

      <log level="full"> 

        <property value="Consumer 1: Message consumed from queue." name="LOG" /> 

      </log> 

      <log level="custom"> 

        <property value="Consumer 1: Message processed. Send ACK to queue." name="LOG" /> 

      </log> 

    </inSequence> 

    <outSequence /> 

    <faultSequence> 

      <log level="custom"> 

        <property value="Consumer 1: Error occurred. Send NACK to queue." name="LOG" /> 

      </log> 

      <property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" /> 

      <drop /> 

    </faultSequence> 

  </target> 

  <parameter name="transport.jms.ConnectionFactory">amqQueue</parameter> 

  <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter> 

  <parameter name="transport.jms.DestinationType">queue</parameter> 

  <parameter name="transport.jms.Destination">queue1</parameter> 

  <parameter name="transport.jms.ContentType"> 

    <rules xmlns=""> 

      <jmsProperty>contentType</jmsProperty> 

      <default>application/xml</default> 

    </rules> 

  </parameter> 

  <parameter name="transport.jms.SessionTransacted">true</parameter> 

  <parameter name="transport.jms.CacheLevel">consumer</parameter> 

  <description /> 

</proxy> 

4. Create JMS consumer to consume from queue 2 

Here transport.jms.ConnectionFactory refers to the JNDI name “amqQueue” defined in JMS transport receiver and source queue name(queue2) is mentioned in transport.jms.Destination parameter 

<?xml version="1.0" encoding="UTF-8"?> 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-two" startOnLoad="true" statistics="disable" trace="disable" transports="jms"> 

  <target> 

    <inSequence> 

      <log level="full"> 

        <property value="Consumer 2: Message consumed from queue." name="LOG" /> 

      </log> 

      <log level="custom"> 

        <property value="Consumer 2: Message processed. Send ACK to queue." name="LOG" /> 

      </log> 

    </inSequence> 

    <outSequence /> 

    <faultSequence> 

      <log level="custom"> 

        <property value="Consumer 2: Error occurred. Send NACK to queue." name="LOG" /> 

      </log> 

      <property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" /> 

      <drop /> 

    </faultSequence> 

  </target> 

  <parameter name="transport.jms.ConnectionFactory">amqQueue</parameter> 

  <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter> 

  <parameter name="transport.jms.DestinationType">queue</parameter> 

  <parameter name="transport.jms.Destination">queue2</parameter> 

  <parameter name="transport.jms.ContentType"> 

    <rules xmlns=""> 

      <jmsProperty>contentType</jmsProperty> 

      <default>application/xml</default> 

    </rules> 

  </parameter> 

  <parameter name="transport.jms.SessionTransacted">true</parameter> 

  <parameter name="transport.jms.CacheLevel">consumer</parameter> 

  <description /> 

</proxy> 

5. Verify the message flow 

Publish a sample xml message by calling the REST API created

curl --location --request POST 'https://localhost:8243/publish/message' \ 

--header 'Content-Type: application/xml' \ 

--data-raw '<test/>' 

Observation from EI logs 

– REST API accepts the message and publish to queues (single operation) 
– Consumer proxy 1 consumed the same message from queue 1 
– Consumer proxy 2 consumed the same message from queue 2 

image 2

More information

eng
Close