Marketplace MQTT Event Bus

Thanks Rich, it makes more sense after your first few sentences. Best to break it up like so:

Main Instance (Publishes only)

  • insert rules
  • insert broker
  • etc etc

Also, setting the groups as string gave me errors with OnOff Types FYI.

Question, so this would not work if the remote instance was sending commands? Lets say the remote instance has a ZWave scene controller physically in the house - you push buttons etc so it sends commands back to the Main instance which runs rules from those commands. So not only On/Off commands but numbers such as power metering readings and scene numbers etc, even OPEN/Closed from Contacts

Effectively, totally bi-directional. This bidirectional works just fine with MQTT 1.x event bus using all item types

Cheers

There is no basic rule. It all depends on which openHAB instance is actually connected to the device, how many openHAB instances are connected to each other, and the nature of the device. That’s why it’s important to make sure to understand the data flows.

The thing you need to avoid are infinite loops. Infinite loops occur when you subscribe to both the update and the command channel for a given Item on both OH instances.

But you wouldn’t need to publish updates from that scene controller right? All you care about are publishing the Commands. And you wouldn’t be commanding this Item from the openHAB that isn’t connected to the controller. All you care about is subscribing to the commands.

So those Items are members of PubItems_CMD only on the machine connected to the controller. The other openHAB instance subscribes to the commands only and publishes nothing for that Item. You don’t care about the updates at all for that Item. It’s only the command that are generated by physically pressing the buttons on the controller.

These are sensors. So for these Items you don’t care about commands. In the case of Contacts you can’t command them if you wanted to. So the instance of openHAB actually connected to the device puts those Items into PubItems_UPD. The other openHAB instance only subscribes. It doesn’t own the device and the device is a sensor. You wouldn’t be updating that device in the other openHAB instance anyway so don’t publish anything at all for those Items.

Even in the MQTT 1.x event bus if you didn’t get these settings right an infinite loop occurred. I fought against many such loops in the past with MQTT 1.x.

But, the above is by design more flexible and more capable than the MQTT 1.x event bus. It was never intended to be a mere reimplementation. I had plans to create a simpler version that more closely mimicked the old event bus only even simpler but that became redundant when the Remote openHAB add-on was released. So this tutorial is mainly kept around to handle those cases that are too complicated to be implemented by the Remote openHAB add-on.

If you want simple mirroring that’s the tool to use. If you are on OH 2 and want simple mirroring of everything, the MQTT 1.x event bus is what you want to use. If you want to have control over how the Items are mirrored on an Item by Item basis then this tutorial is for you. But that means you have to understand the data flows on an Item by Item basis.

The only real difference here from the 1.x EventBus is that in order to allow the control and flexibility I can’t just hide these details like the 1.x binding did. But make no mistake, it worked pretty much the same way to avoid infinite loops.

Which openHAB instance “owns” the Item (i.e. is connected to the actual device)? That will usually be the one that publishes the state updates, unless the Item is commanded from the binding (e.g. a motion sensor, remote controller, etc) in which case only the commands are published.

The openHAB instance that has a proxy for the Item usually wants to know what state the device actually is in. So in that case it will subscribe to updates. But for actuators it also wants to remotely control them so it will publish commands.

You never want a case where you both publish and subscribe to commands on the same OH instance at the same time. When that happens:

  1. OH 1 publishes a command
  2. OH 2 receives the command and sendCommand on the Item
  3. OH 2 publishes the command
  4. OH 1 receives the command and sendCommand on the Item
  5. return to 1.

The same occurs if you both publish and subscribe for updates on the same OH instance at the same time.

  1. OH 1 publishes an update
  2. OH 2 receives the update and postUpdate on the Item
  3. OH 2 publishes the update
  4. OH 1 receives the update and postUpdate on the Item
  5. return to 1.

It’s still two way.

1 Like

Thanks Rich, I’m going to have another stab today at doing this - all makes sense. I had used MQTT1.x successfully for some time to do what I needed but the implementation confused me using MQTT2

Very detailed response, thank you! I cant use the Remote OpenHab binding because I’m not using OH3 - have no desire to do that yet. For now, it’s about getting off the legacy 1.x bindings which I’ve all but done except for MQTT

Thank you!

Hi Rich, I still can’t get it to work.

Main instance (Called, Garage, runs the UI, Habpanel) - has a proxy item for StudySw1 which is a light switch. I also want it to receive updates to the Power channels

Switch BedRoom1Sw1  "Study"    Switch BedRoom1Sw1  "Study"    (Group_HabPanel_Dashboard,gLights_Random,gAllLights,gInsideLights,PubItems_CMD)            { alexa="PowerController.powerState" }
Number BedRoom1Num1 "Study - Current Consumption [%.1f W]"  (gAllLightsUsage,gEnergyRealTime,PubItems_UPD)
Number BedRoom1Num2 "Study - Usage [%.1f kW]"               (gPowerUsage,PubItems_UPD)



Remote Instance (called House)- has the real item with Zwave binding for StudySw1 which is a light switch
I want it to send the power channel updates to the Main instance

/*ZWave Switch Bedroom Lights*/
Switch BedRoom1Sw1  "Study"                                   (Group_HabPanel_Dashboard,gLights_Random,gAllLights,gInsideLights,PubItems_CMD)               { channel="zwave:device:12c97eda:node30:switch_binary1"}
Number BedRoom1Num1 "Study - Current Consumption [%.1f W]"    (gAllLightsUsage,gEnergyRealTime,PubItems_UPD)                                                { channel="zwave:device:12c97eda:node30:meter_watts1"}
Number BedRoom1Num2 "Study - Usage [%.1f kW]"                 (gPowerUsage,PubItems_UPD)                                                                    { channel="zwave:device:12c97eda:node30:meter_kwh1"}

Main Instance rules:


val eb_br = "mqtt:broker:broker" // Thing ID of the broker connection
val eb_name = "Garage"              // <client ID>

rule "Publish commands to the event bus"
when
    Member of PubItems_CMD received command // eb_cmd_gr
then
    val mqttActions = getActions("mqtt",eb_br)
    mqttActions.publishMQTT(eb_name+"/out/"+triggeringItem.name+"/command",receivedCommand.toString)
end

rule "Publish updates to the event bus"
when
    Member of PubItems_UPD received update // eb_upd_gr
then
    val mqttActions = getActions("mqtt",eb_br)
    mqttActions.publishMQTT(eb_name+"/out/"+triggeringItem.name+"/state",triggeringItem.state.toString)
end


rule "Subscribe for commands and updates from the event bus"
when
    Channel "mqtt:broker:broker:remote-updates" triggered
then
    var topic = receivedEvent.toString.split("#").get(0)
    var state = receivedEvent.toString.split("#").get(1)
    val itemName = topic.split("/").get(2)
    val type = topic.split("/").get(3)

    if(type == "command") sendCommand(itemName, state)
    else postUpdate(itemName, state)
end

And the channel

Remote Instance

Rules:


val eb_br = "mqtt:broker:broker" // Thing ID of the broker connection
val eb_name = "House"              // <client ID>

rule "Publish commands to the event bus"
when
    Member of PubItems_CMD received command // eb_cmd_gr
then
    val mqttActions = getActions("mqtt",eb_br)
    mqttActions.publishMQTT(eb_name+"/out/"+triggeringItem.name+"/command",receivedCommand.toString)
end

rule "Publish updates to the event bus"
when
    Member of PubItems_UPD received update // eb_upd_gr
then
    val mqttActions = getActions("mqtt",eb_br)
    mqttActions.publishMQTT(eb_name+"/out/"+triggeringItem.name+"/state",triggeringItem.state.toString)
end


rule "Subscribe for commands and updates from the event bus"
when
    Channel "mqtt:broker:broker:remote-updates" triggered
then
    var topic = receivedEvent.toString.split("#").get(0)
    var state = receivedEvent.toString.split("#").get(1)
    val itemName = topic.split("/").get(2)
    val type = topic.split("/").get(3)

    if(type == "command") sendCommand(itemName, state)
    else postUpdate(itemName, state)
end

Remote Channel configuration

Problem:

  1. I can see the switch updates coming from the Main/Proxy/UI instance to the Remote instance, but the light wont switch on/off

  2. I can’t see the power updates coming from the Remote instance to the Main instance

OK, let’s take this one Item at a time.

First of all, your Items are called BedRoom1Sw1 but they are labeled “Study” which is a little confusing. Make sure you are looking at the right Items.

One requirement is that the Items have to have the same name in both Garage and House. That seems to be the case here which is good.

Based on the two sets of Item definitions BedRoom1Sw1 is “owned” by House and Garage has the proxy.

BedRoom1Sw1 is a switch that controls a light. So we need the following data flows.

Direction Type Purpose
House to Garage Updates When the light changes state we need to represent that change in the proxy Item at Garage
Garage to House Commands The rules and HABPanel and such needs to be able to control this light which means it needs to be able to send commands.

To achieve that:

There are multiple ways to do this but in a one-to-one topology like this I find it easiest to have only one of the OH instances “own” the MQTT topics but that will require more changes to the rules. So instead let’s let each OH instance use it’s own out topics and they each subscribe to each others.

OH Instance Subscription topic
Garage House/out/*
House Garage/out/*

This will set up the two way traffic and it handles both commands and updates.

Now for the Items:

OH Instance Configuration for BedRoom1Sw1
House BedRoom1Sw1 is a member of PubItems_UPD
Garage BedRoom1Sw1 is a member of PubItems_CMD

Updates to the Items on House will be published and Garage will receive that message and update the proxy Item, keeping them in sync. Commands on the proxy Item from Rules or the UI will be published and House will receive them and command the Item linked to the actual device.

A nice additional thing you can do is for the proxy Item on Garage set autoupdate to false. That means the Item itself won’t change state on Garage until it also does so on House, giving you a confirmation that the light actually turned on or off as commanded.

The configuration for the other two Items is even easier because the data flow is actually just one way and all we care about are the updates.

Garage is already subscribed to the right topics so the only thing you need to do on House is add BedRoom1Num1 and BedRoom1Num2 to PubItems_UPD. That’s all there is to it. When the Zwave binding updates the Item that update will be published and Garage will receive the update and postUpdate to the proxy Item, keeping them in sync.

For the rules you probably already have everything configured as they need to be. You need all three rules in both instances. eb_name on the House set of rules need to be “House” and eb-name on the Garage set of rules need to be “Garage”. Other than that they will be the same (unless the Broker Thing is different on the two instances in which case set eb_br as appropriate for each instance).

Looking at your config as posted:

  • the Switch is a member of the wrong Group on House; it should be a member of PubItem_UPD
  • the Number Items should not be a member of PubItem_UPD on Garage, only on House; there is nothing to publish from Garage for these sensor Items
  • the broker Trigger Channels on both OH instances should subscribe to everything under out, not just the state or the command subtopics.

Thanks Rich, I was pretty close . After making those fixes it still does not work.

Ive checked it all and updated as required. For the channel in MQTT, with those topics you’ve listed do I need a separator character?

Also, using autoupdate=“false” on the proxy, wont allow the light to switch on in the UI at all - so I’ve removed it for now.

Once changing the MQTT Channel configuration to Garage/out/* & House/out/* I no longer get any log events :frowning:

Yes, put in “#”. Otherwise the topic name is not passed along with the message in the event from the subscription channel. Without the topic name we have no idea which Item and whether it’s an update or a command. There should be errors in the log when the subscription rule runs if the separation character is missing.

It won’t work right until the event bus works. When it’s false, House will have to publish the state update before HABPanel on Garage will show the light has switched.

OK cool. Unforunately, no joy still.

Proxy/House receives nothing at all - log shows nothing. Topics have * at the end in the channel configuration (different to your how to post at the top)

FIXED!!!

You had given me the wrong channel configuration rich :wink:

This is what it needed, it needed the + and the #, not the * you had mentioned…

Thanks so much - Light going on/off now, ill wait a bit and see if the energy figures are sent from House to Garage…

Hi Rich

It seems something isnt quite right with regards to Scene Controllers.

The house has quite alot of them, but now they are throwing these errors. As you suggested, I put the PubItems_CMD as they send commands only, but the rules for those commands are actually on the Garage instance.

Would this be I receive this error?

17:56:33.869 [WARN ] [verter.ZWaveMultiLevelSwitchConverter] - No conversion in ZWaveMultiLevelSwitchConverter to StringType
17:56:39.661 [INFO ] [smarthome.event.ItemStateChangedEvent] - Kitchen_AeoButton_Scene changed from 2.0 to 1.0
17:56:40.758 [INFO ] [smarthome.event.ItemStateChangedEvent] - Kitchen_AeoButton_Scene changed from 1.0 to 2.0
17:56:40.846 [WARN ] [verter.ZWaveMultiLevelSwitchConverter] - No conversion in ZWaveMultiLevelSwitchConverter to StringType

That was a typo on my part. Getting old sucks and I didn’t have my glasses on. * and # are completely indistinguishable without my glasses.

You need to use #. I’ll update my reply above as well to correct that. In the mean time:

OH Instance Subscription Topic
Garage House/out/#
House Garage/out/#

You will want to use # instead of + though. + only works at one level and # covers the whole subtree.

The rules need to be on both instances.

The warning is coming from the Zwave binding. I don’t really know what it means. It’s complaining about a StringType. Maybe add some logging to the subscription rule on House to see if you can figure out which Item it’s complaining about. Since it’s co

Hi Rich, The items worked on both instances perfectly using MQTT1x for about 2 years so it wont be the ZWave binding - it’s related to MQTT

07:02:22.759 [WARN ] [verter.ZWaveMultiLevelSwitchConverter] - No conversion in ZWaveMultiLevelSwitchConverter to StringType
07:02:26.127 [WARN ] [verter.ZWaveMultiLevelSwitchConverter] - No conversion in ZWaveMultiLevelSwitchConverter to StringType

Well, that part is pretty clear about being completely to do with zwave.

It’s not very helpful by not telling if it is complaining about inbound or outbound traffic, or what channel it relates to. But MultiLevelSwitch might well be something like a scene controller.

That’s even less direct help, but if we assume that the zwave device has not changed then it’s not likely to be about an inbound message suddenly changing from numeric to string.
It could be about a linked Item being of incorrect type.
Or it could be about trying to send an inappropriate command from your openHAB to a scene controller … which you might have accidentally set up by event bus.

1 Like

What worked or didn’t work with the MQTT 1.x binding is completely irrelevant. We are not using the MQTT 1. binding here.

And I didn’t say that the problem is the Zwave binding, but that the warning is coming from the Zwave binding. And as rossko57 points out, it’s not a very informative warning so all we can say is what has already been said:

  • the warning is coming from the Zwave binding
  • the warning is complaining about StringType which is the state carried by a String Item

Beyond that :man_shrugging:

1 Like

Fair enough, ive no idea either. I guess ill keep looking when I get some time - thanks Rich/Rossko

You might use a bit of deduction. If you’ve got a timestamped complaint from zwave, suspected to be about some command issued via openHAB Item, all of those are shown in your events.log with timestamps too.

Hi,

I tried to use the Event Bus on openHAB 3.1. I used the openHAB Helper Libraries from CrazyIvan359 (GitHub - CrazyIvan359/openhab-helper-libraries: JSR223-Jython scripts and modules for use with ope) and installed Jython inside Settings → Automation.

The mqtt_pub.py will work without any problems. If I use MQTT.fx I can see that all topics are available. But the mqtt_sub.py will make some problems…

12:43:58.123 [DEBUG] [jsr223.jython.core.rules             ] - Added rule 'Reload MQTT Event Bus Subscription'
12:43:58.148 [INFO ] [jsr223.jython.mqtt_eb                ] - Successfully created rule Reload MQTT Event Bus Subscription
12:43:58.168 [INFO ] [jsr223.jython.mqtt_eb                ] - Using event bus name of remote-openhab
12:43:58.237 [INFO ] [sr223.jython.MQTT Event Bus Publisher] - Using event bus name of remote-openhab
12:43:58.243 [INFO ] [jsr223.jython.mqtt_eb                ] - Publishing OFF to  <not_important> on mqtt:broker:mosquitto with retained 1
12:43:58.288 [INFO ] [sr223.jython.MQTT Event Bus Publisher] - Using event bus name of remote-openhab
12:43:58.293 [INFO ] [jsr223.jython.mqtt_eb                ] - Publishing UNDEF to  <not_important> on mqtt:broker:mosquitto with retained 1
12:43:58.390 [WARN ] [jsr223.jython.core.triggers          ] - when: "Channel mqtt:broker:broker:eventbus triggered" could not be parsed because Channel 'mqtt:broker:broker:eventbus' does not exist
12:43:58.397 [WARN ] [jsr223.jython.core.rules             ] - rule: not creating rule 'MQTT Event Bus Subscription' due to an invalid trigger definition
12:43:58.405 [ERROR] [on.Reload MQTT Event Bus Subscription] - Failed to create rule MQTT Event Bus Subscription
12:43:58.406 [ERROR] [on.Reload MQTT Event Bus Subscription] - Failed to create MQTT Event Bus Subscription!
12:43:58.411 [ERROR] [jsr223.jython.mqtt_eb                ] - Failed to create MQTT Event Bus Subscription!

The MQTT Event Bus Subscription is also not listed as script.

While I don’t think that the Python implementation has anything that won’t work on OH 3, it’s completely untested. As it says at the openhab-rules-tools repo, one of the requirements for the Python implementations is OH 2.x, not 2.x+. None of them have been tested on OH 3 and the majority of them are known not to work on OH 3.

That being said, the error is pretty straight forward.

when: “Channel mqtt:broker:broker:eventbus triggered” could not be parsed because Channel ‘mqtt:broker:broker:eventbus’ does not exist

Did you create the subscription event Channel on the Broker Thing? Did you copy that Channel’s ID to mqtt_eb_in_chan in configuration.py?

I have created a MQTT Event Bus solution with HABApp. Possibly this is a solution that many are looking for.

You can find the post here or checkout my GitHub repository.

Two different master/slave solutions with two or more openHAB instances are given as example.

I tried it also by rewriting the above rules with the openHAB Helper Libraries and the Scripted Auotmation. You can find it also on GitHub.

Both approaches have the same problem. If you are using to much items at the same time it will not work. Maybe anyone can fix this.

Let me share the latest findings with you: The solution with the openHAB helper libraries seems to work partially. One problem you have here is the triggering channel of the Thing. With the instance that published, I have no problems. With the instance that subscribes, the triggering channel is spammed. When starting openHAB, first all models are loaded and then everything that belongs to Scripted Automation. The observed effect was that as soon as the MQTT Thing which points to the broker is loaded, the channel for the event bus is triggered. With about 3000 items openHAB is then so busy that my Jython script is no longer loaded and the whole thing does not work accordingly. If I disable this Thing, then restart openHAB and enable this Thing, the Jython script is loaded and ready to work.

But this makes a restart of openHAB more complicated, because you would have to check if the script is loaded and then you are only allowed to enable the Thing for the MQTT broker.

With less data load, I say, it loads the Jython script without problems, of course. Also the subscribing works then without problems. With a high data load it happens that not every time the trigger channel recognizes that a new message was received via the MQTT client, then the Jython script for subscribing is actually executed. This means that individual states or commands can be lost. I would also attribute this to an overload.

To reduce the data load, you could proceed as follows at the receiver:

  - id: stateUpdates
    channelTypeUID: mqtt:publishTrigger
    label: State Updates Subscription
    description: ""
    configuration:
      stateTopic: /messages/states/iBad_Hue_Lampe1_Schalter
      separator: "#"
  - id: stateUpdates2
    channelTypeUID: mqtt:publishTrigger
    label: State Updates Subscription
    description: null
    configuration:
      stateTopic: /messages/states/iBad_Hue_Lampe2_Schalter
      separator: "#"

You create a trigger channel for each item. Or optionally only for the items for which you also want to subscribe something. What is missing in this example, of course, is that the Jython script assumes only a single trigger channel. You would have to add several individual channels in the configuration and then iterate through them in a loop in the Jython script.

This would be a way to reduce what all may be received. It would be conceivable with several devices that they only have to subscribe to different topics.

What you could also do is work with a black list and a white list. Similar @rlkoshak has done with annotations and puball. I have left this out in my example.

To get more specific again on how subscribing works… The MQTT client receives a message, the trigger channel of the Thing recognizes this or triggers, then the subscribe rule is executed in the Jython script. However, if there is for example this item only on the master but not on the slave, the script can also not change the state of an item or execute a command to an item. This data load should be reduced. Processes would be started unnecessarily, which then hinder the reception of, let’s say, valid messages.

So if we could say that we don’t work with + or # as wildcards, but only with the items that are present on the instance the subscribed, we would have stopped this. The wildcards + and # ultimately also allow messages for which no item can be found.

I don’t know if it’s possible to create a Thing by Rule or the Channels to the Script by Rule, because then you could just do it in such a way that all Items (would be in use in the Jython Script in a different context) are passed through and accordingly a Trigger Channel is added one by one.

By the way, the event bus works fine when I openHAB controls with devices that access openHAB via MQTT. My Pepper robot runs music, lowers shutters or turns on lights. Means state and command work. Just the trigger channels are overloaded because of the wildcards.

You need to use QOS 1 or 2 on both the publishing and subscribing side. Under heavy load MQTT allows messages to simply be dropped on QOS 0. Based on what you’ve described I’m not certain the problem is necessarily OH specific, at least not in total.

It’s probably possible but by the time you go that far the Remote openHAB binding is probably a better choice anyway. There is a ThingRegistry and ThingManager which has similar capabilities as the RuleRegistry and RuleManager which is used to create Rules.

This behavior is also probably heavily dependent on the machine OH is running on and what rules you are using and how you are writing those rules. While I’ve not tested at 3000, I have at around 1000 using JS Scripting in UI rules and I’ve seen none of these problems. The reading and parsing of UI rules is much more light weight than doing so from files so perhaps there is less chance that the loading problem you describe can happen. I’m also running on a relatively hefty VM and not an RPi.

Another thing built into my implementations on purpose is that you have to choose which Items to publish. I filter the events that get published on the publisher side through Group membership. It does not and should not publish everything all the time by default. The user should make a deliberate choice on what Items get published.