info@yenlo.com
ned
Menu
WSO2 10 min

Spelen met ActiveMQ

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
Playing with ActiveMQ

We gaan ActiveMQ als broker gebruiken voor de opzet die we in deze blog bespreken. Daarbij gaan we een voorbeeldproxy opzetten die een bericht naar een wachtrij gaat sturen. Een andere proxy gaat berichten afluisteren die in de wachtrij staan en ze dan doorsturen naar een database. Door deze combinatie kunnen we zien hoe de interactie met een JMS-endpoint werkt, hoe we een OUT_ONLY kunnen aanroepen en hoe we een bericht vanuit een wachtrij naar een andere dienst kunnen sturen.

Als je ActiveMQ nog niet geïnstalleerd had staan, installeer de software dan eerst. De onderstaande installatie is voor een Linux-omgeving en zal versie 5.15.8 installeren, maar je kunt ook een andere recente versie van ActiveMQ v5 gebruiken. Zelfs Apache Artemis kan gebruikt worden, maar let er dan op dat er een aantal veranderingen in jar-bestanden en in de configuratie nodig zijn, dus raad ik aan om het bij ActiveMQ v5 te houden als je de stappen in deze blog volgt.

Pas in het vervolg de locaties van de installatie aan, zodat het aansluit bij jouw systeem en voorkeuren. Ik installeer het op mijn desktop op Centos 7.

wget http://archive.apache.org/dist/activemq/5.15.8/apache-activemq-5.15.8-bin.tar.gz -P /opt/wso2/Downloads
tar -C /opt/wso2/Desktop -zxvf /opt/wso2/Downloads/apache-activemq-5.15.8-bin.tar.gz
mv /opt/wso2/Desktop/apache-activemq-5.15.8 /opt/wso2/Desktop/active
cd /opt/wso2/Desktop/active/bin
./activemq console

Het laatste commando hierboven zal ActiveMQ in de opdrachtregel console-modus opstarten. Hierdoor krijg je de ActiveMQ log-output direct als output bij de opdrachtregels te zien. Nadat ActiveMQ opgestart is, open je de web-based adminconsole door naar http://localhost:8161 te gaan. Je kunt de URL in de output van de opdrachtregels zien staan bij een opmerking in de trend van ‘INFO message | ActiveMQ WebConsole available at http://0.0.0.0:8161/’.

Playing with ActiveMQ 1

Log in met de gegevens admin/admin.

Playing with ActiveMQ 2

Je zou nu de console in de browser moeten zien.

Playing with ActiveMQ 3

Het is nu tijd om onze WSO2 Enterprise Integrator installatie voor te bereiden en dan verbinding te maken met deze ActiveMQ broker. Kopieer de volgende bibliotheken vanuit de map <ACTIVEMQ_HOME>/lib naar de map [EI_HOME]/lib.

Voor ActiveMQ 5.8.0 en hoger dienen de onderstaande bestanden gekopieerd te worden. Merk op dat sommige versies van de jar-bestanden hogere versienummers hebben (ze worden met een * aangeduid en zijn afhankelijk van de ActiveMQ versie die je in de eerste stap hebt geïnstalleerd).

  1. (*) activemq-broker-5.8.0.jar
  2. (*) activemq-client-5.8.0.jar
  3. (*) activemq-kahadb-store-5.8.0.jar  
  4. geronimo-jms_1.1_spec-1.1.1.jar 
  5. (*) geronimo-j2ee-management_1.1_spec-1.0.1.jar 
  6. geronimo-jta_1.0.1B_spec-1.0.1.jar
  7. (*) hawtbuf-1.9.jar
  8. (*) Slf4j-api-1.6.6.jar
  9. activeio-core-3.1.4.jar (available in the <ACTIVEMQ_HOME>/lib/optional directory)  

Haal in axis2.xml ([APIM-HOME]/conf/axis2 de commentaartekens weg voor JMS transportReceiver en JMS transportSender. Let op dat je de voorgedefinieerde commentaartekens van de ActiveMQ based JMS transportReceiver weghaalt, want er zijn er meer JMS transportReceiver alternatieven in axis2.xml.

Playing with ActiveMQ 4

En activeer ook de JMS transportSender.

Playing with ActiveMQ 5

Start de Enterprise Integrator opnieuw, zodat deze wijzigingen doorgevoerd worden.

Use Case

Mijn use case is een proxy met een throttle mediator. Deze mediator staat vijf aanroepen per minuut toe op de onAccept beslissingstak, alle verdere aanroepen komen in de onReject beslissingstak en worden genegeerd.

Om de berichten die afgewezen worden op te slaan, zetten we ze in een wachtrij en lezen we ze met een proxy met een interval van één minuut. Op die manier worden de berichten die gelijk doorkomen meteen in de database opgeslagen en de andere berichten er later aan toegevoegd.

Het idee is dat de vijf aanroepen per minuut een theoretisch maximum is. Omdat we een proxy hebben die de berichten uit een berichtenwachtrij afleest met één aanroep of bericht per minuut, kunnen we nog steeds alle berichten behandelen zonder de back-end te overbelasten. 

Een database maken

De registraties zullen opgeslagen worden in een tabel die de gepaste naam ‘records’ draagt. De enige waarde die ik ga invoeren is de DATE kolom voor datums, want ID en DTSTAMP worden automatisch gedaan. Dit is de MySQL definitie:

DROP DATABASE active;
CREATE DATABASE active;
USE active;
CREATE TABLE IF NOT EXISTS records (
    ID INTEGER NOT NULL AUTO_INCREMENT  ,
    DATE VARCHAR(40) NOT NULL,
    DTSTAMP datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (ID)
 );

De proxy

De proxy die de berichten ontvangt wordt hier in Design View getoond. Het is relatief eenvoudig: er komt een bericht binnen en gaat ofwel naar de database proxy (DBProxy) in de OnAccept tak of het komt in de onReject tak en het wordt doorgegeven aan een payload mediator om er een timestamp aan toe te voegen, waarna het bericht uiteindelijk wordt opgeslagen in de wachtrij.

vervanging
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ACProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" value="true"/>
            <throttle id="test">
                <policy>
                    <wsp:Policy wsu:id="WSO2MediatorThrottlingPolicy" xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
                        <throttle:MediatorThrottleAssertion xmlns:throttle="http://www.wso2.org/products/wso2commons/throttle">                            <throttle:MaximumConcurrentAccess>0</throttle:MaximumConcurrentAccess>
                            <wsp:Policy>
<throttle:ID throttle:type="IP">other</throttle:ID>
                                <wsp:Policy>
                                    <throttle:Control>
                                        <wsp:Policy>
<throttle:MaximumCount>5</throttle:MaximumCount>                                            <throttle:UnitTime>60000</throttle:UnitTime>
<throttle:ProhibitTimePeriod>0</throttle:ProhibitTimePeriod>
                                        </wsp:Policy>
                                    </throttle:Control>
                                </wsp:Policy>
                            </wsp:Policy>
                        </throttle:MediatorThrottleAssertion>
                    </wsp:Policy>
                </policy>
                <onReject>
                    <payloadFactory media-type="xml">
                        <format>
                            <DATE xmlns="">$1</DATE>
                        </format>
                        <args>
                            <arg evaluator="xml" expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")"/>
                        </args>
                    </payloadFactory>
                        <call>
                        <endpoint>
                            <address uri="jms:/mymessages?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue">
                                <suspendOnFailure>
                                    <initialDuration>-1</initialDuration>
                                    <progressionFactor>-1</progressionFactor>
                                    <maximumDuration>0</maximumDuration>
                                </suspendOnFailure>
                                <markForSuspension>
       <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                </markForSuspension>
                            </address>
                        </endpoint>
                    </call>
                </onReject>
                <onAccept>
                    <call>
                        <endpoint>
                            <address uri="http://localhost:8280/services/DBProxy">
                                <suspendOnFailure>
                                    <initialDuration>-1</initialDuration>
                                    <progressionFactor>-1</progressionFactor>
                                    <maximumDuration>0</maximumDuration>
                                </suspendOnFailure>
                                <markForSuspension>
       <retriesBeforeSuspension>0</retriesBeforeSuspension>
                                </markForSuspension>
                            </address>
                        </endpoint>
                    </call>
                </onAccept>
            </throttle>
            <respond/>
        </inSequence>
    </target>
</proxy>

De bron laat de parameters voor de throttle mediator en de andere mediators zien, waaronder de instelling OUT_ONLY. Zoals je kunt zien aan het eindpunt, specificeren we uitgebreid een JMS eindpunt URI. Het interessante deel is de naam van de wachtrij in het begin. Het is logisch dat de naam van de wachtrij hetzelfde moet zijn als de naam van de wachtrij die gebruikt wordt in de readQueue proxy. Een verschil zou ertoe leiden dat berichten niet afgeleverd worden op de bedoelde eindbestemming.

De DBProxy slaat de registraties op in de database:

Playing with ActiveMQ 7

De proxy controleert of er al een DATE-element is. Zo ja, dan kwam het uit de wachtrij, en zo niet, dan leest het de datum af uit de SYSTEM_DATE eigenschap en slaat dat op als DATE-eigenschap. Deze waarde voor DATE wordt vervolgens in de ‘records’ tabel gezet. 

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="DBProxy" startOnLoad="true" transports="https http" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <filter xpath="$body//DATE!=''">
                <then>
                    <property expression="$body//DATE" name="DATE" />
                </then>
                <else>
                    <property expression="get-property("SYSTEM_DATE", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")" name="DATE" />
                </else>
            </filter>
            <log level="custom">
                <property expression="get-property('DATE')" name="Value"/>
            </log>
            <dbreport>
                <connection>
                    <pool>
                        <driver>com.mysql.jdbc.Driver</driver>
                        <url>jdbc:mysql://localhost:3306/active</url>
                        <user>root</user>
                        <password>root</password>
                    </pool>
                </connection>
                <statement>
                    <sql><![CDATA[insert into records (DATE) values (?)]]></sql>
                    <parameter expression="get-property('DATE')" type="CHAR"/>
                </statement>
            </dbreport>
            <log level="custom">
                <property expression="get-property('DATE')" name="Record Stored"/>
            </log>
            <property name="OUT_ONLY" value="true"/>
            <payloadFactory media-type="xml">
                <format>
                    <message xmlns="">stored</message>
                </format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
    </target></proxy> 

ReadQueue

Om de wachtrij te kunnen uitlezen, heb ik een nieuwe proxy gemaakt die met intervallen van één minuut berichten in de wachtrij uitleest. De proxy luistert naar de JMS transport en de parameters (alleen zichtbaar in de sourceview) bepalen de wachtrij en de snelheid van het uitlezen.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="readQueue" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" value="true"/>
            <log level="full"/>
            <call>
                <endpoint>
                    <address uri="http://localhost:8280/services/DBProxy">
                        <suspendOnFailure>
                            <initialDuration>-1</initialDuration>
                            <progressionFactor>-1</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </address>
                </endpoint>
            </call>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.Destination">mymessages</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>text/xml</default>
        </rules>
    </parameter>
    <parameter name="jms.proxy.throttle.enabled">true</parameter>
    <parameter name="jms.proxy.throttle.limitPerMinute">1</parameter>
    <parameter name="jms.proxy.throttle.mode">fixed-interval</parameter>
</proxy>
Playing with ActiveMQ 8

De readQueue proxy stuurt het vervolgens naar het DBProxy-eindpunt.

Uitproberen

Dit voorbeeld gebruikt een leeg SOAP Bericht om een record te triggeren en aan de database toe te voegen, ofwel direct, of via de wachtrij. De Try-It functie van de Enterprise Integrator is binnen deze set-up te gebruiken. Klik gewoon 10 keer binnen een minuut op ‘Send’ en je krijgt het volgende resultaat:

Playing with ActiveMQ 9

Als alles naar verwachting werkt, dan zal het resultaat te zien zijn in de ‘records’ tabel. Ik heb 10 registraties ingestuurd. Vijf ervan zijn direct toegevoegd, de andere vijf werden afgehouden. De DTSTAMP kolom die aangeeft wanneer een registratie aan de database is toegevoegd en de DATE kolom met datums laten de timestamps zien die we in de ACProxy ingesteld hebben. We kunnen hier zien dat de laatste 4 berichten steeds met een interval van 1 minuut opgeslagen zijn.

MariaDB [active]> select * from records;
+----+-------------------------------+---------------------+
| ID | DATE                          | DTSTAMP             |
+----+-------------------------------+---------------------+
|  1 | 2021-06-10T13:36:42.448+02:00 | 2021-06-10 13:36:42 |
|  2 | 2021-06-10T13:36:43.421+02:00 | 2021-06-10 13:36:43 |
|  3 | 2021-06-10T13:36:44.349+02:00 | 2021-06-10 13:36:44 |
|  4 | 2021-06-10T13:36:45.269+02:00 | 2021-06-10 13:36:45 |
|  5 | 2021-06-10T13:36:46.203+02:00 | 2021-06-10 13:36:46 |
|  6 | 2021-06-10T13:36:47.041+02:00 | 2021-06-10 13:37:00 |
|  7 | 2021-06-10T13:36:50.366+02:00 | 2021-06-10 13:38:00 |
|  8 | 2021-06-10T13:36:51.536+02:00 | 2021-06-10 13:39:00 |
|  9 | 2021-06-10T13:36:52.503+02:00 | 2021-06-10 13:40:00 |
+----+-------------------------------+---------------------+
9 rows in set (0.00 sec)

Conclusie

Deze blog is een eenvoudige oefening om berichten op te slaan in een wachtrij om ze later te verwerken. De throttle mediator zorgt voor een binaire situatie: het bericht gaat door… of niet. Dit is slechts één voorbeeld van hoe je een JMS berichtenwachtrij kunt gebruiken. 

ned
Sluiten
Wat staat er op ons menu