fb
WSO2 Enterprise Integrator 9 Minuten

RabbitMQ mit dem WSO2 Enterprise Integrator einrichten

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
RabbitMQ WSO2 Enterprise Integrator
Scrollen

Für nahezu jedes Unternehmen ist die Anwendung von Queues und Nachrichtenspeichern ein wesentlicher Bestandteil der IT-Umgebung. Hintergrund für diese Aussage ist, dass nicht jeder Prozess synchron ist. Gelegentlich müssen Nachrichten gespeichert werden und es existiert ein Integrationsmuster, welches als Dead Letter Channel bezeichnet wird. Dieses beschreibt die Situation, dass die Nachricht an einen Endpunkt zugestellt werden muss, der nicht verfügbar ist. 

RabbitMQ 1

Das sind alles Szenarien aus der Praxis. Anders ausgedrückt: In jedem Unternehmen wird es Situationen geben, bei denen Sie nicht in der Lage sind, Meldungen zuzustellen, a-synchrone Prozesse haben und Meldungen speichern müssen. 

Das Enterprise Integration Pattern (EIP) Dead Letter Channel 

Das Enterprise Integration Pattern Dead Letter Channel ist etwas, das Sie nicht lösen können, in dem Sie die Nachricht einfach weglassen oder sagen, na ja, Pech gehabt und es noch einmal versuchen. Das ist etwas, um das Sie sich kümmern müssen. Meiner Meinung nach wird dies mit einem messageStore oder einer Queue getan. Wenn eine Meldung nicht zugestellt werden kann, wird sie in der Queue bis zu dem Zeitpunkt gespeichert, an dem sie zugestellt werden kann.

Ablegen von Nachrichten in Queues

Sie können einen Nachricht in die Queue legen und einen anderen Prozess zu einem späteren Zeitpunkt zustellen lassen. So können Sie die Anzahl der Wiederholungen maximieren und Sie können auch eine garantierte Zustellung umsetzen. 

Was ist also eine Queue? In ihrer einfachsten Form ist sie ein Teil des Speichers. Dieser Speicherplatz kann ein Teil des internen Speichers sein. Es kann ein echtes Message-Queue-Produkt sein, oder auch in einer Datenbank. Der Gedanke ist, dass Nachrichten in einer Queue zu einem späteren Zeitpunkt durch einen anderen Prozess abgeholt und hoffentlich zugestellt werden können.

Der Enterprise Integrator von WSO2 unterstützt eine Reihe von Message Queues. Es gibt also activeMQ, WSO2 Message Broker, Apache qpid und RabbitMQ. Grundsätzlich aber auch andere Systeme, die sich an das AMQP-Protokoll halten.

RabbitMQ

In diesem Blog werden wir uns eine Implementierung von RabbitMQ in Verbindung mit dem Enterprise Integrator ansehen. Wir erstellen einen Proxy, der eine Nachricht in eine Queue stellt. Und wir erstellen einen weiteren Proxy, der die Nachricht aus der Queue nimmt und sie weiterschickt. Das ist etwas, das wir asynchron nennen, das heißt, dass es keinen direkten Anfrage-Antwort-Mechanismus gibt. Die Message wird gespeichert und zu einem späteren Zeitpunkt abgeholt. 

Welche Anwendungsfälle gibt es also, außer dem bereits erwähnten Dead Letter Channel? Es könnte ja sein, dass die Antwort nicht direkt verfügbar ist. Angenommen, Sie reichen einen Kostenvoranschlag für die Renovierung Ihres Hauses ein. Irgendjemand muss es sich ansehen, eine Berechnung vornehmen und sie Ihnen schicken. Dies ist ein Beispiel für einen Nachrichtenspeicher eines Nachrichtenprozessors.

RabbitMQ installieren

Wir installieren es auf Centos 7. Eine Installation unter Windows ist ebenfalls möglich, benötigt aber Chocolatey. 

Zunächst erstellen wir ein Repos mit den rabbitmq-erlang-Daten und den rabbitmq-server-Daten. Dieser Befehl erstellt das /etc/yum.repos.d/rabbitmq.repo auf einer Centos7-Umgebung.

cat <<EOF | sudo tee /etc/yum.repos.d/rabbitmq.repo 
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
gpgcheck=0
enabled=1

[rabbitmq_rabbitmq-server]
name=rabbitmq_rabbitmq-server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/\$basearch
gpgcheck=0
enabled=1
EOF

Der Rest der Prozedur ist recht einfach und besteht aus drei Schritten. Alle erfolgen mit erhöhten (sudo) Rechten. 

1. sudo yum install -y rabbitmq-server
2. sudo systemctl enable --now rabbitmq-server
3. sudo rabbitmq-plugins enable rabbitmq_management

Schritt 1 installiert RabbitMQ, Schritt 2 aktiviert RabbitMQ, um beim Booten gestartet zu werden und Schritt 3 aktiviert die Management-Konsole. Probieren wir es aus, indem wir auf http://localhost:15672/#/ zugreifen und uns mit Gast-Zugangsdaten anmelden.

RabbitMQ 2

Es funktioniert, aber ein Gast-Konto als Admin ist seltsam. Lassen Sie uns den Admin-Benutzer hinzufügen und den Gast deaktivieren. In einer Terminalsitzung geben Sie diese Zeilen ein:

sudo rabbitmqctl add_user admin
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
sudo rabbitmqctl delete_user guest

Überprüfen Sie in der Admin-Konsole die vorzunehmenden Änderungen. Wie Sie erkennen können, können Sie einen Benutzer auch über die Konsole hinzufügen.

RabbitMQ 3

Enterprise Integrator einrichten

Der Enterprise Integrator nutzt die Datei axis2.xml in [EI-HOME]/conf/axis2, um diesen speziellen Transport für Empfänger und Sender zu ermöglichen. Neben der Standard-Factory habe ich eine neue Connection Factory (BlogFactory) erstellt. Wie Sie sehen, habe ich eine Reihe von Parametern zum Setup für den Sender hinzugefügt.

RabbitMQ 4
RabbitMQ 5

Laden Sie https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.9.0/amqp-client-5.9.0.jar herunter und legen Sie es in das lib-Verzeichnis.

Speichern Sie die Datei axis2 und starten Sie den Enterprise Integrator mit dem Befehl sh integrator.sh. Wenn wir keine Fehler erhalten, dann sollten wir etwas Ähnliches wie das hier sehen: 

[2020-06-02 17:07:47,247]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory} - RabbitMQ ConnectionFactory : AMQPConnectionFactory initialized

[2020-06-02 17:07:47,249]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQListener} - RabbitMQ AMQP Transport Receiver initialized...

Jetzt können wir anfangen, mit einer Queue zu arbeiten. Der Einfachheit halber werden wir eine Nachricht in eine Queue legt und sie wieder entfernen, wobei ein asynchroner Fluss entsteht. Dies geschieht gegen einen Datenbank-Datensatz, der als Soap-Nachricht gesendet und am Ende in der Datenbank abgelegt wird. Wir können den Datensatz in der Datenbank abfragen, um ihn zu sehen.

Wir legen eine einfache Datenbank an. In den Niederlanden findet ein Musikfestival namens Down The Rabbithole statt. Wir erstellen also eine Aufstellungstabelle mit Bands und Sängern. Wir werden dafür MySQL verwenden. Ändern Sie die SQL-Anweisungen bei Bedarf an Ihre rdbms. 

create database rabbithole;
use rabbithole;

CREATE TABLE `bands` (
 `bandid` INT(8) NOT NULL AUTO_INCREMENT,
 `bandname` VARCHAR(50),
 `country` VARCHAR(20),
 PRIMARY KEY (`bandid`)
);

insert into bands (bandid, bandname, country) values (1001000, "Charli XCX", "United Kingdom");

Um MySQL nutzen zu können, brauchen wir den MySQL JDBC-Treiber. Diese Datei (Download von hier) in das Verzeichnis [EI-HOME]/lib kopieren und das EI neu starten. Verwenden Sie in diesem Fall den Befehl sh integrator.sh -DosgiConsole, um den Treiber zu überprüfen.

Warten Sie, bis das EI gestartet ist, und drücken Sie „Enter“. Geben Sie lb mysql ein:

osgi> lb mysql

START LEVEL 6

   ID|State      |Level|Name

  105|Active     |    4|mysql_connector_java_5.1.39_bin (1.0.0)

osgi>

Dies zeigt, dass der Treiber geladen ist.

Erstellen einer API

Wir werden nun eine API erstellen, die eine Meldung an den RabbitMQ-Endpunkt sendet. Wir zeigen Ihnen nicht vollständig, wie eine API in Integration Studio erstellt wird, sondern wir zeigen das Design und den Quellcode. 

RabbitMQ 6
<api xmlns="http://ws.apache.org/ns/synapse" name="RabbitAPI" context="/new">
   <resource methods="POST">
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <log level="full"/>
         <call>
            <endpoint>
               <address uri="rabbitmq:/?rabbitmq.connection.factory=BlogFactory&rabbitmq.queue.name=Rabbithole"/>
            </endpoint>
         </call>
         <respond/>
      </inSequence>
      <outSequence/>
      <faultSequence/>
   </resource>
</api>

Ich zeige die Nutzlast mit dem Log-Mediator und setze nur aus, da ich keine Antwort vom Aufruf erhalte. Letztlich antworte ich nur auf den Client.

Die Entwicklung erfolgt im Integration Studio, ich habe mich aber dafür entschieden, sie in die Management UI des Enterprise Integrators zu kopieren. 

Dazu gehen Sie in die Management UI des Enterprise Integrators und klicken auf ‚API‘, ‚Add – API‘.

RabbitMQ 7

Klicken Sie auf ‚Switch to source view‘ und kopieren Sie den obigen Code direkt in das Fenster. Klicken Sie auf ‚Speichern‘.

RabbitMQ 8
RabbitMQ 9

Speichern Sie die API.

Eine Band hinzufügen

Wenn wir eine Band hinzufügen möchten, können wir sie mit dieser API in die Queue legen. Dabei müssen wir sicherstellen, dass wir die richtige Payload haben. Mit richtig sind in dieser Situation die Werte gemeint, die wir einfügen müssen. In diesem Fall brauchen wir einen JSON-Payload mit zwei Feldern: Bandname und Land. Wie kann ich das wissen? Na ja, wenn Sie sich die Datenbank ansehen, werden Sie sehen, dass bandid ein Feld mit Autoinkrement ist. In diesem Falle werde ich die beiden Felder mit einem DB Report Mediator in die Datenbank einfügen. Das Design benutzt die Namen, wie sie in der Datenbank verwendet werden.

Die Nachricht in Enterprise Integrator konsumieren 

Wenn man eine Nachricht in eine Queue stellt, muss man sie auch aus der Queue lesen.

Dies ist der Proxy, der die Queue von RabbitMQ Rabbithole liest. Bitte beachten Sie, dass der Transport speziell Rabbitmq und nicht JMS ist!

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ReadRabbitMQ" startOnLoad="true" trace="enable" transports="rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property expression="json-eval($.bandname)" name="bandname" scope="default" type="STRING"/>
            <property expression="json-eval($.country)" name="country" scope="default" type="STRING"/>
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/rabbithole</url>
                        <user>root</user>
                        <password>root</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[INSERT INTO bands(bandname,country) VALUES(?,?)]]></sql>
                    <parameter expression="get-property('bandname')" type="CHAR"/>
                    <parameter expression="get-property('country')" type="CHAR"/>
                </statement>
            </dbreport>
            <log/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="rabbitmq.exchange.name">exchange</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="rabbitmq.queue.name">Rabbithole</parameter>
    <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
    <parameter name="rabbitmq.queue.route.key">route</parameter>
</proxy>

Wir fügen eine Nachricht mit SoapUI ein, wobei die Beatles aus Großbritannien eine Band sein werden, die der Liste hinzugefügt wird.

RabbitMQ 10

Es gibt keine Antwort, da dies nun ein asynchroner Dienst ist. Der Datensatz ist aber gespeichert! Wir können die Tabelle Bands überprüfen

RabbitMQ 11

RabbitMQ

RabbitMQ ist relativ einfach mit dem Enterprise Integrator zu verbinden. Es müssen einige spezifische Einstellungen vorgenommen werden, wie z. B. der Transport, der auf RabbitMQ eingestellt werden muss. Das hängt mit der speziellen JAR-Datei zusammen, die wir dem Enterprise Integrator hinzugefügt haben. Das in diesem Blog gegebene Beispiel ist recht einfach, aber RabbitMQ kann sicherlich auch in komplexeren Szenarien verwendet werden.