info@yenlo.com
eng
Menu
WSO2 12 min

Rollout ActiveMQ and it’s retry mechanism in WSO2

In his latest blog, Nasser takes a deep dive into ActiveMQ, unraveling the complexities of its retry mechanism. From infrastructure setup to monitoring and maintenance. Discover how to configure, integrate, and test ActiveMQ within the WSO2 platform

Nasser Ali Afarin
Nasser Ali Afarin
Integration Consultant
overview of ActiveMQ rollout in WSO2

ActiveMQ is a popular open-source message broker that provides reliable messaging patterns for various applications. In the context of WSO2, ActiveMQ is often used as the underlying message broker in the integration and middleware platform provided by WSO2. ActiveMQ also can be seen as an integral part of an integration platform since a queue can be used for a Dead Letter Channel, a common Enterprise Integration Pattern.

Rollout in the context of ActiveMQ refers to the process of deploying and configuring ActiveMQ within the WSO2 platform. It involves setting up the necessary infrastructure, configuring message queues and topics, and integrating ActiveMQ with other components of the WSO2 platform. In its simplest form for a quick test, it is quite easy to do, for instance in a Docker container. Real world roll outs are of course more complicated.

Rollback messages in ActiveMQ in WSO2 provide a reliable way to handle message processing failures and ensure that messages are not lost or duplicated. By utilizing this feature, developers can build robust messaging applications that can handle a variety of use cases.
However, sometimes a message may fail repeatedly due to a transient issue, such as a temporary network disruption or a downstream service being unavailable. In such cases, a retry mechanism can be implemented to retry processing the message after a delay, which can increase the chances of successful delivery and avoid moving the message to a dead-letter queue.

To implement a retry mechanism in ActiveMQ in WSO2, you can use the built-in retry feature of WSO2 Micro Integrator. The retry feature allows you to specify the number of times to retry processing the message, the delay between retries, and the maximum duration to retry. You can also define a sequence to handle messages that have exceeded the maximum number of retries.

High-level overview of the steps involved in an ActiveMQ rollout in WSO2

overview of ActiveMQ rollout in WSO2

Infrastructure Setup

First, you need to set up the infrastructure required to run ActiveMQ. This typically involves deploying ActiveMQ on a server or cluster of servers, ensuring proper network connectivity, and configuring firewall rules if necessary.

Configuration

Next, you need to configure ActiveMQ to meet the requirements of your application and the WSO2 platform. This includes defining queues and topics, configuring message persistence, setting up security and authentication mechanisms, and tuning various parameters for optimal performance.

Integration with WSO2

Once ActiveMQ is configured, you need to integrate it with other components of the WSO2 platform. This may involve configuring WSO2 middleware products such as WSO2 Micro Integrator (MI) or WSO2 API Manager to use ActiveMQ as the message broker for handling message flows and communication between different components.

Testing and Validation

After the configuration and integration are complete, thorough testing and validation should be performed to ensure the reliability and correctness of the ActiveMQ deployment. This includes testing message delivery, verifying the behavior of queues and topics, testing high availability and failover scenarios, and monitoring performance metrics.

Monitoring and Maintenance

Once ActiveMQ is successfully rolled out, it is essential to establish monitoring and maintenance practices. This involves monitoring the health and performance of the ActiveMQ infrastructure, setting up alerts and notifications for any issues, and performing regular maintenance tasks such as software updates and backups.

Rolling out ActiveMQ in WSO2 requires a good understanding of both ActiveMQ and the WSO2 platform. It is recommended to follow the official documentation and best practices provided by WSO2 and ActiveMQ communities to ensure a successful deployment and efficient utilization of ActiveMQ within the WSO2 platform. We suppose steps 1 and 2 are done and focus on step 3.

Implementation

Configure WSO2 in MI

The Micro Integrator has TOML-based product configurations. All the server-level configurations of your Micro Integrator instance can be applied using a single configuration file, which is the “deployment.toml” file (stored in the MI_HOME/conf directory)

[[transport.jms.sender]]
name = "myQueueSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "producer"
[transport.jms.listener]]
name = "myQueueConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.username = " yourUsername "
parameter.password = " yourPassword"

After adding these configurations, you need to restart MI.

Send Implementation

Now we will proceed to create the first API resource, with which we will send information to the ActiveMQ.

<?xml version="1.0" encoding="UTF-8"?>
<api context="/activemq" name="ActiveMqAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="GET">
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <call>
                <endpoint name="sample_queue_EP">
                    <address uri="jms:/testQueue?transport.jms.ConnectionFactory=myQueueConnectionFactory">
                        <timeout>
                            <duration>10000</duration>
                            <responseAction>fault</responseAction>
                        </timeout>
                        <suspendOnFailure>
                            <errorCodes>-1</errorCodes>
                            <initialDuration>0</initialDuration>
                            <progressionFactor>1.0</progressionFactor>
                            <maximumDuration>0</maximumDuration>
                        </suspendOnFailure>
                        <markForSuspension>
                            <errorCodes>-1</errorCodes>
                            <retriesBeforeSuspension>0</retriesBeforeSuspension>
                        </markForSuspension>
                    </address>
                </endpoint>
            </call>
            <payloadFactory media-type="json">
                <format>
            {
            	"Result":"Ok"
            }
            </format>
                <args/>
            </payloadFactory>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

After sending the message to this API, Call mediator will send it to ActiveMQ queue.

Tip:
1- If you do not use OUT_ONLY property, you face the error below in each send message to queue.

Axis2Sender Unexpected error during sending message out.
org.apache.axis2.AxisFault: Did not receive a JMS response within 30000 ms to destination

The configuration of the OUT_ONLY property that allows us to tell the MI not to wait for a response from the ActiveMQ.

Receive Implementation

For receive messages from ActiveMQ, we have two options, use Proxy mediator or inboundEndpoint with protocol=”jms” config.

Receive by Proxy

We can define a proxy for JMS protocol and use all parameters defined in dewployment.toml file in listener section or specified and custom properties. Below we create a sample:

<proxy xmlns="http://ws.apache.org/ns/synapse" name="sample_activemq_listener" transports="jms" statistics="disable" trace="disable" startOnLoad="true">
    <target>
        <inSequence>
            <log level="full">
                <property name="description" value="we got message from queue by proxy"></property>
            </log>
            <property action="set" name="OUT_ONLY" value="true" />
            <drop /> //Or do whatever your business need
        </inSequence>
        <faultSequence />
    </target>
    <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
    <parameter name="transport.jms.Destination">testQueue</parameter>
    <description />
</proxy>

Receive by inboundEndpoint

InboundEndpoint artefacts can read jms parameters from config files also we can define all needed parameters for it. Below there is a sample of it. It can be done without restarting, which is a benefit.

This InboundEndpoint will listen for messages on the jms testQueue queue. When a message is received, the InboundEndpoint will send it to myMessageHandler for further processes. If the message processing fails, the InboundEndpoint will send it to QueueErrorHandler_Seq sequence for future processing.

<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true<inboundEndpoint name="sample_Inbound_EP" sequence=" myMessageHandler_Seq " onError=" QueueErrorHandler_Seq" protocol="jms" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="interval">1000</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="transport.jms.Destination">testQueue</parameter>
        <parameter name="transport.jms.CacheLevel">3</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
        <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url">failover:tcp://127.0.0.1:61616</parameter>
        <parameter name="transport.jms.SESSION_TRANSACTED">true</parameter>
        <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
        <parameter name="transport.jms.SharedSubscription">false</parameter>
        <parameter name="transport.jms.UserName"> yourUsername </parameter>
        <parameter name="transport.jms.Password"> yourPassword </parameter>
        <parameter name="transport.jms.ContentType">application/json</parameter>
    </parameters>
</inboundEndpoint>

Process message

In this case we get the message from ActiveMQ using InboundEndPoint or Proxy and send it to myMessageHandler_Seq sequence. In this sequence we call an endpoint (for example http://some_soap_service_url/) in blocking mode. When you invoke a service in non-blocking mode, the underlying worker thread returns without waiting for the response. In blocking mode, the underlying worker thread gets blocked and waits for the response after sending the request to the endpoint. It is better to configure “timeout” and “suspendOnFailure” parameters based on your need. Please check this link to get more about Endpoint Properties. If service is available and accepts message and response, then everything is ok and MI will send ACK to ActiveMQ automatically, in the other case if we receive any server unreachable or timeout or service unavailable error, Invalid response or fault response, these situations will fire “OnError” in InboundEndPoint or “faultSequence” in Proxy.

Remember if you received a valid response containing error, these types of case should handle after “call” mediator.

<sequence name="myMessageHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <call blocking="true">
        <endpoint xmlns="http://ws.apache.org/ns/synapse">
            <address trace="disable" uri="http://some_soap_service_url/" format="soap11">
                <timeout>
                    <duration>30000</duration>
                    <responseAction>fault</responseAction>
                </timeout>
                <suspendOnFailure>
                    <errorCodes>-1</errorCodes>
                    <initialDuration>0</initialDuration>
                    <progressionFactor>1.0</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <errorCodes>-1</errorCodes>
                </markForSuspension>
            </address>
        </endpoint>
    </call>
</sequence>

How to rollback message

When the Micro Integrator consumes messages from an ActiveMQ queue, it can be configured to redeliver messages that fail to process. This is useful for ensuring that messages are not lost in the event of a failure.

<sequence name="QueueErrorHandler_Seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="SET_ROLLBACK_ONLY" scope="default" type="STRING" value="true" />
</sequence>

Add the following line in your fault sequence:

<property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>

SET_ROLLBACK_ONLY:

This parameter must be defined in wso2 sequences to redeliver the message to ActiveMQ.

As an example, when the ESB Profile of WSO2 MI is unable to deliver a message to the back-end service due to an error, it will be routed to the fault sequence in the configuration. When the SET_ROLLBACK_ONLY property is set in the fault sequence, the WSO2 MI informs ActiveMQ to requeue it and redeliver the message based on parameters defined in configs.

Apply and use Retry mechanism parameters

By implementing a retry mechanism in ActiveMQ in WSO2, you can handle transient failures and increase the chances of successful message delivery. This feature provides a reliable way to ensure that messages are processed correctly and can help you build resilient messaging applications.

There is a way to add this option to our proxy or InboundEndPoints using InboundEndpoint parameters.

        <parameter name="redeliveryPolicy.maximumRedeliveries">3</parameter>
        <parameter name="redeliveryPolicy.redeliveryDelay">60000</parameter>
        <parameter name="redeliveryPolicy. initialRedeliveryDelay">60000</parameter>

In this example, the retry feature is configured to retry processing the message up to three times, with a delay of 60 seconds (60000 ms) between retries and a maximum duration of 60 seconds. If the message fails after three retries, it will be directed to the specified sequence.

  • redeliveryPolicy.maximumRedeliveries: Maximum number of retries for delivering the message. If set to -1 ActiveMQ will retry indefinitely.
  • transport.jms.SessionTransacted: When set to true, this enables the JMS session transaction for the proxy service.
  • redeliveryPolicy.redeliveryDelay: Delay time in milliseconds between retries.
  • transport.jms.CacheLevel: This needs to be set to the consumer for the ActiveMQ redelivery mechanism to work.

Usable Parameters

JMSXDeliveryCount:
We can get retries count with $trp:JMSXDeliveryCount, below we put this variable to another property.

<property expression="$trp:JMSXDeliveryCount" name="Retry_Count" scope="default" type="STRING" />

Remember that for the first time this property is null and after first redeliver of message, it will start from number 2.

When rollback does not work

One of most important part of above code is using . It is recommended to not use blocking calls in general, but it is mandatory to use blocking call after receive message from queue if you want to send a message to a service by call an endpoint. If you use blocking mode in call an endpoint you will get response in the same thread as caller thread and SET_ROLLBACK_ONLY property will work, otherwise it will not work.

Conclusion

In conclusion, rollback messages are a critical aspect of any messaging system, and ActiveMQ in WSO2 provides robust support for this feature. By understanding how rollback messages work, developers can build resilient messaging applications that can handle failures and ensure message delivery. You can access sample project from Yenlo by this link also.

Whitepaper: API Security

API Security
Download Whitepaper
eng
Close