WSO2 Enterprise Integrator 9 minuten

RabbitMQ in combinatie met de WSO2 Enterprise Integrator gebruiken

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
RabbitMQ WSO2 Enterprise Integrator
Scroll

Voor bijna iedere enterprise is het gebruik van wachtrijen en message stores een essentieel onderdeel in het IT-landschap. De redenatie achter deze uitspraak ligt in het feit dat niet alle processen synchroon lopen. Soms moeten berichten opgeslagen worden of gebruik je het integratiepatroon dat dead letter channel genoemd wordt. Hierin staat de situatie beschreven dat het bericht afgeleverd moet worden op een eindpunt dat niet beschikbaar is. 

RabbitMQ 1

Dit zijn allemaal real-life scenario’s. In andere woorden: voor iedere organisatie zullen er situaties voorkomen waarin je niet alle berichten direct kunt afleveren of a-synchrone processen gebruikt en je berichten zult moeten opslaan.

Het Enterprise Integration Pattern (EIP) Dead Letter Channel

Het Enterprise Integration patroon ‘Dead Letter Channel’ is een patroon dat niet zomaar opgelost kan worden door het bewuste bericht te laten vervallen, of door te zeggen: “Helaas, probeer het nog een keer.” Het is iets waar je daadwerkelijk wat mee moet doen. Naar mijn mening doe je dat het beste met een message store of een wachtrij (queue). Wanneer een bericht niet afgeleverd kan worden, zal de wachtrij de eenheid en de tijd opslaan tot het moment dat het afgeleverd kan worden.

Berichten in wachtrijen zetten

Ze kunnen berichten in de wachtrij zetten en door een ander proces op een later tijdstip laten afleveren. Je kunt het aantal pogingen instellen en zelfs een gegarandeerde aflevering implementeren.

Dus, wat is een wachtrij precies? In uitgeklede vorm is het een stukje opslag. Die opslagruimte kan deel zijn van je interne geheugen, het kan een message queue product zijn of zelfs in een database zitten. Het idee is dat berichten in een wachtrij later opgepikt kunnen worden door een ander proces en zo hopelijk afgeleverd kunnen worden.

De Enterprise integrator van WSO2 ondersteunt een aantal berichtenwachtrijen, waaronder activeMQ, WSO2 Message Broker, Apache qpid en RabbitMQ. Maar eigenlijk kunnen alle systemen die het AMQP-protocol volgen aangesloten worden.

RabbitMQ

In deze blog gaan we kijken naar een implementatie van RabbitMQ in samenspel met de Enterprise Integrator. We gaan daadwerkelijk een proxy aanmaken die een bericht in een wachtrij gaat zetten. En we gaan een andere proxy creëren die vervolgens dat bericht uit de wachtrij haalt en het doorstuurt. Dit is een proces dat we asynchroon noemen, wat inhoudt dat er geen direct aanvraag-respons mechanisme is. Het bericht wordt opgeslagen en op een later moment weer opgepikt.

Welke use cases passen hierbij, behalve het eerder genoemde dead letter channel? Het zou kunnen zijn dat het antwoord niet meteen beschikbaar is. Laten we zeggen dat je een offerte aanvraagt voor de herinrichting van je huis. Daar zal iemand naar moeten kijken, een berekening moeten uitvoeren en stuurt die naar je terug. Dat is een voorbeeld waarin een bericht verwerkt en opgeslagen wordt.

RabbitMQ installeren

We gaan RabbitMQ installeren op Centos 7. Een Windows installatie ook mogelijk, maar dat gebruikt Chocolatey. Laten we eerst een repo aanmaken met de zowel de rabbitmq-erlang data al de rabbitmq-server data. Dit commando maakt de /etc/yum.repos.d/rabbitmq.repo aan in een Centos7-omgeving.

cat <<EOF | sudo tee /etc/yum.repos.d/rabbitmq.repo 
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
gpgcheck=0
enabled=1

[rabbitmq_rabbitmq-server]
name=rabbitmq_rabbitmq-server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/\$basearch
gpgcheck=0
enabled=1
EOF

De rest van de procedure is gemakkelijk en gaat in drie stappen. Iedere stap wordt gedaan met verhoogde (sudo-)rechten. 

1. sudo yum install -y rabbitmq-server
2. sudo systemctl enable --now rabbitmq-server
3. sudo rabbitmq-plugins enable rabbitmq_management

Stap 1 installeert RabbitMQ, stap 2 zorgt ervoor dat RabbitMQ start bij het opstarten en stap 3 schakelt de management console in. Laten we het uitproberen door naar http://localhost:15672/#/ te gaan en in te loggen met de inloggegevens: guest/guest.

RabbitMQ 2

Het werkt, maar als admin op een gastaccount werken is vreemd. Laten we daarom de admin gebruiker toevoegen en ‘guest’ deactiveren. Voer deze regels in in een terminalsessie:

sudo rabbitmqctl add_user admin
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
sudo rabbitmqctl delete_user guest

Check de admin console voor de te maken wijzigingen. Zoals je kunt zien, kun je de gebruiker ook via de console toevoegen.

RabbitMQ 3

De Enterprise Integrator instellen

De Enterprise Integrator maakt gebruik van het bestand axis2.xml in de map [EI-HOME]/conf/axis2 om dit specifieke transport voor zowel de Receiver en Sender mogelijk te maken. Ik heb een nieuwe Connection Factory (BlogFactory) naast de default factory aangemaakt. Merk op dat ik een aantal parameters aan de set-up voor de Sender heb toegevoegd.

RabbitMQ 4
RabbitMQ 5

Download de https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.9.0/amqp-client-5.9.0.jar en zet het in de lib-map. Sla het axis2-bestand op en start Enterprise Integrator met het sh integrator.sh commando. Als we geen foutmeldingen krijgen, zouden we iets dergelijks te zien moeten krijgen: 

[2020-06-02 17:07:47,247]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory} - RabbitMQ ConnectionFactory : AMQPConnectionFactory initialized

[2020-06-02 17:07:47,249]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQListener} - RabbitMQ AMQP Transport Receiver initialized...

We kunnen nu met de wachtrij aan de slag. Om het eenvoudig te houden gaan we een bericht in een wachtrij zetten en het eruit halen door een asynchrone flow te creëren. We doen dit voor een database registratie die verzonden wordt als een SOAP-bericht en uiteindelijk in de database wordt gezet. We kunnen een query gebruiken op de database om de registratie in te zien.

We maken een simpele database. In Nederland is er een muziekfestival dat Down The Rabbithole heet. Laten we daar een tabel voor maken met een line-up aan bands en zangers. We doen dat met MySQL. Verander de SQL-statements voor je rdbms, indien nodig.

create database rabbithole;
use rabbithole;

CREATE TABLE `bands` (
 `bandid` INT(8) NOT NULL AUTO_INCREMENT,
 `bandname` VARCHAR(50),
 `country` VARCHAR(20),
 PRIMARY KEY (`bandid`)
);

insert into bands (bandid, bandname, country) values (1001000, "Charli XCX", "United Kingdom");

Om MySQL te kunnen gebruiken, hebben we de MySQL JDBC driver nodig. kopieer dit bestand (download de driver hier) naar de [EI-HOME]/lib map start de EI opnieuw. In dit geval gebruiken we het sh integrator.sh -DosgiConsole commando om de driver te controleren.

Wacht totdat de EI opgestart is en druk op ‘Enter’. Type in: lb mysql:

osgi> lb mysql

START LEVEL 6

   ID|State      |Level|Name

  105|Active     |    4|mysql_connector_java_5.1.39_bin (1.0.0)

osgi>

Dat laat zien dat de driver geladen is.

Een API maken

We zullen nu een API maken die een bericht naar het RabbitMQ eindpunt stuurt. Ik praat je niet helemaal door het proces hoe we een API maken in de Integration Studio, maar laat je liever het ontwerp van de broncode zien. 

RabbitMQ 6
<api xmlns="http://ws.apache.org/ns/synapse" name="RabbitAPI" context="/new">
   <resource methods="POST">
      <inSequence>
         <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
         <log level="full"/>
         <call>
            <endpoint>
               <address uri="rabbitmq:/?rabbitmq.connection.factory=BlogFactory&rabbitmq.queue.name=Rabbithole"/>
            </endpoint>
         </call>
         <respond/>
      </inSequence>
      <outSequence/>
      <faultSequence/>
   </resource>
</api>

Ik laat de payload met de log mediator zien en begin alleen omdat ik geen respons op de oproep krijg. Uiteindelijk reageer ik alleen naar de client.

De ontwikkeling is klaar in de Integration Studio, maar ik heb ervoor gekozen om het naar de Management UI van de Enterprise Integrator te kopiëren. 

Dat is eenvoudig. Ga naar de Management UI van de Enterprise Integrator en klik daar op ‘API’, ‘Add – API’.

RabbitMQ 7

Klik op vervolgens op ‘Switch to source view’ en kopieer de bovenstaande code direct in het scherm. Klik op ‘Save’.

RabbitMQ 8
RabbitMQ 9

Sla de API op.

Een band toevoegen

We kunnen nu de API gebruiken als we een band toe willen voegen aan de wachtrij. We moeten ervoor zorgen dat het de juiste payload heeft. Dat zijn in deze situatie de waarden die we moeten invoeren. In dit geval hebben we een JSON payload nodig met twee velden: ‘bandname’ en ‘country’. Hoe weet ik dat? Nou, als je kijkt naar de database, dan zul je opmerken dat ‘bandid’ een veld is met een auto increment. In dit geval ga ik de twee velden in de database invoegen met een DB Report Mediator. Het design gebruikt de namen zoals ze in de database gebruikt worden.

Het bericht in de Enterprise Integrator 

Als je een bericht in een wachtrij zet, zul je het ook weer uit de wachtrij moeten halen of uitlezen.

Dit is de proxy die de RabbitMQ Rabbithole Queue uitleest. Merk op dat het transport specifiek rabbitmq is en geen JMS!

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ReadRabbitMQ" startOnLoad="true" trace="enable" transports="rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property expression="json-eval($.bandname)" name="bandname" scope="default" type="STRING"/>
            <property expression="json-eval($.country)" name="country" scope="default" type="STRING"/>
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/rabbithole</url>
                        <user>root</user>
                        <password>root</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[INSERT INTO bands(bandname,country) VALUES(?,?)]]></sql>
                    <parameter expression="get-property('bandname')" type="CHAR"/>
                    <parameter expression="get-property('country')" type="CHAR"/>
                </statement>
            </dbreport>
            <log/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="rabbitmq.exchange.name">exchange</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="rabbitmq.queue.name">Rabbithole</parameter>
    <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
    <parameter name="rabbitmq.queue.route.key">route</parameter>
</proxy>

We plaatsen een bericht met behulp van SoapUI waar de Beatles uit het VK een band is die aan de lijst wordt toegevoegd.

RabbitMQ 10

Er is geen respons, omdat dit nu een asynchrone service is, maar de registratie is opgeslagen! We kunnen de bands in de tabel bekijken.

RabbitMQ 11

RabbitMQ

RabbitMQ is relatief makkelijk te integreren met de Enterprise Integrator. Er zijn een aantal specifieke settings die je in moet stellen, zoals het transport dat op RabbitMQ gezet moet worden. Dit is gerelateerd aan het speciale JAR-bestand dat we aan de Enterprise Integrator hebben toegevoegd. Het voorbeeld dat we in deze blog hebben gegeven is relatief eenvoudig, maar RabbitMQ kan uiteraard gebruikt worden voor veel complexere scenarios.