info@yenlo.com
ned
Menu
WSO2 12 min

Implementatie van ActiveMQ en het retry-mechanisme in WSO2

In zijn nieuwste blog duikt Nasser diep in ActiveMQ en ontrafelt hij de complexiteit van het retry-mechanisme. Van de opzet van de infrastructuur tot monitoring en onderhoud. Ontdek hoe je ActiveMQ configureert, integreert en test binnen het WSO2-platform.

Nasser Ali Afarin
Nasser Ali Afarin
Integration Consultant
Implementatie van ActiveMQ en het retry mechanisme in WSO2

ActiveMQ is een populaire open-source message broker die betrouwbare berichtpatronen biedt voor verschillende toepassingen. In de context van WSO2 wordt ActiveMQ vaak gebruikt als de onderliggende message broker in het integratie- en middlewareplatform dat door WSO2 wordt geleverd. ActiveMQ kan ook worden gezien als een integraal onderdeel van een integratieplatform, aangezien een wachtrij kan worden gebruikt voor een Dead Letter Channel, een veelvoorkomend Enterprise Integration Pattern.

Implementatie in de context van ActiveMQ verwijst naar het proces van implementatie en configuratie van ActiveMQ binnen het WSO2-platform. Dit omvat het opzetten van de noodzakelijke infrastructuur, het configureren van berichtwachtrijen en -onderwerpen, en het integreren van ActiveMQ met andere componenten van het WSO2-platform. In de eenvoudigste vorm, bijvoorbeeld voor een snelle test, is het vrij eenvoudig te doen, bijvoorbeeld in een Docker-container. Echte implementaties zijn natuurlijk ingewikkelder.

Terugrollen van berichten in ActiveMQ in WSO2 biedt een betrouwbare manier om fouten in de verwerking van berichten af te handelen en ervoor te zorgen dat berichten niet verloren gaan of gedupliceerd worden. Door van deze functie gebruik te maken, kunnen ontwikkelaars robuuste berichtenapplicaties bouwen die verschillende gebruiksscenario’s aankunnen. Soms kan een bericht echter herhaaldelijk falen door een tijdelijk probleem, zoals een tijdelijke netwerkonderbreking of een niet-beschikbare downstream-service. In dergelijke gevallen kan een retry-mechanisme worden geïmplementeerd om het verwerken van het bericht na een vertraging opnieuw te proberen, wat de kans op succesvolle levering vergroot en voorkomt dat het bericht naar een dead-letter-wachtrij wordt verplaatst.

Om een retry-mechanisme in ActiveMQ in WSO2 te implementeren, kan je gebruikmaken van de ingebouwde retry-functie van WSO2 Micro Integrator. De retry-functie stelt u in staat het aantal pogingen om het bericht te verwerken, de vertraging tussen pogingen en de maximale duur van pogingen te specificeren. Je kunt ook een sequentie definiëren om berichten af te handelen die het maximale aantal pogingen hebben overschreden.

Overzicht van de stappen bij een ActiveMQ-implementatie in WSO2

overview of ActiveMQ rollout in WSO2

Infrastructuurinstelling

Allereerst moet je de infrastructuur opzetten die nodig is om ActiveMQ uit te voeren. Dit omvat doorgaans het implementeren van ActiveMQ op een server of cluster van servers, het zorgen voor de juiste netwerkconnectiviteit en het configureren van firewallregels indien nodig.

Configuratie

Vervolgens moet je ActiveMQ configureren om aan de eisen van je toepassing en het WSO2-platform te voldoen. Dit omvat het definiëren van wachtrijen en onderwerpen, het configureren van berichtenpersistentie, het instellen van beveiligings- en verificatiemechanismen, en het afstemmen van verschillende parameters voor optimale prestaties.

Integratie met WSO2

Nadat ActiveMQ is geconfigureerd, moet je het integreren met andere componenten van het WSO2-platform. Dit kan het configureren van WSO2 middleware-producten zoals WSO2 Micro Integrator (MI) of WSO2 API Manager omvatten om ActiveMQ te gebruiken als de message broker voor het afhandelen van berichtenstromen en communicatie tussen verschillende componenten.

Testen en validatie

Nadat de configuratie en integratie zijn voltooid, moet grondig testen en valideren worden uitgevoerd om de betrouwbaarheid en juistheid van de ActiveMQ-implementatie te waarborgen. Dit omvat het testen van berichtlevering, het verifiëren van het gedrag van wachtrijen en onderwerpen, het testen van hoge beschikbaarheid en failoverscenario’s, en het monitoren van prestatie-indicatoren.

Monitoring en onderhoud

Nadat ActiveMQ succesvol is geïmplementeerd, is het essentieel om monitoring- en onderhoudspraktijken in te stellen. Dit omvat het monitoren van de gezondheid en prestaties van de ActiveMQ-infrastructuur, het instellen van waarschuwingen en meldingen voor eventuele problemen, en het uitvoeren van regelmatige onderhoudstaken zoals software-updates en back-ups.

Het uitrollen van ActiveMQ in WSO2 vereist een goed begrip van zowel ActiveMQ als het WSO2-platform. Het wordt aanbevolen om de officiële documentatie en best practices van WSO2 en ActiveMQ communities te volgen om een succesvolle implementatie en efficiënt gebruik van ActiveMQ binnen het WSO2-platform te garanderen.

We veronderstellen dat stappen 1 en 2 zijn voltooid en richten ons op stap 3.

Implementatie

WSO2 configureren in MI

De Micro Integrator heeft op TOML gebaseerde productconfiguraties. Alle serverconfiguraties van je Micro Integrator-exemplaar kunnen worden toegepast met behulp van een enkel configuratiebestand, het “deployment.toml”-bestand (opgeslagen in de MI_HOME/conf-directory).

[[transport.jms.sender]]
name = "myQueueSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "producer"
[transport.jms.listener]]
name = "myQueueConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.username = " yourUsername "
parameter.password = " yourPassword"

Na het toevoegen van deze configuraties moet je MI opnieuw starten.

Implementatie verzenden

Nu gaan we verder met het maken van de eerste API-bron, waarmee we informatie naar ActiveMQ zullen sturen.

<?xml version="1.0" encoding="UTF-8"?>
<api context="/activemq" name="ActiveMqAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="GET">
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <call>
                <endpoint name="sample_queue_EP">
                    <address uri="jms:/testQueue?transport.jms.ConnectionFactory=myQueueConnectionFactory">
                        <timeout>
                            <duration>10000</duration>
                            <responseAction>fault</responseAction>
                        </timeout>
                        <suspendOnFailure>
                            <errorCodes>-1</errorCodes>
                            <initialDuration>0</initialDuration>
                            <progressionFactor>1.0</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
                            <errorCodes>-1</errorCodes>
                            <retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </address>
                </endpoint>
            </call>
            <payloadFactory media-type="json">
                <format>
            {
            	"Result":"Ok"
            }
            </format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

Na het verzenden van het bericht naar deze API, zal de Call-mediator het naar de ActiveMQ-wachtrij sturen.

Tip:
1- Als je de OUT_ONLY-property niet gebruikt, krijgt u de onderstaande fout bij elk verzonden bericht naar de wachtrij.

Axis2Sender Unexpected error during sending message out.
org.apache.axis2.AxisFault: Did not receive a JMS response within 30000 ms to destination

De configuratie van de OUT_ONLY-property stelt ons in staat om MI te vertellen niet te wachten op een reactie van ActiveMQ.

Implementatie ontvangen

Om berichten van ActiveMQ te ontvangen, hebben we twee opties: Proxy-mediator of inboundEndpoint met protocol=”jms” config.

Ontvangen via Proxy

We kunnen een proxy definiëren voor het JMS-protocol en alle parameters gebruiken die zijn gedefinieerd in het dewployment.toml-bestand in de luistersectie of gespecificeerde en aangepaste eigenschappen. Hieronder maken we een voorbeeld:

<proxy xmlns="http://ws.apache.org/ns/synapse" name="sample_activemq_listener" transports="jms" statistics="disable" trace="disable" startOnLoad="true">
    <target>
        <inSequence>
            <log level="full">
                <property name="description" value="we got message from queue by proxy"></property>
            </log>
            <property action="set" name="OUT_ONLY" value="true" />
            <drop /> //Or do whatever your business need
        </inSequence>
        <faultSequence />
    </target>
    <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
    <parameter name="transport.jms.Destination">testQueue</parameter>
    <description />
</proxy>

Ontvangen via inboundEndpoint

InboundEndpoint-artefacten kunnen JMS-parameters lezen uit configuratiebestanden en we kunnen alle benodigde parameters ervoor definiëren. Hieronder staat een voorbeeld ervan. Het kan worden gedaan zonder opnieuw op te starten, wat een voordeel is.

Dit InboundEndpoint luistert naar berichten op de jms testQueue-wachtrij. Wanneer een bericht wordt ontvangen, stuurt het InboundEndpoint het naar myMessageHandler voor verdere verwerking. Als de berichtverwerking mislukt, stuurt het InboundEndpoint het naar de QueueErrorHandler_Seq-sequentie voor toekomstige verwerking.

<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="transport.jms.Destination">testQueue</parameter>
        <parameter name="transport.jms.CacheLevel">3</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">failover:tcp://127.0.0.1:61616</parameter>
        <parameter name="transport.jms.SESSION_TRANSACTED">true</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
        <parameter name="transport.jms.SharedSubscription">false</parameter>
        <parameter name="transport.jms.UserName"> yourUsername </parameter>
        <parameter name="transport.jms.Password"> yourPassword </parameter>
        <parameter name="transport.jms.ContentType">application/json</parameter>
    </parameters>
</inboundEndpoint>

Bericht verwerken

In dit geval halen we het bericht op van ActiveMQ met behulp van InboundEndPoint of Proxy en sturen het naar myMessageHandler_Seq-sequentie. In deze sequentie roepen we een eindpunt aan (bijvoorbeeld http://some_soap_service_url/) in blokkerende modus aan. Wanneer je een service aanroept in niet-blokkerende modus, keert de onderliggende workerthread terug zonder te wachten op de reactie. In blokkerende modus wordt de onderliggende workerthread geblokkeerd en wacht op de reactie nadat het verzoek naar het eindpunt is verzonden. Het is beter om de parameters “timeout” en “suspendOnFailure” te configureren op basis van je behoefte. Controleer deze link voor meer informatie over Endpoint Properties. Als de service beschikbaar is en het bericht en het antwoord accepteert, is alles in orde en MI stuurt automatisch een ACK naar ActiveMQ. In het andere geval, als we een foutmelding krijgen zoals server onbereikbaar, time-out, service niet beschikbaar, ongeldig antwoord of foutantwoord, zal dit “OnError” activeren in InboundEndPoint of “faultSequence” in Proxy.

Onthoud dat als u een geldig antwoord ontvangt met een fout, deze gevallen moeten worden afgehandeld na “call” mediator.

<sequence name="myMessageHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <call blocking="true">
        <endpoint xmlns="http://ws.apache.org/ns/synapse">
            <address trace="disable" uri="http://some_soap_service_url/" format="soap11">
                <timeout>
                    <duration>30000</duration>
                    <responseAction>fault</responseAction>
                </timeout>
                <suspendOnFailure>
                    <errorCodes>-1</errorCodes>
                    <initialDuration>0</initialDuration>
                    <progressionFactor>1.0</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <errorCodes>-1</errorCodes>
                </markForSuspension>
            </address>
        </endpoint>
    </call>
</sequence>

Hoe bericht terug te rollen

Wanneer de Micro Integrator berichten uit een ActiveMQ-wachtrij verwerkt, kan het worden geconfigureerd om berichten opnieuw aan te bieden die niet succesvol zijn verwerkt. Dit is handig om ervoor te zorgen dat berichten niet verloren gaan bij een storing.

<sequence name="QueueErrorHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="SET_ROLLBACK_ONLY" scope="default" type="STRING" value="true" />
</sequence>

Voeg de volgende regel toe aan de foutsequentie:

<property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>

SET_ROLLBACK_ONLY:

Deze parameter moet worden gedefinieerd in WSO2-sequenties om het bericht opnieuw aan ActiveMQ aan te bieden.

Als voorbeeld, wanneer het ESB-profiel van WSO2 MI een bericht niet kan bezorgen aan de back-endservice vanwege een fout, wordt het doorgestuurd naar de foutsequentie in de configuratie. Wanneer de eigenschap SET_ROLLBACK_ONLY is ingesteld in de foutsequentie, informeert WSO2 MI ActiveMQ om het opnieuw in de wachtrij te plaatsen en het bericht opnieuw aan te bieden op basis van parameters gedefinieerd in configuraties.

Toepassen en gebruikmaken van parameters voor het retry-mechanisme

Door een retry-mechanisme in ActiveMQ in WSO2 te implementeren, kan je tijdelijke fouten afhandelen en de kans op succesvolle berichtaflevering vergroten. Deze functie biedt een betrouwbare manier om ervoor te zorgen dat berichten correct worden verwerkt en kan je helpen veerkrachtige berichtenapplicaties te bouwen.

Er is een manier om deze optie toe te voegen aan onze proxy of InboundEndpoints met behulp van InboundEndpoint-parameters.

        <parameter name="redeliveryPolicy.maximumRedeliveries">3</parameter>
        <parameter name="redeliveryPolicy.redeliveryDelay">60000</parameter>
        <parameter name="redeliveryPolicy. initialRedeliveryDelay">60000</parameter>

In dit voorbeeld is het retry-mechanisme geconfigureerd om de verwerking van het bericht tot drie keer opnieuw te proberen, met een vertraging van 60 seconden (60000 ms) tussen pogingen en een maximale duur van 60 seconden. Als het bericht na drie pogingen mislukt, wordt het doorgestuurd naar de gespecificeerde sequentie.

  • redeliveryPolicy.maximumRedeliveries: Maximaal aantal pogingen om het bericht af te leveren. Als ingesteld op -1 zal ActiveMQ onbeperkt opnieuw proberen.
  • transport.jms.SessionTransacted: Wanneer ingesteld op true, wordt de JMS-sessietransactie ingeschakeld voor de proxyservice.
  • redeliveryPolicy.redeliveryDelay: Vertragingstijd in milliseconden tussen pogingen.
  • transport.jms.CacheLevel: Dit moet worden ingesteld op de consument voor het ActiveMQ redelivery-mechanisme om te werken.

Bruikbare parameters

JMSXDeliveryCount: We kunnen het aantal pogingen krijgen met $trp:JMSXDeliveryCount, hieronder zetten we deze variabele naar een andere eigenschap.

<property expression="$trp:JMSXDeliveryCount" name="Retry_Count" scope="default" type="STRING" />

Onthoud dat deze eigenschap de eerste keer leeg is en na de eerste heraflevering van het bericht begint vanaf nummer 2.

Wanneer rollback niet werkt

Een van de belangrijkste onderdelen van de bovenstaande code is het gebruik van . Over het algemeen wordt het niet aanbevolen om blokkerende oproepen te gebruiken, maar het is verplicht om een blokkerende oproep te gebruiken na het ontvangen van een bericht uit de wachtrij als je een bericht naar een service wilt sturen door een endpoint aan te roepen. Als je de blokkeringsmodus gebruikt bij het aanroepen van een eindpunt, ontvang je het antwoord in dezelfde thread als de aanroeper-thread en zal de eigenschap SET_ROLLBACK_ONLY werken, anders werkt het niet.

Conclusie

Al met al zijn het terugdraaien van berichten een kritisch aspect van elk berichtensysteem, en ActiveMQ in WSO2 biedt robuuste ondersteuning voor deze functie. Door te begrijpen hoe het terugdraaien van berichten werkt, kunnen ontwikkelaars veerkrachtige berichtenapplicaties bouwen die fouten kunnen afhandelen en zorgen voor de aflevering van berichten. Je kunt een voorbeeldproject van Yenlo via deze link bekijken.

ned
Sluiten