MQTT 2.5 Event Bus

Tags: #<Tag:0x00007f7465eb6d98>

For JSR223 Python users, I’ve checked in a reusable library implementation of the below at https://github.com/openhab-scripters/openhab-helper-libraries/pull/257. I’ll update this thread when it gets merged. In the mean time check it out and give it a test and report back here if you have any problems.

This keeps coming up so here is a quick tutorial on how to set up an event bus configuration using the MQTT 2.5 M1 binding. I don’t know if all of these features exist for the 2.4 Release version.

Setting up the binding and MQTT Broker Thing is beyond the scope of this tutorial.

What’s an Event Bus?

There are times when one may have more than one OH instance that needs to have some or all of it’s Items synchronized across all the instances. In the MQTT 1.x binding there was a special configuration that enabled this that no longer exists in MQTT 2.x.

The event bus will have two parts, the subscription and the publishing.

We will use the following conventions:

topic purpose
<openHAB name>/out/<Item>/command Topic where all commands that occur on <openHAB name> are published
<openHAB name>/out/<Item>/state Topic where all state updates that occur on <openHAB name> are published
<openHAB name>/in/<Item>/command Topic where commands that occur in another openHAB instance are published that should be reflected in <openHAB name>. You can have more than one of these groups of topics that you subscribe to if you have three or more openHAB instances to synchronize.
<openHAB name>/in/<Item>/state Topic where state updates that occur in another openHAB instance are published that should be reflected in <openHAB name>You can have more than one of these groups of topics that you subscribe to if you have three or more openHAB instances to synchronize.

<openHAB name> is a name you chose to uniquely identify each openHAB instance. <Item> will be replaced with the Item name.

Note that you will want to use the same name for all Items that need to be synchronized across the OH instances.

One important thing to note is that the synchronization can be one way. For example, one OH instance can just subscribe to another OH instance without publishing their own commands and updates. This raises a little bit of confusion as to which OH instance needs to publish to which set of topics and which to subscribe. Unfortunately the answer depends on how you are doing the synchronization.

Let’s say we have two OH instances that we want to synchronize: openHAB-main and openHAB-remote.

Then for commands openHAB-main would publish to openHAB-main/out/<Item>/command and subscribe to openHAB-remote/out/<Item>/command. openHAB-remote would publish to openHAB-remote/out/<Item>/command and subscribe to openHAB-main/out/<Item>/command.

Now let’s say that we have a bunch of OH instances and the are all just publishing sensor reading to openHAB-main. In that case openHAB-main would not publish at all and openHAB-main would subscribe to openHAB-main/in/<Item>/command. All the other instances would publish to that same topic.

Publishing

The first step to publish commands and updates to the out topics is to create a Group. Be sure to give the Group a Type. It doesn’t matter what type we choose because we will not be aggregating the states of it’s members, but without a Type the Group will not receive events that are necessary to drive Rules [NOTE to self: Is this the case for Member of triggers?]. For this tutorial I’ll use:

Group:String PubItems

All Items that need to be published to the “event bus” need to be a member of this Group.

Next we will need two Rules.

rule "Publish commands to the event bus"
when
    Member of PubItems received command
then
    val mqttActions = getActions("mqtt","mqtt:systemBroker:embedded-mqtt-broker") // use your broker Thing ID
    mqttActions.publishMQTT("main-openHAB/out/"+triggeringItem.name+"/command",receivedCommand.toString)
end

rule "Publish updates to the event bus"
when
    Member of PubItems received update
then
    val mqttActions = getActions("mqtt","mqtt:systemBroker:embedded-mqtt-broker") // use your broker Thing ID
    mqttActions.publishMQTT("openHAB-main/out/"+triggeringItem.name+"/state",triggeringItem.state.toString)
end    

NOTE: I show them as separate Rules to make it easier to only partially implement (e.g. only synchronize updates). They could be combined into one I believe.

That’s all there is to it. All commands to all members of PubItems get published to the command topic and all updates get published to the state topic.

Subscribing

For subscription we need to create a publish trigger Channel on the MQTT Broker Thing and configure it to subscribe to the output topics from the other openHAB, or the input topics for this openHAB instance. You can use the wild card subscription to use this one Channel for all the Items. Be sure to define a separator character so we can get the MQTT topic from the event in our Rule.

The screenshot above only subscribes to the command topic. If you want both commands and updates you can use openHAB-remote/out/# as the topic.

With the Channel in place we only need a single Rule.

rule "Subscribe for commands and updates from the event bus"
when
    Channel 'mqtt:broker:broker:main-openHAB' triggered
then
    val itemName = receivedEvent.split("/").get(2)
    val type = receivedEvent.split("/").get(3)
    val state = receivedEvent.split("#").get(1)

    if(type == "command") sendCommand(itemName, state)
    else postUpdate(itemName, state)
end

Note that the above handles both commands and updates.

Future work

As OH 3 comes closer and closer and a distribution mechanism is built to distribute Rule templates, the Rules above will become available as an installable Rules template so you don’t need to type it all in. The JSR223 versions of these Rules should look very similar. If someone codes them up I will happily add them to the OP.

7 Likes

I use them in Jython in a more basic Form than yours but this is it in principle:

@rule("Publish all")
@when("Member of OutItems changed")
def MQTTPublishOut(event):
    output1 = event.itemName
    output2 = str(event.itemState)
    actions.get("mqtt","mqtt:broker:f79d2a84").publishMQTT("allItemsout/"+output1,output2)

@rule("Receive all")
@when("Channel mqtt:broker:f79d2a84:TriggerIn triggered")
def MQTTPublishIn(event):
    input1 = event.event
    input2 = input1.split("/")
    input3 = input2[1].split("#")
    events.sendCommand(input3[0], input3[1])

Best regards Johannes

Hi Guys,

I hope that someone can give me the redeeming answer because it is starting to drive me crazy. I’ve been playing around for a few days but can’t get out.

I use the event bus rule of a Rich. This works fine, however, he sent to the Mqtt broker an On or and OFF instead of 1 or a 0.
I would like to transform this.

Can someone help me on my way? Do I have to do the transformation via the Item file or in the rule?

Thanks

The Event Bus is intended to work as a way to federate two OH instances. OH understands ON and OFF.

If you have something that isn’t OH and it expects something besides ON and OFF, you should not be using the Event Bus to integrate that device/system or you should adjust that device/system so it understands OH’s states.

To properly integrate a device/subsystem with OH, you would create a Generic MQTT Thing (assuming it doesn’t support Homie or HA standards) and define Channels. In the Channels you can define a transformation that will convert the ON to 1 and OFF to 0.

Thank you for your quick response Rich.

I have a Mqtt channel which also sends the status of the item. Only this one will send a 0 or a 1.

I was so happy with your rule that I can choose which item to put on the bus

Switch Output_2 “Output_2 [%s]” (PubItems) { channel=“knx:device:9d0e2d38:1”, channel=“mqtt:topic:mybroker:satelthing:output_2” [profile=“follow”]}

if I do a transformation in the channels does this work with the rule above?

No. That isn’t what the Rule is intended to do. It publishs the openHAB states to topics. On the subscriber side it expects openHAB states.

It is not designed to nor is it intended to be used with some other system beyond two or more OH instances talking to each other.

If you cannot make whatever it is that you are using that needs the 1 and 0 instead of ON and OFF, this is not the correct approach.

This is the right approach. This Rule does nothing for you.

Thnx Rich

Thank you for your clear explanation always. Where would the community be without you.
Can you perhaps explain why the Mqqt also sends the statuses of all other items when there is a change in status of 1 of the items?
I do not use the rule but only via the mqtt things.
This is quite a big load with multiple items.

You will have to describe the behavior more. Are you saying the Rules in the OP does this? Or are you saying that your Generic Things are doing this?

I don’t see how this behavior is possible given the Rules above. The Rule only triggers when a member of PubItems receives a command or receives an update. Then the Rule only publishes on that one Item that triggered the Rule. If it’s sending updates for ALL Items, that means that the Rule is triggering for all Items which means that all of your Items are receiving an update at the same time.

Sorry I wasn’t entirely clear.
I do not use the rule as above but only the Items and the channels.

When the staus of output 1 changes, the mqtt binding also immediately sends the status of output 2.

Items file

Switch Output_1 “Output_1 [%s]” (Test) { channel=“knx:device:9d0e2d38:2”, channel=“mqtt:topic:mybroker:satelthing:output_1” [profile=“follow”]}
Switch Output_2 “Output_2 [%s]” (Test) { channel=“knx:device:9d0e2d38:1”, channel=“mqtt:topic:mybroker:satelthing:output_2” [profile=“follow”]}

Mqtt things

    Type switch : output_1 "Output_1" [ stateTopic="output_1", commandTopic="output_1/state", postCommand="true" ] 
    Type switch : output_2 "Output_2" [ stateTopic="output_2", commandTopic="output_2/state", postCommand="true" ]

You will need to post a new thread and hope someone who knows more about the MQTT binding, especially when using .things files to configure them will see and can help.

ok thanks

Rich,

Is there not a possibility to use your rule anyway? Possibly create a proxy item and have it converted to 0 and 1

You could (you can do just about anything you want), but why would you? Just create a Thing, configure the transform on the Channel and avoid the extra Item and rule. That’s what the Generic Things are for. What you are proposing, given the information I have so far, send like an awful lot of extra work just to avoid the little bit of work of creating the Thing.

The problem with that is that I also want to link an alarm system (Satel) to another system. However, this alarm system only allows 1 connection. With a status change of 1 zone, the system immediately sends the status of all zones. This means that when 1 zone is changed, 200 statuses are immediately sent over the broker.
With the rule above I can override this by replacing meber or group changed instead of received command.

If I use the thing, all statuses are also sent.
So I am looking for a solution to overcome this.

Is that really a problem? MQTT brokers are designed to presses tens of thousands of messages with hundreds of connections on machines like an RPi. I found these 200 messages are overloading the broker. So what other system is actually experiencing a problem?

It’s best not to try to fix things that are not actually a problem.

But it you do want to user a rule liked the above, than you need to code it significantly differently. You will have to add logic to figure out what sort of message it is and convert it as appropriate