Der ActiveMQ Message Broker bietet eine Funktion namens „Composite Destination“. Dank dieser Funktion kann dieselbe Nachricht in einem einzigen Vorgang in mehreren physischen ActiveMQ-Warteschlangen veröffentlicht werden. In diesem Blogbeitrag werden wir diese Funktion unter Verwendung von WSO2 EI als Nachrichtenproduzent und -konsument evaluieren.
Das folgende Diagramm zeigt, wie EI und ActiveMQ eingerichtet werden, um die Funktion Composite Destination zu evaluieren.
Hier werden wir eine REST-API (fungiert als JMS Publisher) in EI erstellen, die eine XML-Nachricht annimmt und die Nachricht in den ActiveMQ-Warteschlangen queue1 und queue2 veröffentlicht. Dieser Vorgang ist ein einzelner Vorgang (d. h. ein einzelner Aufruf von ActiveMQ). Dann erstellen wir zwei nachrichtenkonsumierende JMS-Proxys, um die Nachricht aus den jeweiligen Warteschlangen zu lesen und zu überprüfen, ob beide dieselbe Nachricht konsumiert haben.
Verwendete Produktversionen
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12
1. Konfigurieren von WSO2 EI mit ActiveMQ zum Veröffentlichen und Konsumieren von Nachrichten
Befolgen Sie die Anweisungen in der WSO2-Dokumentation, um das Setup zu konfigurieren
https://docs.wso2.com/display/EI660/Configure+with+ActiveMQ+#ConfigurewithActiveMQ-SettingupWSO2EIandActiveMQ
Sie können die folgenden JMS-Empfänger- und Senderkonfigurationen in <EI_HOME>/conf/axis2/axis2.xml file verwenden.
Der JNDI-Name der Warteschlange „amqQueue“ wird später in unserer REST-API und den Proxy-Diensten verwendet.
Da sowohl EI als auch ActiveMQ auf dem lokalen Rechner ausgeführt werden, wird localhost in der Provider-URL verwendet.
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
<parameter name="amqTopic" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
</parameter>
<parameter name="amqQueue" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
<parameter name="default" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
</transportReceiver>
transportSender class="org.apache.axis2.transport.jms.JMSSender" name="jms">
<parameter locked="false" name="amqTopic">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">topic</parameter>
</parameter>
<parameter locked="false" name="amqQueue">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameter>
<parameter locked="false" name="default">
<parameter locked="false" name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameter>
</transportSender>
2. Erstellen Sie die folgende REST-API in EI, um eine beliebige XML-Nachricht anzunehmen und in den beiden Warteschlangen queue1 und queue2 zu veröffentlichen.
Um dieselbe Nachricht bei einem einzigen Vorgang zu veröffentlichen, müssen wir die Namen der Warteschlangen als kommagetrennte Werte angeben [1]. Wie Sie sehen können, erfolgt dies im Property Mediator „QueueName“ und wird bei der Erstellung der JMS-Publisher-URL verwendet. Die JMS-Publisher-URL verwendet den JNDI-Namen „amqQueue“, der in der Transportsenderkonfiguration definiert ist
<property value=“queue1,queue2“ name=“QueueName“ scope=“default“ type=“STRING“/>
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/message">
<inSequence>
<log level="custom">
<property value="Message received. Publish to queue." name="LOG"/>
</log>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<property value="queue1,queue2" name="QueueName" scope="default" type="STRING"/>
<header name="To" scope="default" expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=amqQueue')"/>
<property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
<call>
<endpoint>
<default>
<timeout>
<duration>10000</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
</markForSuspension>
</default>
</endpoint>
</call>
</inSequence>
<outSequence/>
<faultSequence>
</faultSequence>
</resource>
</api>
3. Erstellen Sie einen JMS-Proxy, um Nachrichten aus queue1 zu konsumieren
Hier bezieht sich transport.jms.ConnectionFactory auf den JNDI-Namen „amqQueue“, der im JMS-Transportempfänger definiert ist, und der Name der Quellwarteschlange (queue1) wird im Parameter transport.jms.Destination erwähnt
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-one" startOnLoad="true" statistics="disable" trace="disable" transports="jms">
<target>
<inSequence>
<log level="full">
<property value="Consumer 1: Message consumed from queue." name="LOG" />
</log>
<log level="custom">
<property value="Consumer 1: Message processed. Send ACK to queue." name="LOG" />
</log>
</inSequence>
<outSequence />
<faultSequence>
<log level="custom">
<property value="Consumer 1: Error occurred. Send NACK to queue." name="LOG" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<drop />
</faultSequence>
</target>
<parameter name="transport.jms.ConnectionFactory">amqQueue</parameter>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<description />
</proxy>
4. Erstellen Sie einen JMS-Konsumenten, der aus der queue2 konsumiert
Hier bezieht sich transport.jms.ConnectionFactory auf den JNDI-Namen „amqQueue“, der im JMS-Transportempfänger definiert ist, und der Name der Quellwarteschlange (queue2) wird im Parameter transport.jms.Destination erwähnt
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="jms-consumer-two" startOnLoad="true" statistics="disable" trace="disable" transports="jms">
<target>
<inSequence>
<log level="full">
<property value="Consumer 2: Message consumed from queue." name="LOG" />
</log>
<log level="custom">
<property value="Consumer 2: Message processed. Send ACK to queue." name="LOG" />
</log>
</inSequence>
<outSequence />
<faultSequence>
<log level="custom">
<property value="Consumer 2: Error occurred. Send NACK to queue." name="LOG" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<drop />
</faultSequence>
</target>
<parameter name="transport.jms.ConnectionFactory">amqQueue</parameter>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.Destination">queue2</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<description />
</proxy>
5. Überprüfen Sie den Nachrichtenfluss
Veröffentlichen Sie eine XML-Beispielnachricht, indem Sie die erstellte REST-API aufrufen
curl --location --request POST 'https://localhost:8243/publish/message' \
--header 'Content-Type: application/xml' \
--data-raw '<test/>'
Beobachtung aus EI-Protokollen
– REST-API nimmt die Nachricht an und veröffentlicht sie in Warteschlangen (einzelner Vorgang)
– Konsumentenproxy 1 hat dieselbe Nachricht aus queue1 konsumiert
– Konsumentenproxy 2 hat dieselbe Nachricht aus queue2 konsumiert