MQTT (Message Queuing Telemetry Transport) is an ISO standard publish-subscribe-based messaging protocol. It is designed for connections, e.g. IoT devices with remote locations where a "small code footprint" is required or the network bandwidth is limited. In this WSO2 Tutorial we will publish and subscribe to MQTT messages using WSO2 Enterprise Integrator and Mosquitto.
We start with installing Mosquitto. We do the install on CENTOS 7 and since there is no mosquito package by default. It is necessary to install EPEL (Extra Packages for Enterprise Linux) , an extra software repository. This is full of packages, like Mosquitto, that we can install. For other environments, google install MQTT [your environment].
We will continue with the instructions for CENTOS7.
Open a terminal session and enter:
sudo yum -y install epel-release
Fill in your user password when asked. If the user is not in the list of sudoers, add the user to the sudoers list. Google how to do so if you do not know to do this.
With EPEL added to the system we can now auto install (using -y parameter) the Mosquitto package.
sudo yum -y install mosquitto
Again enter your password when asked.
There is a default configuration. Let’s stat Mosquitto. Enter the following command in a terminal session:
sudo systemctl start mosquitto
When we want to have mosquito started at boot we enter the following:
sudo systemctl enable mosquitto
Testing by sending a message to a topic
Now let's test the default configuration. The Mosquitto package comes with some command line MQTT clients. We'll use one of them to subscribe to a topic on our broker.
A topic is a label for messages that can be organized in a hierarchy. You can publish to a topic and other people can subscribe to it.
Suppose we have a hierarchy like:Sensors
This of course is a dummy setup but does show the hierarchy. You can subscribe to all or some topics and will receive messages.
Publish / Subscribe
We are now going to look at publish / subcribe model. We need two terminals since one is meant to subscribe to the topic and the other one is used to publish to the topic.
Type in the first terminal the following command (using the mosquitto_sub commandline tool) to subscribe to the test topic:
mosquitto_sub -h localhost -t test
-h is used to specify the hostname of the MQTT server, and -t is the topic name. Other settings like port are the default values.
You'll see no output after hitting ENTER because mosquitto_sub is waiting for messages to arrive. Switch back to your other terminal and publish a message:
mosquitto_pub -h localhost -t test -m "hello world"
The options for mosquitto_pub are the same as mosquitto_sub, though this time we use the additional -m option to specify our message. Hit ENTER, and you should see hello world pop up in the other terminal. You've sent your first MQTT message!
Setting up WSO2 Enterprise Integrator 6.4.0
Since the MQTT transport is arranged on the AXIS2 level (with the exception of the Inbound endpoints) we need to edit the [EI-HOME /conf/axis2/axis2.xml file and change (uncomment) the MQTT sender and listener configuration to be as follows:
Since we want to receive and send MQTT messages we also need to uncomment the MQTT transport sender.
The values here can be overwritten when the proxy or inbound endpoint is defined.
<transportReceiver class="org.apache.axis2.transport.mqtt.MqttListener" name="mqtt">
<parameter locked="false" name="mqttConFactory">
<parameter locked="false" name="mqtt.server.host.name">localhost</parameter>
<parameter locked="false" name="mqtt.server.port">1883</parameter>
<parameter locked="false" name="mqtt.client.id">esb.test.listener</parameter>
<parameter locked="false" name="mqtt.topic.name">esb.test1</parameter>
Furthermore we need to download the org.eclipse.paho.client.mqttv3-1.1.0.jar from this link. This jar file needs to be added to the [EI-HOME]/lib. The Enterprise Integrator needs to be restarted since the lib files are only loaded at startup.
Publishing MQTT events
We will create a setup where we simulate sensors that will indicate when a garbage bin equipped with a sensor is more than 75% full. Since we do not have sensors that will do that we will actually mimic messages being sent in to Mosquitto. In our case we create a simple proxy that will use a payload factory mediator (JSON) to define a message and send it to an MQTT endpoint (the mosquito server on a specific topic.
Let define a message type that simulates the sensor data that we can envision. We have an event with metaData, correlationData and payloadData. The $1,$2 etc are variables that indicate where and when the message comes in. This is actually the format of the payload factory mediator.
Let’s start with sending a message in JSON. We will keep the sample simple but will include a more elaborate setup on the Yenlo bitbucket page.
The data will be generated using the random function within certain boundaries. We will simulate a garbage can in the greater Las Vegas area. This is done by defining a rectangular area of longitude / latitude that encompasses Las Vegas. The 36.306316, -115.333456 is the left top hand corner and 35.984696, -114.923976 the right bottom corner.
In order to make it more realistic we will simulate the longitude / latitude with a script mediator and a random function as well as the sensorId and the fill percentage. Timestamp will be taken from the current system date/time.
In order to test out the MQTT transport we create two proxies.
The first proxy will simulate a message being set from a sensor.
What we do is we get a random longitude and latitude, create a random bin number, create a JSON based payload with the message format and the data we just created We set two properties that indicate that we do not need to wait for an answer (OUT_ONLY) and give back a 202 response (FORCE_SC_ACCEPTED). The endpoint we are sending it to is an MQTT endpoint . We actually define the connection string but override using the parameters at the end of the definition. These are not visible in the graphical overview.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="BinProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<with-param name="min" value="0"/>
<with-param name="max" value="10000"/>
<arg evaluator="xml" expression="get-property('SYSTEM_DATE')"/>
<arg evaluator="xml" expression="get-property('RANDNO')"/>
<arg evaluator="xml" expression="get-property('LVLAT')"/>
<arg evaluator="xml" expression="get-property('LVLON')"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
The other proxy listens on the topic. This one is even more spartan.
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ListenProxy" startOnLoad="true" transports="mqtt" xmlns="http://ws.apache.org/ns/synapse">
It is on the MQTT transport and the actual topic is defined in the parameters.
The message that is read from MQTT is subsequently put in a full log mediator showing the JSON message in a soap message (everything in the Enterprise Integrator is by default in a / a soap message.
The log mediator puts the message on the console (and in wso2carbon.log).
What do we have?
We have a proxy that we called creates a dummy message, however with realistic data.
This is a sample request:
"timestamp": 12/20/18 4:42 PM,
When we map this on Google Maps we see that we are in Las Vegas.
Please leave a comment if you have any questions or have a look at one of my other blogs.