In gegarandeerde asynchrone berichtenbezorgingssystemen worden bericht-brokers gebruikt om berichten van uitgevers aan consumenten te bezorgen. In sommige scenario’s moet een bericht dat naar een berichten-broker-wachtrij wordt gepubliceerd worden gerouteerd naar een specifieke consument. Dit gebeurt op basis van de inhoud van het bericht (content based routering).
De algemene oplossing hiervoor is om álle consumenten die op de betreffende wachtrij zijn aangesloten álle berichten te laten consumeren en dan de berichten eruit te filteren op basis van de inhoud. Maar dit zorgt voor extra verwerkingacties aan de kant van de consument.
In deze blogpost zullen we bespreken hoe we de op inhoud gebaseerde routering kunnen bereiken met ActiveMQ’s “Selectors”-functie en extra verwerkingsacties aan de kant van de consument kunnen vermijden. We laten zien hoe u de WSO2 JMS-proxy configureert om u te abonneren op ActiveMQ met selector-/filtercriteria.
ActiveMQ-berichtkiezers worden meestal toegepast op bericht-headers. Maar het kan ook worden toegepast op XML-payload met behulp van XPATH-gebaseerde selectors. In deze blogpost gaan we alleen in op selectors die op header-teksten zijn gebaseerd.
Scenario
Laten we zeggen dat we twee berichtconsumenten in een wachtrij hebben en dat één consument het bericht moet verwerken met header groupID=group1 en de andere consument met header groupID=group2.
Oplossing
WSO2 EI treedt zowel op als producent als consument van berichten. We zullen JMS Proxy-consumenten configureren om de respectieve berichten te consumeren door de selector/filter toe te voegen aan de proxyconfiguratie.
We gebruikten
– WSO2 EI 6.6.0
– ActiveMQ 5.15.12
Maar het werkt ook op de Micro Integrator. De configuratie is in wezen hetzelfde, hoewel de configuratie wordt uitgevoerd in deployment.toml in het geval van de Micro Integrator. En niet rechtstreeks in het bestand as2.xml, zoals het geval is voor de Enterprise Integrator.
Configureer WSO2 EI met ActiveMQ om berichten te publiceren en te consumeren
We zullen de installatie van WSO2 en ActiveMQ niet volledig beschrijven . U kunt de instructies in de WSO2-documentatie volgen om de installatie te configureren of door deze blogs bekijken .
Maak JMS-producent REST API in EI
Deze API accepteert een xml-bericht als invoer en publiceert het bericht in een wachtrij. Tijdens het publiceren van het bericht zou de groupID JMS-header worden ingesteld op basis van de groupID-waarde van de payload. Deze groupID JMS-header wordt gebruikt in de JMS-consumentenconfiguratie om de relevante berichten te selecteren of te filteren.
Om in het ontwerpaanzicht te passen, hebben we een eigenschapsgroep gebruikt om enkele eigenschappen te bundelen. Dit is alleen voor weergavedoeleinden.
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publish" name="jms-publisher" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/message">
<inSequence>
<log level="custom">
<property name="LOG" value="Message received. Publish to queue."/>
</log>
<log level="full"/>
<propertyGroup>
<property expression="//groupID/text()" name="groupID" scope="transport" type="STRING"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<property name="QueueName" scope="default" type="STRING" value="queue1"/>
</propertyGroup>
<header expression="concat('jms:/',get-property('QueueName'),'?transport.jms.ConnectionFactory=myQueueSender')" name="To" scope="default"/>
<property action="remove" name="REST_URL_POSTFIX" scope="axis2"/>
<call>
<endpoint>
<default>
<timeout>
<duration>10000</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</default>
</endpoint>
</call>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
Maak JMS Consumer 1 in EI
Deze proxy zou alleen de berichten moeten consumeren met JMS-header groupID=group1. Om de ActiveMQ te vertellen om de berichten met groupID=group1 header te verzenden, zie hieronder in de proxy de selector/filtervoorwaarde:
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer1 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer1 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer1 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group1'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
De proxy is vrij eenvoudig, vooral vanuit de ontwerpweergave die alleen de informatie op hoog niveau toont en niet de parameters in die weergave. Daarvoor moet u naar de bronweergave gaan. De onderstaande afbeelding beschrijft beide jmsconsumer-proxy’s omdat ze behoorlijk op elkaar lijken.
Maak JMS Consumer 2 in EI
Deze proxy zou alleen de berichten moeten consumeren met JMS-header groupID=group1. Om de ActiveMQ te vertellen om de berichten met groupID=group2 header te verzenden, zie hieronder in de proxy de selector/filtervoorwaarde:
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="jmsconsumer2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="full">
<property name="LOG" value="Consumer2 : Message consumed from queue."/>
</log>
<log level="custom">
<property name="LOG" value="Consumer2 : Message processed. Send ACK to queue."/>
</log>
</inSequence>
<outSequence/>
<faultSequence>
<log level="custom">
<property name="LOG" value="Consumer2 : Error occurred. Send NACK to queue."/>
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true"/>
</faultSequence>
</target>
<parameter name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.MessageSelector">groupID='group2'</parameter>
<parameter name="transport.jms.Destination">queue1</parameter>
<parameter name="transport.jms.ContentType">
<rules xmlns="">
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>
Test de opstelling
We sturen verzoeken met groupID-waarden group1, group2 en group3. U kunt CURL gebruiken om het bericht naar de opengestelde API te sturen. De opdracht zou zijn:
curl -k --location --request POST 'https://localhost:8243/publish/message' --header 'Content-Type: application/xml' --data '<groupID>group1</groupID>'
Op de console zien we:
Zoals we in de logboeken kunnen zien, wordt volgens de gedefinieerde selector/filtervoorwaarden bericht met groupID=group1 gebruikt door consument 1 en wordt bericht met groupID=group2 gebruikt door consument 2.
Bericht met groupID=group3 niet geconsumeerd omdat het niet overeenkomt met een filtervoorwaarde die voor beide consumenten is gedefinieerd. Als een nieuwe consument zich inschrijft in de wachtrij met selector groupID=’group3′, worden deze berichten verbruikt.
Zoals je kunt zien als we door het bericht bladeren, is het groeps-ID drie en daarom niet opgepikt door JMS-consument 1 of JMS-consument 2.