How to: MQTTv2 Eventbus rules

mqtt
Tags: #<Tag:0x00007f01470fb668>

(pascal) #1

Hi everyone.
As some of you, I was missing the mqtt eventbus feature in MQTTv2, so after reading across the forums and encountering quite some errors I finally managed to mimic the eventbus feature by using rules.
I thought I would share this. I’m not a script or rule guru, so I’m sure this can be optimised.

In PaperUI I created:

  • MQTT broker: mqtt:broker:8ca384dd

  • Publish Trigger channel: mqtt:broker:8ca384dd:pub
    The Publish Trigger listens to everything (#) and the delimiter is “#”.

  • a Group item dg_AllItems

my mqqteventbus.rules

import org.eclipse.smarthome.model.script.ScriptServiceUtil

// MQTT naming convention i use:
//
//commandPublishTopic = /openHAB/out/${item}/command
//stateSubscribeTopic = /openHAB/in/${item}/state

// MASTER
var boolean commandSubscribeTopic = false
var boolean stateSubscribeTopic = true
var boolean statePublishTopic = false
var boolean commandPublishTopic = true

// SLAVE
// var boolean commandSubscribeTopic = true
// var boolean stateSubscribeTopic = false
// var boolean statePublishTopic = true
// var boolean commandPublishTopic = false

// We need a group with all items as a Member of trigger.
// Creating one dynamicly. (Group dg_AllItems)
// ScriptServiceUtil.getItemRegistry.getItems() can not be used as a Member of trigger (I've tried :-)

rule "populate dynamic group dg_AllItems"
when
        Time cron "0 0/1 * * * ?"
then
        // add all items to the group
        ScriptServiceUtil.getItemRegistry.getItems().forEach[item | dg_AllItems.addMember(item) ]

        // remove all items which no longer exist
        dg_AllItems.members.filter(s | ScriptServiceUtil.getItemRegistry.getItems().filter[i | i.name.contains(s.name)].length == 0).forEach[item | dg_AllItems.removeMember(item)]

        // remove the group from itself (odd.)
        dg_AllItems.removeMember(dg_AllItems)

        //DEBUG dg_AllItems.members.forEach[ item | logInfo("dg_AllItems_member",item.name) ]
end

rule "Publish command to broker"
when
    Member of dg_AllItems received command
then
    if (commandPublishTopic){
            val actions = getActions("mqtt","mqtt:broker:8ca384dd")
            actions.publishMQTT("/openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
  }
end

rule "Publish state to broker"
when
        Member of dg_AllItems changed
then
        if (statePublishTopic){
                val actions = getActions("mqtt","mqtt:broker:8ca384dd")
                actions.publishMQTT("/openHAB/in/"+triggeringItem.name+"/state",triggeringItem.state.toString)
      }
end


rule "Subscribe to broker"
when
        Channel "mqtt:broker:8ca384dd:pub" triggered
then
        var evnt_payload = receivedEvent.getEvent.toString.split("#")
        var mqtttopic = evnt_payload.get(0).toString.split("/")
        var mqttcommand = evnt_payload.get(1)
        var mqttitem = mqtttopic.get(mqtttopic.length-2)
        var mqttcommandorstate = mqtttopic.get(mqtttopic.length-1)
        var ItemExists = dg_AllItems.members.filter[item | item.name.contains(mqttitem)].length
        if (ItemExists > 0){

                if (commandSubscribeTopic &&   mqttcommandorstate == "command") {
                        sendCommand(mqttitem,mqttcommand)
                }
                else {
                        if (stateSubscribeTopic && mqttcommandorstate == "state") {
                                postUpdate(mqttitem,mqttcommand)
                        }
                }
        }
end

The Time cron rule is only there because I didn’t want to add each Item to the group manually, so updating the group every minute is a bit of an overkill. Of course you could always use a specific “Eventbus” group to control which items are used or not.


Another question about migration from mqtt 1.x to 2.4
MQTTv2 proper way of eventbus feature?
[CLOSED] MQTT binding version 2.4 (Pre-release !)
(Y. Gijbels) #2

I have a problem with the getActions part of the code.
It returns a null value, so the log outputs that it couldn’t evoke the method.

val actions = getActions("mqtt","mqtt:broker:mybroker")

I’ve checked the ID of the broker, and the spelling is correct.
Any suggestions?


(pascal) #3

I had a bit of a struggle with it as well.
Did you create your broker through PaperUI of a .things file?


(Y. Gijbels) #4

PaperUI.

Tried the .things as well, with a different name but same error…


(pascal) #5

Strange…
Are you sure you are using MQTT version 2 and not MQTT v1?


(Y. Gijbels) #6

2.4.0 to be exact
Do I need to configure something in PaperUI (PublishTrigger, …) ?


(pascal) #7

2.4.0 is your openhab version, not the MQTT Binding version.

Can you open a karaf console and show the output of:

bundle:list | grep MQTT

Should show somthing like:

232 │ Active   │  80 │ 1.2.0                  │ Paho MQTT Client
234 │ Active   │  80 │ 0.10.0.oh240           │ Eclipse SmartHome MQTT Binding
235 │ Active   │  80 │ 0.10.0.oh240           │ Eclipse SmartHome MQTT Thing Binding
236 │ Active   │  80 │ 0.10.0.oh240           │ Eclipse SmartHome MQTT Transport Bundle

Yes in PaperUI, you need to add PublishTrigger Channel to your broker.


(Y. Gijbels) #8

It looks identical:

225 x Active   x  80 x 1.2.0                  x Paho MQTT Client
226 x Active   x  80 x 0.10.0.oh240           x Eclipse SmartHome MQTT Binding
227 x Active   x  80 x 0.10.0.oh240           x Eclipse SmartHome MQTT Thing Binding
228 x Active   x  80 x 0.10.0.oh240           x Eclipse SmartHome MQTT Transport Bundle

About that Publish Trigger Channel. From my understanding this is needed to react on published messages on the broker. But I need to publish to the broker…


(pascal) #9

Did you try publishing to your broker with the mosquitto_pub command?

Your understanding of the Publish trigger channel is correct.
Once that channel is created you will notice all mqtt messages in karaf’s log:tail


(Y. Gijbels) #10

Didn’t try that one.
But other items created via paperUI are working correctly.
I’m able to receive and publish messages with these items, it’s just the rule that gives me problems.
So there is nothing wrong with the broker itself.


(Y. Gijbels) #11

For some reason it’s working now.
Unfortunatly I’m unable to pinpoint where it went wrong.
When I output the ‘actions’ variable to the console (logInfo) it still gives me a null value, but the broker is receiving messages.
The only difference is that I removed whitespaces before and after the equation mark, and I created a new broker connection (for testing) in PaperUI.
Now both of the broker connections are working.
Thanks for the help.


(pascal) #12

Glad it works!


(Y. Gijbels) #13

Found why it wasn’t working, thanks to @vzorglub:

The variable ‘actions’ needs to be declared globally.
After this it works.


(Elser Marc) #14

Sorry, but can you or @vzorglub please explain why the actions variable needs to be declared globally? As a programmer I would consider the declaration in the if blocks cleaner and would want to avoid global variables whereever possible.


(Vincent Regaud) #15

In his case, he needed the action defined as a global because he was calling them inside timer lambdas.
You should be able to do have a local action.


(Elser Marc) #16

@pascal

There’s an error in the “Publish state to broker” rule. There’s no such thing as “receivedCommand” in the case of a member changing. What you need to do is “triggeringItem.state.toString()” instead. Maybe you want to update your code.

For those who want to stick closer to the original publish/subscribe topics of the old evenbus (for example because your master or client is not openhab). Check the eventbus.cfg file for the original topics. Change the “publish to xxx” rules accordingly and instead in the subscribe “Subscribe to broker” rule change line “var mqttcommandorstate = mqtttopic.get(mqtttopic.length-1)” to “var mqttcommandorstate = mqtttopic.get(2)”

With these simply modifications I could re-use my old master which is a HomeControl Android App where I didn’t want to remap hundreds of mqtt paths


(pascal) #17

@marcelser
I see what you mean. Corrected it.


(Ferry) #18

This rule works perfectly. I would like to extend it with a system start so that all values ​​of the items are sent when the system starts up. Is this possible?

Thnx


(Vincent Regaud) #19

You shouldn’t need to. The rule will trigger when items change value regardless. That will happen at start-up to
You could add a trigger to the group building rule:

rule "populate dynamic group dg_AllItems"
when
        Time cron "0 0/1 * * * ?" or
        System started
then
        // add all items to the group
        ScriptServiceUtil.getItemRegistry.getItems().forEach[item | dg_AllItems.addMember(item) ]

        // remove all items which no longer exist
        dg_AllItems.members.filter(s | ScriptServiceUtil.getItemRegistry.getItems().filter[i | i.name.contains(s.name)].length == 0).forEach[item | dg_AllItems.removeMember(item)]

        // remove the group from itself (odd.)
        dg_AllItems.removeMember(dg_AllItems)

        //DEBUG dg_AllItems.members.forEach[ item | logInfo("dg_AllItems_member",item.name) ]
end

(Ferry) #20

Thanx