MQTT (Message Queuing Telemetry Transport) is an ISO standard publish-subscribe-based messaging protocol. It is designed for connections, e.g., IoT devices with remote locations where a “small code footprint” is required or the network bandwidth is limited.
In this article we will publish and subscribe to MQTT messages using WSO2 Micro Integrator and Mosquitto.
We start with installing Mosquitto. We do the installation on CENTOS 7 and since there is no mosquito package by default. It is necessary to install epel (Extra Packages for Enterprise Linux) , an extra software repository. This is full of packages like Mosquitto that we can install. For other environments, google install mqtt [your environment].
We will continue with the instructions for CENTOS7.
Installing
Open a terminal session and enter:
sudo yum -y install epel-release
Fill in your user password when asked. If the user is not in the list of sudoers, add the user to the sudoers list. Google how to do so if you do not know how to do this.
With EPEL added to the system we can now auto install (using -y parameter) the mosquitto package.
sudo yum -y install mosquitto
Again, enter your password when asked.
There is a default configuration. Let us start mosquitto. Enter the following command in a terminal session:
sudo systemctl start mosquitto
When we want to have mosquito started at boot we enter the following:
sudo systemctl enable mosquitto
Testing by sending a message to a topic
Now let us test the default configuration. The mosquitto package comes with some command line MQTT clients. We will use one of them to subscribe to a topic on our broker.
A topic is a label for messages that can be organized in a hierarchy. You can publish to a topic and other people can subscribe to it.
Suppose we have a hierarchy like:
Sensors
- Window sensors
- Door sensors
- Temperature sensors
This of course is a dummy setup but does show the hierarchy. You can subscribe to all or some topics and will receive messages.
Publish or subscibe
We are now going to look at the publish / subscribe model. We need two terminals since one is meant to subscribe to the topic and the other one is used to publish to the topic.
Type in the first terminal the following command (using the mosquitto_sub commandline tool) to subscribe to the test topic:
mosquitto_sub -h localhost -t test
-h is used to specify the hostname of the MQTT server, and -t is the topic name. Other settings like port are the default values.
You will see no output after hitting ENTER because mosquitto_sub is waiting for messages to arrive. Switch back to your other terminal and publish a message:
mosquitto_pub -h localhost -t test -m “hello world”
The options for mosquitto_pub are the same as mosquitto_sub, though this time we use the additional -m option to specify our message. Hit ENTER, and you should see hello world pop up in the other terminal. You have sent your first MQTT message!
Setting up the Micro Integrator 4.2.0
Since the MQTT transport is arranged on the AXIS2 level (with the exception of the Inbound endpoints) we need to edit the [MI-HOME /conf/deployment.toml file and change (uncomment) the MQTT sender and listener configuration to be as follows:
[[transport.mqtt]listener.enable = true
sender.enable = true
Since we want to receive and send mqtt messages we also need to uncomment the mqtt transport sender. Support for MQTT is standard in MI 4.2.0.
Furthermore we need to download the org.eclipse.paho.client.mqttv3-1.2.0.jar from this link. This jar file needs to be added to the [MI-HOME]/lib. The Micro Integrator needs to be restarted since the lib files are only loaded at startup.
We do not have to configure the actual port and so on in this file, we will do it in a proxy.
Publishing MQTT events
We will create a setup where we simulate sensors that will indicate when a garbage bin equipped with a sensor is more than 75% full. Since we do not have sensors that will do that we will actually mimic messages being sent in to Mosquitto.
In our case we create a simple proxy that will use a payload factory mediator (json) to define a message and send it to an mqtt endpoint (the mosquito server on a specific topic.
Let us define a message type that simulates the sensor data that we can envision. We have an event with metaData, correlationData and payloadData. The $1, $2 etc are variables that indicate where and when the message comes in. This is actually the format of the payload factory mediator.
{
"event": {
"metaData": {
"timestamp": $1,
"sensorId": $2,
"sensorName": BIN
},
"correlationData":{
"longitude": $3,
"latitude": $4,
},
"payloadData": {
"fillpercentage": $5
}
}
}
Let us start with sending a message in Json. We will keep the sample simple but will include a more elaborate setup on the Yenlo bitbucket page.
The data will be generated using the random function within certain boundaries. We will simulate a garbage can in the greater Las Vegas area. This is done by defining a rectangular area of longitude / latitude that encompasses Las Vegas. The 36.306316, -115.333456 is the left top hand corner and 35.984696, -114.923976 the right bottom corner.
In order to make it more realistic we will simulate the longitude / latitude with a script mediator and a random function as well as the sensorId and the fill percentage. Timestamp will be taken from the current system date/time.
Two proxies
In order to test out the mqtt transport we create two proxies.
The first proxy will simulate a message being set from a sensor.
What we do is we get a random longitude and latitude, create a random bin number, create a json based payload with the message format and the data we just created We set two properties that indicate that we do not need to wait for an answer (OUT_ONLY) and give back a 202 response (FORCE_SC_ACCEPTED). The endpoint we are sending it to is an mqqt endpoint . We actually define the connection string but override using the parameters at the end of the definition. These are not visible in the graphical overview.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="BinProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<endpoint>
<address uri="mqtt:/BinProxy?mqtt.server.host.name=localhost&mqtt.server.port=1883&mqtt.client.id=yenlo&mqtt.topic.name=bin&mqtt.subscription.qos=2&mqtt.blocking.sender=true">
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>1</progressionFactor>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</address>
</endpoint>
<inSequence>
<!-- -->
<property name="LVLAT" scope="default" type="FLOAT" value="5"/>
<property name="LVLON" scope="default" type="STRING" value="5"/>
<sequence key="SET_LAT_LON_LV"/>
<call-template target="RandomFunc">
<with-param name="min" value="0"/>
<with-param name="max" value="10000"/>
</call-template>
<payloadFactory media-type="json">
<format>
{
"event": {
"metaData": {
"timestamp": $1,
"sensorId": $2,
"sensorName": BIN
},
"correlationData":{
"latitude": $3,
"longitude": $4,
},
"payloadData": {
"fillpercentage": $5
}
}
}
</format>
<args>
<arg evaluator="xml" expression="get-property('SYSTEM_DATE')"/>
<arg evaluator="xml" expression="get-property('RANDNO')"/>
<arg evaluator="xml" expression="get-property('LVLAT')"/>
<arg evaluator="xml" expression="get-property('LVLON')"/>
<arg value="100"/>
</args>
</payloadFactory>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
<parameter name="mqtt.connection.factory">mqttConFactory</parameter>
<parameter name="mqtt.content.type">application/json</parameter>
<parameter name="mqtt.subscription.qos">2</parameter>
<parameter name="mqtt.topic.name">bin</parameter>
<parameter name="mqtt.session.clean">false</parameter>
</proxy>
Listening proxy
The other proxy listens on the topic. This one is even more spartan.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ListenProxy" startOnLoad="true" transports="mqtt" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<!-- -->
<log level="full"/>
<drop/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
<parameter name="mqtt.connection.factory">mqttConFactory</parameter>
<parameter name="mqtt.content.type">application/json</parameter>
<parameter name="mqtt.server.port">1883</parameter>
<parameter name="mqtt.subscription.qos">2</parameter>
<parameter name="mqtt.client.id">yenlo</parameter>
<parameter name="mqtt.topic.name">bin</parameter>
<parameter name="mqtt.session.clean">false</parameter>
<parameter name="mqtt.server.host.name">localhost</parameter>
</proxy>
It is on the mqtt transport, and the actual topic is defined in the parameters.
The message that is read from mqtt is subsequently put in a full log mediator showing the json message in a soap message (everything in the Micro Integrator is by default a soap message).
The log mediator puts the message on the console (and in wso2carbon.log).
Sequence and sequence template
There are also a sequence and a sequence template in use.
The content is quite simple. I use JavaScript to calculate random values. The Sequence looks like this.
I have hardcoded the top left and bottom right Lat / Lon and subtract the values for both Latitude and Longitude and multiply it with a random number and add the bottom value to get the coordinate. That is stored in a value.
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="SET_LAT_LON_LV" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<property name="left_lat" scope="default" type="STRING" value="36.306316"/>
<property name="left_lon" scope="default" type="STRING" value="-115.333456"/>
<property name="right_lat" scope="default" type="STRING" value="35.983605"/>
<property name="right_lon" scope="default" type="STRING" value="-114.934917"/>
<script language="js"><![CDATA[var result, result2, i, j, mini, mini2, maxi;
//calculate lat
result = '';
mini = mc.getProperty('left_lat');
maxi = mc.getProperty('right_lat');
result = ((mini - maxi)*Math.random())+ ~~mini;
result2 = result.toFixed(6);
mc.setProperty("LVLAT", result2);
// calculate lon
mini = mc.getProperty('left_lon');
maxi = mc.getProperty('right_lon');
result = (mini - maxi)*Math.random()+ ~~mini;
result2 = result.toFixed(6);
mc.setProperty("LVLON", result2);]]></script>
</sequence>
De Sequence template
The same structure as the other one.
<?xml version="1.0" encoding="UTF-8"?>
<template name="RandomFunc" xmlns="http://ws.apache.org/ns/synapse">
<parameter defaultValue="" isMandatory="false" name="min"/>
<parameter defaultValue="" isMandatory="false" name="max"/>
<sequence>
<property expression="$func:min" name="minval" scope="default" type="STRING"/>
<property expression="$func:max" name="maxval" scope="default" type="STRING"/>
<script language="js"><![CDATA[var result, i, j, mini, maxi;
result = '';
mini = Math.ceil(mc.getProperty('minval'));
maxi = Math.floor(mc.getProperty('maxval'));
result = Math.round(Math.floor(Math.random() * (maxi - mini)) + mini); mc.setProperty("RANDNO", result);]]></script>
<log description="" level="custom">
<property expression="get-property('RANDNO')" name="test"/>
</log>
</sequence>
</template>
Executing the BinProxy
When we create a simple Soap project we can use the BinProxy’s WSDL to create a SOAP message,
We do not see anything in the response windows as this is an asynchronous call.
What do we have?
But on the console we have a dummy message, however with realistic data.
This is a sample request:
{
"event": {
"metaData": {
"timestamp": 6/9/23, 1:27 PM,
"sensorId": 2439.0,
"sensorName": BIN
},
"correlationData":{
"latitude": 36.126444,
"longitude": -115.297177,
},
"payloadData": {
"fillpercentage": 100
}
}
}
{
When we map this on Google Maps we see that we are in Las Vegas.
In the next blog we will turn the proxy into a task and will simulate an increasingly larger number of messages that eventually will trigger a garbage collection run. This has nothing to do with memory garbage collection but more with the real-world garbage collection that we simulate.