info@yenlo.com
deu
Menu
WSO2 11 min

Wie man MQTT-Ereignisse mit WSO2 MI 4.2.0 veröffentlicht und abonniert

Tauchen Sie ein in die Feinheiten der MQTT-Ereignisverarbeitung mit WSO2 MI 4.2.0 und Rob Blaauboer von Yenlo. Entdecken Sie, wie Sie Ihre IoT-Nachrichtenprozesse optimieren können.

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
Wie man MQTT Ereignisse mit WSO2 MI 4.2.0 veroeffentlicht und abonniert

MQTT (Message Queuing Telemetry Transport) ist ein nach dem Publish-Subscribe-Muster aufgebautes Nachrichtenprotokoll nach dem ISO-Standard. Es ist für Verbindungen wie z.B. IoT-Geräte mit entfernten Standorten konzipiert, bei denen eine „geringe Code-Größe“ erforderlich ist oder die Netzwerkbandbreite begrenzt ist.

In diesem Artikel werden wir veröffentlichen und MQTT-Nachrichten mit Hilfe von WSO2 Micro Integrator und Mosquitto abonnieren.

Wir beginnen mit der Installation von Mosquitto. Die Installation erfolgt auf CENTOS 7, und da es standardmäßig kein Mosquitto-Paket gibt, ist es notwendig, das epel (Extra Packages for Enterprise Linux), ein zusätzliches Software-Repository, zu installieren. Dort sind Pakete wie Mosquitto verfügbar, die wir installieren können. Für andere Umgebungen suchen Sie nach „MQTT installieren“ gefolgt von Ihrer Umgebung.
Wir werden mit den Anweisungen für CENTOS7 fortfahren

Installation

Öffnen Sie eine Terminal-Sitzung und geben Sie ein:

sudo yum -y install epel-release

Geben Sie Ihr Benutzerpasswort ein, wenn Sie dazu aufgefordert werden. Wenn der Benutzer nicht in der Liste der Sudoers ist, fügen Sie den Benutzer zur Sudoers-Liste hinzu. Suchen Sie in Google nach Anleitungen, falls Sie nicht wissen, wie Sie dies tun sollen.
Nachdem EPEL dem System hinzugefügt wurde, können wir das Mosquitto-Paket jetzt automatisch installieren (mit dem Parameter -y).

sudo yum -y install mosquitto

Geben Sie erneut Ihr Passwort ein, wenn Sie dazu aufgefordert werden..
Es gibt eine Standardkonfiguration. Lassen Sie uns Mosquitto starten. Geben Sie den folgenden Befehl in einer Terminal-Sitzung ein:

sudo systemctl start mosquitto

Wenn wir möchten, dass Mosquitto beim Start gestartet wird, geben wir Folgendes ein:

sudo systemctl enable mosquitto

Testen durch Senden einer Nachricht an ein Thema

Lassen Sie uns nun die Standardkonfiguration testen. Das Mosquitto-Paket wird mit einigen Befehlszeilen-MQTT-Clients geliefert. Wir werden einen davon verwenden, um uns an ein Thema auf unserem Broker zu abonnieren.

Ein Thema ist ein Label für Nachrichten, die hierarchisch organisiert sein können. Sie können an ein Thema veröffentlichen, und andere Personen können sich dafür abonnieren.

Nehmen wir an, wir haben eine Hierarchie wie folgt:

Sensoren

  • Fenstersensoren
  • Türsensoren
  • Temperatursensor

Dies ist natürlich eine fiktive Einrichtung, aber es zeigt die Hierarchie auf. Sie können sich für alle oder einige Themen abonnieren und Nachrichten empfangen.

Veröffentlichen / Abonnieren

Lassen Sie uns nun das Publish/Subscribe-Modell betrachten. Dazu benötigen wir zwei Terminalfenster, da wir eines verwenden, um das Thema zu abonnieren, und das andere, um auf das Thema zu veröffentlichen.

Lassen Sie uns nun das Publish/Subscribe-Modell betrachten. Dazu benötigen wir zwei Terminalfenster, da wir eines verwenden, um das Thema zu abonnieren, und das andere, um auf das Thema zu veröffentlichen.

mosquitto_sub -h localhost -t test

Der Parameter -h wird verwendet, um den Hostnamen des MQTT-Servers anzugeben, und -t ist der Themenname. Andere Einstellungen wie der Port sind Standardwerte.

mqtt events Testen durch Senden einer Nachricht an ein Thema

Sie sehen keine Ausgabe, nachdem Sie die Eingabetaste gedrückt haben, da mosquitto_sub auf das Eintreffen von Nachrichten wartet. Wechseln Sie zum anderen Terminalfenster und veröffentlichen Sie eine Nachricht:

    mosquitto_pub -h localhost -t test -m “hello world”

mqtt events Testen durch Senden einer Nachricht an ein Thema 2

Die Optionen für mosquitto_pub sind die gleichen wie für mosquitto_sub, aber diesmal verwenden wir die zusätzliche Option -m, um unsere Nachricht anzugeben. Drücken Sie die Eingabetaste und Sie sollten „Hallo Welt“ im anderen Terminalfenster sehen. Sie haben Ihre erste MQTT-Nachricht gesendet!

mqtt events Testen durch Senden einer Nachricht an ein Thema 3

Einrichten des Micro Integrator 4.2.0

Da der MQTT-Transport auf der AXIS2-Ebene (mit Ausnahme der Inbound Endpoints) angeordnet ist, müssen wir die Konfiguration des MQTT-Senders und -Listeners in der Datei [MI-HOME/conf/deployment.toml (auskommentieren)] wie folgt ändern:

[[transport.mqtt]

listener.enable = true

sender.enable = true

Einrichten des Micro Integrator 4.2.0

Da wir MQTT-Nachrichten empfangen und senden möchten, müssen wir auch den MQTT-Transport-Sender auskommentieren. Die Unterstützung für MQTT ist standardmäßig in MI 4.2.0 enthalten.

Darüber hinaus müssen wir die Datei org.eclipse.paho.client.mqttv3-1.2.0.jar von diesem Link herunterladen. Diese JAR-Datei muss dem [MI-HOME]/lib-Verzeichnis hinzugefügt werden. Der Micro Integrator muss neu gestartet werden, da die Bibliotheksdateien nur beim Start geladen werden.

In dieser Datei müssen wir den tatsächlichen Port und andere Einstellungen nicht konfigurieren. Das werden wir in einem Proxy tun.

Veröffentlichen von MQTT-Ereignissen

Wir werden eine Konfiguration erstellen, um Sensoren zu simulieren, die anzeigen, ob ein Mülleimer zu mehr als 75 % gefüllt ist. Da wir keine echten Sensoren haben, die dies tun, simulieren wir das Senden von Nachrichten an Mosquitto.

In unserem Fall erstellen wir einen einfachen Proxy, der einen Payload Factory Mediator (JSON) verwendet, um eine Nachricht zu definieren und sie an einen MQTT-Endpunkt (den Mosquitto-Server auf einem bestimmten Thema) zu senden.

Definieren wir einen Nachrichtentyp, der die Sensordaten simuliert, die wir uns vorstellen können. Wir haben ein Ereignis mit Metadaten, Korrelationsdaten und Nutzlastdaten. Die $1, $2 usw. sind Variablen, die anzeigen, wo und wann die Nachricht hereinkommt. Dies ist tatsächlich das Format des Payload Factory Mediators.

{
      "event": {
           "metaData": {
           "timestamp": $1,
           "sensorId": $2,
           "sensorName": BIN
},
      "correlationData":{
           "longitude": $3,
           "latitude": $4,
},
      "payloadData": {
           "fillpercentage": $5
          
}
}
}

Beginnen wir mit dem Senden einer Nachricht im JSON-Format. Wir halten das Beispiel einfach, aber auf der Yenlo-Bitbucket-Seite finden Sie ein umfangreicheres Setup.

Die Daten werden mit einer Zufallsfunktion innerhalb bestimmter Grenzen generiert. Wir simulieren einen Mülleimer in der Umgebung von Las Vegas. Dies wird erreicht, indem wir einen rechteckigen Bereich von Längen- / Breitengraden definieren, der Las Vegas abdeckt. Die Koordinaten 36.306316, -115.333456 stellen die linke obere Ecke dar und 35.984696, -114.923976 die rechte untere Ecke.

Um es realistischer zu machen, simulieren wir den Längen- / Breitengrad mit einem Skript-Mediator und einer Zufallsfunktion sowie die Sensor-ID und den Füllungsprozentsatz. Der Zeitstempel wird aus dem aktuellen Systemdatum und der Uhrzeit abgeleitet.

Zwei proxies

Um den MQTT-Transport zu testen, erstellen wir zwei Proxies.

Der erste Proxy simuliert eine Nachricht, die von einem Sensor gesendet wird.

Zwei proxies

Was wir tun, ist, dass wir eine zufällige Längen- und Breitengrad erhalten, eine zufällige Behälternummer erstellen und eine JSON-Nutzlast erstellen, die auf dem Nachrichtenformat und den gerade erstellten Daten basiert. Wir setzen zwei Eigenschaften, die anzeigen, dass wir nicht auf eine Antwort warten müssen (OUT_ONLY) und geben eine 202-Antwort zurück (FORCE_SC_ACCEPTED). Der Endpunkt, an den wir die Nachricht senden, ist ein MQTT-Endpunkt. Wir definieren den Verbindungsstring tatsächlich, überschreiben ihn jedoch mit den Parametern am Ende der Definition. Diese sind in der grafischen Übersicht nicht sichtbar.

<?xml version="1.0" encoding="UTF-8"?>

<proxy name="BinProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">

    <target>

        <endpoint>

            <address uri="mqtt:/BinProxy?mqtt.server.host.name=localhost&mqtt.server.port=1883&mqtt.client.id=yenlo&mqtt.topic.name=bin&mqtt.subscription.qos=2&mqtt.blocking.sender=true">

                <suspendOnFailure>

                    <initialDuration>-1</initialDuration>

                    <progressionFactor>1</progressionFactor>

                </suspendOnFailure>

                <markForSuspension>

                    <retriesBeforeSuspension>0</retriesBeforeSuspension>

                </markForSuspension>

            </address>

        </endpoint>

        <inSequence>

            <!--  -->

            <property name="LVLAT" scope="default" type="FLOAT" value="5"/>

            <property name="LVLON" scope="default" type="STRING" value="5"/>

            <sequence key="SET_LAT_LON_LV"/>

            <call-template target="RandomFunc">

                <with-param name="min" value="0"/>

                <with-param name="max" value="10000"/>

            </call-template>

            <payloadFactory media-type="json">

                <format>

                {

     "event": {

           "metaData": {

           "timestamp": $1,

           "sensorId": $2,

           "sensorName": BIN

},

     "correlationData":{

           "latitude": $3,

           "longitude": $4,

},

     "payloadData": {

           "fillpercentage": $5

}

}

}

</format>

                <args>

                    <arg evaluator="xml" expression="get-property('SYSTEM_DATE')"/>

                    <arg evaluator="xml" expression="get-property('RANDNO')"/>

                    <arg evaluator="xml" expression="get-property('LVLAT')"/>

                    <arg evaluator="xml" expression="get-property('LVLON')"/>

                    <arg value="100"/>

                </args>

            </payloadFactory>

            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>

            <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>

        </inSequence>

        <outSequence/>

        <faultSequence/>

    </target>

    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>

    <parameter name="mqtt.content.type">application/json</parameter>

    <parameter name="mqtt.subscription.qos">2</parameter>

    <parameter name="mqtt.topic.name">bin</parameter>

    <parameter name="mqtt.session.clean">false</parameter>

</proxy>

Empfangender proxy

Der andere Proxy lauscht auf dem Thema. Dieser ist noch einfacher.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ListenProxy" startOnLoad="true" transports="mqtt" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <!--  -->
            <log level="full"/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>
    <parameter name="mqtt.content.type">application/json</parameter>
    <parameter name="mqtt.server.port">1883</parameter>
    <parameter name="mqtt.subscription.qos">2</parameter>
    <parameter name="mqtt.client.id">yenlo</parameter>
    <parameter name="mqtt.topic.name">bin</parameter>
    <parameter name="mqtt.session.clean">false</parameter>
    <parameter name="mqtt.server.host.name">localhost</parameter>
</proxy>

Es verwendet den MQTT-Transport und das tatsächliche Thema ist in den Parametern definiert.

Die Nachricht, die aus MQTT gelesen wird, wird dann in einem vollständigen Log-Mediator angezeigt, der die JSON-Nachricht in eine SOAP-Nachricht umwandelt (standardmäßig ist in Micro Integrator alles eine SOAP-Nachricht).

Der Log-Mediator gibt die Nachricht auf der Konsole aus (und in der wso2carbon.log-Datei).

 

Sequenz und Sequenzvorlage

Es werden auch eine Sequenz und eine Sequenzvorlage verwendet.

Empfangender proxy

Der Inhalt ist recht einfach. Ich verwende JavaScript, um zufällige Werte zu berechnen. Die Sequenz sieht wie folgt aus.

Sequenz

Ich habe die linke obere Ecke fest codiert und ziehe die Werte für Breiten- und Längengrad ab und multipliziere sie mit einer Zufallszahl und füge den unteren Wert hinzu, um die Koordinate zu erhalten. Das wird in einer Variable gespeichert.

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="SET_LAT_LON_LV" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="left_lat" scope="default" type="STRING" value="36.306316"/>
    <property name="left_lon" scope="default" type="STRING" value="-115.333456"/>
    <property name="right_lat" scope="default" type="STRING" value="35.983605"/>
    <property name="right_lon" scope="default" type="STRING" value="-114.934917"/>
    <script language="js"><![CDATA[var result, result2, i, j, mini, mini2, maxi; 
                 //calculate lat
                 result = ''; 
  
                 mini = mc.getProperty('left_lat');
                 maxi = mc.getProperty('right_lat');
  				 result = ((mini - maxi)*Math.random())+ ~~mini;        
                 result2 =  result.toFixed(6);        
                 mc.setProperty("LVLAT", result2);
                
                 // calculate lon
                 mini = mc.getProperty('left_lon');
                 maxi = mc.getProperty('right_lon');
  				 result = (mini - maxi)*Math.random()+ ~~mini;       
                 result2 =  result.toFixed(6);      
                 mc.setProperty("LVLON", result2);]]></script>
</sequence>

Die Sequenzvorlage

Die Sequenzvorlage

Die gleiche Struktur wie bei der anderen.

<?xml version="1.0" encoding="UTF-8"?>
<template name="RandomFunc" xmlns="http://ws.apache.org/ns/synapse">
    <parameter defaultValue="" isMandatory="false" name="min"/>
    <parameter defaultValue="" isMandatory="false" name="max"/>
    <sequence>
        <property expression="$func:min" name="minval" scope="default" type="STRING"/>
        <property expression="$func:max" name="maxval" scope="default" type="STRING"/>
        <script language="js"><![CDATA[var result, i, j, mini, maxi; 
            result = ''; 
	mini = Math.ceil(mc.getProperty('minval'));
 	maxi = Math.floor(mc.getProperty('maxval'));
  	result = Math.round(Math.floor(Math.random() * (maxi - mini)) + mini);      mc.setProperty("RANDNO", result);]]></script>
        <log description="" level="custom">
            <property expression="get-property('RANDNO')" name="test"/>
        </log>
    </sequence>
</template>

Ausführen des BinProxy

Wenn wir ein einfaches SOAP-Projekt erstellen, können wir die WSDL des BinProxy verwenden, um eine SOAP-Nachricht zu erstellen,

Ausführen des BinProxy

Im Antwortfenster sehen wir nichts, da dies ein asynchroner Aufruf ist.

Was haben wir?

Auf der Konsole haben wir eine Dummy-Nachricht, jedoch mit realistischen Daten.

Hier ist ein Beispiel für eine Anfrage:

{

     "event": {

           "metaData": {

           "timestamp": 6/9/23, 1:27 PM,

           "sensorId": 2439.0,

           "sensorName": BIN

},

     "correlationData":{

           "latitude": 36.126444,

           "longitude": -115.297177,

},

     "payloadData": {

           "fillpercentage": 100

}

}

}

{

Wenn wir dies auf Google Maps abbilden, sehen wir, dass wir uns in Las Vegas befinden

mqtt location

Im nächsten Blog werden wir den Proxy in eine Aufgabe umwandeln und eine zunehmend größere Anzahl von Nachrichten simulieren, die schließlich einen Müllsammlungsvorgang auslösen werden. Dies hat nichts mit der Speicherbereinigung zu tun, sondern simuliert die Müllsammlung in der realen Welt.

deu
Schließen
Was ist auf unserer Speisekarte