fb
WSO2 Enterprise Integrator 17 min

Reliable Message publishing (JMS) to WSO2 BAM using WSO2 MB

Yenlo
Integration Experts
6 dec no1
Scroll

In this WSO2 tutorial an overview how you can publish reliable messages to WSO2 BAM using the WSO2 Message Broker (WSO2 MB). All these componentens are part of the 100% open source integration platform WSO2. The mentioned WSO2 BAM is now (2016) rebranded to WSO2 Data Analytics Server (WSO2 DAS).

What is business activity monitoring (BAM)?

As per Gartner, business activity monitoring (BAM) is the processes and technologies that enhance situation awareness and enable analysis of critical business performance indicators based on real-time data. BAM is used to improve the speed and effectiveness of business operations by keeping track of what’s happening and making issues visible quickly.

Service-oriented architecture (SOA) encourages systems to expose business functions as services. It is required to start monitoring these exposed services in order to gain insights into, and achieve transparency in business activities. By plugging into services, data can be harnessed as valuable information. However, the amount of information that is flowing through an organization’s systems is enormous. Therefore, all this data needs to be collected and processed by a system capable of handling big data loads, capable of handling data volumes of TeraBytes, or even PetaBytes with relative ease.

BAM with WSO2

The BAM concept can be implemented through many different kinds of software tools. These tools enable enterprises to be proactive rather than reactive to stay ahead of the game. WSO2 BAM is one such open-source software tool. It is a flexible framework allowing to model your own key performance indicators to suit different stakeholders, be it business users, devops, CxOs, etc. WSO2 BAM achieves this level of flexibility, while facilitating technologies such as big data storage, big data analytics, and high performance data transfer.

WSO2 BAM has a new feature

With the new WSO2 BAM 2.5.0 release, the product is now fully integrated with Java Messaging Service (JMS), which enables loosely coupled communication together with asynchronous and reliable connection. This means that WSO2 BAM does not have to receive messages at the same time the sending client sends them and the sending client can send them and move on to other tasks; WSO2 BAM can receive them much later. It also ensures that a message is delivered once, and only once.

An enterprise application provider is likely to choose a messaging API over a tightly-coupled API, such as a remote procedure call (RPC), in the following circumstances. Look also to api management with the WSO2 API Manager.

  • The provider wants the components to not depend on information about other components’ interfaces, so components can be easily replaced.
  • The provider wants the application to run whether or not all components are up and running simultaneously.
  • The application business model allows a component to send information to another and to continue to operate without receiving an immediate response.

Let’s take an example

Consider the case where a company, say Company A, is operated under SOA. Company A has hosted several services online and they are exposed to the outside via an enterprise service bus (ESB), such as WSO2 ESB (as a proxy service).

6_dec_no1.png

Figure 1

Company A now wants to monitor these requests and responses from customers and the company wants to implement a BAM solution reliably with a minimum footprint and without disrupting the core business processes.

As explained in Figure 1, JMS would provide an ideal communication platform for this scenario. WSO2 BAM could then be integrated to the system to carry out the event monitoring requirements.

The solution can be depicted as illustrated in Figure 2.

6_dec_no2.png

Figure 2

What is a JMS Broker/Provider?

A JMS provider is a messaging system that implements the JMS interfaces and provides administrative and control features for a message-oriented middleware (MOM). Providers are implemented as either a Java JMS implementation or an adapter to a non-Java MOM.

WSO2 Message Broker (WSO2 MB) brings messaging and eventing capabilities to your SOA framework. It is based on JMS, and its features are basically implementations of the JMS specification, so any JMS client can communicate with WSO2 MB. It can be used as a standalone message broker or a distributed message brokering system. [4]

Hands-on implementation of WSO2 BAM together with WSO2 MB

Installation prerequisites

  • Java 1.6 or later
  • Apache Ant 1.7.0 or later
  • Apache Maven 3.0 or later
  • Web browser

Download the products

  • WSO2 ESB 4.8.1 (https://docs.wso2.com/display/ESB481/Downloading+the+Product)
  • WSO2 MB 2.2.0 (https://docs.wso2.com/display/MB220/Downloading+the+Product)
  • WSO2 BAM 2.5.0 Beta (The 2.5.0 version is yet to release. Please find the Beta version in the following link [5])
  • Unzip the products

Step 1: Enable JMS in ESB and BAM

  • Enable the JMS transport of WSO2 ESB to communicate with the Message Broker by editing the /repository/conf/axis2/axis2.xml file. Find a commented-out block for MB 2.x.x and uncomment it. Also uncomment the block for JMS in the same file
  • Let’s start MB with a port-offset of 1 and BAM with a 3. To do this, go to /repository/conf/carbon.xml file. Edit the Offset field, respectively.
  • Copy the following JAR files from /client-lib folder to /repository/components/lib in BAM and ESB. They are client libraries required from MB to ESB and BAM.
    • andes-client-0.13.wso2v4
    • geronimo-jms_1.1_spec-1.1.0.wso2v1
  • Open /repository/conf/jndi.properties file (If the file is not there you can create one) and point to the running MB (Let us start the MB later on) in both ESB and BAM. The file should have the following configuration.
[php]# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist=’tcp://localhost:5673′
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist=’tcp://localhost:5673′
[/php]
  • Start MB by running /bin/wso2server.sh

Step 2: Create a backend Axis2 service

  • Go to /samples/axis2Server/src/SimpleStockQuoteService and build the backend service with Apache Ant. This will create a service .aar bundle and deploy it on the backend Axis2 server.
  • Go to /samples/axis2Server and start the Axis2 server by running ./axis2server.sh
  • Test whether the proxy service WSDL exists in http://localhost:9000/services/SimpleStockQuoteService?wsdl

Step 3: Add a SimpleStockQuote proxy in the ESB

  • Start the ESB server by running /bin/wso2server.sh
  • Go to ESB management console and select Main→Web Services→Add→Proxy Service from the side panel.
  • Click WSDL Based Proxy to create a new WSDL based proxy.
  • Fill the form as follows:
    • Insert a suitable Proxy Service Name (e.g. SSQSProxy)
    • Set http://localhost:9000/services/SimpleStockQuoteService?wsdl as the WSDL URI
    • Set SimpleStockQuoteService as the WSDL service
    • Set SimpleStockQuoteServiceHttpSoap11Endpoint as the WSDL Port
  • Click Create.
  • Click on the newly created proxy’s name (SSQSProxy in this case) to go to its dashboard.
  • Let’s add Log mediators to the in sequence and out sequence of the proxy service, just to monitor the messages. You can do it by clicking on the ‘Edit’ option in the dashboard.
  • Once this is completed, the final synapse config of ESB would look like as follows.
[php]<?xml version=”1.0″ encoding=”UTF-8″?>
<definitions >
<registry provider=”org.wso2.carbon.mediation.registry.WSO2Registry”>
<parameter name=”cachableDuration”>15000</parameter>
</registry>
<proxy name=”SSQSProxy” transports=”https http” startonload=”true” trace=”disable”>
<description>
<target>
<endpoint>
<wsdl service=”SimpleStockQuoteService” port=”SimpleStockQuoteServiceHttpSoap11Endpoint” uri=”http://localhost:9000/services/SimpleStockQuoteService?wsdl”>
</wsdl></endpoint>
<insequence>
<log level=”full”>
</log></insequence>
<outsequence>
<log level=”full”>
<send>
</send></log></outsequence>
</target>
</description></proxy>
<sequence name=”fault”>
<log level=”full”>
<property name=”MESSAGE” value=”Executing default ‘fault’ sequence”>
<property name=”ERROR_CODE” expression=”get-property(‘ERROR_CODE’)”>
<property name=”ERROR_MESSAGE” expression=”get-property(‘ERROR_MESSAGE’)”>
</property></property></property></log>
<drop>
</drop></sequence>
<sequence name=”main”>
<in>
<log level=”full”>
<filter source=”get-property(‘To’)” regex=”http://localhost:9000.*”>
<send>
</send></filter>
</log></in>
<out>
<send>
</send></out>
<description>The main sequence for the message mediation</description>
</sequence>
</definitions>
[/php]
  • Let us test the service by using an ESB sample client.Go to /samples/axis2Client and build the backend service with Apache Ant as follows:ant stockquote -Daddurl=https://localhost:8244/services/SSQSProxy -Dmode=quote -Dsymbol=WSO2
  • On the ESB management console, you should see the following log mediator outputs. Sample input to the service:
[php]To: https://localhost:8243/services/SSQSProxy, WSAction: urn:getQuote, SOAPAction: urn:getQuote, ReplyTo: http://www.w3.org/2005/08/addressing/anonymous, MessageID: urn:uuid:50535bf8-1c37-4fcf-afd6-85daf6d0f96a, Direction: request, Envelope:
<?xml version=”1.0″ encoding=”utf-8″?>
<soapenv:envelope xmlns_soapenv=”http://schemas.xmlsoap.org/soap/envelope/”>
<soapenv:header xmlns_wsa=”http://www.w3.org/2005/08/addressing”>
<wsa:to>https://localhost:8243/services/SSQSProxy</wsa:to>
<wsa:messageid>urn:uuid:50535bf8-1c37-4fcf-afd6-85daf6d0f96a</wsa:messageid>
<wsa:action>urn:getQuote</wsa:action>
</soapenv:header>
<soapenv:body>
<m0:getquote xmlns_m0=”http://services.samples”>
<m0:request>
<m0:symbol>WSO2</m0:symbol>
</m0:request>
</m0:getquote>
</soapenv:body>
</soapenv:envelope>

Sample output from the service:
To: http://www.w3.org/2005/08/addressing/anonymous, WSAction: , SOAPAction: , ReplyTo: http://www.w3.org/2005/08/addressing/anonymous, MessageID: urn:uuid:7aa671c1-7dae-48cd-8219-ca55b37be064, Direction: response, Envelope:
<?xml version=”1.0″ encoding=”utf-8″?>
<soapenv:envelope xmlns_soapenv=”http://schemas.xmlsoap.org/soap/envelope/”>
<soapenv:body>
<ns:getquoteresponse xmlns_ns=”http://services.samples”>
<ns:return xmlns_ax21=”http://services.samples/xsd” xmlns_xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi_type=”ax21:GetQuoteResponse”>
<ax21:change>-2.8489171508187012</ax21:change>
<ax21:earnings>-9.728456590284134</ax21:earnings>
<ax21:high>95.47837722274852</ax21:high>
<ax21:last>91.3323987087055</ax21:last>
<ax21:lasttradetimestamp>Sat Oct 04 18:34:42 IST 2014</ax21:lasttradetimestamp>
<ax21:low>-91.05157483949503</ax21:low>
<ax21:marketcap>5.37472097959319E7</ax21:marketcap>
<ax21:name>WSO2 Company</ax21:name>
<ax21:open>94.95820639015217</ax21:open>
<ax21:peratio>-19.662021375532035</ax21:peratio>
<ax21:percentagechange>3.3511040016482827</ax21:percentagechange>
<ax21:prevclose>-85.01428631929733</ax21:prevclose>
<ax21:symbol>WSO2</ax21:symbol>
<ax21:volume>18169</ax21:volume>
</ns:return>
</ns:getquoteresponse>
</soapenv:body>
</soapenv:envelope>
[/php]

Now, we have successfully implemented the Figure 1 scenario. We can now add the BAM facility to the implementation as shown in Figure 2.

Step 4: Configure BAM to listen to MB

  • Start the BAM server by running /bin/wso2server.sh
  • Go to Configure → Event Processor Configs → Input Event Adapters → Add Input Event Adapter
  • Select the type to be JMS
  • Use the following configurations

Event Adaptor Name : mbjmsnputadapter

Event Adaptor Type : jms

JNDI Initial Context Factory Class : org.wso2.andes.jndi.PropertiesFileInitialContextFactory

URL of the JNDI provider : repository/conf/jndi.properties

Connection Factory JNDI Name : QueueConnectionFactory

Destination Type : queue

Enable Durable Subscription : false

  • Alternatively, you can specify an event adaptor configuration using an XML file and save it in/repository/deployment/server/inputeventadaptors directory, which is the Input event adaptor deployment directory. Since hot deployment is enabled, you can simply add/remove files to deploy/undeploy from the server.Deployed XML file of the above adapter would be like this.
[php]<?xml version=”1.0″ encoding=”UTF-8″?>
<inputeventadaptor name=”mbjmsnputadapter” statistics=”disable” trace=”disable” type=”jms” >
<property name=”java.naming.provider.url”>repository/conf/jndi.properties</property>
<property name=”transport.jms.SubscriptionDurable”>false</property>
<property name=”java.naming.factory.initial”>org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
<property name=”transport.jms.ConnectionFactoryJNDIName”>QueueConnectionFactory</property>
<property name=”transport.jms.DestinationType”>queue</property>
</inputeventadaptor>
[/php]

Once an input adapter is made, it will be listening to the configured JMS provider. Now, we have to configure a stream to carry the messages caught by the adapter.

  • Go to BAM management console, select Main → Event Streams and click Add Event Stream
  • Enter details in the form that appears and click the Add Event Stream button at the end of the form.Here, we have the ability to choose to select the fields we need to monitor in the output stream. Dumping the entire output message into the BAM Cassandra database would be inefficient when it comes to data processing later on. Therefore, we will pick only a few fields from the output message to monitor.Example configuration is shown below.

6_dec_no3.png

Figure 3

Other than the server UI, you can also define the stream definition using a config file, which is in the repository/conf/data-bridge/stream-definitions.xml. However, this file is used to create streams only in the server startup (this file is not hot deployable).

[php]<streamdefinitions >
<streamdefinition>
{
“name”: “StockQuoteStreamMB”,
“version”: “1.0.0”,
“nickName”: “MB”,
“description”: “from MB”,
“payloadData”: [
{
“name”: “volume”,
“type”: “INT”
},
{
“name”: “name”,
“type”: “STRING”
},
{
“name”: “symbol”,
“type”: “STRING”
},
{
“name”: “price”,
“type”: “FLOAT”
}
]}
</streamdefinition>
</streamdefinitions>
[/php]
  • Once the event stream is added successfully, you will be asked if you want to create the event builder for the stream. Event builder links a stream with an adapter.Choose ‘custom event builder’ and click OK.

(Alternatively, perform the following steps. BAM management console → Event Streams → In-Flows of the relevant stream → Receive from External Event Stream(via Event Builder))

  • Enter details in the form that appears and click the Add Event Builder button at the end of the form.For the above example, we need to select the input adapter we created earlier. Since the payload is in the XML format, select the input mapping to be XML.Click ‘Advanced’ button to map the XML data to the stream using XPath. (You can use this tool to derive XPath from the output message)
  • Alternatively, you can specify an event builder configuration using an XML file and save it in/repository/deployment/server/eventbuildersdirectory, which is the event builder deployment directory. Since hot deployment is enabled, you can simply add/remove files to deploy/undeploy from the server.
[php]<?xml version=”1.0″ encoding=”UTF-8″?>
<eventbuilder name=”StockQuoteEventBuilderMB” statistics=”disable” trace=”disable” >
<from eventadaptorname=”mbjmsnputadapter” eventadaptortype=”jms”>
<property name=”transport.jms.Destination”>StockQuotesQueue</property>
</from>
<mapping custommapping=”enable” parentxpath=”/soapenv:Envelope” type=”xml”>
<xpathdefinition namespace=”http://schemas.xmlsoap.org/soap/envelope/” prefix=”soapenv”>
<xpathdefinition namespace=”http://schemas.xmlsoap.org/soap/envelope/” prefix=”soapenv”>
<xpathdefinition namespace=”http://services.samples” prefix=”ns”>
<xpathdefinition namespace=”http://services.samples/xsd” prefix=”ax21″>
<xpathdefinition namespace=”http://www.w3.org/2001/XMLSchema-instance” prefix=”xsi”>
<property>
<from xpath=”/soapenv:Envelope/soapenv:Body/ns:getQuoteResponse/ns:return/ax21:name”>
<to default=”default” name=”name” type=”string”>
</to></from></property>
<property>
<from xpath=”/soapenv:Envelope/soapenv:Body/ns:getQuoteResponse/ns:return/ax21:symbol”>
<to default=”default” name=”symbol” type=”string”>
</to></from></property>
<property>
<from xpath=”/soapenv:Envelope/soapenv:Body/ns:getQuoteResponse/ns:return/ax21:last”>
<to default=”0.0″ name=”price” type=”float”>
</to></from></property>
<property>
<from xpath=”/soapenv:Envelope/soapenv:Body/ns:getQuoteResponse/ns:return/ax21:volume”>
<to default=”0″ name=”volume” type=”int”>
</to></from></property>
</xpathdefinition></xpathdefinition></xpathdefinition></xpathdefinition></xpathdefinition></mapping>
<to streamname=”StockQuoteStreamMB” version=”1.0.0″>
</to></eventbuilder>
[/php]

Once all the configurations are made, you should see the queue in the MB management console under Queues → Browse

Step 5: Edit ESB set up to send a copy of the output messages to MB

  • We will introduce a clone mediator on the output sequence of the proxy service, so that it would send a copy of the response to MB.
  • Edit the proxy service in the Design View.
  • Go to Out-Sequence → Define Out Sequence → Click define inline → Edit
  • Replace the Send mediator with a Clone mediator and add two targets to it.
  • Add a Send mediator to one target. This will be the normal response path to the client.
  • Add a send mediator to the other target and define endpoint inline. This will point to the MB message queue. (Refer [7] for more information)
[php]jms:/StockQuotesQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue
[/php]
  • Add an endpoint property OUT_ONLY with value true. This will prompt ESB to send the JMS message without expecting a response.
  • Once this is completed, the final synapse config of ESB would look like as follows.
[php]<?xml version=”1.0″ encoding=”UTF-8″?>
<definitions >
<registry provider=”org.wso2.carbon.mediation.registry.WSO2Registry”>
<parameter name=”cachableDuration”>15000</parameter>
</registry>
<proxy name=”SSQSProxy” transports=”https http” startonload=”true” trace=”disable”>
<description>
<target>
<endpoint>
<wsdl service=”SimpleStockQuoteService” port=”SimpleStockQuoteServiceHttpSoap11Endpoint” uri=”http://localhost:9000/services/SimpleStockQuoteService?wsdl”>
</wsdl></endpoint>
<insequence>
<log level=”full”>
</log></insequence>
<outsequence>
<log level=”full”>
<clone>
<target>
<sequence>
<send>
</send></sequence>
</target>
<target>
<sequence>
<send>
<endpoint>
<address uri=”jms:/StockQuotesQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue”>
<property name=”OUT_ONLY” value=”true”>
</property></address></endpoint>
</send>
</sequence>
</target>
</clone>
</log></outsequence>
</target>
</description></proxy>
<sequence name=”fault”>
<log level=”full”>
<property name=”MESSAGE” value=”Executing default ‘fault’ sequence”>
<property name=”ERROR_CODE” expression=”get-property(‘ERROR_CODE’)”>
<property name=”ERROR_MESSAGE” expression=”get-property(‘ERROR_MESSAGE’)”>
</property></property></property></log>
<drop>
</drop></sequence>
<sequence name=”main”>
<in>
<log level=”full”>
<filter source=”get-property(‘To’)” regex=”http://localhost:9000.*”>
<send>
</send></filter>
</log></in>
<out>
<send>
</send></out>
<description>The main sequence for the message mediation</description>
</sequence>
</definitions>
[/php]

Step 6: Run the setup

  • Once the configuration is completed run the ESB stock quote client again.Go to /samples/axis2Client and build the backend service with Apache Ant as follows:

ant stockquote -Daddurl=https://localhost:8244/services/SSQSProxy -Dmode=quote -Dsymbol=WSO2

  • You should see request and the response in the ESB console and the proxy service UI.
  • Go to BAM management console and check the message in BAM Cassandra database.Tools → Cassandra Explorer → Connect to ClusterConnection URL: localhost:9163Username: adminPassword: admin
  • You should see the message in the Cassandra explorer.
  • Now, shutdown the BAM server and send another request from the stock quote client.
  • This time, you should see this message being pushed into the MB queue, since there is no listener to the JMS provider.
  • Once you turn the BAM server on again, it will receive the queued message from MB and it gets pushed into the Cassandra database.

Step 7: Create a Hive script and publish the data into a dashboard

  • Once the data is store in the Cassandra database, it can be used for further processing using Hive engine inside BAM.
  • This processed data can then be displayed on a dashboard in BAM.

References

  1. http://www.gartner.com/it-glossary/bam-business-activity-monitoring
  2. https://docs.wso2.com/display/BAM241/About+BAM
  3. http://docs.oracle.com/javaee/7/tutorial/doc/jms-concepts001.htm
  4. https://docs.wso2.com/display/MB210/Message+Broker+Features
  5. https://svn.wso2.org/repos/wso2/people/malith/BAM-2.5.0/BAM-2.5.0-Beta/wso2bam-2.5.0.zip
  6. http://xmlgrid.net/xpath.html
  7. https://docs.wso2.com/display/ESB481/ESB+as+a+JMS+Producer