When integrating systems, it’s often necessary to send a single input such, as a CSV file, to multiple destinations for further processing. In this blog, we’ll walk through a practical example of how to do this using the Smooks Mediator in WSO2 Micro Integrator.
You’ll learn how to split a CSV payload and route its contents to both an ActiveMQ message queue and a MySQL database. This pattern is useful when the same data needs to be processed by both a messaging system and a relational data store. The result: efficient parallel delivery of structured data with minimal complexity.
We’ll demonstrate how to split a payload and forward it to both a message queue and database using the Smooks Mediator in WSO2 Micro Integrator.
In the sample flow, a source file in CSV format is fetched by the Virtual File System (VFS) Proxy of WSO2 Micro Integrator. The flow is duplicated and sent to two separate endpoints: ActiveMQ Message Broker and MySQL. The data is split and processed, with each message added individually to the queue and inserted as a single row in the database table.
The following Figure 01 flow diagram illustrates our high-level setup.
Prerequisites
- JDK
- ActiveMQ
- MySQL
- WSO2 Micro Integrator
Configure Runtime Environment for WSO2
- JDK
WSO2 Micro Integrator has been tested with JDK versions11, 17 and 21. We use the 17.0.12 version. See the official documentation for details.
- ActiveMQ
This setup uses ActiveMQ 5.16.4 version.
- MySQL
This setup uses 9.0.1 Community Edition, see the official documentation for details.
- WSO2 Micro Integrator 4.4.0
- To start up the server go to Micro Integrator home and execute
sh bin/micro-integrator.sh
- To start up the server go to Micro Integrator home and execute
Setting up the Development Environment
- Visual Studio Code
WSO2 provides a Visual Studio Code extension to simplify low-code development and streamline deployment. Integrating Micro Integrator runtime with VS Code enables one-click deployment.
- Setup the database
In our use case, we need to insert the split records to the table create the following table in the MySQL database.
CREATE TABLE contacts ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100), address VARCHAR(255), phonenumber VARCHAR(20) );
- Setup Micro Integrator for connecting to ActiveMQ and database
To support the connectivity between the ActiveMQ Message Broker and the MySQL database, we need to add drivers and jars.
- ActiveMQ
- Add the below JAR files from ACTIVE_MQ/lib and ACTIVE_MQ/lib/optional
- activemq-broker-5.16.4.jar
- activeio-core-3.1.4.jar ( from ACTIVE_MQ/lib/optional)
- activemq-client-5.16.4.jar
- activemq-kahadb-store-5.16.4.jar
- geronimo-j2ee-management_1.1_spec-1.0.1.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- geronimo-jta_1.1_spec-1.1.1.jar
- hawtbuf-1.11.jar
- slf4j-api-1.7.36.jar
- Add the below JAR files from ACTIVE_MQ/lib and ACTIVE_MQ/lib/optional
- MySQL database
- Add the mysql-connector-j-9.2.0.jar to MI_HOME/bin
After adding the Jar files, restart Micro Integrator.
- Build Integration Development with Smooks
- Create the data source, which we will use to connect the database from Smooks Mediator.
1. <datasource>
2. <name>SmooksDatasource</name>
3. <jndiConfig useDataSourceFactory="false">
4. <name>SmooksDatasource</name>
5. </jndiConfig>
6. <definition type="RDBMS">
7. <configuration>
8. <driverClassName>com.mysql.jdbc.Driver</driverClassName>
9. <url>jdbc:mysql://localhost:3306/smooks</url>
10. <username>XXXXXXXXXX</username>
11. <password>XXXXXXXXXX</password>
12. </configuration>
13. </definition>
14. </datasource>
- Create the Local Entries for the Smooks Configuration as below.
- Transformation of the CSV to XML as our source is in the CSV format.
1. <?xml version="1.0" encoding="UTF-8"?>
2. <localEntry key="CSVToXMLSmooks" xmlns="http://ws.apache.org/ns/synapse">
3. <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd" xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.2.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd">
4. <resource-config selector="org.xml.sax.driver">
5. <resource>org.milyn.csv.CSVReader</resource>
6. <param name="fields">name,address,phonenumber,country,postcode</param>
7. <param name="separator">,</param>
8. <param name="rootElementName">file</param>
9. <param name="recordElementName">data</param>
10. </resource-config>
11. </smooks-resource-list>
12. </localEntry>
- Split and Send to JMS Queue
1. <?xml version="1.0" encoding="UTF-8"?>
2. <localEntry key="XmlToJMSSmooks" xmlns="http://ws.apache.org/ns/synapse">
3. <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd" xmlns:xsl="http://www.milyn.org/xsd/smooks/xsl-1.1.xsd" xmlns:core="http://www.milyn.org/xsd/smooks/smooks-core-1.3.xsd" xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd">
4. <core:namespaces>
5. <core:namespace prefix="soapenv" uri="http://schemas.xmlsoap.org/soap/envelope/"/>
6. </core:namespaces>
7. <params>
8. <param name="stream.filter.type">SAX</param>
9. <param name="default.serialization.on">false</param>
10. </params>
11. <resource-config selector="data">
12. <resource>org.milyn.delivery.DomModelCreator</resource>
13. </resource-config>
14. <ftl:freemarker applyOnElement="data">
15. <ftl:template><!-- <data><name>${data.name}</name><address>${data.address}</address><phonenumber>${data.phonenumber}</phonenumber></data> -->
16. </ftl:template>
17. <ftl:use>
18. <ftl:bindTo id="dataItemXml"/>
19. </ftl:use>
20. </ftl:freemarker>
21. <jms:router routeOnElement="data" beanId="dataItemXml" destination="datasplitter">
22. <jms:jndi properties="https://2ae95bce.delivery.rocketcdn.me/repository/smooks/smooks.jndi.properties"/>
23. <jms:highWaterMark mark="-1"/>
24. </jms:router>
25. </smooks-resource-list>
26. </localEntry>
- Split and Send to database Table
1. <?xml version="1.0" encoding="UTF-8"?>
2. <localEntry key="XmlToDatabaseSmooks" xmlns="http://ws.apache.org/ns/synapse">
3. <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd" xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.4.xsd" xmlns:core="http://www.milyn.org/xsd/smooks/smooks-core-1.3.xsd" xmlns:db="http://www.milyn.org/xsd/smooks/db-routing-1.1.xsd" xmlns:ds="http://www.milyn.org/xsd/smooks/datasource-1.3.xsd">
4. <core:filterSettings type="SAX"></core:filterSettings>
5. <jb:bean beanId="datas" class="java.util.Hashtable" createOnElement="data">
6. <jb:value data="data/*"></jb:value>
7. </jb:bean>
8. <ds:JNDI bindOnElement="#document" datasource="DBSource" datasourceJndi="SmooksDatasource"></ds:JNDI>
9. <db:executor executeOnElement="data" datasource="DBSource" executeBefore="false">
10. <db:statement>INSERT INTO contacts (name, address, phonenumber) VALUES ( ${datas.name}, ${datas.address}, ${datas.phonenumber} )</db:statement>
11. </db:executor>
12. </smooks-resource-list>
13. </localEntry>
- Create the Proxy Service to Integrate all together and execute the flow.
1. <?xml version="1.0" encoding="UTF-8"?>
2. <proxy name="FileConsumerProxy" startOnLoad="true" transports="http https vfs" xmlns="http://ws.apache.org/ns/synapse">
3. <target>
4. <inSequence>
5. <!-- 1. Log the file Information -->
6. <log category="INFO" logMessageID="true" logFullPayload="true">
7. <message>Processing File : $ctx:transport.vfs.FileName</message>
8. </log>
9. <!-- 2. Convert the CSV to XML as we need to send the XML to Queue -->
10. <smooks config-key="CSVToXMLSmooks">
11. <input type="text"/>
12. <output type="xml"/>
13. </smooks>
14. <!-- 3. Clone to send to Multiple Targets -->
15. <clone continueParent="true" id="SmooksRoute">
16. <target>
17. <!-- 4. Sending to smooks to split and send to ActiveMQ Queue -->
18. <sequence>
19. <smooks config-key="XmlToJMSSmooks" description="Sending Data To Queue">
20. <input type="xml"/>
21. <output type="xml"/>
22. </smooks>
23. </sequence>
24. </target>
25. <target>
26. <!-- 4. Sending to smooks to split and send to MySQL Database -->
27. <sequence>
28. <smooks config-key="XmlToDatabaseSmooks" description="Sending Data To Database">
29. <input type="xml"/>
30. <output type="xml"/>
31. </smooks>
32. </sequence>
33. </target>
34. </clone>
35. </inSequence>
36. <outSequence/>
37. <faultSequence/>
38. </target>
39. <parameter name="transport.PollInterval">15</parameter>
40. <parameter name="transport.vfs.FileURI">file:///Users/ajanthan/yenlo/blogs/2025/resources/in</parameter>
41. <parameter name="transport.vfs.ContentType">text/plain</parameter>
42. <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
43. <parameter name="transport.vfs.MoveAfterFailure">file:///Users/ajanthan/yenlo/blogs/2025/resources/fail</parameter>
44. <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
45. <parameter name="transport.vfs.FileNamePattern">.*.csv</parameter>
46. <parameter name="transport.vfs.MoveAfterProcess">file:///Users/ajanthan/yenlo/blogs/2025/resources/out</parameter>
47. </proxy>
- Create the smooks.jndi.properties file as below
1. java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
2. java.naming.provider.url = tcp://192.168.133.144:61616
3. queue.smooks.splitQueue = smooks.splitQueue
Below figure depicts the design view of the VS Code of WSO2 Micro Integrator Development.
- Testing and Verification
Once we configure VS Code with WSO2 Micro Integrator, we can directly start and deploy artifacts to the Integrator through VS Code. Since the VFS Listener serves as the integration’s trigger point, placing a file in the listener’s location will activate the integration, allowing us to view the records being pushed to both the queue and the database.
Sample file to be placed as below:
1. John Doe,123 Main St,555-1234,USA,10001
2. Jane Smith,456 Elm St,555-5678,Canada,M4B1B3
3. Alice Johnson,789 Oak Ave,555-9101,UK,SW1A1AA
4. Bob Brown,321 Pine Rd,555-1122,Australia,2000
5. Charlie Davis,654 Cedar Ln,555-3344,Germany,10115
6. Emily White,987 Birch Blvd,555-5566,France,75001
7. Frank Harris,741 Maple Dr,555-7788,India,110001
8. Grace Wilson,852 Walnut St,555-9900,Japan,100-0001
9. Henry Lewis,369 Spruce Ct,555-2233,Brazil,01000-000
10. Isabella Clark,258 Cherry Pl,555-4455,South Africa,8000
As we have 10-records there should be 10 messages in the queue and in the database contact’s table.
In conclusion, integrating Smooks with WSO2 Micro Integrator offers an efficient way to split and process CSV payloads, enabling seamless routing to multiple endpoints like ActiveMQ Message Broker and MySQL. This approach enhances data handling by breaking down complex payloads and ensuring each record is processed individually. By leveraging the VFS Proxy and Smooks Mediator, organizations can streamline integration flows, improve data routing, and simplify system interoperability.
Once this integration pattern is in place, it can be further enhanced by implementing robust error handling to deal with potential delivery failures to either ActiveMQ or MySQL, including retries and detailed logging. Monitoring tools—either native to WSO2 or external observability platforms—can be introduced to track processing metrics and identify bottlenecks in real time. Additionally, the transformation logic itself can be expanded with conditional routing or data enrichment based on business rules. This setup is also well-suited for scaling to larger datasets, real-time event streams, or batch processing scenarios, providing a solid foundation for more resilient and maintainable integrations.
Need more information? Get in touch with us we’ll be happy to help.