fb
WSO2 10 Minuten

Mit ActiveMQ spielen

Rob Blaauboer
Rob Blaauboer
Playing with ActiveMQ
Scrollen

Für diesen Blog werden wir ActiveMQ als Broker verwenden. Wir richten einen Beispiel-Proxy ein, der die Nachricht in eine Queue sendet. Ein weiterer Proxy wird verwendet, der auf Nachrichten in dieser Queue wartet und die Nachricht an eine Datenbank senden wird. Durch diese Kombination können wir sehen, wie man mit einem JMS-Endpunkt interagiert, einen OUT_ONLY-Aufruf durchführt und sehen, wie man eine Nachricht von einer Queue an einen anderen Service sendet.

Wenn Sie ActiveMQ noch nicht installiert haben, müssen Sie die Software zuerst einrichten. Die unten stehende Installation ist für eine Linux-Umgebung und installiert Version 5.15.8, aber Sie können jede aktuelle Version von ActiveMQ v5 verwenden. Sie können auch Apache Artemis verwenden, aber das würde einige Änderungen in den jar-Dateien und der Konfiguration erfordern, daher empfehle ich, bei ActiveMQ v5 zu bleiben, wenn Sie die Schritte in diesem Blog ausführen.

Ändern Sie bitte die unten aufgeführten Installationsorte, um sie an Ihr System und Ihre Vorlieben anzupassen. Ich habe es schließlich auf meinem Desktop unter Centos/7 installiert.

wget http://archive.apache.org/dist/activemq/5.15.8/apache-activemq-5.15.8-bin.tar.gz -P /opt/wso2/Downloads
tar -C /opt/wso2/Desktop -zxvf /opt/wso2/Downloads/apache-activemq-5.15.8-bin.tar.gz
mv /opt/wso2/Desktop/apache-activemq-5.15.8 /opt/wso2/Desktop/active
cd /opt/wso2/Desktop/active/bin
./activemq console

Der letzte Befehl oben startet ActiveMQ im Kommandozeilen-Konsolenmodus. Dadurch können Sie die Log-Ausgabe von ActiveMQ direkt in der Kommandozeilen-Ausgabe sehen. Sobald ActiveMQ gestartet ist, öffnen Sie die webbasierte Verwaltungskonsole, indem Sie zu http://localhost:8161 gehen. In der Befehlszeilenausgabe sehen Sie die URL, die ähnlich lautet wie „INFO-Meldung | ActiveMQ WebConsole verfügbar unter http://0.0.0.0:8161/“.

Playing with ActiveMQ 1

Anmeldung mit admin/admin-Anmeldeinformationen.

Playing with ActiveMQ 2

Sie sollten nun die Konsole im Browser sehen.

Playing with ActiveMQ 3

Lassen Sie uns nun unsere WSO2 Enterprise Integrator-Installation für die Verbindung mit diesem ActiveMQ-Broker vorbereiten.
 Die folgenden Client-Bibliotheken kopieren Sie aus dem Verzeichnis <ACTIVEMQ_HOME>/lib in das Verzeichnis [EI_HOME]/lib. 


Für ActiveMQ 5.8.0 und höher müssen die folgenden Dateien kopiert werden: Bitte beachten Sie, dass einige Versionen der jar-Dateien eine höhere Versionsnummer haben (sie sind mit einem * gekennzeichnet und hängen von der ActiveMQ-Version ab, die Sie im ersten Schritt installiert haben).

  1. (*) activemq-broker-5.8.0.jar
  2. (*) activemq-client-5.8.0.jar
  3. (*) activemq-kahadb-store-5.8.0.jar  
  4. geronimo-jms_1.1_spec-1.1.1.jar 
  5. (*) geronimo-j2ee-management_1.1_spec-1.0.1.jar 
  6. geronimo-jta_1.0.1B_spec-1.0.1.jar
  7. (*) hawtbuf-1.9.jar
  8. (*) Slf4j-api-1.6.6.jar
  9. activeio-core-3.1.4.jar (available in the <ACTIVEMQ_HOME>/lib/optional directory)  

Dekommentieren Sie in der axis2.xml ([APIM-HOME]/conf/axis2) den JMS transportReceiver und JMS transportSender. Achten Sie darauf, dass Sie den vordefinierten ActiveMQ-basierten JMS transportReceiver unkommentiert lassen, da es weitere JMS transportReceiver-Alternativen in axis2.xml gibt.

Playing with ActiveMQ 4

Und aktivieren Sie auch den JMS transportSender.

Playing with ActiveMQ 5

Führen Sie einen Neustart des Enterprise Integrator durch, damit Ihre Änderungen wirksam werden.

Anwendungsbeispiel

Mein Anwendungsbeispiel ist ein Proxy mit einem Throttle-Mediator. Dieser Mediator lässt im Entscheidungszweig onAccept fünf Aufrufe pro Minute zu, alle anderen werden im Entscheidungszweig onReject verweigert.

Um die Meldungen, die gedrosselt werden, zu speichern, stelle ich sie in eine Queue und lese sie mit einem Proxy mit einem Intervall von einer Minute. So werden die durchlaufenden Nachrichten direkt in der Datenbank gespeichert und die anderen zu einem späteren Zeitpunkt hinzugefügt.

Allgemein ist davon auszugehen, dass die fünf Aufrufe pro Minute ein theoretisches Maximum darstellen. Da wir einen Proxy haben, der eine Queue mit einem Aufruf oder einer Nachricht pro Minute ausliest, können wir immer noch alle Nachrichten zustellen, ohne das Backend theoretisch abzuschalten. 

Erstellen einer Datenbank

Die Datensätze werden in einer Tabelle mit dem passenden Namen „Datensätze“ gespeichert. Den einzigen Wert, den ich einfüge, ist die Spalte DATE. Die ID und DTSTAMP werden automatisch eingefügt. Dies ist die MySQL-Definition:

DROP DATABASE active;
CREATE DATABASE active;
USE active;
CREATE TABLE IF NOT EXISTS records (
    ID INTEGER NOT NULL AUTO_INCREMENT  ,
    DATE VARCHAR(40) NOT NULL,
    DTSTAMP datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (ID)
 );

Proxy

Der Proxy, der die Meldungen empfängt, wird hier in der Entwurfsansicht gezeigt. Eine Nachricht kommt herein und wird entweder im OnAccept- oder im OnReject-Zweig an den Datenbank-Proxy (DBProxy) weitergeleitet, um einen Zeitstempel hinzuzufügen, und schließlich wird die Nachricht in der Queue gespeichert.

Playing with ActiveMQ 6
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ACProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" value="true"/>
            <throttle id="test">
                <policy>
                    <wsp:Policy wsu:id="WSO2MediatorThrottlingPolicy" xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
                        <throttle:MediatorThrottleAssertion xmlns:throttle="http://www.wso2.org/products/wso2commons/throttle">                            <throttle:MaximumConcurrentAccess>0</throttle:MaximumConcurrentAccess>
                            <wsp:Policy>
<throttle:ID throttle:type="IP">other</throttle:ID>
                                <wsp:Policy>
                                    <throttle:Control>
                                        <wsp:Policy>
<throttle:MaximumCount>5</throttle:MaximumCount>                                            <throttle:UnitTime>60000</throttle:UnitTime>
<throttle:ProhibitTimePeriod>0</throttle:ProhibitTimePeriod>
                                        </wsp:Policy>
                                    </throttle:Control>
                                </wsp:Policy>
                            </wsp:Policy>
                        </throttle:MediatorThrottleAssertion>
                    </wsp:Policy>
                </policy>
                <onReject>
                    <payloadFactory media-type="xml">
                        <format>
                            <DATE xmlns="">$1</DATE>
                        </format>
                        <args>
                            <arg evaluator="xml" expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")"/>
                        </args>
                    </payloadFactory>
                        <send>
                        <endpoint>
                            <address uri="jms:/mymessages?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue">
                                <suspendOnFailure>
                                    <initialDuration>-1</initialDuration>
                                    <progressionFactor>-1</progressionFactor>
                                    <maximumDuration>0</maximumDuration>
                                </suspendOnFailure>
                                <markForSuspension>
       <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                </markForSuspension>
                            </address>
                        </endpoint>
                    </send>
                </onReject>
                <onAccept>
                    <call>
                        <endpoint>
                            <address uri="http://localhost:8280/services/DBProxy">
                                <suspendOnFailure>
                                    <initialDuration>-1</initialDuration>
                                    <progressionFactor>-1</progressionFactor>
                                    <maximumDuration>0</maximumDuration>
                                </suspendOnFailure>
                                <markForSuspension>
       <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                </markForSuspension>
                            </address>
                        </endpoint>
                    </call>
                </onAccept>
            </throttle>
            <respond/>
        </inSequence>
    </target>
</proxy>

Der Quelltext zeigt die Parameter für den Throttle- und andere Vermittler, wie die Eigenschaft OUT_ONLY.
 Wie Sie im Endpunkt sehen können, geben wir einen ausführlichen JMS-Endpunkt-URI an. Der interessante Teil is der Queue-Name am Anfang. Dieser Queue-Name muss natürlich mit dem Queue-Namen übereinstimmen, der im readQueue-Proxy verwendet wird. Eine Abweichung würde dazu führen, dass die Nachrichten nicht an das vorgesehene Ziel zugestellt werden.

Der DBProxy speichert den Datensatz in der Datenbank;

Playing with ActiveMQ 7

Der Proxy prüft, ob bereits ein DATE-Element vorhanden ist, das in diesem Fall aus der Queue stammt. Ist dies nicht der Fall, wird das DATE abgerufen, indem die Eigenschaft SYSTEM_DATE abgerufen und in einer DATE-Eigenschaft gespeichert wird. Dieser DATE-Wert wird dann in die Datensatztabelle eingefügt. 

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="DBProxy" startOnLoad="true" transports="https http" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <filter xpath="$body//DATE!=''">
                <then>
                    <property expression="$body//DATE" name="DATE" />
                </then>
                <else>
                    <property expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")" name="DATE" />
                </else>
            </filter>
            <log level="custom">
                <property expression="get-property('DATE')" name="Value"/>
            </log>
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/active</url>
                        <user>root</user>
                        <password>root</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[insert into records (DATE) values (?)]]></sql>
                    <parameter expression="get-property('DATE')" type="CHAR"/>
                </statement>
            </dbreport>
            <log level="custom">
                <property expression="get-property('DATE')" name="Record Stored"/>
            </log>
            <property name="OUT_ONLY" value="true"/>
            <payloadFactory media-type="xml">
                <format>
                    <message xmlns="">stored</message>
                </format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
    </target></proxy> 

ReadQueue

Um aus der Queue zu lesen, habe ich einen neuen Proxy erstellt, der mit einem Intervall von einer Nachricht pro Minute eine Nachricht aus der Queue liest. Der Proxy lauscht auf den JMS-Transport und die Parameter (nur in der Sourceview sichtbar) definieren die Queue und die Lesegeschwindigkeit. 

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="readQueue" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" value="true"/>
            <log level="full"/>
            <call>
                <endpoint>
                    <address uri="http://localhost:8280/services/DBProxy">
                        <suspendOnFailure>
                            <initialDuration>-1</initialDuration>
                            <progressionFactor>-1</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </address>
                </endpoint>
            </call>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.Destination">mymessages</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>text/xml</default>
        </rules>
    </parameter>
    <parameter name="jms.proxy.throttle.enabled">true</parameter>
    <parameter name="jms.proxy.throttle.limitPerMinute">1</parameter>
    <parameter name="jms.proxy.throttle.mode">fixed-interval</parameter>
</proxy>
Playing with ActiveMQ 8

Der readQueue-Proxy sendet die Nachricht an den DBProxy-Endpunkt.

Ausprobieren

In diesem Beispiel wird eine leere SOAP-Nachricht verwendet, um einen Datensatz auszulösen, der entweder direkt oder über eine Queue in eine Datenbank eingefügt werden soll. Zum Auslösen dieser Einrichtung kann die Enterprise Integrator Try-It-Funktionalität verwendet werden. Betätigen Sie einfach 10 Mal innerhalb einer Minute die Taste Senden, um das folgende Ergebnis auszulösen:

Playing with ActiveMQ 9

Wenn alles wie erwartet funktioniert, wird das Ergebnis in der Tabelle „Datensätze“ angezeigt. Ich habe 10 Datensätze übermittelt. Fünf davon werden direkt hinzugefügt, die übrigen fünf werden ausgeblendet. Die DTSTAMP-Spalte zeigt, wann der Datensatz zur Datenbank hinzugefügt wurde, und die DATE-Spalte zeigt den Zeitstempel, den wir im ACProxy gesetzt haben. Hier sehen wir, dass die letzten 4 Nachrichten mit einem Intervall von 1 Minute gespeichert wurden.

MariaDB [active]> select * from records;
+----+-------------------------------+---------------------+
| ID | DATE                          | DTSTAMP             |
+----+-------------------------------+---------------------+
|  1 | 2021-06-10T13:36:42.448+02:00 | 2021-06-10 13:36:42 |
|  2 | 2021-06-10T13:36:43.421+02:00 | 2021-06-10 13:36:43 |
|  3 | 2021-06-10T13:36:44.349+02:00 | 2021-06-10 13:36:44 |
|  4 | 2021-06-10T13:36:45.269+02:00 | 2021-06-10 13:36:45 |
|  5 | 2021-06-10T13:36:46.203+02:00 | 2021-06-10 13:36:46 |
|  6 | 2021-06-10T13:36:47.041+02:00 | 2021-06-10 13:37:00 |
|  7 | 2021-06-10T13:36:50.366+02:00 | 2021-06-10 13:38:00 |
|  8 | 2021-06-10T13:36:51.536+02:00 | 2021-06-10 13:39:00 |
|  9 | 2021-06-10T13:36:52.503+02:00 | 2021-06-10 13:40:00 |
+----+-------------------------------+---------------------+
9 rows in set (0.00 sec)

Fazit

Dieser Blog ist eine einfache Übung zum Speichern von Nachrichten in einer Queue zur späteren Verarbeitung. Der Throttle-Mediator hat eine binäre Situation geschaffen: Es geht durch … oder nicht. Das ist nur ein Beispiel dafür, wie eine JMS-Message-Queue verwendet werden kann.