MQTT 2.5 Event Bus

Tags: #<Tag:0x00007f5c98cd4b90>

This keeps coming up so here is a quick tutorial on how to set up an event bus configuration using the MQTT 2.5 M1 binding. I don’t know if all of these features exist for the 2.4 Release version.

Setting up the binding and MQTT Broker Thing is beyond the scope of this tutorial.

What’s an Event Bus?

There are times when one may have more than one OH instance that needs to have some or all of it’s Items synchronized across all the instances. In the MQTT 1.x binding there was a special configuration that enabled this that no longer exists in MQTT 2.x.

The event bus will have three parts, the subscription, the publishing, and online status reporting.

We will use the following conventions:

topic purpose
<openHAB name>/out/<Item>/command Topic where all commands that occur on <openHAB name> are published
<openHAB name>/out/<Item>/state Topic where all state updates that occur on <openHAB name> are published
<openHAB name>/in/<Item>/command Topic where commands that occur in another openHAB instance are published that should be reflected in <openHAB name>. You can have more than one of these groups of topics that you subscribe to if you have three or more openHAB instances to synchronize.
<openHAB name>/in/<Item>/state Topic where state updates that occur in another openHAB instance are published that should be reflected in <openHAB name>You can have more than one of these groups of topics that you subscribe to if you have three or more openHAB instances to synchronize.

<openHAB name> is a name you chose to uniquely identify each openHAB instance. <Item> will be replaced with the Item name.

Note that you will want to use the same name for all Items that need to be synchronized across the OH instances.

One important thing to note is that the synchronization can be one way. For example, one OH instance can just subscribe to another OH instance without publishing their own commands and updates. This raises a little bit of confusion as to which OH instance needs to publish to which set of topics and which to subscribe. Unfortunately the answer depends on how you are doing the synchronization.

Let’s say we have two OH instances that we want to synchronize: openHAB-main and openHAB-remote.

Then for commands openHAB-main would publish to openHAB-main/out/<Item>/command and subscribe to openHAB-remote/out/<Item>/command. openHAB-remote would publish to openHAB-remote/out/<Item>/command and subscribe to openHAB-main/out/<Item>/command.

Now let’s say that we have a bunch of OH instances and they are all just publishing sensor reading to openHAB-main. In that case openHAB-main would not publish at all and openHAB-main would subscribe to openHAB-main/in/<Item>/command. All the other instances would publish to that same topic.

For the code below, we will be using the following:

Variable Value for examples Purpose
eb_out_gr PubItems Group containing all the Items whose commands and updates are published to the bus.
eb_name openhab-remote Name for the publishing OH instance, will be the root of the topics.
eb_br mqtt:broker:mosquitto Thing ID for the MQTT Broker Thing
eb_in_chan mqtt:broker:mosquitto:eventbus Channel ID for the Trigger Channel defined on the MQTT Broker Thing for the OH instance that subscribes to the event bus


The first step to publish commands and updates to the out topics is to create a Group. Be sure to give the Group a Type. It doesn’t matter what type we choose because we will not be aggregating the states of it’s members, but without a Type the Group will not receive events that are necessary to drive Rules. For this tutorial I’ll use:

Group:String PubItems

All Items that need to be published to the “event bus” need to be a member of this Group.

Next we will need Rules.


from core.rules import rule
from core.triggers import when

# !!!! Fill in these variables !!!!
eb_out_gr = "PubItems"          # name of the Group containing the Items to publish on the bus
eb_name = "openhab-remote"      # name of the openHAB instance, <openHAB name>
eb_br = "mqtt:broker:mosquitto" # Thing ID for the MQTT Broker Thing

@rule("Publish Item events",
      description=("Publishes commands and update events for all members of {} "
                   "to the configured topic".format(eb_out_gr)),
@when("Member of {} received command".format(eb_out_gr))
@when("Member of {} received update".format(eb_out_gr))
def pub(event):
    is_cmd = hasattr(event, 'itemCommand')
    msg = str(event.itemCommand if is_cmd else event.itemState)
    topic = "{}/out/{}/{}".format(eb_name, event.itemName,
                                  "command" if is_cmd else "state")
    retained = False if is_cmd else True"Publishing {} to {} on {}".format(msg, topic, eb_br))
    action = actions.get("mqtt", eb_br)
    if action:
        action.publishMQTT(topic, msg, retained)
        pub.log.error("There is no broker thing {}!".format(eb_br))    

Rules DSL:

rule "Publish commands to the event bus"
    Member of PubItems received command
    val mqttActions = getActions("mqtt","mqtt:broker:mosquitto") // use your broker Thing ID

rule "Publish updates to the event bus"
    Member of PubItems received update
    val mqttActions = getActions("mqtt","mqtt:broker:mosquitto") // use your broker Thing ID

That’s all there is to it. All commands to all members of PubItems get published to the command topic and all updates get published to the state topic.


For subscription we need to create a publish trigger Channel on the MQTT Broker Thing on the subscribing instance of OH and configure it to subscribe to the output topics from the other openHAB, or the input topics for this openHAB instance. You can use the wild card subscription to use this one Channel for all the Items. Be sure to define a separator character so we can get the MQTT topic from the event in our Rule.

The screenshot above only subscribes to the command topic. If you want both commands and updates you can use openHAB-remote/out/# as the topic. # is used as the separator character. In the Rule we will split on # to get the topic and message. Take note of the Thing Channel ID, we will use that to trigger the Rule.

With the Channel in place we only need a single Rule.


from core.rules import rule
from core.triggers import when

# !!!! Fill in this variable !!!!
eb_in_chan = "mqtt:broker:mosquitto:eventbus" # Thing Event Channel ID

@rule("Eventbus subscribe",
      description="Subscribe to eventbus events and synchronize the Items.",
@when("Channel {} triggered".format(eb_in_chan))
def eb_sub(event):
    topic = event.event.split("#")[0]
    state = event.event.split("#")[1]
    item_name = topic.split("/")[2]
    event_type = topic.split("/")[3]

    if item_name not in items:
        eb_sub.log.debug("Local openHAB does not have Item {}, ignoring."
    elif event_type == "command":
        events.sendCommand(item_name, state)
        events.postUpdate(item_name, state)

Rules DSL:

rule "Subscribe for commands and updates from the event bus"
    Channel 'mqtt:broker:broker:main-openHAB' triggered
    var topic = receivedEvent.toString.split("#").get(0)
    var state = receivedEvent.toString.split("#").get(1)
    val itemName = topic.split("/").get(2)
    val type = topic.split("/").get(3)

    if(type == "command") sendCommand(itemName, state)
    else postUpdate(itemName, state)

Note that the above handles both commands and updates.

Online Status

It is also desirable to know when the publishing OH instance goes offline. To handle this, we configure the LWT on the MQTT Broker Thing.

  • Topic: <openHAB-Name>/status
  • Retained: True
  • Message: “OFFLINE”

When the OH instance loses it’s connection to the MQTT Broker, the Broker will publish “OFFLINE” to <openHAB-Name>/status. Since the message is retained, even if the subscribers are not online, they will get the OFFLINE message when they reconnect.

Next we need a Rule triggered at Systems started to publish ONLINE as a retained message when the OH instance comes back online.

Rules DSL:

from core.rules import rule
from core.triggers import when

# !!!! Fill in these variables !!!!
eb_name = "openhab-instance" # openHAB's instance name, <openHAB-name>
eb_br = "mqtt:broker:mosquitto" # MQTT Broker Thing ID

@rule("Eventbus Online",
      description="Publish that this instance of OH is now online",
@when("System started")
@when("Thing {} changed to ONLINE".format(eb_br))
def online(event):"Reporting eventbus as online")
    actions.get("mqtt", eb_br).publishMQTT("{}/status".format(eb_name),
                                                              "ONLINE", True)

Rules DSL:

rule "Eventbus Online"
    System started or
    Thing mqtt:broker:mosquitto changed to ONLINE
    logInfo("Reporting eventbus as online")
    getActions("mqtt", "mqtt:broker:mosquitto").publishMQTT("openhab-remote/status", "ONLINE", True)

I use them in Jython in a more basic Form than yours but this is it in principle:

@rule("Publish all")
@when("Member of OutItems changed")
def MQTTPublishOut(event):
    output1 = event.itemName
    output2 = str(event.itemState)

@rule("Receive all")
@when("Channel mqtt:broker:f79d2a84:TriggerIn triggered")
def MQTTPublishIn(event):
    input1 = event.event
    input2 = input1.split("/")
    input3 = input2[1].split("#")
    events.sendCommand(input3[0], input3[1])

Best regards Johannes

Hi Guys,

I hope that someone can give me the redeeming answer because it is starting to drive me crazy. I’ve been playing around for a few days but can’t get out.

I use the event bus rule of a Rich. This works fine, however, he sent to the Mqtt broker an On or and OFF instead of 1 or a 0.
I would like to transform this.

Can someone help me on my way? Do I have to do the transformation via the Item file or in the rule?


The Event Bus is intended to work as a way to federate two OH instances. OH understands ON and OFF.

If you have something that isn’t OH and it expects something besides ON and OFF, you should not be using the Event Bus to integrate that device/system or you should adjust that device/system so it understands OH’s states.

To properly integrate a device/subsystem with OH, you would create a Generic MQTT Thing (assuming it doesn’t support Homie or HA standards) and define Channels. In the Channels you can define a transformation that will convert the ON to 1 and OFF to 0.

Thank you for your quick response Rich.

I have a Mqtt channel which also sends the status of the item. Only this one will send a 0 or a 1.

I was so happy with your rule that I can choose which item to put on the bus

Switch Output_2 “Output_2 [%s]” (PubItems) { channel=“knx:device:9d0e2d38:1”, channel=“mqtt:topic:mybroker:satelthing:output_2” [profile=“follow”]}

if I do a transformation in the channels does this work with the rule above?

No. That isn’t what the Rule is intended to do. It publishs the openHAB states to topics. On the subscriber side it expects openHAB states.

It is not designed to nor is it intended to be used with some other system beyond two or more OH instances talking to each other.

If you cannot make whatever it is that you are using that needs the 1 and 0 instead of ON and OFF, this is not the correct approach.

This is the right approach. This Rule does nothing for you.

Thnx Rich

Thank you for your clear explanation always. Where would the community be without you.
Can you perhaps explain why the Mqqt also sends the statuses of all other items when there is a change in status of 1 of the items?
I do not use the rule but only via the mqtt things.
This is quite a big load with multiple items.

You will have to describe the behavior more. Are you saying the Rules in the OP does this? Or are you saying that your Generic Things are doing this?

I don’t see how this behavior is possible given the Rules above. The Rule only triggers when a member of PubItems receives a command or receives an update. Then the Rule only publishes on that one Item that triggered the Rule. If it’s sending updates for ALL Items, that means that the Rule is triggering for all Items which means that all of your Items are receiving an update at the same time.

Sorry I wasn’t entirely clear.
I do not use the rule as above but only the Items and the channels.

When the staus of output 1 changes, the mqtt binding also immediately sends the status of output 2.

Items file

Switch Output_1 “Output_1 [%s]” (Test) { channel=“knx:device:9d0e2d38:2”, channel=“mqtt:topic:mybroker:satelthing:output_1” [profile=“follow”]}
Switch Output_2 “Output_2 [%s]” (Test) { channel=“knx:device:9d0e2d38:1”, channel=“mqtt:topic:mybroker:satelthing:output_2” [profile=“follow”]}

Mqtt things

    Type switch : output_1 "Output_1" [ stateTopic="output_1", commandTopic="output_1/state", postCommand="true" ] 
    Type switch : output_2 "Output_2" [ stateTopic="output_2", commandTopic="output_2/state", postCommand="true" ]

You will need to post a new thread and hope someone who knows more about the MQTT binding, especially when using .things files to configure them will see and can help.

ok thanks


Is there not a possibility to use your rule anyway? Possibly create a proxy item and have it converted to 0 and 1

You could (you can do just about anything you want), but why would you? Just create a Thing, configure the transform on the Channel and avoid the extra Item and rule. That’s what the Generic Things are for. What you are proposing, given the information I have so far, send like an awful lot of extra work just to avoid the little bit of work of creating the Thing.

The problem with that is that I also want to link an alarm system (Satel) to another system. However, this alarm system only allows 1 connection. With a status change of 1 zone, the system immediately sends the status of all zones. This means that when 1 zone is changed, 200 statuses are immediately sent over the broker.
With the rule above I can override this by replacing meber or group changed instead of received command.

If I use the thing, all statuses are also sent.
So I am looking for a solution to overcome this.

Is that really a problem? MQTT brokers are designed to presses tens of thousands of messages with hundreds of connections on machines like an RPi. I found these 200 messages are overloading the broker. So what other system is actually experiencing a problem?

It’s best not to try to fix things that are not actually a problem.

But it you do want to user a rule liked the above, than you need to code it significantly differently. You will have to add logic to figure out what sort of message it is and convert it as appropriate

This is great!

I actually had this idea the other day, and wondered how I could connect more OH systems to eachother… The idea was to use two or three Rpi´s placed strategic in our house to cover every corner of Zigbee/Zwave or whatever else needed near these particular places… (I live in a rather big one level house, so wireless communication is impossible everywhere with just one single coordinator/gateway).

I understand MQTT is an option, but I also wondered if there are any plans to make this possibilty more “integrated” directly into OH. (mainly because I´m still struggling with understanding MQTT fully).

All plans I’m aware of involve MQTT. I have plans to improve the above and perhaps there will be tighter integration with openHAB, but I know of no current plans to implement this using anything but MQTT. It is, IMHO, the best tool for the job.

Some of the things I’m considering include:

  • automatically create the Itemson the subscriber side
  • automatically discover remote OH instances
  • turn this into a Rule Template to make installation and use easier
  • add it as a feature to the MQTT binding or create a separate binding

What direction this goes will depend on what direction OH 3 goes. And I don’t want to do too much to it until it gets merged into the Helper Libraries.

I understand you wont put too much effort into it. But it sure would be nice to see those things mentioned beeing included.
The best option would be, if this “event bus” could be added directly into OH. I have a little idea, that many would find it highly usefull.

I said I won’t put much effort into it until what I’ve already submitted gets merged. I didn’t say I’m not going to not work on it much at all for ever.

That’s exactly what rule templates are.

Sorry thats what I meant… I took that for granted I gues…Sorry!

What I meant was, the event bus got a part of the system itself, as an direct option in the setup where you specify wheter you need to activate this even bus, and if yes, then there would be an option where you place to where, and which funktions/items/things etc you want to share between the systems… The actualy communcation should then be established automaticly. Perhas like main system “download” a list of all resources/items/things ext from the other(s) system(s).