Discover our knowledge. Read our blogs!

Learn more

We build all our solutions with WSO2 and we are proud that we are WSO2 Premier Certified Integration Partner and Value-Added Reseller.

Learn more

WSO2 Tutorial: Publishing and Subscribing to MQTT events with WSO2 Enterprise Integrator 6.4.0

15 min read

WSO2 Tutorial - Publish snd Subscribe to MQTT Events with WSO2 Enterprise IntegratorMQTT (Message Queuing Telemetry Transport) is an ISO standard publish-subscribe-based messaging protocol. It is designed for connections, e.g. IoT devices with remote locations where a "small code footprint" is required or the network bandwidth is limited. In this WSO2 Tutorial we will publish and subscribe to MQTT messages using WSO2 Enterprise Integrator and Mosquitto.

We start with installing Mosquitto. We do the install on CENTOS 7 and since there is no mosquito package by default. It is necessary to install EPEL (Extra Packages for Enterprise Linux) , an extra software repository. This is full of packages, like Mosquitto, that we can install. For other environments, google install MQTT [your environment].

We will continue with the instructions for CENTOS7.

Installing

Open a terminal session and enter:

sudo yum -y install epel-release

Fill in your user password when asked. If the user is not in the list of sudoers, add the user to the sudoers list. Google how to do so if you do not know to do this.

With EPEL added to the system we can now auto install (using -y parameter) the Mosquitto package.

sudo yum -y install mosquitto

Again enter your password when asked.

There is a default configuration. Let’s stat Mosquitto. Enter the following command in a terminal session:

sudo systemctl start mosquitto

When we want to have mosquito started at boot we enter the following:

sudo systemctl enable mosquitto

Testing by sending a message to a topic

Now let's test the default configuration. The Mosquitto package comes with some command line MQTT clients. We'll use one of them to subscribe to a topic on our broker.

A topic is a label for messages that can be organized in a hierarchy. You can publish to a topic and other people can subscribe to it.

Suppose we have a hierarchy like:

Sensors
    1.    Windowsensors
    2.    Doorsensors
    3.    Temperaturesensor

This of  course is a dummy setup but does show the hierarchy. You can subscribe to all or some topics and will receive messages.

Publish / Subscribe

We are now going to look at publish / subcribe model. We need two terminals since one is meant to subscribe to the topic and the other one is used to publish to the topic.

Type in the first terminal the following command (using the  mosquitto_sub commandline tool) to subscribe to the test topic:

mosquitto_sub -h localhost -t test

-h is used to specify the hostname of the MQTT server, and -t is the topic name. Other settings like port are the default values.

You'll see no output after hitting ENTER because mosquitto_sub is waiting for messages to arrive. Switch back to your other terminal and publish a message:

mosquitto_pub -h localhost -t test -m "hello world"

The options for mosquitto_pub are the same as mosquitto_sub, though this time we use the additional -m option to specify our message. Hit ENTER, and you should see hello world pop up in the other terminal. You've sent your first MQTT message!

Setting up WSO2 Enterprise Integrator 6.4.0

Since the MQTT transport is arranged on the AXIS2 level (with the exception of the Inbound endpoints) we need to edit the [EI-HOME /conf/axis2/axis2.xml file and change (uncomment) the MQTT sender and listener configuration to be as follows:

Since we want to receive and send MQTT messages we also need to uncomment the MQTT transport sender.

The values here can be overwritten when the proxy or inbound endpoint is defined.

<transportReceiver class="org.apache.axis2.transport.mqtt.MqttListener" name="mqtt">
<parameter locked="false" name="mqttConFactory">
<parameter locked="false" name="mqtt.server.host.name">localhost</parameter>
<parameter locked="false" name="mqtt.server.port">1883</parameter>
<parameter locked="false" name="mqtt.client.id">esb.test.listener</parameter>
<parameter locked="false" name="mqtt.topic.name">esb.test1</parameter>
</parameter>
</transportReceiver>

Furthermore we need to download the org.eclipse.paho.client.mqttv3-1.1.0.jar from this link. This jar file needs to be added to the [EI-HOME]/lib. The Enterprise Integrator needs to be restarted since the lib files are only loaded at startup.

Publishing MQTT events

We will create a setup where we simulate sensors that will indicate when a garbage bin equipped with a sensor is more than 75% full.  Since we do not have sensors that will do that we will actually mimic messages being sent in to Mosquitto. In our case we create a simple proxy that will use a payload factory mediator (JSON) to define a message and send it to an MQTT endpoint (the mosquito server on a specific topic.

Let define a message type that simulates the sensor data that we can envision. We have an event with metaData, correlationData and payloadData. The $1,$2 etc are variables that indicate where and when the message comes in. This is actually the format of the payload factory mediator.

{ 
"event": {
           "metaData": {
           "timestamp": $1,
           "sensorId": $2,
           "sensorName": BIN
},
     "correlationData":{
           "longitude": $3,
           "latitude": $4,
},
     "payloadData": {
           "fillpercentage": $5
}
}
}

Let’s start with sending a message in JSON. We will keep the sample simple but will include a more elaborate setup  on the Yenlo bitbucket page.

The data will be  generated using the  random function within certain boundaries. We will simulate a garbage can in the greater Las Vegas area. This is done by defining a rectangular area of longitude / latitude that encompasses Las Vegas. The 36.306316, -115.333456 is the left top hand corner and 35.984696, -114.923976 the right bottom corner.

In order to make it more realistic we will simulate the longitude / latitude with a script mediator and a random function as well as the sensorId and the fill percentage. Timestamp will be taken from the current system date/time.

Two proxies

In order to test out the MQTT transport we create two proxies.

The first proxy will simulate a message being set from a sensor.

What we do is we get a random longitude and latitude, create a random bin number, create a JSON based payload with the message format and the data we just created  We set two properties that indicate that we do not need to wait for an answer (OUT_ONLY) and give back a 202 response (FORCE_SC_ACCEPTED). The endpoint we are sending it to is an MQTT endpoint . We actually define the connection string but override using the parameters at the end of the definition. These are not visible in the graphical overview.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="BinProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="mqtt:/BinProxy?mqtt.server.host.name=localhost&amp;mqtt.server.port=1883&amp;mqtt.client.id=eyenlo&amp;mqtt.topic.name=bin&amp;mqtt.subscription.qos=2&amp;mqtt.blocking.sender=true"/>
        </endpoint>
        <inSequence>
            <sequence key="LVLONGLAT"/>
            <call-template target="RandomFunc">
                <with-param name="min" value="0"/>
                <with-param name="max" value="10000"/>
            </call-template>
            <payloadFactory media-type="json">
                <format>
                {
     "event": {
           "metaData": {
           "timestamp": $1,
           "sensorId": $2,
           "sensorName": BIN
},
     "correlationData":{
           "longitude": $3,
           "latitude": $4,
},
     "payloadData": {
           "fillpercentage": $5
          
}
}
}
</format>
                <args>
                    <arg evaluator="xml" expression="get-property('SYSTEM_DATE')"/>
                    <arg evaluator="xml" expression="get-property('RANDNO')"/>
                    <arg evaluator="xml" expression="get-property('LVLAT')"/>
                    <arg evaluator="xml" expression="get-property('LVLON')"/>
                    <arg value="100"/>
                </args>
            </payloadFactory>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>
    <parameter name="mqtt.content.type">text/plain</parameter>
    <parameter name="mqtt.subscription.qos">2</parameter>
    <parameter name="mqtt.topic.name">bin</parameter>
    <parameter name="mqtt.session.clean">false</parameter>
</proxy>

Listening proxy

The other proxy listens on the topic. This one is even more spartan.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ListenProxy" startOnLoad="true" transports="mqtt" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <log level="full"/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>
    <parameter name="mqtt.content.type">text/plain</parameter>
    <parameter name="mqtt.subscription.qos">2</parameter>
    <parameter name="mqtt.topic.name">bin</parameter>
    <parameter name="mqtt.session.clean">false</parameter>

It is on the MQTT transport and the actual topic is defined in the parameters.

The message that is read from MQTT is subsequently put in a full log mediator showing the JSON message in a soap message (everything in the Enterprise Integrator is by default in a / a soap message.

The log mediator puts the message on the console (and in wso2carbon.log).

What do we have?

We have a proxy that we called creates a dummy message, however with realistic data.

This is a sample request:

{    
     "metaData": {
           "timestamp": 12/20/18 4:42 PM,
           "sensorId": 3590.0,
           "sensorName": BIN
},
     "correlationData":{
           "longitude": -115.162291,
           "latitude": 36.224301,
},
     "payloadData": {
           "fillpercentage": 100 
}

When we map this on Google Maps we see that we are in Las Vegas.

Please leave a comment if you have any questions or have a look at one of my other blogs.

Care to share?
   
Picture of Rob Blaauboer
Published August 8, 2019

Rob Blaauboer

Rob is a Senior Business Consultant and Solution Architect with more than twenty years experience. In addition to his work he is an active blogger working on a number of articles on the ‘Internet of Things’ and a WSO2 ‘Getting Started with …’ series (WSO2 tutorial) in which he talks about WSO2 components and their purpose especially aimed at non technical readers. Rob is a WSO2 expert and official WSO2 trainer.

Responses

Stay up to date with the latest articles