For having multiple calls or simulation of “for-each” or “while-do” loop, we can use a combination of two mediators in the WSO2 ESB: the iterate and aggregate mediators.
Iterate mediator can split a message with repetitive elements into multiple messages and can make calls to an endpoint in each iteration.
Aggregate mediator should be set to receive minimum number of sent messages, which is the exact count of the messages that go in the Iterate. We can specify a timeout as the time how much aggregator should wait for the responses. If we don’t specify the min and max, aggregate will wait for the count of iterated calls. These examples are done in WSO2 ESB version 4.9.0
Figure 1 Graphical representation of iterate and aggregate flow
Request message
<soapenv:Envelope xmlns_soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns_v0="http://com/unisys/gem/uleaf/WSO2/v0">
<soapenv:Header/>
<soapenv:Body>
<v0:WSO2Request>
<MessageType>createAccounts</MessageType>
<ParameterList>
<Account>1</Account>
<Account>2</Account>
<Account>3</Account>
<Account>4</Account>
</ParameterList>
</v0:WSO2Request>
</soapenv:Body>
</soapenv:Envelope>
Sequence with iterate and aggregate example
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="iterate-aggregate-example" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<property description="countRequests" expression="count(//Account)"
name="countRequests" scope="default" type="STRING"/> <iterate expression="//Account" id="IterateAccountCreate"
sequential="true"> <target>
<sequence>
<payloadFactory media-type="xml">
<format>
<echo:echoString xmlns_echo="http://echo.services.core.carbon.wso2.org">
<in xmlns="">$1</in>
</echo:echoString>
</format>
<args>
<arg evaluator="xml" expression="//Account/text()"/>
</args>
</payloadFactory>
<property action="remove" description="TRANSPORT_HEADERS"
name="TRANSPORT_HEADERS" scope="axis2"/> <property description="OperationName" name="OperationName"
scope="default" type="STRING" value="echoString"/> <header action="remove" name="To" scope="default"/>
<header name="Action" scope="default" value="urn:echoString"/>
<header name="SOAPAction" scope="transport" value="urn:echoString"/>
<log category="DEBUG" description="***Request***">
<property expression="$body" name="request"/>
</log>
<call>
<endpoint key="echo"/>
</call>
<log category="DEBUG" description="***Response***" separator="***Response***">
<property expression="$body" name="create_instanties_response"/>
</log>
</sequence>
</target>
</iterate>
<aggregate description="" id="IterateAccountCreate">
<completeCondition timeout="10">
<messageCount max="{get-property('countRequests')}" min="{get
property('countRequests')}"/> </completeCondition>
<onComplete expression="//ns:echoStringResponse" xmlns_ns="http://echo.services.core.carbon.wso2.org">
<log description="***aggregated***" separator="***aggregated***">
<property expression="$body" name="aggregated_body"/> </log>
</onComplete>
</aggregate>
</sequence>
Response message
<soapenv:Envelope xmlns_soapenv="http://schemas.xmlsoap.org/soap/envelope/">
<soapenv:Body>
<ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
<return>1</return>
</ns:echoStringResponse>
<ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
<return>2</return>
</ns:echoStringResponse>
<ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
<return>3</return>
</ns:echoStringResponse>
<ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
<return>4</return>
</ns:echoStringResponse>
</soapenv:Body>
</soapenv:Envelope>
Important notes for setting of the iterate and aggregate mediators
1. The names of Iterate and Aggregate mediators should be the same
In that way Aggregate will know which requests to wait and iterate and aggregate can be in different sequences, for example iterate in input sequence and aggregate in the output sequence.
id="IterateAccountCreate"
2. Non-blocking calls
The calls should be non-blocking in case we use the aggregate mediator, otherwise the aggregation does not proceed. This is with default configuration of the ESB product version 4.9.
<call>
<endpoint key="echo"/>
</call>
3. Aggregate expression
Aggregate expression is the node of the response on which aggregation will group responses and put into one body. After the aggregation is completed, there will be body with multiple elements of the node evaluated by this specified onComplete
expression.
<onComplete expression="//ns:echoStringResponse" xmlns_ns="http://echo.services.core.carbon.wso2.org">
Nested iterations
For transformations of the complex messages (responses), we can create nested iterations. Inside of the first iterate, we can put another sequence with an iterate – aggregate pair. In that way we are able to work with the nested payloads, or payloads returned from both iterations.
Aggregated nested responses
<aggregatedCategories>
<category>
<categoryNode1>value</categoryNode1>
<categoryNode2>value</categoryNode2>
<aggregatedSubcategories>
<subcategory>
<node1>value1</node1>
<node2>value2</node2>
</subcategory>
<subcategory>
<node1>value3</node1>
<node2>value4</node2>
</subcategory>
</aggregatedSubcategories>
</category>
<category>
<categoryNode1>value</categoryNode1>
<categoryNode2>value</categoryNode2>
<aggregatedSubcategories>
<subcategory>
<node1>value5</node1>
<node2>value6</node2>
</subcategory>
<subcategory>
<node1>value7</node1>
<node2>value8</node2>
</subcategory>
</aggregatedSubcategories>
</category>
</aggregatedCategories>
Property inside of aggregate mediator in scope operation
If we use aggregation in a nested manner, we should put a property inside of the aggregate and use it to combine the payload in the outer aggregate mediator. The property should be in scope “operation”.
<aggregate id="IterateCategories">
<completeCondition timeout="60">
<messageCount max="-1" min="{get-property('countCategories')}"/>
</completeCondition>
<onComplete expression="//category">
<property description="aggregatedCategories" expression="$body"
name="aggregatedCategories" scope="operation" type="OM"/> </onComplete>
</aggregate>
Sequence with nested iterations
This sequence searches categories in the first iteration, then searches subcategories in the second nested iteration-aggregation. The outer aggregate will contain nested responses of aggregated categories and subcategories inside of each category.
<?xml version="1.0" encoding="UTF-8"?> <sequence name="categories-subcategories" trace="disable" xmlns="http://ws.apache.org/ns/synapse"> <call-template description="search-getCategories" target="search-getEntities"> <with-param name="searchQuery" value="from ConversationCategory cc"/> </call-template> <property description="countCategories" <iterate expression="//ns18:Entity" id="IterateCategories" <target> <sequence> <property description="currentCategory" <switch source="get-property('conversationSubCatNode')!=''"> <case regex="true"> <enrich description="currentCategory"> <source clone="true" property="currentCategory" <target type="body"/> </enrich> <xslt description="gather-references-cat-SubCat" <property name="entity-field" value="ConversationCategory </xslt> <call-template description="getSubcategories" <property description="ConversationSubCategories" </case> <default/> </switch> <payloadFactory description="category_subcategory" media-type="xml"> <format> <category xmlns=""> $1 <subcategories>$2</subcategories> </category> </format> <args> <arg evaluator="xml" expression="get-property('currentCategory')"/> <arg evaluator="xml" expression="get </args> </payloadFactory> </sequence> </target> </iterate> <aggregate id="IterateCategories"> <completeCondition timeout="60"> <messageCount max="-1" min="{get-property('countCategories')}"/> </completeCondition> <onComplete expression="//category"> <property description="aggregatedCategories" expression="$body" </onComplete> </aggregate> <payloadFactory media-type="xml"> <format> <categories xmlns="">$1</categories> </format> <args> <arg evaluator="xml" expression="get-property('operation','aggregatedCategories')"/> </args> </payloadFactory> <log description="***aggregatedCategories***" <property expression="$body" name="BODY"/> </log> <xslt description="category-subcategory-type" key="gov:/my-project/xslt/category-subcategory-type.xslt"/> <log description="***TransformCategoriesResponse***" separator="***TransformCategoriesResponse***"> <property expression="$body" name="BODY"/> </log> </sequence> |
The payload being aggregated in the first (outer) iteration is custom and it contains current category and the aggregatedSubcategories from the second (inner) iteration.
<payloadFactory description="category_subcategory" media-type="xml">
<format>
<category xmlns="">
$1
<subcategories>$2</subcategories>
</category>
</format>
<args>
<arg evaluator="xml" expression="get
property('currentCategory')"/> <arg evaluator="xml" expression="get
property('operation','aggregatedSubcategories')"/> </args></payloadFactory>
The second inner iterate-aggregate for the subcategories is in a separate template, or it can be a sequence too.
<call-template description="iterate-subcategories" target="iterate-subcategories"/
If you have any questions about this blogpost contact us via the comments section below. View also our WSO2 Tutorials, webinars or white papers for more technical information. Need support? We do deliver WSO2 Product Support, WSO2 Development Support, WSO2 Operational Support and WSO2 Training Programs.
Yenlo is the product leader and multi-award winner in WSO2, Boomi, MuleSoft and Microsoft Azure technologies and offers best-of-breed solutions from multiple leading integration vendors.
With over 240+ experts in the API, integration, and Identity Access Management domain and over $35 million in annual revenue, Yenlo is one of the largest and best API-first and Cloud-first integration specialists worldwide.