In garantierten asynchronen Nachrichtenübermittlungssystemen werden Nachrichtenbroker verwendet, um Nachrichten von Publishern an Consumer zu übermitteln. In einigen Szenarien soll eine in einer Nachrichtenbroker-Warteschlange veröffentlichte Nachricht basierend auf dem Nachrichteninhalt an einen bestimmten Consumer weitergeleitet werden (inhaltsbasiertes Routing).
Die übliche Lösung dafür ist, alle mit der jeweiligen Warteschlange verbundenen Consumer alle Nachrichten konsumieren zu lassen und die Nachrichten basierend auf dem Inhalt herauszufiltern. Dies führt jedoch zu einem zusätzlichen Verarbeitungsaufwand beim Consumer.
In diesem Blogbeitrag werden wir beschreiben, wie wir das inhaltsbasierte Routing mit der Funktion „Selectors“ von ActiveMQ erreichen und den zusätzlichen Verarbeitungsaufwand beim Consumer vermeiden können. Wir werden Ihnen zeigen, wie Sie den WSO2 JMS-Proxy konfigurieren, um ActiveMQ mit Selektor-/Filterkriterien zu nutzen.
ActiveMQ-Nachrichtenselektoren werden normalerweise in Nachrichten-Headern angewendet. Sie können aber auch mit XPATH-basierten Selektoren für XML-Nutzdaten angewendet werden. In diesem Blogbeitrag werden wir uns nur mit den Header-basierten Selektoren befassen.
Szenario
Angenommen, wir haben zwei Nachrichten-Consumer für eine Warteschlange und ein Consumer soll die Nachricht mit dem Header groupID=group1 und der andere Consumer mit dem Header groupID=group2 verarbeiten.
Lösung
WSO2 EI wird sowohl als Nachrichtenproduzent als auch als Consumer fungieren. Wir werden die JMS-Proxy-Consumer so konfigurieren, dass sie die entsprechenden Nachrichten konsumieren. Dazu fügen wir der Proxy-Konfiguration den Selektor/Filter hinzu.
Wir haben Folgendes verwendet:
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12
Es würde aber auch mit dem Micro Integrator funktionieren. Die Konfiguration ist im Wesentlichen dieselbe, obwohl die Konfiguration beim Micro Integrator in der Datei deployment.toml erfolgt und nicht direkt in der Datei axis2.xml, wie es beim Enterprise Integrator der Fall ist.
Konfigurieren von WSO2 EI mit ActiveMQ zum Veröffentlichen und Konsumieren von Nachrichten
Wir werden die Installation von WSO2 und ActiveMQ nicht vollständig beschreiben. Sie können die Anweisungen in der WSO2-Dokumentation befolgen, um das Setup zu konfigurieren, oder sich diese Blogs durchlesen.
JMS-Produzent REST-API im EI erstellen
Diese API würde eine XML-Nachricht als Eingabe akzeptieren und die Nachricht in einer Warteschlange veröffentlichen. Bei der Veröffentlichung der Nachricht würde sie die groupID des JMS-Headers auf Grundlage des groupID-Wertes aus den Nutzdaten festlegen. Diese groupID des JMS-Headers wird in der Konfiguration des JMS-Consumers verwendet, um die relevanten Nachrichten auszuwählen oder zu filtern.
Um der Entwurfsansicht gerecht zu werden, haben wir eine Eigenschaftsgruppe verwendet, um einige Eigenschaften zu bündeln. Dies dient nur zu Darstellungszwecken.
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/message">
<inSequence>
<log level="custom">
<property name="LOG" value="Message received. Publish to queue."/>
</log>
<log level="full"/>
<propertyGroup>
<property expression="//groupID/text()" name="groupID" scope="transport" type="STRING"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<property name="QueueName" scope="default" type="STRING" value="queue1"/>
</propertyGroup>
<header expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=myQueueSender')" name="To" scope="default"/>
<property action="remove" name="REST_URL_POSTFIX" scope="axis2"/>
<call>
<endpoint>
<default>
<timeout>
<duration>10000</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</default>
</endpoint>
</call>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
JMS-Consumer 1 im EI erstellen
Dieser Proxy soll nur die Nachrichten mit JMS-Header groupID=group1 konsumieren. Sehen Sie sich unten im Proxy die Selektor-/Filterbedingung an, um ActiveMQ anzuweisen, die Nachrichten mit dem Header groupID=group1 zu versenden:
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer1 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer1 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer1 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
Der Proxy ist vor allem in der Entwurfsansicht recht simpel gehalten, da lediglich die übergeordneten Informationen, nicht aber die Parameter angezeigt werden. (Sie müssen dafür in die Quellansicht wechseln). Das folgende Bild beschreibt beide jmsconsumer-Proxys, da sie sich sehr ähneln.
JMS-Consumer 2 im EI erstellen
Dieser Proxy soll nur die Nachrichten mit JMS-Header groupID=group1 konsumieren. Sehen Sie sich unten im Proxy die Selektor-/Filterbedingung an, um ActiveMQ anzuweisen, die Nachrichten mit dem Header groupID=group2 zu versenden:
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer2 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer2 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer2 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
Das Setup testen.
Wir werden Anfragen mit den groupID-Werten group1, group2 und group3 senden. Sie können CURL verwenden, um die Nachricht an die exponierte API zu senden. Der Befehl würde lauten:
curl -k --location --request POST 'https://localhost:8243/publish/message' --header 'Content-Type: application/xml' --data '<groupID>group1</groupID>'
Auf der Konsole sehen wir:
Wie wir in den Protokollen sehen können, wird die Nachricht mit groupID=group1 von Consumer 1 und die Nachricht mit groupID=group2 von Consumer 2 gemäß den definierten Selektor-/Filterbedingungen konsumiert.
Die Nachricht mit groupID=group3 wird nicht konsumiert, da sie keiner der für beide Consumer definierten Filterbedingungen entspricht. Wenn ein neuer Consumer die Warteschlange mit dem Selektor groupID=’group3′ abonniert, werden diese Nachrichten konsumiert.
Wie Sie beim Durchgehen der Nachricht sehen können, ist die groupid drei und wird daher weder von JMS-Consumer 1 noch von JMS-Consumer 2 erfasst.