info@yenlo.com
ned
Menu
WSO2 14 min

Payloads splitsen met Smooks in WSO2 Micro Integrator voor ActiveMQ en MySQ

Ajanthan Eliyathamby Yenlo
Ajanthan Eliyathamby
Integratie Expert
Payloads splitsen met Smooks

Bij het integreren van systemen is het vaak nodig om een enkele invoer, zoals een CSV-bestand, naar meerdere bestemmingen te sturen voor verdere verwerking. In deze blog volg je de stappen van een praktisch voorbeeld hoe je dit kunt doen in WSO2 Micro Integrator met behulp van de Smooks Mediator.

Je leert hoe je een CSV-payload kunt splitsen en de inhoud ervan kunt doorsturen naar zowel een ActiveMQ-message queue als een MySQL-database. Dit patroon is handig wanneer dezelfde data zowel door een berichtensysteem als een relationele datastore verwerkt moet worden. Het resultaat: een efficiënte, parallelle levering van gestructureerde data met minimale complexiteit.
We demonstreren hoe je een payload kunt splitsen en doorsturen naar zowel een message queue als een database met behulp van de Smooks Mediator in WSO2 Micro Integrator.

In het onderstaande workflow voorbeeld wordt een bronbestand in CSV-formaat opgehaald door de Virtual File System (VFS) Proxy van WSO2 Micro Integrator. De workflow wordt gekopieerd en naar twee aparte endpoints gestuurd: ActiveMQ Message Broker en MySQL. De data wordt gesplitst en verwerkt: elk bericht wordt afzonderlijk aan de queue toegevoegd en als één enkele rij in de database tabel ingevoegd.

Onderstaande diagram 01 geeft de workflow in hoofdlijnen weer.

hl fd
hl fd

Vereisten: 

  1. JDK 
  2. ActiveMQ 
  3. MySQL 
  4. WSO2 Micro Integrator 

Configuratie van de runtime-omgeving voor WSO2 

  1. JDK 

WSO2 Micro Integrator is getest met de JDK versies 11, 17 en 21. Wij gebruiken de 17.0.12 versie. Zie de officiele documentatie voor details. 

  1. ActiveMQ 

            Deze configuratie gebruikt de ActiveMQ 5.16.4 versie.  

  1. MySQL 

Deze configuratie gebruikt de 9.0.1 Community Edition. Zie de officiele documentatie voor details.  

  1. WSO2 Micro Integrator 4.4.0 
  2. Download Micro Integrator 4.4.0  
  3. Om de server te starten, ga je naar Micro Integrator, home en voer je het volgende uit: sh bin/micro-integrator.sh 

Het opzetten van de ontwikkelomgeving 

  1. Visual Studio Code 

WSO2 biedt een Visual Studio Code-extensie die low-code ontwikkeling vereenvoudigt en het uitrollen van integraties versnelt. Door de Micro Integrator-runtime te integreren met VS Code wordt uitrol met één klik mogelijk.  

wso2 mi
  1. De database instellen 

In onze use case moeten we de gesplitste records in de tabel invoegen. Maak hiervoor de volgende tabel aan in de MySQL-database. 

CREATE TABLE contacts ( 
id INT AUTO_INCREMENT PRIMARY KEY, 
name VARCHAR(100), 
address VARCHAR(255), 
phonenumber VARCHAR(20) 
); 

3. Micro Integrator instellen voor de verbinding met ActiveMQ en de database 

Om de verbinding tussen de ActiveMQ Message Broker en de MySQL-database goed te laten werken, voeg je de benodigde drivers en JAR-bestanden toe. 

  • ActiveMQ 
    • Voeg onderstaande JAR files toe van ACTIVE_MQ/lib en ACTIVE_MQ/lib/optional 
      • activemq-broker-5.16.4.jar 
      • activeio-core-3.1.4.jar ( from ACTIVE_MQ/lib/optional) 
      • activemq-client-5.16.4.jar 
      • activemq-kahadb-store-5.16.4.jar 
      • geronimo-j2ee-management_1.1_spec-1.0.1.jar 
      • geronimo-jms_1.1_spec-1.1.1.jar 
      • geronimo-jta_1.1_spec-1.1.1.jar 
      • hawtbuf-1.11.jar 
      • slf4j-api-1.7.36.jar 
  • MySQL database 
    • Voeg de mysql-connector-j-9.2.0.jar to MI_HOME/bin toe. 

Start de Micro Integrator opnieuw op, nadat je de JAR-bestanden hebt toegevoegd,  

4. Integratieontwikkeling opzetten met Smooks 

  • Maak de datasource aan die we gebruiken om vanuit de Smooks Mediator verbinding te maken met de database 

1. <datasource>  
2.     <name>SmooksDatasource</name> 
3.     <jndiConfig useDataSourceFactory="false"> 
4.         <name>SmooksDatasource</name> 
5.     </jndiConfig> 
6.     <definition type="RDBMS"> 
7.         <configuration> 
8.             <driverClassName>com.mysql.jdbc.Driver</driverClassName> 
9.             <url>jdbc:mysql://localhost:3306/smooks</url> 
10.             <username>XXXXXXXXXX</username> 
11.             <password>XXXXXXXXXX</password> 
12.         </configuration> 
13.     </definition> 
14. </datasource> 

  • Maak de lokale entries voor de Smooks-configuratie aan, zoals hieronder weergegeven 
    • Transformeer de CSV naar XML, omdat onze brongegevens in CSV-formaat zijn. 

1. <?xml version="1.0" encoding="UTF-8"?> 
2. <localEntry key="CSVToXMLSmooks" xmlns="http://ws.apache.org/ns/synapse"> 
3.     <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd" xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.2.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd"> 
4.         <resource-config selector="org.xml.sax.driver"> 
5.             <resource>org.milyn.csv.CSVReader</resource> 
6.             <param name="fields">name,address,phonenumber,country,postcode</param> 
7.             <param name="separator">,</param> 
8.             <param name="rootElementName">file</param> 
9.             <param name="recordElementName">data</param> 
10.         </resource-config> 
11.     </smooks-resource-list> 
12. </localEntry> 

  • Opsplitsen en versturen naar JMS

1. <?xml version="1.0" encoding="UTF-8"?> 
2. <localEntry key="XmlToJMSSmooks" xmlns="http://ws.apache.org/ns/synapse"> 
3.     <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd" xmlns:xsl="http://www.milyn.org/xsd/smooks/xsl-1.1.xsd" xmlns:core="http://www.milyn.org/xsd/smooks/smooks-core-1.3.xsd" xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd"> 
4.         <core:namespaces> 
5.             <core:namespace prefix="soapenv" uri="http://schemas.xmlsoap.org/soap/envelope/"/> 
6.         </core:namespaces> 
7.         <params> 
8.             <param name="stream.filter.type">SAX</param> 
9.             <param name="default.serialization.on">false</param> 
10.         </params> 
11.         <resource-config selector="data"> 
12.             <resource>org.milyn.delivery.DomModelCreator</resource> 
13.         </resource-config> 
14.         <ftl:freemarker applyOnElement="data"> 
15.             <ftl:template><!-- <data><name>${data.name}</name><address>${data.address}</address><phonenumber>${data.phonenumber}</phonenumber></data> --> 
16.             </ftl:template> 
17.             <ftl:use> 
18.                 <ftl:bindTo id="dataItemXml"/> 
19.             </ftl:use> 
20.         </ftl:freemarker> 
21.         <jms:router routeOnElement="data" beanId="dataItemXml" destination="datasplitter"> 
22.             <jms:jndi properties="https://2ae95bce.delivery.rocketcdn.me/repository/smooks/smooks.jndi.properties"/> 
23.             <jms:highWaterMark mark="-1"/> 
24.         </jms:router> 
25.     </smooks-resource-list> 
26. </localEntry> 

  • Opsplitsen en versturen naar de database tabel 

1. <?xml version="1.0" encoding="UTF-8"?> 
2. <localEntry key="XmlToDatabaseSmooks" xmlns="http://ws.apache.org/ns/synapse"> 
3.     <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd" xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.4.xsd" xmlns:core="http://www.milyn.org/xsd/smooks/smooks-core-1.3.xsd" xmlns:db="http://www.milyn.org/xsd/smooks/db-routing-1.1.xsd" xmlns:ds="http://www.milyn.org/xsd/smooks/datasource-1.3.xsd"> 
4.         <core:filterSettings type="SAX"></core:filterSettings> 
5.         <jb:bean beanId="datas" class="java.util.Hashtable" createOnElement="data"> 
6.             <jb:value data="data/*"></jb:value> 
7.         </jb:bean> 
8.         <ds:JNDI bindOnElement="#document" datasource="DBSource" datasourceJndi="SmooksDatasource"></ds:JNDI> 
9.         <db:executor executeOnElement="data" datasource="DBSource" executeBefore="false"> 
10.             <db:statement>INSERT INTO contacts (name, address, phonenumber) VALUES ( ${datas.name}, ${datas.address}, ${datas.phonenumber} )</db:statement> 
11.         </db:executor> 
12.     </smooks-resource-list> 
13. </localEntry> 

  • Maak de Proxy Service aan om alles te integreren en het proces uit te voeren. 

1. <?xml version="1.0" encoding="UTF-8"?> 
2. <proxy name="FileConsumerProxy" startOnLoad="true" transports="http https vfs" xmlns="http://ws.apache.org/ns/synapse"> 
3.   <target> 
4.     <inSequence> 
5.       <!-- 1. Log the file Information --> 
6.       <log category="INFO" logMessageID="true" logFullPayload="true"> 
7.         <message>Processing File : $ctx:transport.vfs.FileName</message> 
8.       </log> 
9.       <!-- 2. Convert the CSV to XML as we need to send the XML to Queue --> 
10.       <smooks config-key="CSVToXMLSmooks"> 
11.         <input type="text"/> 
12.         <output type="xml"/> 
13.       </smooks> 
14.       <!-- 3. Clone to send to Multiple Targets --> 
15.       <clone continueParent="true" id="SmooksRoute"> 
16.         <target> 
17.           <!-- 4. Sending to smooks to split and send to ActiveMQ Queue --> 
18.           <sequence> 
19.             <smooks config-key="XmlToJMSSmooks" description="Sending Data To Queue"> 
20.               <input type="xml"/> 
21.               <output type="xml"/> 
22.             </smooks> 
23.           </sequence> 
24.         </target> 
25.         <target> 
26.           <!-- 4. Sending to smooks to split and send to MySQL Database  --> 
27.           <sequence> 
28.             <smooks config-key="XmlToDatabaseSmooks" description="Sending Data To Database"> 
29.               <input type="xml"/> 
30.               <output type="xml"/> 
31.             </smooks> 
32.           </sequence> 
33.         </target> 
34.       </clone> 
35.     </inSequence> 
36.     <outSequence/> 
37.     <faultSequence/> 
38.   </target> 
39.   <parameter name="transport.PollInterval">15</parameter> 
40.   <parameter name="transport.vfs.FileURI">file:///Users/ajanthan/yenlo/blogs/2025/resources/in</parameter> 
41.   <parameter name="transport.vfs.ContentType">text/plain</parameter> 
42.   <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter> 
43.   <parameter name="transport.vfs.MoveAfterFailure">file:///Users/ajanthan/yenlo/blogs/2025/resources/fail</parameter> 
44.   <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter> 
45.   <parameter name="transport.vfs.FileNamePattern">.*.csv</parameter> 
46.   <parameter name="transport.vfs.MoveAfterProcess">file:///Users/ajanthan/yenlo/blogs/2025/resources/out</parameter> 
47. </proxy> 

48.

  • Maak het smooks.jndi.properties bestand aan zoals hieronder weergegeven. 

1. java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory 
2. java.naming.provider.url = tcp://192.168.133.144:61616 
3. queue.smooks.splitQueue = smooks.splitQueue 

De onderstaande afbeelding toont de ontwerpweergave in VS Code voor de WSO2 Micro Integrator-ontwikkeling 

image

5. Testen en verifiëren 

Zodra je VS Code hebt geconfigureerd met WSO2 Micro Integrator, kun je rechtstreeks starten en artifacts naar de Integrator uitrollen via VS Code. Omdat de VFS Listener fungeert als triggerpunt voor de integratie, wordt de integratie geactiveerd zodra je een bestand in de locatie van de listener plaatst. Zo kun je de records volgen die naar zowel de queue als de database worden gestuurd. 

Het voorbeeldbestand plaats je als volgt: 

1. John Doe,123 Main St,555-1234,USA,10001 
2. Jane Smith,456 Elm St,555-5678,Canada,M4B1B3 
3. Alice Johnson,789 Oak Ave,555-9101,UK,SW1A1AA 
4. Bob Brown,321 Pine Rd,555-1122,Australia,2000 
5. Charlie Davis,654 Cedar Ln,555-3344,Germany,10115 
6. Emily White,987 Birch Blvd,555-5566,France,75001 
7. Frank Harris,741 Maple Dr,555-7788,India,110001 
8. Grace Wilson,852 Walnut St,555-9900,Japan,100-0001 
9. Henry Lewis,369 Spruce Ct,555-2233,Brazil,01000-000 
10. Isabella Clark,258 Cherry Pl,555-4455,South Africa,8000
 

Omdat we 10 records hebben, zouden er 10 berichten in de queue en in de database contacten-tabel moeten staan. 

image
image

Samengevat biedt het integreren van Smooks met de WSO2 Micro Integrator een efficiënte manier om CSV-gegevens te splitsen en te verwerken, zodat je ze moeiteloos naar meerdere endpoints zoals ActiveMQ Message Broker en MySQL kunt sturen. Deze aanpak verbetert de verwerking van gegevens door complexe payloads op te splitsen en ervoor te zorgen dat elk record afzonderlijk wordt verwerkt. Door in te zetten op de VFS Proxy en de Smooks Mediator kunnen organisaties integratiestromen stroomlijnen, de datarouting verbeteren en de interoperabiliteit tussen systemen vereenvoudigen. 

Zodra dit integratiepatroon is opgezet, kun je het verder verbeteren door robuuste foutafhandeling te implementeren. Zo kun je mogelijke afleverproblemen naar zowel ActiveMQ als MySQL opvangen, inclusief retries en gedetailleerde logging. Monitoringtools zowel native in WSO2 als externe observability-platforms kun je gebruiken om verwerkingsstatistieken bij te houden en knelpunten in real time te identificeren. Aanvullend kan de transformatielogica zelf uitgebreid worden met conditionele routing of data-verrijking op basis van bedrijfsregels. Deze opzet is ook goed geschikt om op te schalen naar grotere datasets, real-time event streams of batchverwerkingsscenario’s, en biedt zo een solide basis voor meer robuuste en onderhoudsvriendelijke integraties. 
Meer informatie? Neem contact met ons op en we helpen je graag verder.

Whitepaper: API-beveiliging

wp API Security mockup
Download Whitepaper
ned
Sluiten