info@yenlo.com
ned
Menu
WSO2 11 min

Hoe MQTT-gebeurtenissen publiceren en abonneren met WSO2 MI 4.2.0

Duik in de complexiteiten van MQTT Event afhandeling met behulp van WSO2 MI 4.2.0, samen met Rob Blaauboer bij Yenlo. Ontdek hoe je jouw IoT-berichtgevingsprocessen kunt stroomlijnen.

Rob Blaauboer
Rob Blaauboer
Integration Consultant & WSO2 Trainer
Hoe MQTT gebeurtenissen publiceren en abonneren met WSO2 MI 4.2.0

MQTT (Message Queuing Telemetry Transport) is een op de publish-subscribe gebaseerd berichtprotocol dat voldoet aan de ISO-standaard. Het is ontworpen voor verbindingen, zoals IoT-apparaten met afgelegen locaties waar een “kleine code footprint” vereist is of de netwerkbandbreedte beperkt is.

In dit artikel zullen we MQTT-berichten publiceren en abonneren met behulp van WSO2 Micro Integrator en Mosquitto.

We beginnen met het installeren van Mosquitto. We voeren de installatie uit op CENTOS 7 en aangezien er standaard geen Mosquitto-pakket is, is het nodig om epel (Extra Packages for Enterprise Linux), een extra software repository, te installeren. Hierin zitten pakketten zoals Mosquitto die we kunnen installeren. Voor andere omgevingen kun je zoeken naar “mqtt installeren” in combinatie met jouw omgeving.
We gaan verder met de instructies voor CENTOS7.

Installatie

Open een terminalsessie en voer het volgende in:

sudo yum -y install epel-release

Vul je gebruikerswachtwoord in wanneer hierom wordt gevraagd. Als de gebruiker niet in de lijst van sudoers staat, voeg de gebruiker dan toe aan de lijst van sudoers. Zoek op Google hoe je dit kunt doen als je niet weet hoe het moet.
Nu EPEL aan het systeem is toegevoegd, kunnen we het mosquitto-pakket automatisch installeren (met behulp van de parameter -y).

sudo yum -y install mosquitto

Voer opnieuw je wachtwoord in wanneer hierom wordt gevraagd. Er is een standaardconfiguratie beschikbaar. Laten we Mosquitto starten. Voer de volgende opdracht in een terminalsessie in:

sudo systemctl start mosquitto

Als je wilt dat Mosquitto wordt gestart bij het opstarten van het systeem, voer dan het volgende in:

sudo systemctl enable mosquitto

Testen door een bericht naar een onderwerp te sturen

Laten we nu de standaardconfiguratie testen. Het mosquitto-pakket wordt geleverd met enkele command line MQTT-clients. We zullen er een van gebruiken om te abonneren op een onderwerp op onze broker.

Een onderwerp is een label voor berichten die georganiseerd kunnen worden in een hiërarchie. Je kunt publiceren naar een onderwerp en andere mensen kunnen zich erop abonneren.

Laten we aannemen dat we de volgende hiërarchie hebben:

Sensoren

  • Raamsensoren
  • Deursensoren
  • Temperatuursensor

Dit is natuurlijk een dummy-opstelling, maar het laat wel de hiërarchie zien. Je kunt je abonneren op alle of enkele onderwerpen en berichten ontvangen.

Publiceren / Abonneren

We gaan nu kijken naar het publiceren / abonneren-model (ook wel bekend als Pub / Sub). We hebben twee terminals nodig, omdat de ene bedoeld is om zich te abonneren op het onderwerp en de andere om te publiceren naar het onderwerp.

Typ in de eerste terminal de volgende opdracht (met behulp van de mosquitto_sub commandline tool) om je te abonneren op het testonderwerp:

mosquitto_sub -h localhost -t test

– h wordt gebruikt om de hostnaam van de MQTT-server op te geven en -t is de onderwerpsnaam. Andere instellingen zoals de poort zijn de standaardwaarden.

mqtt events Testen door een bericht naar een onderwerp te sturen

Je ziet geen uitvoer nadat je op ENTER hebt gedrukt, omdat mosquitto_sub wacht op berichten. Schakel terug naar je andere terminal en publiceer een bericht:

    mosquitto_pub -h localhost -t test -m “hello world”

mqtt events Testen door een bericht naar een onderwerp te sturen 2

De opties voor mosquitto_pub zijn hetzelfde als voor mosquitto_sub, maar deze keer gebruiken we de extra -m optie om ons bericht op te geven. Druk op ENTER en je zou “hello world” moeten zien verschijnen in de andere terminal. Je hebt je eerste MQTT-bericht verstuurd!

mqtt events Testen door een bericht naar een onderwerp te sturen 3

Instellen van de Micro Integrator 4.2.0

Aangezien het MQTT-transport is geregeld op het AXIS2-niveau (met uitzondering van de inbound endpoints), moeten we het bestand [MI-HOME/conf/deployment.toml bewerken en de MQTT-zender- en luisterconfiguratie wijzigen (uitcommentariëren / uncomment) zoals hieronder:

[[transport.mqtt]

listener.enable = true

sender.enable = true

Instellen van de Micro Integrator 4.2.0

Aangezien we MQTT-berichten willen ontvangen en verzenden, moeten we ook de mqtt-transportzender uitcommentariëren. Ondersteuning voor MQTT is standaard aanwezig in MI 4.2.0.

Daarnaast moeten we het bestand org.eclipse.paho.client.mqttv3-1.2.0.jar downloaden via deze link. Dit JAR-bestand moet worden toegevoegd aan de [MI-HOME]/lib. De Micro Integrator moet opnieuw worden gestart omdat de lib-bestanden alleen worden geladen bij het opstarten.

We hoeven de werkelijke poort enzovoort niet te configureren in dit bestand, dat zullen we doen in een proxy.

MQTT-gebeurtenissen publiceren

We zullen een opstelling maken waarin we sensoren simuleren die aangeven wanneer een vuilnisbak uitgerust met een sensor voor meer dan 75% vol is. Aangezien we geen sensoren hebben die dit kunnen doen, zullen we daadwerkelijk berichten simuleren die naar Mosquitto worden verzonden.

In ons geval creëren we een eenvoudige proxy die een Payload Factory mediator (json) gebruikt om een bericht te definiëren en dit naar een mqtt-eindpunt (de Mosquitto-server) te sturen op een specifiek onderwerp.

Laten we een berichttype definiëren dat de sensorgegevens simuleert die we kunnen bedenken. We hebben een gebeurtenis met metaData, correlationData en payloadData. De $1, $2, etc. zijn variabelen die aangeven waar en wanneer het bericht binnenkomt. Dit is eigenlijk het formaat van de payload factory mediator.{

     {
 "event": {
           "metaData": {
           "timestamp": $1,
           "sensorId": $2,
           "sensorName": BIN
},
      "correlationData":{
           "longitude": $3,
           "latitude": $4,
},
      "payloadData": {
           "fillpercentage": $5
          
}
}
}

Laten we beginnen met het verzenden van een bericht in JSON-indeling. We houden het voorbeeld eenvoudig, maar we zullen een meer uitgebreide setup laten zien op de Yenlo Bitbucket-pagina.

De gegevens worden gegenereerd met behulp van de random-functie binnen bepaalde grenzen. We simuleren een vuilnisbak in het gebied van Las Vegas. Dit wordt gedaan door een rechthoekig gebied van lengte- / breedtegraad te definiëren dat Las Vegas omvat. De waarden 36.306316, -115.333456 zijn de linkerbovenhoek en 35.984696, -114.923976 zijn de rechteronderhoek.

Om het realistischer te maken, simuleren we de lengte- / breedtegraad met een scriptmediator en een randomfunctie, evenals de sensorId en het vulpercentage. De tijdstempel wordt genomen van de huidige systeemdatum / tijd.

Twee proxies

Om het MQTT-transport uit te testen, maken we twee proxies.

De eerste proxy simuleert een bericht dat wordt ingesteld vanuit een sensor.

Twee proxies

Wat we doen is een willekeurige lengte- en breedtegraad verkrijgen, een willekeurig baknummer maken, een payload in JSON-indeling maken met het berichtformaat en de zojuist gemaakte gegevens. We stellen twee eigenschappen in die aangeven dat we niet hoeven te wachten op een antwoord (OUT_ONLY) en een 202-respons teruggeven (FORCE_SC_ACCEPTED). Het eindpunt waarnaar we het verzenden is een mqtt-eindpunt. We definiëren eigenlijk de verbindingsreeks, maar overschrijven deze met de parameters aan het einde van de definitie. Deze zijn niet zichtbaar in het grafische overzicht.

<?xml version="1.0" encoding="UTF-8"?>

<proxy name="BinProxy" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">

    <target>

        <endpoint>

            <address uri="mqtt:/BinProxy?mqtt.server.host.name=localhost&mqtt.server.port=1883&mqtt.client.id=yenlo&mqtt.topic.name=bin&mqtt.subscription.qos=2&mqtt.blocking.sender=true">

                <suspendOnFailure>

                    <initialDuration>-1</initialDuration>

                    <progressionFactor>1</progressionFactor>

                </suspendOnFailure>

                <markForSuspension>

                    <retriesBeforeSuspension>0</retriesBeforeSuspension>

                </markForSuspension>

            </address>

        </endpoint>

        <inSequence>

            <!--  -->

            <property name="LVLAT" scope="default" type="FLOAT" value="5"/>

            <property name="LVLON" scope="default" type="STRING" value="5"/>

            <sequence key="SET_LAT_LON_LV"/>

            <call-template target="RandomFunc">

                <with-param name="min" value="0"/>

                <with-param name="max" value="10000"/>

            </call-template>

            <payloadFactory media-type="json">

                <format>

                {

     "event": {

           "metaData": {

           "timestamp": $1,

           "sensorId": $2,

           "sensorName": BIN

},

     "correlationData":{

           "latitude": $3,

           "longitude": $4,

},

     "payloadData": {

           "fillpercentage": $5

}

}

}

</format>

                <args>

                    <arg evaluator="xml" expression="get-property('SYSTEM_DATE')"/>

                    <arg evaluator="xml" expression="get-property('RANDNO')"/>

                    <arg evaluator="xml" expression="get-property('LVLAT')"/>

                    <arg evaluator="xml" expression="get-property('LVLON')"/>

                    <arg value="100"/>

                </args>

            </payloadFactory>

            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>

            <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>

        </inSequence>

        <outSequence/>

        <faultSequence/>

    </target>

    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>

    <parameter name="mqtt.content.type">application/json</parameter>

    <parameter name="mqtt.subscription.qos">2</parameter>

    <parameter name="mqtt.topic.name">bin</parameter>

    <parameter name="mqtt.session.clean">false</parameter>

</proxy>

Luisterende proxy

De andere proxy luistert naar het onderwerp. Deze is nog eenvoudiger.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ListenProxy" startOnLoad="true" transports="mqtt" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <!--  -->
            <log level="full"/>
            <drop/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="mqtt.connection.factory">mqttConFactory</parameter>
    <parameter name="mqtt.content.type">application/json</parameter>
    <parameter name="mqtt.server.port">1883</parameter>
    <parameter name="mqtt.subscription.qos">2</parameter>
    <parameter name="mqtt.client.id">yenlo</parameter>
    <parameter name="mqtt.topic.name">bin</parameter>
    <parameter name="mqtt.session.clean">false</parameter>
    <parameter name="mqtt.server.host.name">localhost</parameter>
</proxy>

Het is gebaseerd op het MQTT-transport en het werkelijke onderwerp is gedefinieerd in de parameters.

Het bericht dat wordt gelezen van MQTT wordt vervolgens in een full log mediator geplaatst, waarbij het json-bericht wordt getoond in een soap-bericht (alles in de Micro Integrator is standaard een soap-bericht).

De log mediator plaatst het bericht op de console (en in wso2carbon.log).

 

Sequentie en sequentie-sjabloon

Er wordt ook een sequentie en een sequentie-sjabloon gebruikt.

Luisterende proxy

De inhoud is vrij eenvoudig. Ik gebruik JavaScript om willekeurige waarden te berekenen. De sequentie ziet er als volgt uit.

sequentie

Ik heb de linkerbovenhoek en rechteronderhoek van de lengte- en breedtegraad hard gecodeerd en trek de waarden af voor zowel breedte- als lengtegraad, vermenigvuldig dit met een willekeurig getal en voeg de onderste waarde toe om de coördinaat te krijgen. Dit wordt opgeslagen in een waarde.

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="SET_LAT_LON_LV" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
    <property name="left_lat" scope="default" type="STRING" value="36.306316"/>
    <property name="left_lon" scope="default" type="STRING" value="-115.333456"/>
    <property name="right_lat" scope="default" type="STRING" value="35.983605"/>
    <property name="right_lon" scope="default" type="STRING" value="-114.934917"/>
    <script language="js"><![CDATA[var result, result2, i, j, mini, mini2, maxi; 
                 //calculate lat
                 result = ''; 
  
                 mini = mc.getProperty('left_lat');
                 maxi = mc.getProperty('right_lat');
  				 result = ((mini - maxi)*Math.random())+ ~~mini;        
                 result2 =  result.toFixed(6);        
                 mc.setProperty("LVLAT", result2);
                
                 // calculate lon
                 mini = mc.getProperty('left_lon');
                 maxi = mc.getProperty('right_lon');
  				 result = (mini - maxi)*Math.random()+ ~~mini;       
                 result2 =  result.toFixed(6);      
                 mc.setProperty("LVLON", result2);]]></script>
</sequence>

De Sequence template

De Sequence template

Dezelfde structuur als de andere.

<?xml version="1.0" encoding="UTF-8"?>
<template name="RandomFunc" xmlns="http://ws.apache.org/ns/synapse">
    <parameter defaultValue="" isMandatory="false" name="min"/>
    <parameter defaultValue="" isMandatory="false" name="max"/>
    <sequence>
        <property expression="$func:min" name="minval" scope="default" type="STRING"/>
        <property expression="$func:max" name="maxval" scope="default" type="STRING"/>
        <script language="js"><![CDATA[var result, i, j, mini, maxi; 
            result = ''; 
	mini = Math.ceil(mc.getProperty('minval'));
 	maxi = Math.floor(mc.getProperty('maxval'));
  	result = Math.round(Math.floor(Math.random() * (maxi - mini)) + mini);      mc.setProperty("RANDNO", result);]]></script>
        <log description="" level="custom">
            <property expression="get-property('RANDNO')" name="test"/>
        </log>
    </sequence>
</template>

Uitvoeren van de BinProxy

Wanneer we een eenvoudig SOAP-project maken, kunnen we de WSDL van de BinProxy gebruiken om een SOAP-bericht te maken,

Uitvoeren van de BinProxy

We zien niets in het antwoordvenster, omdat dit een asynchrone oproep is.

Wat hebben we nu?

Maar op de console hebben we een dummy-bericht, maar wel met realistische gegevens.

Dit is een voorbeeldverzoek:

{

     "event": {

           "metaData": {

           "timestamp": 6/9/23, 1:27 PM,

           "sensorId": 2439.0,

           "sensorName": BIN

},

     "correlationData":{

           "latitude": 36.126444,

           "longitude": -115.297177,

},

     "payloadData": {

           "fillpercentage": 100

}

}

}

{

Als we dit op Google Maps mappen, zien we dat we ons in Las Vegas bevinden.

mqtt locatie

In de volgende blog zullen we de proxy omzetten in een taak en een steeds groter aantal berichten simuleren dat uiteindelijk een vuilnisophalingsrun zal activeren. Dit heeft niets te maken met geheugenvuilnisophaling, maar meer met de vuilnisophaling in de echte wereld die we simuleren.

ned
Sluiten