[possible new binding] openHAB Eventbus to MQTT

Okay, I tried to create the openHAB Eventbus with HABApp. I have a problem with subscribing the states. Subscribing commands will work fine.

I installed on both openHAB instances HABApp like described in the documentation. I call them MASTER and SLAVE. On the master I installed the MQTT 2.x binding and mosquitto and configured a thing which has access to the mosquitto broker. On the slave I installed also the MQTT 2.x binding and configured a thing which has access to the mosquitto broker from the master. Both things has a Public Trigger channel. The master has /openhab/out/+/command and the slave has /openhab/in/+/state.

So here is the master’s config.yml:

directories:
  logging: log  # Folder where the logs will be written to
  rules: rules # Folder from which the rule files will be loaded
  param: params # Folder from which the parameter files will be loaded
  config: config # Folder from which configuration files (e.g. for textual thing configuration) will be loaded
  lib: lib # Folder where additional libraries can be placed
location:
  latitude: 0.0
  longitude: 0.0
  elevation: 0.0
mqtt:
  connection:
    client_id: HABApp
    host: <master_ip>
    port: 1883
    user: ''
    password: ''
    tls: false
    tls_ca_cert: ''  # Path to a CA certificate that will be treated as trusted
    tls_insecure: true
  general:
    listen_only: false  # If True HABApp will not publish any value to the broker
  publish:
    qos: 0  # Default QoS when publishing values
    retain: false # Default retain flag when publishing values
  subscribe:
    qos: 0  # Default QoS for subscribing
    topics:
    - '#'
    - 0
openhab:
  connection:
    host: <master_ip>
    port: 8080
    user: ''
    password: ''
  general:
    listen_only: false  # If True HABApp will not change anything on the openHAB instance.
    wait_for_openhab: true # If True HABApp will wait for items from the openHAB instance before loading any rules on startup
  ping:
    enabled: true  # If enabled the configured item will show how long it takes to send an update from HABApp and get the updated value back from openhabin milliseconds
    item: HABApp_Ping # Name of the Numberitem
    interval: 10 # Seconds between two pings
directories:
  logging: log  # Folder where the logs will be written to
  rules: rules # Folder from which the rule files will be loaded
  param: params # Folder from which the parameter files will be loaded
  config: config # Folder from which configuration files (e.g. for textual thing configuration) will be loaded
  lib: lib # Folder where additional libraries can be placed
location:
  latitude: 0.0
  longitude: 0.0
  elevation: 0.0
mqtt:
  connection:
    client_id: HABApp
    host: 192.168.0.5
    port: 1883
    user: ''
    password: ''
    tls: false
    tls_ca_cert: ''  # Path to a CA certificate that will be treated as trusted
    tls_insecure: true
  general:
    listen_only: false  # If True HABApp will not publish any value to the broker
  publish:
    qos: 0  # Default QoS when publishing values
    retain: false # Default retain flag when publishing values
  subscribe:
    qos: 0  # Default QoS for subscribing
    topics:
    - '#'
    - 0
openhab:
  connection:
    host: 192.168.0.5
    port: 8080
    user: ''
    password: ''
  general:
    listen_only: false  # If True HABApp will not change anything on the openHAB instance.
    wait_for_openhab: true # If True HABApp will wait for items from the openHAB instance before loading any rules on startup
  ping:
    enabled: true  # If enabled the configured item will show how long it takes to send an update from HABApp and get the updated value back from openhabin milliseconds
    item: HABApp_Ping # Name of the Numberitem
    interval: 10 # Seconds between two pings

And here ist the slave’s config.yml:

directories:
  logging: log  # Folder where the logs will be written to
  rules: rules # Folder from which the rule files will be loaded
  param: params # Folder from which the parameter files will be loaded
  config: config # Folder from which configuration files (e.g. for textual thing configuration) will be loaded
  lib: lib # Folder where additional libraries can be placed
location:
  latitude: 0.0
  longitude: 0.0
  elevation: 0.0
mqtt:
  connection:
    client_id: HABApp
    host: <master_ip>
    port: 1883
    user: ''
    password: ''
    tls: false
    tls_ca_cert: ''  # Path to a CA certificate that will be treated as trusted
    tls_insecure: true
  general:
    listen_only: false  # If True HABApp will not publish any value to the broker
  publish:
    qos: 0  # Default QoS when publishing values
    retain: false # Default retain flag when publishing values
  subscribe:
    qos: 0  # Default QoS for subscribing
    topics:
    - '#'
    - 0
openhab:
  connection:
    host: <slave_ip>
    port: 8080
    user: ''
    password: ''
  general:
    listen_only: false  # If True HABApp will not change anything on the openHAB instance.
    wait_for_openhab: true # If True HABApp will wait for items from the openHAB instance before loading any rules on startup
  ping:
    enabled: true  # If enabled the configured item will show how long it takes to send an update from HABApp and get the updated value back from openhabin milliseconds
    item: HABApp_Ping # Name of the Numberitem
    interval: 10 # Seconds between two pings
directories:
  logging: log  # Folder where the logs will be written to
  rules: rules # Folder from which the rule files will be loaded
  param: params # Folder from which the parameter files will be loaded
  config: config # Folder from which configuration files (e.g. for textual thing configuration) will be loaded
  lib: lib # Folder where additional libraries can be placed
location:
  latitude: 0.0
  longitude: 0.0
  elevation: 0.0
mqtt:
  connection:
    client_id: HABApp
    host: 192.168.0.5
    port: 1883
    user: ''
    password: ''
    tls: false
    tls_ca_cert: ''  # Path to a CA certificate that will be treated as trusted
    tls_insecure: true
  general:
    listen_only: false  # If True HABApp will not publish any value to the broker
  publish:
    qos: 0  # Default QoS when publishing values
    retain: false # Default retain flag when publishing values
  subscribe:
    qos: 0  # Default QoS for subscribing
    topics:
    - '#'
    - 0
openhab:
  connection:
    host: 192.168.0.5
    port: 8080
    user: ''
    password: ''
  general:
    listen_only: false  # If True HABApp will not change anything on the openHAB instance.
    wait_for_openhab: true # If True HABApp will wait for items from the openHAB instance before loading any rules on startup
  ping:
    enabled: true  # If enabled the configured item will show how long it takes to send an update from HABApp and get the updated value back from openhabin milliseconds
    item: HABApp_Ping # Name of the Numberitem
    interval: 10 # Seconds between two pings

Then here you can see the mqtt.py-Rule for HABApp on the master server:

import HABApp
from HABApp.openhab.events import ItemStateEvent
from HABApp.openhab.items import Thing
from HABApp.openhab.items import OpenhabItem
from HABApp.mqtt.items import MqttItem


class ExampleOpenhabToMQTTRule(HABApp.Rule):
    """This Rule mirrors all updates from OpenHAB to MQTT"""

    def __init__(self):
        super().__init__()

        for item in HABApp.core.Items.get_all_items():
            #if isinstance(item, (Thing, MqttItem)):
            if not isinstance(item, OpenhabItem):
                continue
            item.listen_event(self.process_update, ItemStateEvent)

    def process_update(self, event):
        assert isinstance(event, ItemStateEvent)

        print(f'/openhab/{event.name} <- {event.value}')
        self.mqtt.publish(f'/openhab/in/{event.name}/state', str(event.value))


ExampleOpenhabToMQTTRule()

And here is the pendant for the slave:

import HABApp
from HABApp.openhab.events import ItemCommandEvent
from HABApp.openhab.items import Thing
from HABApp.openhab.items import OpenhabItem
from HABApp.mqtt.items import MqttItem


class ExampleOpenhabToMQTTRule(HABApp.Rule):
    """This Rule mirrors all updates from OpenHAB to MQTT"""

    def __init__(self):
        super().__init__()

        for item in HABApp.core.Items.get_all_items():
            #if isinstance(item, (Thing, MqttItem)):
            if not isinstance(item, OpenhabItem):
                continue
            item.listen_event(self.process_update, ItemCommandEvent)

    def process_update(self, event):
        assert isinstance(event, ItemCommandEvent)

        print(f'/openhab/{event.name} <- {event.value}')
        self.mqtt.publish(f'/openhab/out/{event.name}/command', str(event.value))


ExampleOpenhabToMQTTRule()

Then I have created a mqtt.rule file on both instances to subscribe using the Public Trigger channel. Here for the master:

import org.eclipse.smarthome.model.script.ScriptServiceUtil

// MQTT naming convention i use:
//
//commandPublishTopic = /openHAB/out/${item}/command
//stateSubscribeTopic = /openHAB/in/${item}/state

// MASTER
var boolean commandSubscribeTopic = true
var boolean stateSubscribeTopic = false

// SLAVE
// var boolean commandSubscribeTopic = false
// var boolean stateSubscribeTopic = true

// We need a group with all items as a Member of trigger.
// Creating one dynamicly. (Group dg_AllItems)
// ScriptServiceUtil.getItemRegistry.getItems() can not be used as a Member of trigger (I've tried :-)

rule "populate dynamic group dg_AllItems"
when
        Time cron "0 0/1 * * * ?"
then
        // add all items to the group
        ScriptServiceUtil.getItemRegistry.getItems().forEach[item | gSmartHome.addMember(item) ]

        // remove all items which no longer exist
        gSmartHome.members.filter(s | ScriptServiceUtil.getItemRegistry.getItems().filter[i | i.name.contains(s.name)].length == 0).forEach[item | gSmartHome.removeMember(item)]

        // remove the group from itself (odd.)
        gSmartHome.removeMember(gSmartHome)

        //DEBUG dg_AllItems.members.forEach[ item | logInfo("dg_AllItems_member",item.name) ]
end
/*
rule "publish commands to broker"
when
        Time cron "0 0/1 * * * ?"
then
        if (commandPublishTopic){
            val actions = getActions("mqtt","mqtt:broker:mosquitto")
            actions.publishMQTT("/openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
  }
end


rule "Publish command to broker"
when
    Member of gSmartHome received command
then
    if (commandPublishTopic){
            val actions = getActions("mqtt","mqtt:broker:mosquitto")
            actions.publishMQTT("/openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
  }
end

rule "Publish state to broker"
when
        Member of gSmartHome changed
then
        if (statePublishTopic){
                val actions = getActions("mqtt","mqtt:broker:mosquitto")
                actions.publishMQTT("/openHAB/in/"+triggeringItem.name+"/state",triggeringItem.state.toString)
      }
end

rule "publish states to broker"
when
        Time cron "0 0/1 * * * ?"
then
        if (statePublishTopic){
                val actions = getActions("mqtt","mqtt:broker:mosquitto")
                actions.publishMQTT("/openHAB/in/"+triggeringItem.name+"/state",triggeringItem.state.toString)
      }

end
*/

rule "Subscribe to broker"
when
        Channel "mqtt:broker:mosquitto:openhabvm-updates" triggered
then
        var evnt_payload = receivedEvent.getEvent.toString.split("#")
        var mqtttopic = evnt_payload.get(0).toString.split("/")
        var mqttcommand = evnt_payload.get(1)
        var mqttitem = mqtttopic.get(mqtttopic.length-2)
        var mqttcommandorstate = mqtttopic.get(mqtttopic.length-1)
        var ItemExists = gSmartHome.members.filter[item | item.name.contains(mqttitem)].length
        if (ItemExists > 0){

                if (commandSubscribeTopic &&   mqttcommandorstate == "command") {
                        sendCommand(mqttitem,mqttcommand)
                }
                else {
                        if (stateSubscribeTopic && mqttcommandorstate == "state") {
                                postUpdate(mqttitem,mqttcommand)
                        }
                }
        }
end

And here for the slave:

import org.eclipse.smarthome.model.script.ScriptServiceUtil

// MQTT naming convention i use:
//
//commandPublishTopic = /openHAB/out/${item}/command
//stateSubscribeTopic = /openHAB/in/${item}/state

// MASTER
//var boolean commandSubscribeTopic = true
//var boolean stateSubscribeTopic = false

// SLAVE
var boolean commandSubscribeTopic = false
var boolean stateSubscribeTopic = true

// We need a group with all items as a Member of trigger.
// Creating one dynamicly. (Group dg_AllItems)
// ScriptServiceUtil.getItemRegistry.getItems() can not be used as a Member of trigger (I've tried :-)

rule "populate dynamic group dg_AllItems"
when
        Time cron "0 0/1 * * * ?"
then
        // add all items to the group
        ScriptServiceUtil.getItemRegistry.getItems().forEach[item | gSmartHome.addMember(item) ]

        // remove all items which no longer exist
        gSmartHome.members.filter(s | ScriptServiceUtil.getItemRegistry.getItems().filter[i | i.name.contains(s.name)].length == 0).forEach[item | gSmartHome.removeMember(item)]

        // remove the group from itself (odd.)
        gSmartHome.removeMember(gSmartHome)

        //DEBUG dg_AllItems.members.forEach[ item | logInfo("dg_AllItems_member",item.name) ]
end
/*
rule "publish commands to broker"
when
        Time cron "0 0/1 * * * ?"
then
        if (commandPublishTopic){
            val actions = getActions("mqtt","mqtt:broker:mosquitto")
            actions.publishMQTT("/openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
  }
end


rule "Publish command to broker"
when
    Member of gSmartHome received command
then
    if (commandPublishTopic){
            val actions = getActions("mqtt","mqtt:broker:mosquitto")
            actions.publishMQTT("/openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
  }
end

rule "Publish state to broker"
when
        Member of gSmartHome changed
then
        if (statePublishTopic){
                val actions = getActions("mqtt","mqtt:broker:mosquitto")
                actions.publishMQTT("/openHAB/in/"+triggeringItem.name+"/state",triggeringItem.state.toString)
      }
end

rule "publish states to broker"
when
        Time cron "0 0/1 * * * ?"
then
        if (statePublishTopic){
                val actions = getActions("mqtt","mqtt:broker:mosquitto")
                actions.publishMQTT("/openHAB/in/"+triggeringItem.name+"/state",triggeringItem.state.toString)
      }

end
*/

rule "Subscribe to broker"
when
        Channel "mqtt:broker:general:openhabvm-updates" triggered
then
        var evnt_payload = receivedEvent.getEvent.toString.split("#")
        var mqtttopic = evnt_payload.get(0).toString.split("/")
        var mqttcommand = evnt_payload.get(1)
        var mqttitem = mqtttopic.get(mqtttopic.length-2)
        var mqttcommandorstate = mqtttopic.get(mqtttopic.length-1)
        var ItemExists = gSmartHome.members.filter[item | item.name.contains(mqttitem)].length
        if (ItemExists > 0){

                if (commandSubscribeTopic &&   mqttcommandorstate == "command") {
                        sendCommand(mqttitem,mqttcommand)
                }
                else {
                        if (stateSubscribeTopic && mqttcommandorstate == "state") {
                                postUpdate(mqttitem,mqttcommand)
                        }
                }
        }
end

Well it is nearly the same as you can find here: How to: MQTTv2 Eventbus rules

What works better is that publishing states and commands are nearly work to 100%. This I can`t reach only using the linked rules and I have never reached with @rlkoshak’s eventbus solution: MQTT 2.5+ Event Bus

So this approach might be the most stable. What unfortunately does not work is that I can subscribe to the state on the slave. On the master, as I said, I have no problem subscribing to commands from the slave.

I must be missing something here. As you can see, I have also named the Things differently. But it doesn’t matter, only the channel and the Mosquitto connection are needed. But I can definitely access the broker of the master from the slave. I can test this in the command line or the publishing works.

Does anyone see an error? Perhaps someone would like to test this for themselves. The broker installation and how to configure the Thing can actually be copied from the two links.

Kind regards