fb
Insights 12 min

Iterate and aggregate mediators with call mediator in WSO2 ESB

Jenny Gligorovska
Jenny Gligorovska
Integration Consultant
pikwizard 75e4fa9f607e5c09c0e932272d71b35b scaled
Scroll

Iterate and aggregate messages - computer codeFor having multiple calls or simulation of “for-each” or “while-do” loop, we can use a combination of two mediators in the WSO2 ESB: the iterate and aggregate mediators.

Iterate mediator can split a message with repetitive elements into multiple messages and can make calls to an endpoint in each iteration.

Aggregate mediator should be set to receive minimum number of sent messages, which is the exact count of the messages that go in the Iterate. We can specify a timeout as the time how much aggregator should wait for the responses. If we don’t specify the min and max, aggregate will wait for the count of iterated calls. These examples are done in WSO2 ESB version 4.9.0

Graphical representation of iterate and aggregate flowFigure 1 Graphical representation of iterate and aggregate flow

Banner-WSO2-Community-1-5

Request message

<soapenv:Envelope xmlns_soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns_v0="http://com/unisys/gem/uleaf/WSO2/v0">
   <soapenv:Header/>
   <soapenv:Body>
      <v0:WSO2Request>
         <MessageType>createAccounts</MessageType>
         <ParameterList>
            <Account>1</Account>
            <Account>2</Account>
            <Account>3</Account>
            <Account>4</Account>
         </ParameterList>
      </v0:WSO2Request>
   </soapenv:Body>
</soapenv:Envelope>

Sequence with iterate and aggregate example

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="iterate-aggregate-example" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
  <property description="countRequests" expression="count(//Account)"
    name="countRequests" scope="default" type="STRING"/>

  <iterate expression="//Account" id="IterateAccountCreate"
sequential="true">

    <target>
      <sequence>
        <payloadFactory media-type="xml">
          <format>
            <echo:echoString xmlns_echo="http://echo.services.core.carbon.wso2.org">
              <in xmlns="">$1</in>
            </echo:echoString>
          </format>
          <args>
            <arg evaluator="xml" expression="//Account/text()"/>
          </args>
        </payloadFactory>
        <property action="remove" description="TRANSPORT_HEADERS"
          name="TRANSPORT_HEADERS" scope="axis2"/>

        <property description="OperationName" name="OperationName"
          scope="default" type="STRING" value="echoString"/>

        <header action="remove" name="To" scope="default"/>
        <header name="Action" scope="default" value="urn:echoString"/>
        <header name="SOAPAction" scope="transport" value="urn:echoString"/>
        <log category="DEBUG" description="***Request***">
          <property expression="$body" name="request"/>
        </log>
        <call>
          <endpoint key="echo"/>
        </call>
        <log category="DEBUG" description="***Response***" separator="***Response***">
          <property expression="$body" name="create_instanties_response"/>
        </log>
      </sequence>
    </target>
  </iterate>
  <aggregate description="" id="IterateAccountCreate">
    <completeCondition timeout="10">
      <messageCount max="{get-property('countRequests')}" min="{get
property('countRequests')}"
/>

    </completeCondition>
    <onComplete expression="//ns:echoStringResponse" xmlns_ns="http://echo.services.core.carbon.wso2.org">
      <log description="***aggregated***" separator="***aggregated***">
        <property expression="$body" name="aggregated_body"/>      </log>
    </onComplete>
  </aggregate>
</sequence>

Response message

<soapenv:Envelope xmlns_soapenv="http://schemas.xmlsoap.org/soap/envelope/">
   <soapenv:Body>
      <ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
         <return>1</return>
      </ns:echoStringResponse>
      <ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
         <return>2</return>
      </ns:echoStringResponse>
      <ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
         <return>3</return>
      </ns:echoStringResponse>
      <ns:echoStringResponse xmlns_ns="http://echo.services.core.carbon.wso2.org">
         <return>4</return>
      </ns:echoStringResponse>
   </soapenv:Body>
</soapenv:Envelope>

Important notes for setting of the iterate and aggregate mediators

1.    The names of Iterate and Aggregate mediators should be the same

In that way Aggregate will know which requests to wait and iterate and aggregate can be in different sequences, for example iterate in input sequence and aggregate in the output sequence.

     id="IterateAccountCreate"

2.    Non-blocking calls

The calls should be non-blocking in case we use the aggregate mediator, otherwise the aggregation does not proceed. This is with default configuration of the ESB product version 4.9.

  <call>
          <endpoint key="echo"/>
        </call>

3.    Aggregate expression

Aggregate expression is the node of the response on which aggregation will group responses and put into one body. After the aggregation is completed, there will be body with multiple elements of the node evaluated by this specified onComplete  expression.

<onComplete expression="//ns:echoStringResponse" xmlns_ns="http://echo.services.core.carbon.wso2.org">

Nested iterations

For transformations of the complex messages (responses), we can create nested iterations. Inside of the first iterate, we can put another sequence with an iterate – aggregate pair. In that way we are able to work with the nested payloads, or payloads returned from both iterations.

Aggregated nested responses

<aggregatedCategories>
      <category>
            <categoryNode1>value</categoryNode1>
            <categoryNode2>value</categoryNode2>
            <aggregatedSubcategories>
                  <subcategory>
                       <node1>value1</node1>
                       <node2>value2</node2>
                  </subcategory>
                  <subcategory>
                       <node1>value3</node1>
                       <node2>value4</node2>
                  </subcategory>
            </aggregatedSubcategories>
      </category>
      <category>
            <categoryNode1>value</categoryNode1>
            <categoryNode2>value</categoryNode2>
            <aggregatedSubcategories>
                  <subcategory>
                       <node1>value5</node1>
                       <node2>value6</node2>
                  </subcategory>
                  <subcategory>
                       <node1>value7</node1>
                       <node2>value8</node2>
                  </subcategory>
            </aggregatedSubcategories>
      </category>
</aggregatedCategories>

Property inside of aggregate mediator in scope operation

If we use aggregation in a nested manner, we should put a property inside of the aggregate and use it to combine the payload in the outer aggregate mediator. The property should be in scope “operation”.

  <aggregate id="IterateCategories">
    <completeCondition timeout="60">
      <messageCount max="-1" min="{get-property('countCategories')}"/>
    </completeCondition>
    <onComplete expression="//category">
      <property description="aggregatedCategories" expression="$body"
        name="aggregatedCategories" scope="operation" type="OM"/>

    </onComplete>
  </aggregate>

Sequence with nested iterations

This sequence searches categories in the first iteration, then searches subcategories in the second nested iteration-aggregation. The outer aggregate will contain nested responses of aggregated categories and subcategories inside of each category.

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="categories-subcategories" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
  <call-template description="search-getCategories" target="search-getEntities">
    <with-param name="searchQuery" value="from ConversationCategory cc"/>
  </call-template>
  <property description="countCategories"
    expression="count(//ns18:Entity)" name="countCategories"
    scope="default" type="STRING" xmlns_ns18="http://organisation.com/entity"/>

  <iterate expression="//ns18:Entity" id="IterateCategories"
    sequential="true" xmlns_ns18="http://organisation.com/entity">

    <target>
      <sequence>
        <property description="currentCategory"
          expression="//ns18:Entity" name="currentCategory"
          scope="default" type="STRING" xmlns_ns18="http://organisation.com/entity />
        <property description="conversationSubCatNode"         expression="//ns18:AttributeValue[ns18:Name='conversationSubCat']"
          name="conversationSubCatNode" scope="default"
type="STRING"     xmlns_ns18="http://organisation.com/entity"/>

        <switch source="get-property('conversationSubCatNode')!=''">
          <case regex="true">
            <enrich description="currentCategory">
              <source clone="true" property="currentCategory"
type="property"/>

              <target type="body"/>
            </enrich>
            <xslt description="gather-references-cat-SubCat"
              key="gov:custom/gatherReferences.xslt" source="$body">

              <property name="entity-field" value="ConversationCategory
conversationSubCat"
/>

            </xslt>
            <call-template description="getSubcategories"
target="getEntities"/>

            <property description="ConversationSubCategories"
              expression="$body/ns18:GetEntitiesResponse"
              name="ConversationSubCategories" scope="default" type="STRING" xmlns_ns18="http://organisation.com/entity />
            <call-template description="iterate-subcategories" target="iterate-subcategories"/>

          </case>
          <default/>
        </switch>
        <payloadFactory description="category_subcategory" media-type="xml">
          <format>
            <category xmlns="">
                  $1
            <subcategories>$2</subcategories>
            </category>
          </format>
          <args>
            <arg evaluator="xml" expression="get-property('currentCategory')"/>
            <arg evaluator="xml" expression="get
property('operation','aggregatedSubcategories')"
/>

          </args>
        </payloadFactory>
      </sequence>
    </target>
  </iterate>
  <aggregate id="IterateCategories">
    <completeCondition timeout="60">
      <messageCount max="-1" min="{get-property('countCategories')}"/>
    </completeCondition>
    <onComplete expression="//category">
      <property description="aggregatedCategories" expression="$body"
        name="aggregatedCategories" scope="operation" type="OM"/>

    </onComplete>
  </aggregate>
  <payloadFactory media-type="xml">
    <format>
      <categories xmlns="">$1</categories>
    </format>
    <args>
      <arg evaluator="xml" expression="get-property('operation','aggregatedCategories')"/>
    </args>
  </payloadFactory>
  <log description="***aggregatedCategories***"
separator="***aggregatedCategories***">

    <property expression="$body" name="BODY"/>
  </log>
  <xslt description="category-subcategory-type" key="gov:/my-project/xslt/category-subcategory-type.xslt"/>
  <log description="***TransformCategoriesResponse***" separator="***TransformCategoriesResponse***">
    <property expression="$body" name="BODY"/>
  </log>
</sequence>

The payload being aggregated in the first (outer) iteration is custom and it contains current category and the aggregatedSubcategories from the second (inner) iteration.

<payloadFactory description="category_subcategory" media-type="xml">
          <format>
            <category xmlns="">
                     $1
              <subcategories>$2</subcategories>
            </category>
          </format>
          <args>
            <arg evaluator="xml" expression="get
property('currentCategory')"
/>

            <arg evaluator="xml" expression="get
property('operation','aggregatedSubcategories')"
/>

          </args></payloadFactory>

The second inner iterate-aggregate for the subcategories is in a separate template, or it can be  a sequence too.

 <call-template description="iterate-subcategories" target="iterate-subcategories"/

References

https://docs.wso2.com/display/ESB490/Iterate+Mediatorhttps://docs.wso2.com/display/ESB490/Aggregate+Mediator

If you have any questions about this blogpost contact us via the comments section below. View also our WSO2 Tutorialswebinars or white papers for more technical information. Need support? We do deliver WSO2 Product SupportWSO2 Development SupportWSO2 Operational Support and WSO2 Training Programs.

Full API lifecycle Management Selection Guide

WHITEPAPER

smartmockups l0qqucke