info@yenlo.com
ned
Menu
WSO2 9 min

WSO2 EI Publiceer bericht naar meerdere ActiveMQ wachtrijen in een handeling

ActiveMQ berichten transmitter biedt een functie genaamd "Composite Destination" [1]. Met deze functie kan hetzelfde bericht in een keer naar meerdere fysieke ActiveMQ wachtrijen worden gepubliceerd. In deze blog zullen we deze functie evalueren met behulp van WSO2 EI als bericht producent en consument.

Jenanathan Yogendran
Jenanathan Yogendran
Integration Consultant
WSO2 EI Publiceer bericht naar meerdere ActiveMQ wachtrijen in een handeling

ActiveMQ berichten transmitter biedt een functie genaamd “Composite Destination”. Met deze functie kan hetzelfde bericht in een keer naar meerdere fysieke ActiveMQ wachtrijen worden gepubliceerd. In deze blog zullen we deze functie evalueren met behulp van WSO2 EI als bericht producent en consument.

Het onderstaande diagram laat zien hoe EI en ActiveMQ worden ingesteld om de functie Composite Destination te evalueren.

Hier maken we een REST API (fungeert als JMS uitgever) in EI aan die een XML-bericht accepteert en het bericht publiceert naar ActiveMQ wachtrijen wachtrij1 en wachtrij2. Deze handeling zal een enkele handeling zijn (d.w.z. een enkele aanroep naar ActiveMQ). Vervolgens zullen we twee JMS berichten consumerende proxy’s aanmaken om het bericht uit de respectieve wachtrijen te lezen en te verifiëren of beide hetzelfde bericht hebben geconsumeerd.

WSO2 EI Publiceer bericht naar meerdere ActiveMQ wachtrijen

Gebruikte productversies
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12

1. Configureer WSO2 EI met ActiveMQ om berichten te publiceren en te consumeren

Volg de instructies in de WSO2 documentatie om de setup te configureren 
https://docs.wso2.com/display/EI660/Configure+with+ActiveMQ+#ConfigurewithActiveMQ-SettingupWSO2EIandActiveMQ 
 
Wachtrij JNDI naam “amqQueue” zal later worden gebruikt in onze REST API en Proxy diensten. Aangezien zowel EI als ActiveMQ op de lokale machine draaien, wordt localhost gebruikt in de provider-URL.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> 

        <parameter name="amqTopic" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> 

        </parameter> 

        <parameter name="amqQueue" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> 

        </parameter> 

        <parameter name="default" locked="false"> 

            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> 

            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> 

            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> 

        </parameter> 

    </transportReceiver> 
transportSender class="org.apache.axis2.transport.jms.JMSSender" name="jms"> 

        <parameter locked="false" name="amqTopic"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">topic</parameter> 

        </parameter> 

        <parameter locked="false" name="amqQueue"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter> 

        </parameter> 

        <parameter locked="false" name="default"> 

            <parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> 

            <parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> 

            <parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter> 

        </parameter> 

    </transportSender> 

2. Maak de onderstaande REST API in EI aan om elk XML-bericht te accepteren en te publiceren naar beide wachtrijen wachtrij1 en wachtrij2

Om hetzelfde bericht in een handeling te publiceren, moeten we de wachtrij namen opgeven als door komma’s gescheiden waarden [1]. Zoals u kunt zien, wordt dit gedaan in de eigenschap bemiddelaar “QueueName” en gebruikt bij de constructie van JMS uitgever URL. JMS uitgever URL gebruikt de JNDI naam “amqQueue” gedefinieerd in de transportzenderconfiguratie
 
<property value=”queue1,queue2” name=”QueueName” scope=”default” type=”STRING”/> 

<?xml version="1.0" encoding="UTF-8"?> 

<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse"> 

    <resource methods="POST" uri-template="/message"> 

        <inSequence> 

            <log level="custom"> 

              <property value="Message received. Publish to queue." name="LOG"/> 

            </log> 

            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> 

            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> 

            <property name="messageType" scope="axis2" type="STRING" value="application/xml"/> 

            <property value="queue1,queue2" name="QueueName" scope="default" type="STRING"/> 

            <header name="To" scope="default" expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=amqQueue')"/> 

            <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/> 

            <call> 

            <endpoint> 

                <default> 

                    <timeout> 

                        <duration>10000</duration> 

                        <responseAction>fault</responseAction> 

                    </timeout> 

                    <suspendOnFailure> 

                        <errorCodes>-1</errorCodes> 

                        <initialDuration>0</initialDuration> 

                        <progressionFactor>1.0</progressionFactor> 

                        <maximumDuration>0</maximumDuration> 

                    </suspendOnFailure> 

                    <markForSuspension> 

                        <errorCodes>-1</errorCodes> 

                    </markForSuspension> 

                </default> 

            </endpoint> 

            </call> 

        </inSequence> 

        <outSequence/> 

        <faultSequence> 

        </faultSequence> 

    </resource> 

</api> 

3. Maak JMS Proxy aan om berichten van wachtrij 1 te consumeren

3. Maak JMS Proxy aan om berichten van wachtrij 1 te consumeren

Hier verwijst transport.jms.ConnectionFactory naar de JNDI naam “amqQueue” die is gedefinieerd in de JMS-transportontvanger en de naam van de bronwachtrij (wachtrij1) wordt vermeld in de parameter transport.jms.Destination

<?xml version="1.0" encoding="UTF-8"?> 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-one" startOnLoad="true" statistics="disable" trace="disable" transports="jms"> 

  <target> 

    <inSequence> 

      <log level="full"> 

        <property value="Consumer 1: Message consumed from queue." name="LOG" /> 

      </log> 

      <log level="custom"> 

        <property value="Consumer 1: Message processed. Send ACK to queue." name="LOG" /> 

      </log> 

    </inSequence> 

    <outSequence /> 

    <faultSequence> 

      <log level="custom"> 

        <property value="Consumer 1: Error occurred. Send NACK to queue." name="LOG" /> 

      </log> 

      <property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" /> 

      <drop /> 

    </faultSequence> 

  </target> 

  <parameter name="transport.jms.ConnectionFactory">amqQueue</parameter> 

  <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter> 

  <parameter name="transport.jms.DestinationType">queue</parameter> 

  <parameter name="transport.jms.Destination">queue1</parameter> 

  <parameter name="transport.jms.ContentType"> 

    <rules xmlns=""> 

      <jmsProperty>contentType</jmsProperty> 

      <default>application/xml</default> 

    </rules> 

  </parameter> 

  <parameter name="transport.jms.SessionTransacted">true</parameter> 

  <parameter name="transport.jms.CacheLevel">consumer</parameter> 

  <description /> 

</proxy> 

4. Maak JMS consument aan om uit wachtrij 2 te consumeren

Hier verwijst transport.jms.ConnectionFactory naar de JNDI naam “amqQueue” die is gedefinieerd in de JMS transportontvanger en de naam van de bronwachtrij (wachtrij2) wordt vermeld in de parameter transport.jms.Destination

<?xml version="1.0" encoding="UTF-8"?> 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-two" startOnLoad="true" statistics="disable" trace="disable" transports="jms"> 

  <target> 

    <inSequence> 

      <log level="full"> 

        <property value="Consumer 2: Message consumed from queue." name="LOG" /> 

      </log> 

      <log level="custom"> 

        <property value="Consumer 2: Message processed. Send ACK to queue." name="LOG" /> 

      </log> 

    </inSequence> 

    <outSequence /> 

    <faultSequence> 

      <log level="custom"> 

        <property value="Consumer 2: Error occurred. Send NACK to queue." name="LOG" /> 

      </log> 

      <property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" /> 

      <drop /> 

    </faultSequence> 

  </target> 

  <parameter name="transport.jms.ConnectionFactory">amqQueue</parameter> 

  <parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter> 

  <parameter name="transport.jms.DestinationType">queue</parameter> 

  <parameter name="transport.jms.Destination">queue2</parameter> 

  <parameter name="transport.jms.ContentType"> 

    <rules xmlns=""> 

      <jmsProperty>contentType</jmsProperty> 

      <default>application/xml</default> 

    </rules> 

  </parameter> 

  <parameter name="transport.jms.SessionTransacted">true</parameter> 

  <parameter name="transport.jms.CacheLevel">consumer</parameter> 

  <description /> 

</proxy> 

5. Controleer de berichtenstroom

Publiceer een voorbeeld xml bericht door de gemaakte REST API aan te roepen

curl --location --request POST 'https://localhost:8243/publish/message' \ 

--header 'Content-Type: application/xml' \ 

--data-raw '<test/>' 

Observatie van EI logboeken

– REST API accepteert het bericht en publiceert naar wachtrijen (één handeling)
– Consument proxy 1 heeft hetzelfde bericht uit wachtrij 1 geconsumeerd
– Consument proxy 2 heeft hetzelfde bericht uit wachtrij 2 geconsumeerd

Observatie van EI logboeken

Meer informatie

 

ned
Sluiten