info@yenlo.com
deu
Menu
WSO2 13 min

Bereitstellung von ActiveMQ und seinem Wiederholungsmechanismus in WSO2

In seinem neuesten Blog taucht Nasser tief in ActiveMQ ein und entwirrt die Komplexitäten seines Wiederholungsmechanismus. Von der Einrichtung der Infrastruktur bis zur Überwachung und Wartung. Erfahren Sie, wie Sie ActiveMQ innerhalb der WSO2-Plattform konfigurieren, integrieren und testen.

Nasser Ali Afarin
Nasser Ali Afarin
Integration Expert
Bereitstellung von ActiveMQ und seinem Wiederholungsmechanismus in WSO2

ActiveMQ ist ein beliebter Open-Source-Nachrichtenbroker, der zuverlässige Nachrichtenmuster für verschiedene Anwendungen bietet. Im Kontext von WSO2 wird ActiveMQ häufig als der zugrunde liegende Nachrichtenbroker in der Integrations- und Middleware-Plattform von WSO2 verwendet. ActiveMQ kann auch als integraler Bestandteil einer Integrationsplattform betrachtet werden, da eine Warteschlange für einen Dead Letter Channel verwendet werden kann, ein gängiges Muster für die unternehmensweite Integration.

Im Kontext von ActiveMQ bezieht sich die Einführung auf den Prozess der Bereitstellung und Konfiguration von ActiveMQ innerhalb der WSO2-Plattform. Dies umfasst das Einrichten der erforderlichen Infrastruktur, die Konfiguration von Nachrichtenwarteschlangen und -themen sowie die Integration von ActiveMQ mit anderen Komponenten der WSO2-Plattform. In seiner einfachsten Form für einen schnellen Test ist es zum Beispiel in einem Docker-Container recht einfach zu tun. Reale Bereitstellungen sind natürlich komplizierter.

Rückrollnachrichten in ActiveMQ in WSO2 bieten eine zuverlässige Möglichkeit, mit Fehlern bei der Nachrichtenverarbeitung umzugehen und sicherzustellen, dass Nachrichten nicht verloren gehen oder dupliziert werden. Durch die Nutzung dieses Features können Entwickler robuste Messaging-Anwendungen erstellen, die eine Vielzahl von Anwendungsfällen bewältigen können. Manchmal kann jedoch eine Nachricht aufgrund eines vorübergehenden Problems, wie einer vorübergehenden Netzwerkstörung oder eines nicht verfügbaren nachgelagerten Dienstes, wiederholt fehlschlagen. In solchen Fällen kann ein Wiederholungsmechanismus implementiert werden, um die Verarbeitung der Nachricht nach einer Verzögerung zu wiederholen, was die Chancen auf erfolgreiche Zustellung erhöhen und vermeiden kann, die Nachricht in eine Warteschlange für abgelehnte Nachrichten zu verschieben.

Um einen Wiederholungsmechanismus in ActiveMQ in WSO2 zu implementieren, können Sie die integrierte Wiederholungsfunktion des WSO2 Micro Integrator verwenden. Die Wiederholungsfunktion ermöglicht es Ihnen, die Anzahl der Wiederholungen der Verarbeitung der Nachricht, die Verzögerung zwischen den Wiederholungen und die maximale Dauer der Wiederholung zu spezifizieren. Sie können auch eine Sequenz definieren, um Nachrichten zu behandeln, die die maximale Anzahl der Wiederholungen überschritten haben.

Hochrangiger Überblick über die Schritte bei einer ActiveMQ-Bereitstellung in WSO2

overview of ActiveMQ rollout in WSO2

Infrastrukturaufbau

Zuerst müssen Sie die Infrastruktur einrichten, die für den Betrieb von ActiveMQ erforderlich ist. Dies umfasst in der Regel die Bereitstellung von ActiveMQ auf einem Server oder Cluster von Servern, die Sicherstellung einer ordnungsgemäßen Netzwerkverbindung und die Konfiguration von Firewall-Regeln, wenn dies erforderlich ist.

Konfiguration

Als nächstes müssen Sie ActiveMQ so konfigurieren, dass es den Anforderungen Ihrer Anwendung und der WSO2-Plattform entspricht. Dies umfasst das Definieren von Warteschlangen und Themen, die Konfiguration der Nachrichtenpersistenz, das Einrichten von Sicherheits- und Authentifizierungsmechanismen sowie das Abstimmen verschiedener Parameter für optimale Leistung.

Integration mit WSO2

Nach der Konfiguration müssen Sie ActiveMQ in andere Komponenten der WSO2-Plattform integrieren. Dies kann die Konfiguration von Middleware-Produkten wie WSO2 Micro Integrator (MI) oder WSO2 API Manager beinhalten, um ActiveMQ als Nachrichtenbroker für die Handhabung von Nachrichtenflüssen und die Kommunikation zwischen verschiedenen Komponenten zu verwenden.

Testen und Validieren

Nach Abschluss der Konfiguration und Integration sollten gründliche Tests und Validierungen durchgeführt werden, um die Zuverlässigkeit und Richtigkeit der ActiveMQ-Bereitstellung sicherzustellen. Dies umfasst Tests zur Nachrichtenzustellung, Überprüfung des Verhaltens von Warteschlangen und Themen, Tests von Hochverfügbarkeits- und Failover-Szenarien sowie die Überwachung von Leistungskennzahlen.

Überwachung und Wartung

Nach erfolgreicher Bereitstellung von ActiveMQ ist es wichtig, Überwachungs- und Wartungspraktiken zu etablieren. Dies umfasst die Überwachung der Gesundheit und Leistung der ActiveMQ-Infrastruktur, das Einrichten von Warnungen und Benachrichtigungen für etwaige Probleme sowie regelmäßige Wartungsaufgaben wie Softwareupdates und Backups.

Die Bereitstellung von ActiveMQ in WSO2 erfordert ein gutes Verständnis sowohl von ActiveMQ als auch von der WSO2-Plattform. Es wird empfohlen, der offiziellen Dokumentation und den bewährten Methoden der WSO2- und ActiveMQ-Community zu folgen, um eine erfolgreiche Bereitstellung und effiziente Nutzung von ActiveMQ innerhalb der WSO2-Plattform zu gewährleisten. Wir nehmen an, dass Schritte 1 und 2 abgeschlossen sind und konzentrieren uns auf Schritt 3.

Implementierung

WSO2 in MI konfigurieren

Der Micro Integrator verfügt über TOML-basierte Produktkonfigurationen. Alle konfigurationen auf Serverebene Ihrer Micro Integrator-Instanz können mit einer einzelnen Konfigurationsdatei angewendet werden, die die „deployment.toml“-Datei ist (gespeichert im MI_HOME/conf-Verzeichnis).

[[transport.jms.sender]]
name = "myQueueSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "producer"
[transport.jms.listener]]
name = "myQueueConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.username = " yourUsername "
parameter.password = " yourPassword"

Nachdem Sie diese Konfigurationen hinzugefügt haben, müssen Sie MI neu starten.

Send Implementierung

Jetzt werden wir damit beginnen, die erste API-Ressource zu erstellen, mit der wir Informationen an die ActiveMQ senden.

<?xml version="1.0" encoding="UTF-8"?>
<api context="/activemq" name="ActiveMqAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="GET">
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <call>
                <endpoint name="sample_queue_EP">
                    <address uri="jms:/testQueue?transport.jms.ConnectionFactory=myQueueConnectionFactory">
                        <timeout>
                            <duration>10000</duration>
                            <responseAction>fault</responseAction>
                        </timeout>
                        <suspendOnFailure>
                            <errorCodes>-1</errorCodes>
                            <initialDuration>0</initialDuration>
                            <progressionFactor>1.0</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
                            <errorCodes>-1</errorCodes>
                            <retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </address>
                </endpoint>
            </call>
            <payloadFactory media-type="json">
                <format>
            {
            	"Result":"Ok"
            }
            </format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

Nach dem Senden der Nachricht an diese API sendet der Aufrufer die Nachricht an die ActiveMQ-Warteschlange.

Tipp:
1- Wenn Sie die OUT_ONLY-Eigenschaft nicht verwenden, sehen Sie den unten stehenden Fehler bei jeder gesendeten Nachricht an die Warteschlange.

Axis2Sender Unexpected error during sending message out.
org.apache.axis2.AxisFault: Did not receive a JMS response within 30000 ms to destination

Die Konfiguration der OUT_ONLY-Eigenschaft ermöglicht es uns, dem MI mitzuteilen, nicht auf eine Antwort von der ActiveMQ zu warten.

Empfangsimplementierung

Für den Empfang von Nachrichten von ActiveMQ haben wir zwei Optionen: die Verwendung des Proxy-Mediators oder von InboundEndpoint mit der Konfiguration protocol=“jms“.

Empfangen durch Proxy

Wir können einen Proxy für das JMS-Protokoll definieren und alle in der Dewployment.toml-Datei im Listener-Abschnitt oder angegebenen und benutzerdefinierten Eigenschaften definierten Parameter verwenden. Im Folgenden erstellen wir ein Beispiel:

<proxy xmlns="http://ws.apache.org/ns/synapse" name="sample_activemq_listener" transports="jms" statistics="disable" trace="disable" startOnLoad="true">
    <target>
        <inSequence>
            <log level="full">
                <property name="description" value="we got message from queue by proxy"></property>
            </log>
            <property action="set" name="OUT_ONLY" value="true" />
            <drop /> //Or do whatever your business need
        </inSequence>
        <faultSequence />
    </target>
    <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
    <parameter name="transport.jms.Destination">testQueue</parameter>
    <description />
</proxy>

Empfangen durch InboundEndpoint

InboundEndpoint-Artefakte können JMS-Parameter aus Konfigurationsdateien lesen, wir können auch alle benötigten Parameter dafür definieren. Im Folgenden finden Sie ein Beispiel dafür. Es kann ohne Neustart durchgeführt werden, was ein Vorteil ist.

Dieses InboundEndpoint lauscht auf Nachrichten in der JMS-TestQueue-Warteschlange. Wenn eine Nachricht empfangen wird, sendet das InboundEndpoint sie an myMessageHandler für weitere Prozesse. Wenn die Nachrichtenverarbeitung fehlschlägt, sendet das InboundEndpoint sie an die QueueErrorHandler_Seq-Sequenz für zukünftige Verarbeitung.

<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="transport.jms.Destination">testQueue</parameter>
        <parameter name="transport.jms.CacheLevel">3</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">failover:tcp://127.0.0.1:61616</parameter>
        <parameter name="transport.jms.SESSION_TRANSACTED">true</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
        <parameter name="transport.jms.SharedSubscription">false</parameter>
        <parameter name="transport.jms.UserName"> yourUsername </parameter>
        <parameter name="transport.jms.Password"> yourPassword </parameter>
        <parameter name="transport.jms.ContentType">application/json</parameter>
    </parameters>
</inboundEndpoint>

Prozessnachricht

In diesem Fall erhalten wir die Nachricht von ActiveMQ mit InboundEndPoint oder Proxy und senden sie an die Sequenz myMessageHandler_Seq. In dieser Sequenz rufen wir einen Endpunkt auf (zum Beispiel http://some_soap_service_url/) im Blockierungsmodus auf. Wenn Sie einen Dienst im nicht blockierenden Modus aufrufen, kehrt der zugrunde liegende Worker-Thread ohne Warten auf die Antwort zurück. Im Blockierungsmodus wird der zugrunde liegende Worker-Thread blockiert und wartet auf die Antwort, nachdem er die Anfrage an den Endpunkt gesendet hat. Es ist ratsam, die Parameter „timeout“ und „suspendOnFailure“ basierend auf Ihren Anforderungen zu konfigurieren. Bitte prüfen Sie diesen Link, um mehr über Endpunkt-Eigenschaften zu erfahren. Wenn der Dienst verfügbar ist und die Nachricht und Antwort akzeptiert, ist alles in Ordnung und MI sendet automatisch ein ACK an ActiveMQ. Andernfalls, wenn wir einen Fehler wie Server nicht erreichbar oder Timeout oder Dienst nicht verfügbar, ungültige Antwort oder Fehlerantwort erhalten, lösen diese Situationen „OnError“ in InboundEndPoint oder „faultSequence“ in Proxy aus.

Denken Sie daran, wenn Sie eine gültige Antwort mit einem Fehler erhalten haben, sollten diese Fälle nach dem „call“-Mediator behandelt werden.

<sequence name="myMessageHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <call blocking="true">
        <endpoint xmlns="http://ws.apache.org/ns/synapse">
            <address trace="disable" uri="http://some_soap_service_url/" format="soap11">
                <timeout>
                    <duration>30000</duration>
                    <responseAction>fault</responseAction>
                </timeout>
                <suspendOnFailure>
                    <errorCodes>-1</errorCodes>
                    <initialDuration>0</initialDuration>
                    <progressionFactor>1.0</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <errorCodes>-1</errorCodes>
                </markForSuspension>
            </address>
        </endpoint>
    </call>
</sequence>

Wie man eine Nachricht zurückrollt

Wenn der Micro Integrator Nachrichten aus einer ActiveMQ-Warteschlange verbraucht, kann er so konfiguriert werden, dass Nachrichten, die nicht verarbeitet werden können, erneut zugestellt werden. Dies ist nützlich, um sicherzustellen, dass Nachrichten im Falle eines Fehlers nicht verloren gehen.

<sequence name="QueueErrorHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="SET_ROLLBACK_ONLY" scope="default" type="STRING" value="true" />
</sequence>

Fügen Sie die folgende Zeile in Ihre Fehlersequenz ein:

<property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>

SET_ROLLBACK_ONLY:

Diese Eigenschaft muss in WSO2-Sequenzen definiert werden, um die Nachricht an ActiveMQ erneut zuzustellen.

Als Beispiel, wenn das ESB-Profil von WSO2 MI aufgrund eines Fehlers eine Nachricht nicht an den Back-End-Dienst zustellen kann, wird es zur Fehlersequenz in der Konfiguration geroutet. Wenn die SET_ROLLBACK_ONLY-Eigenschaft in der Fehlersequenz festgelegt ist, informiert der WSO2 MI ActiveMQ, es erneut in die Warteschlange zu stellen und die Nachricht basierend auf den in der Konfiguration definierten Parametern erneut zuzustellen.

Anwenden und Verwenden von Parametern des Wiederholungsmechanismus

Durch die Implementierung eines Wiederholungsmechanismus in ActiveMQ in WSO2 können Sie vorübergehende Fehler behandeln und die Chancen auf erfolgreiche Nachrichtenzustellung erhöhen. Diese Funktion bietet eine zuverlässige Möglichkeit sicherzustellen, dass Nachrichten korrekt verarbeitet werden, und kann Ihnen helfen, robuste Messaging-Anwendungen zu erstellen.

Es gibt eine Möglichkeit, diese Option zu unseren Proxies oder InboundEndpoints mit InboundEndpoint-Parametern hinzuzufügen.

        <parameter name="redeliveryPolicy.maximumRedeliveries">3</parameter>
        <parameter name="redeliveryPolicy.redeliveryDelay">60000</parameter>
        <parameter name="redeliveryPolicy. initialRedeliveryDelay">60000</parameter>

In diesem Beispiel ist der Wiederholungsmechanismus so konfiguriert, dass die Verarbeitung der Nachricht bis zu dreimal wiederholt wird, mit einer Verzögerung von 60 Sekunden (60000 ms) zwischen den Wiederholungen und einer maximalen Dauer von 60 Sekunden. Wenn die Nachricht nach drei Wiederholungen fehlschlägt, wird sie an die angegebene Sequenz gerichtet.

  • redeliveryPolicy.maximumRedeliveries: Maximale Anzahl der Wiederholungen für die Zustellung der Nachricht. Wenn auf -1 gesetzt, wird ActiveMQ unendlich wiederholen.
  • transport.jms.SessionTransacted: Wenn auf true gesetzt, aktiviert dies die JMS-Sitzungstransaktion für den Proxy-Dienst.
  • redeliveryPolicy.redeliveryDelay: Verzögerungszeit in Millisekunden zwischen den Wiederholungen.
  • transport.jms.CacheLevel: Dies muss für den Verbraucher eingestellt werden, damit der ActiveMQ-Wiederholungsmechanismus funktioniert.

Verwendbare Parameter

JMSXDeliveryCount: Wir können die Anzahl der Wiederholungen mit $trp:JMSXDeliveryCount erhalten, unten setzen wir diese Variable auf eine andere Eigenschaft.

<property expression="$trp:JMSXDeliveryCount" name="Retry_Count" scope="default" type="STRING" />

Denken Sie daran, dass diese Eigenschaft beim ersten Mal null ist und nach der ersten Wiederholung der Nachricht bei Nummer 2 startet.

Wenn das Zurückrollen nicht funktioniert

Einer der wichtigsten Teile des obigen Codes ist die Verwendung von . Es wird im Allgemeinen nicht empfohlen, blockierende Aufrufe zu verwenden, aber es ist zwingend erforderlich, einen blockierenden Aufruf nach dem Empfangen einer Nachricht aus der Warteschlange zu verwenden, wenn Sie eine Nachricht an einen Dienst senden möchten, indem Sie einen Endpunkt aufrufen. Wenn Sie den Blockierungsmodus im Aufruf eines Endpunkts verwenden, erhalten Sie die Antwort im gleichen Thread wie der Aufrufer-Thread, und die Eigenschaft SET_ROLLBACK_ONLY funktioniert, sonst funktioniert sie nicht.

Fazit

Zusammenfassend sind das Zurückrollen von Nachrichten ein entscheidender Aspekt eines jeden Nachrichtensystems, und ActiveMQ in WSO2 bietet robuste Unterstützung für dieses Feature. Durch das Verständnis, wie das Zurückrollen von Nachrichten funktioniert, können Entwickler widerstandsfähige Messaging-Anwendungen erstellen, die Ausfälle bewältigen können und die Zustellung von Nachrichten sicherstellen. Sie können auch auf das Beispielprojekt von Yenlo über diesen Link zugreifen.

deu
Schließen
Was ist auf unserer Speisekarte