MQTT Event Bus with HABApp

HABApp-MQTT-Event-Bus

A MQTT Event Bus for openHAB 2.x and 3.x using HABApp. Tested with openHAB 3. This should work equivalent like the Event bus binding from the old MQTT 1.x binding.

Description:
Publish/receive all states/commmands directly on the openHAB eventbus.

Usage:
Perfect for integrating multiple openHAB instances or broadcasting all events.

Preparation

Install HABApp

At first you have to install HABApp.

Then I changed the permissions with

sudo chown -R openhab:openhab /opt/habapp
sudo chown -R openhab:openhab /etc/openhab/habapp

HABApp should also work with the permissions in the installations steps. However, my problem was that the log files were not created then.

If your are using openHAB 2 instead of openHAB 3 you have to use /etc/openhab2/habapp instead of /etc/openhab/habapp.

Also you have to make sure that the systemd file looks like following:

[Unit]
Description=HABApp
Documentation=https://habapp.readthedocs.io
After=openhab.service

[Service]
Type=simple
User=openhab
Group=openhab
UMask=002
ExecStart=/opt/habapp/bin/habapp -c /etc/openhab/habapp

[Install]
WantedBy=multi-user.target

So please make sure that User and Group are openhab because it could be that there will be not logs in /var/log/openhab2 or /var/log/openhab.

Install the mosquitto MQTT broker

The next step is to install the mosquitto MQTT broker on the master with

sudo apt install mosquitto mosquitto-clients

The slaves only need mosquitto-clients because all slaves will later be connected to the master.

Then you have to edit the mosquitto.conf file:

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

listener 1883 0.0.0.0

pid_file /run/mosquitto/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log

include_dir /etc/mosquitto/conf.d

allow_anonymous true

Of course you can use a password which mean you should not have to use allow_anonymous true. The more important thing is that you have to use listener 1883 0.0.0.0. This means that the mosquitto broker will be public accessible for all slaves (maybe if you want with a password).

Configure HABapp

Configure the MQTT and openHAB connection

At first we will configure the config.yml for HABApp:

directories:
  logging: /var/log/openhab  # Folder where the logs will be written to
  rules: rules # Folder from which the rule files will be loaded
  param: params # Folder from which the parameter files will be loaded
  config: config # Folder from which configuration files (e.g. for textual thing configuration) will be loaded
  lib: lib # Folder where additional libraries can be placed
location:
  latitude: 0.0
  longitude: 0.0
  elevation: 0.0
mqtt:
  connection:
    client_id: <client_name>
    host: <public_ip_of_mosquitto_broker>
    port: 1883
    user: ''
    password: ''
    tls: false
    tls_ca_cert: ''  # Path to a CA certificate that will be treated as trusted
    tls_insecure: true
  general:
    listen_only: false  # If True HABApp will not publish any value to the broker
  publish:
    qos: 0  # Default QoS when publishing values
    retain: false # Default retain flag when publishing values
  subscribe:
    qos: 0  # Default QoS for subscribing
    topics:
    - '#'
    - 0
openhab:
  connection:
    host: <public_ip_of_openhab_instance>
    port: 8080
    user: '<openhab_user>'
    password: '<openhab_password>'
  general:
    listen_only: false  # If True HABApp will not change anything on the openHAB instance.
    wait_for_openhab: true # If True HABApp will wait for items from the openHAB instance before loading any rules on startup
  ping:
    enabled: true  # If enabled the configured item will show how long it takes to send an update from HABApp and get the updated value back from openhabin milliseconds
    item: HABApp_Ping # Name of the Numberitem
    interval: 10 # Seconds between two pings

The <client_name> can be random. On the master it could contain the word master and on the slave(s) it could be contain slave for separation. All openHAB instances should refer to the same public ip adress so <public_ip_of_mosquitto_broker> should be always the same. As you seen in the mosquitto.conf we use the public port 1883 so maybe you want to use 8883 or another port number.

If you all the anonymous access to the mosquitto broker user and password could be empty. If not fill in the username and password. I also changed tls to false and tls_insecure to true. Please check your mosquitto.conf! If you will use tls then you have to set tls to true and tls_insecure to false!

In the openhab section you can use localhost for <public_ip_of_openhab_instance> because it will refer to the openhab instance you are currently using. For a better separation I used the public ip address. If you are using openHAB 3 you should replace <openhab_user> and <openhab_password> with the username and password you need to access the openHAB settings panel. If you are using openHAB 2 it could be that there is no username and password needed. Please check your openHAB configuration!

In the next step we will add following to the logging.yml file:

  MQTTEventBusHandler:
    class: HABApp.core.lib.handler.MidnightRotatingFileHandler
    filename: '/var/log/openhab/EventBus.log'
    maxBytes: 1_048_576
    backupCount: 3

    formatter: Frontail_format
    level: DEBUG

and

  MQTTEventBusHandler:
    class: HABApp.core.lib.handler.MidnightRotatingFileHandler
    filename: '/var/log/openhab/EventBus.log'
    maxBytes: 1_048_576
    backupCount: 3

    formatter: Frontail_format
    level: DEBUG

Configure the logging handler

So in my case the config.yml looks like this:

levels:
  WARNING: WARN

formatters:
  HABApp_format:
    format: '[%(asctime)s] [%(name)25s] %(levelname)8s | %(message)s'

  Frontail_format:
    format: '%(asctime)s.%(msecs)03d [%(levelname)-5s] [%(name)-36s] - %(message)s'
    datefmt: '%Y-%m-%d %H:%M:%S'

handlers:
  # There are several Handlers available:
  #  - logging.handlers.RotatingFileHandler:
  #    Will rotate when the file reaches a certain size (see python logging documentation for args)
  #  - HABApp.core.lib.handler.MidnightRotatingFileHandler:
  #    Will wait until the file reaches a certain size and then rotate on midnight
  #  - More handlers:
  #    https://docs.python.org/3/library/logging.handlers.html#rotatingfilehandler

  HABApp_default:
    class: HABApp.core.lib.handler.MidnightRotatingFileHandler
    filename: '/var/log/openhab/HABApp.log'
    maxBytes: 1_048_576
    backupCount: 3

    formatter: Frontail_format
    level: DEBUG

  EventFile:
    class: HABApp.core.lib.handler.MidnightRotatingFileHandler
    filename: 'HABApp_events.log'
    maxBytes: 1_048_576
    backupCount: 3

    formatter: HABApp_format
    level: DEBUG

  BufferEventFile:
    class: logging.handlers.MemoryHandler
    capacity: 10
    formatter: HABApp_format
    target: EventFile
    level: DEBUG

  MQTTEventBusHandler:
    class: HABApp.core.lib.handler.MidnightRotatingFileHandler
    filename: '/var/log/openhab/EventBus.log'
    maxBytes: 1_048_576
    backupCount: 3

    formatter: Frontail_format
    level: DEBUG

loggers:
  HABApp:
    level: INFO
    handlers:
      - HABApp_default
    propagate: False

  MQTTEventBus:
    level: DEBUG
    handlers:
      - MQTTEventBusHandler
    propagate: False

  HABApp.EventBus:
    level: INFO
    handlers:
      - BufferEventFile
    propagate: False

This will add EventBus.log file to '/var/log/openhab/. If you are using openHAB 2 please make sure that the log file will be in /var/log/openhab2.

Installation

You can install it for openHAB 3 with:

wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/mqtt_event_bus.py -P /etc/openhab/habapp/rules
sudo chmod +x /etc/openhab/habapp/rules/mqtt_event_bus.py
sudo chown -R openhab:openhab /etc/openhab/habapp/rules/mqtt_event_bus.py

And for openHAB 2 with:

wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/mqtt_event_bus.py -P /etc/openhab2/habapp/rules
sudo chmod +x /etc/openhab2/habapp/rules/mqtt_event_bus.py
sudo chown -R openhab:openhab /etc/openhab2/habapp/rules/mqtt_event_bus.py

or copy this code and create a file with sudo nano /etc/openhab/habapp/rules/mqtt_event_bus.py or sudo nano /etc/openhab2/habapp/rules/mqtt_event_bus.py

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
statePublishTopic = Parameter(
    'mqtt_event_bus', 'statePublishTopic', default_value='').value
commandPublishTopic = Parameter(
    'mqtt_event_bus', 'commandPublishTopic', default_value='').value
stateSubscribeTopic = Parameter(
    'mqtt_event_bus', 'stateSubscribeTopic', default_value='').value
commandSubscribeTopic = Parameter(
    'mqtt_event_bus', 'commandSubscribeTopic', default_value='').value


class MqttEventBus(HABApp.Rule):
    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            if statePublishTopic != '':
                item.listen_event(self.on_item_state, ItemStateEvent)

            if commandPublishTopic != '':
                item.listen_event(self.on_item_command, ItemCommandEvent)

            if commandSubscribeTopic != '':
                topic_command = commandSubscribeTopic.replace(
                    "${item}", str(item.name))

                mqtt_item_command = MqttItem.get_create_item(f'{topic_command}')
                mqtt_item_command.listen_event(
                    self.on_mqtt_command, ValueUpdateEvent)

            if stateSubscribeTopic != '':
                topic_state = stateSubscribeTopic.replace(
                    "${item}", str(item.name))

                mqtt_item_state = MqttItem.get_create_item(f'{topic_state}')
                mqtt_item_state.listen_event(
                    self.on_mqtt_state, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = commandSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topicString = statePublishTopic.replace(
            "${item}", event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, str(value))

    def on_item_command(self, event: ItemCommandEvent):
        topicString = commandPublishTopic.replace(
            "${item}", event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, str(value))

    def on_mqtt_state(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = stateSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


MqttEventBus()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

After that you have to restart habapp with:

sudo systemctl restart habapp.service

This should create a file like this in /etc/openhab/habapp/params/mqtt_event_bus.yml or /etc/openhab2/habapp/params/mqtt_event_bus.yml:

log_state: true
statePublishTopic: ''
commandPublishTopic: ''
stateSubscribeTopic: ''
commandSubscribeTopic: ''

If not you can create it with sudo nano /etc/openhab/habapp/params/mqtt_event_bus.yml or sudo nano /etc/openhab2/habapp/params/mqtt_event_bus.yml. Alternatively you can download it with wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/mqtt_event_bus.yml -O /etc/openhab/habapp/params/mqtt_event_bus.yml or wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/mqtt_event_bus.yml -O /etc/openhab2/habapp/params/mqtt_event_bus.yml.

Master / Slave example

Please make sure that the slave(s) will be connected to the master’s mosquitto broker!

Usage

You have to make sure that the item names on the master and on the slave(s) are equal. The master should contain all items from all slaves. But not all slaves should contain all items from the master. This means that the slaves can have different items with different names. Also the slaves could have only a few items from the master. This can be thought of as a restricted user who only has access to a few items. For example, that a slave is in the bathroom and the openHAB instance in the bathroom then only allows the items in the bathroom to be operated. The master/slave principle can only subscribe where the corresponding item is present. Otherwise it will be published, but a slave or even none of the slaves will access this topic. Conversely, the master should be able to subscribe to everything that the slaves publish.

Configuration example 1

On the master you can create a configuration like this:

log_state: true
statePublishTopic: openHAB/in/${item}/state
commandPublishTopic: ''
stateSubscribeTopic: ''
commandSubscribeTopic: openHAB/out/${item}/command

and on the slave like this:

log_state: true
statePublishTopic: ''
commandPublishTopic: openHAB/out/${item}/command
stateSubscribeTopic: openHAB/in/${item}/state
commandSubscribeTopic: ''

Or on the master you can download it with wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.master1 -O /etc/openhab/habapp/params/mqtt_event_bus.yml or wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.master1 -O /etc/openhab2/habapp/params/mqtt_event_bus.yml. On the slave you can download it with wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.slave1 -O /etc/openhab/habapp/params/mqtt_event_bus.yml or wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.slave1 -O /etc/openhab2/habapp/params/mqtt_event_bus.yml.

Configuration example 2

On the master you can create a configuration like this:

log_state: true
statePublishTopic: /messages/states/${item}
commandPublishTopic: ''
stateSubscribeTopic: ''
commandSubscribeTopic: /messages/commands/${item}

and on the slave like this:

log_state: true
statePublishTopic: ''
commandPublishTopic: /messages/commands/${item}
stateSubscribeTopic: /messages/states/${item}
commandSubscribeTopic: ''

Or on the master you can download it with wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.master2 -O /etc/openhab/habapp/params/mqtt_event_bus.yml or wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.master2 -O /etc/openhab2/habapp/params/mqtt_event_bus.yml. On the slave you can download it with wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.slave2 -O /etc/openhab/habapp/params/mqtt_event_bus.yml or wget https://raw.githubusercontent.com/Michdo93/HABApp-MQTT-Event-Bus/main/example/mqtt_event_bus.yml.slave2 -O /etc/openhab2/habapp/params/mqtt_event_bus.yml.

Proof of concept

Check the master logging

The EventBus.log on the master could look like this:

2021-12-04 00:35:02.326 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/in/testSwitch/state with ON
2021-12-04 00:35:02.332 [INFO ] [MQTTEventBus                        ] - testSwitch changed from OFF to ON
2021-12-04 00:35:09.845 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic/openHAB/out/testSwitch/command with OFF
2021-12-04 00:35:09.846 [INFO ] [MQTTEventBus                        ] - testSwitch predicted to become OFF
2021-12-04 00:35:09.889 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/in/testSwitch/state with OFF
2021-12-04 00:35:09.894 [INFO ] [MQTTEventBus                        ] - testSwitch changed from ON to OFF
2021-12-04 00:35:16.777 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic/openHAB/out/testSwitch/command with ON
2021-12-04 00:35:16.779 [INFO ] [MQTTEventBus                        ] - testSwitch predicted to become ON
2021-12-04 00:35:16.817 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/in/testSwitch/state with ON
2021-12-04 00:35:16.823 [INFO ] [MQTTEventBus                        ] - testSwitch changed from OFF to ON
2021-12-04 00:35:20.787 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/in/testSwitch/state with OFF
2021-12-04 00:35:20.792 [INFO ] [MQTTEventBus                        ] - testSwitch changed from ON to OFF

Check the slave logging

The EventBus.log on the slave(s) could look like this:

2021-12-04 00:35:02.337 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic /openHAB/in/testSwitch/state with ON
2021-12-04 00:35:02.396 [INFO ] [MQTTEventBus                        ] - testSwitch changed from OFF to ON
2021-12-04 00:35:09.837 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/out/testSwitch/command with OFF
2021-12-04 00:35:09.847 [INFO ] [MQTTEventBus                        ] - testSwitch changed from ON to OFF
2021-12-04 00:35:09.893 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic /openHAB/in/testSwitch/state with OFF
2021-12-04 00:35:16.767 [INFO ] [MQTTEventBus                        ] - Published MQTT topic /openHAB/out/testSwitch/command with ON
2021-12-04 00:35:16.781 [INFO ] [MQTTEventBus                        ] - testSwitch changed from OFF to ON
2021-12-04 00:35:16.821 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic /openHAB/in/testSwitch/state with ON
2021-12-04 00:35:20.792 [INFO ] [MQTTEventBus                        ] - Subscribed MQTT topic /openHAB/in/testSwitch/state with OFF
2021-12-04 00:35:20.831 [INFO ] [MQTTEventBus                        ] - testSwitch changed from ON to OFF
2 Likes

Very nice write up! I’m really impressed. :+1: :+1:


I’ve had some more Ideas based on your example that would further simplify things.
My idea was to have a parameter file in the params folder where one would configure if it is a master or a slave.
Then all the logic can be put into a single file and shared evenly across all systems.
This makes it easier to deploy changes.

Note that I used negative indices on the topic and the new self.get_items function.
What do you think?

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
is_master = Parameter('mqtt_event_bus', 'master', default_value=True).value
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
topic_prefix = Parameter('mqtt_event_bus', 'topic_prefix', default_value='openHAB/').value


class MqttEventBusMaster(HABApp.Rule):
    """This rule sends states to mqtt and commands to openhab"""
    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_state, ItemStateEvent)

            mqtt_item = MqttItem.get_create_item(f'{topic_prefix}/{item.name}/command')
            mqtt_item.listen_event(self.on_mqtt_command, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        item = name.split("/")[-1]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topic = f'/openHAB/{event.name}/state'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)


class MqttEventBusSlave(HABApp.Rule):
    """This rule sends commands to mqtt and state changes to openhab"""
    def __init__(self):
        super().__init__()
        
        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_command, ItemCommandEvent)

            mqtt_item = MqttItem.get_create_item(f'{topic_prefix}/{item.name}/state')
            mqtt_item.listen_event(self.on_mqtt_state, ValueUpdateEvent)

    def on_item_command(self, event: ItemCommandEvent):
        topic = f'/openHAB/{event.name}/command'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_mqtt_state(self, event):

        name = event.name
        item = name.split("/")[-1]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""
    
    def __init__(self):
        super().__init__()
        
        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)
            
    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


# create master or slave - depending on what is configured
if is_master:
    MqttEventBusMaster()
else:
    MqttEventBusSlave()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

2 Likes

Thank you very, very much. You can see that I just got the code to work yesterday. Thought a configuration would be nicer and haven’t thought about it yet. Thought that can be revised later. Well, I would say that this is the big advantage of a community. I save myself the work accordingly. Thank you very much. I will test this promptly and revise my post.

There was a litte error. I fixed it. Another point is that you have to make sure that the params directory exists. This could be done with

sudo mkdir /etc/openhab/habapp/params
sudo chmod openhab:openhab /etc/openhab/habapp/params

What also can produce errors is if you make changes to the Python files like changing is_master from true to false that the params file not save this change. If this occurs then you can fix it by deleting this file. Furthermore you should remember that the params yml file has the same name as the Python file only the file extension differs with .yml and .py. So if you create the file mqtt_event_bus.py in /etc/openhab/habapp/rules the file mqtt_event_bus.yml will be created in /etc/openhab/habapp/params.

Here the file for the master:

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
is_master = Parameter('mqtt_event_bus', 'master', default_value=True).value
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
topic_prefix = Parameter('mqtt_event_bus', 'topic_prefix',
                         default_value='/openHAB').value


class MqttEventBusMaster(HABApp.Rule):
    """This rule sends states to mqtt and commands to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_state, ItemStateEvent)

            mqtt_item = MqttItem.get_create_item(
                f'{topic_prefix}/{item.name}/command')
            mqtt_item.listen_event(self.on_mqtt_command, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        item = name.split("/")[-2]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topic = f'/openHAB/{event.name}/state'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)


class MqttEventBusSlave(HABApp.Rule):
    """This rule sends commands to mqtt and state changes to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_command, ItemCommandEvent)

            mqtt_item = MqttItem.get_create_item(
                f'{topic_prefix}/{item.name}/state')
            mqtt_item.listen_event(self.on_mqtt_state, ValueUpdateEvent)

    def on_item_command(self, event: ItemCommandEvent):
        topic = f'/openHAB/{event.name}/command'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_mqtt_state(self, event):

        name = event.name
        item = name.split("/")[-2]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


# create master or slave - depending on what is configured
if is_master:
    MqttEventBusMaster()
else:
    MqttEventBusSlave()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

And here the file for the slave:

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
is_master = Parameter('mqtt_event_bus', 'master', default_value=False).value
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
topic_prefix = Parameter('mqtt_event_bus', 'topic_prefix',
                         default_value='/openHAB').value


class MqttEventBusMaster(HABApp.Rule):
    """This rule sends states to mqtt and commands to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_state, ItemStateEvent)

            mqtt_item = MqttItem.get_create_item(
                f'{topic_prefix}/{item.name}/command')
            mqtt_item.listen_event(self.on_mqtt_command, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        item = name.split("/")[-2]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topic = f'/openHAB/{event.name}/state'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)


class MqttEventBusSlave(HABApp.Rule):
    """This rule sends commands to mqtt and state changes to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_command, ItemCommandEvent)

            mqtt_item = MqttItem.get_create_item(
                f'{topic_prefix}/{item.name}/state')
            mqtt_item.listen_event(self.on_mqtt_state, ValueUpdateEvent)

    def on_item_command(self, event: ItemCommandEvent):
        topic = f'/openHAB/{event.name}/command'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_mqtt_state(self, event):

        name = event.name
        item = name.split("/")[-2]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


# create master or slave - depending on what is configured
if is_master:
    MqttEventBusMaster()
else:
    MqttEventBusSlave()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

Both you can find on this GitHub repository: https://github.com/Michdo93/HABApp-MQTT-Event-Bus/

My first attempt you can find here: https://github.com/Michdo93/HABApp-MQTT-Event-Bus/tree/v0.8.0

I just titled my attempt as v0.8.0, because it was actually already close to the solution. It worked, but is now easier to configure.

I’ll update my first post and the GitHub page next so that there are installation and configuration instructions for use. Once again a thousand thanks for your improvements. It stands to reason that you are a little more familiar with HABApp than I am :rofl:

The name of the param file is the first argument in the Parameter('mqtt_event_bus, …) calll. You can pick any name you like. The specified default value is the value that will be used if the entry does not exist yet.

It’s possible to automatically reload the rule file if the parameter file changes.
Note that this only because a parameter is used for rule creation, if you use the Parameters in an existing rule it’s not required to reload.

You don’t have to create two different source files for the master and the slave!! Just create the parameter file on the slave first and set the ‘master’ entry (manually) to false.
Imho you should publish one source file and two configs (e.g. for master and slave).
The workflow is then to put the params file in the params folder and only then add the rule file.

:wink: :wink:

Thanks. You are right. I just read over it. Was very, very late yesterday. I sat down again after 11 pm and was climbing before.

However, we can skip this.

The default value could be empty. I will show you, what I have done today.

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
is_master = Parameter('mqtt_event_bus', 'master', default_value=True).value
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
statePublishTopic = Parameter(
    'mqtt_event_bus', 'statePublishTopic', default_value='openHAB/in/${item}/state').value
commandPublishTopic = Parameter(
    'mqtt_event_bus', 'commandPublishTopic', default_value='openHAB/out/${item}/command').value
stateSubscribeTopic = Parameter(
    'mqtt_event_bus', 'stateSubscribeTopic', default_value='openHAB/in/${item}/state').value
commandSubscribeTopic = Parameter(
    'mqtt_event_bus', 'commandSubscribeTopic', default_value='openHAB/out/${item}/command').value


class MqttEventBusMaster(HABApp.Rule):
    """This rule sends states to mqtt and commands to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_state, ItemStateEvent)

            topic = commandSubscribeTopic.replace("${item}", "{}").format(item.name)

            mqtt_item = MqttItem.get_create_item(f'{topic}')
            mqtt_item.listen_event(self.on_mqtt_command, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = commandSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topicString = statePublishTopic.replace("${item}", "{}").format(event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)


class MqttEventBusSlave(HABApp.Rule):
    """This rule sends commands to mqtt and state changes to openhab"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_command, ItemCommandEvent)

            topic = stateSubscribeTopic.replace("${item}", "{}").format(item.name)

            mqtt_item = MqttItem.get_create_item(f'{topic}')
            mqtt_item.listen_event(self.on_mqtt_state, ValueUpdateEvent)

    def on_item_command(self, event: ItemCommandEvent):
        topicString = commandPublishTopic.replace("${item}", "{}").format(event.name)
        topic = f'{topicString}'
        value=event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_mqtt_state(self, event):

        name=event.name
        itemPosition=stateSubscribeTopic.split("${item}")[0].count("/")

        item=name.split("/")[itemPosition]
        state=event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type = OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


# create master or slave - depending on what is configured
if is_master:
    MqttEventBusMaster()
else:
    MqttEventBusSlave()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

So a few changes will be done. This can then be used in all MQTT scenarios after the changes. Not only master and slave.

What you can do currently is:

Currently I have this on the master:

master: true
log_state: true
statePublishTopic: openHAB/in/${item}/state
commandPublishTopic: openHAB/out/${item}/command
stateSubscribeTopic: openHAB/in/${item}/state
commandSubscribeTopic: openHAB/out/${item}/command

And this on the slave:

master: false
log_state: true
statePublishTopic: openHAB/in/${item}/state
commandPublishTopic: openHAB/out/${item}/command
stateSubscribeTopic: openHAB/in/${item}/state
commandSubscribeTopic: openHAB/out/${item}/command

I want to remove is_master. In the next step. But first, what is the advantage of my change.

You can change the topics with your own configuration very easy. You can do something like this:

statePublishTopic: /openHAB/in/${item}/state
commandPublishTopic: /openHAB/out/${item}/command
stateSubscribeTopic: /openHAB/in/${item}/state
commandSubscribeTopic: /openHAB/out/${item}/command

or this:

statePublishTopic: /openHAB/${item}/state
commandPublishTopic: /openHAB/${item}/command
stateSubscribeTopic: /openHAB/${item}/state
commandSubscribeTopic: /openHAB/${item}/command

or maybe this:

statePublishTopic: /messages/states/${item}
commandPublishTopic: /messages/commands/${item}
stateSubscribeTopic: /messages/states/${item}
commandSubscribeTopic: /messages/commands/${item}

So this is why in the next step the default value for this four topics should be empty and should be set not in the python file but in the yml file for configuration.

Remember that the ${item} syntax is like in the MQTT 1.x binding.

There I have in my master/slave configuration on the master following:

# Name of the broker as it is defined in the openhab.cfg. If this property is not available, no event bus MQTT binding will be created.
broker=MQTTBroker

# When available, all status updates which occur on the openHAB event bus are published to the provided topic. The message content will
# be the status. The variable ${item} will be replaced during publishing with the item name for which the state was received.
statePublishTopic=/messages/states/${item}

# When available, all commands which occur on the openHAB event bus are published to the provided topic. The message content will be the
# command. The variable ${item} will be replaced during publishing with the item name for which the command was received.
#commandPublishTopic=

# When available, all status updates received on this topic will be posted to the openHAB event bus. The message content is assumed to be
# a string representation of the status. The topic should include the variable ${item} to indicate which part of the topic contains the
# item name which can be used for posting the received value to the event bus.
#stateSubscribeTopic=

# When available, all commands received on this topic will be posted to the openHAB event bus. The message content is assumed to be a
# string representation of the command. The topic should include the variable ${item} to indicate which part of the topic contains the
# item name which can be used for posting the received value to the event bus.
commandSubscribeTopic=/messages/commands/${item}

and on the slave:

# Name of the broker as it is defined in the openhab.cfg. If this property is not available, no event bus MQTT binding will be created.
broker=MQTTBroker

# When available, all status updates which occur on the openHAB event bus are published to the provided topic. The message content will
# be the status. The variable ${item} will be replaced during publishing with the item name for which the state was received.
#statePublishTopic=

# When available, all commands which occur on the openHAB event bus are published to the provided topic. The message content will be the
# command. The variable ${item} will be replaced during publishing with the item name for which the command was received.
commandPublishTopic=/messages/commands/${item}

# When available, all status updates received on this topic will be posted to the openHAB event bus. The message content is assumed to be
# a string representation of the status. The topic should include the variable ${item} to indicate which part of the topic contains the
# item name which can be used for posting the received value to the event bus.
stateSubscribeTopic=/messages/states/${item}

# When available, all commands received on this topic will be posted to the openHAB event bus. The message content is assumed to be a
# string representation of the command. The topic should include the variable ${item} to indicate which part of the topic contains the
# item name which can be used for posting the received value to the event bus.
#commandSubscribeTopic=

So instead of using is_master with true or false I will make it like in the MQTT 1.x binding. This means I have to check if a paramter is empty or not.

What should not work is the parameter config from above. Because this will lead to an infinity loop. So at least in the next update there will be only one class.

But I’m out for now and then go so slowly climb again… Until soon.

So I am done. Here the complete workaround:

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
statePublishTopic = Parameter(
    'mqtt_event_bus', 'statePublishTopic', default_value='').value
commandPublishTopic = Parameter(
    'mqtt_event_bus', 'commandPublishTopic', default_value='').value
stateSubscribeTopic = Parameter(
    'mqtt_event_bus', 'stateSubscribeTopic', default_value='').value
commandSubscribeTopic = Parameter(
    'mqtt_event_bus', 'commandSubscribeTopic', default_value='').value


class MqttEventBus(HABApp.Rule):
    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            if statePublishTopic != '':
                item.listen_event(self.on_item_state, ItemStateEvent)

            if commandPublishTopic != '':
                item.listen_event(self.on_item_command, ItemCommandEvent)

            if commandSubscribeTopic != '':
                topic_command = commandSubscribeTopic.replace(
                    "${item}", "{}").format(item.name)

                mqtt_item_command = MqttItem.get_create_item(f'{topic_command}')
                mqtt_item_command.listen_event(
                    self.on_mqtt_command, ValueUpdateEvent)

            if stateSubscribeTopic != '':
                topic_state = stateSubscribeTopic.replace(
                    "${item}", "{}").format(item.name)

                mqtt_item_state = MqttItem.get_create_item(f'{topic_state}')
                mqtt_item_state.listen_event(
                    self.on_mqtt_state, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = commandSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topicString = statePublishTopic.replace(
            "${item}", "{}").format(event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_item_command(self, event: ItemCommandEvent):
        topicString = commandPublishTopic.replace(
            "${item}", "{}").format(event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, value)

    def on_mqtt_state(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = stateSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


MqttEventBus()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

On my master I have this configuration:

log_state: true
statePublishTopic: openHAB/in/${item}/state
commandPublishTopic: ''
stateSubscribeTopic: ''
commandSubscribeTopic: openHAB/out/${item}/command

And on my slave:

log_state: true
statePublishTopic: ''
commandPublishTopic: openHAB/out/${item}/command
stateSubscribeTopic: openHAB/in/${item}/state
commandSubscribeTopic: ''

Also tested with following topic structure:

log_state: true
statePublishTopic: /messages/states/${item}
commandPublishTopic: ''
stateSubscribeTopic: ''
commandSubscribeTopic: /messages/commands/${item}
log_state: true
statePublishTopic: ''
commandPublishTopic: /messages/commands/${item}
stateSubscribeTopic: /messages/states/${item}
commandSubscribeTopic: ''

Just another idea:
Why don’t you put a generic topic definition in the parameter file, e.g.

topic: my/mqtt/${type}/${item}

where ${type} is command/state. This is not as generic as before but makes configuration easier (but then you need an is_master config entry again).

In every case I’d group the topics in the parameter file by functionality or by logic:

log_state: true
publish:
    commands: /messages/commands/${item}
    states: /messages/states/${item}
subscribe:
    states: ''
    commands: ''

Instead of an empty string as default you can also use ‘-’ or something different so it looks a little bit nicer in the config file.


.replace("${item}", "{}").format(item.name)

This will crash if the user enters a single ‘{’ (or another format wildcard) in the topic. The correct way would be

.replace("${item}", item.name)

Also you forgot to replace the wildcard for publish in the snippet.

1 Like

Ok, I now updated my first post to the current version. Tomorrow I can test it more extensively in my lab. There are about 3000 items available. And if I already draw the comparison to the MQTT 1.x binding… There it worked also over MQTT to execute rules on the other instances. They also use send_command and post_update etc.

I already see the first weaknesses when I access it from remote:

[2021-12-06 16:32:42,921] [   HABApp.mqtt.connection]    ERROR | Disconnect: Out of memory. (1)

Maybe too much items…

[2021-12-06 16:32:36,281] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/paho/mqtt/client.py", line 1263, in publish
[2021-12-06 16:32:36,281] [            HABApp.Worker]    ERROR |     raise TypeError(
[2021-12-06 16:32:36,281] [            HABApp.Worker]    ERROR |
[2021-12-06 16:32:36,281] [            HABApp.Worker]    ERROR | TypeError: payload must be a string, bytearray, int, float or None.

I think I forget to parse all values to string. This should be done during the string replace, so that the values are really transferred as string.

What also appears in the logs are:

[2021-12-06 16:32:36,242] [            HABApp.Worker]  WARNING | Execution of MqttEventBus.on_item_state took too long: 0.88s

And of course:

[   HABApp.mqtt.connection]     INFO | Disconnect: No error. (0)

What happens is that it will disconnect and connect again:

...............
[2021-12-06 16:32:21,529] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-06 16:32:21,529] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-06 16:32:21,529] [            HABApp.Worker]    ERROR |
[2021-12-06 16:32:21,530] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-06 16:32:21,506] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected

You have to set a different mqtt-id for the different HABApp instances.

Both instances have different mqtt-id’s. What seems to be a big problem is that it will connect again and again and again and again. The problem is that there are to many topics and the HABApp.Worker or HABApp.mqtt.connection can’t handle it.

[2021-12-07 09:42:02,578] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,578] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 09:42:02,578] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,578] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,579] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 09:42:02,579] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 1.53s. Maybe there are not enough threads?
[2021-12-07 09:42:02,525] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,608] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,608] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,609] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iKueche_Miele_Kuehlschrank_Gefrierstatus, value: Running>
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,610] [            HABApp.Worker]    ERROR |      value = 'Running'
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |      event.value = 'Running'
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_Gefrierstatus'
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,611] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,612] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,612] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,612] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,612] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,612] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,479] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,616] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,617] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,618] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iKueche_Miele_Kuehlschrank_ZielKuehlschrankTemperatur, value: 7>
[2021-12-07 09:42:02,619] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,619] [            HABApp.Worker]    ERROR |      value = 7
[2021-12-07 09:42:02,619] [            HABApp.Worker]    ERROR |      event.value = 7
[2021-12-07 09:42:02,619] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,554] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,620] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,621] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iKueche_Miele_Kuehlschrank_GefrierTuer, value: CLOSED>
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      value = 'CLOSED'
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      event.value = 'CLOSED'
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_GefrierTuer'
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,622] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,623] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR |     33
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR |     35           return mqtt.MQTT_ERR_NO_CONN
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_GefrierTuer'
[2021-12-07 09:42:02,557] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,625] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,626] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 09:42:02,627] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,627] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,627] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 09:42:02,627] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 1.58s. Maybe there are not enough threads?
[2021-12-07 09:42:02,534] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,644] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,645] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iKueche_Miele_Kuehlschrank_ZielGefrierTemperatur, value: -21>
[2021-12-07 09:42:02,646] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |      value = -21
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |      event.value = -21
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_ZielGefrierTemperatur'
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,647] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,648] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,648] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,561] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,703] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,703] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,703] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,703] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,703] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,619] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_ZielKuehlschrankTemperatur'
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,705] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     33
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     35           return mqtt.MQTT_ERR_NO_CONN
[2021-12-07 09:42:02,706] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_ZielKuehlschrankTemperatur'
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      payload = 7
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      typing.Any = typing.Any
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,707] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 09:42:02,708] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 09:42:02,709] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,709] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 09:42:02,709] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,709] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,709] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 09:42:02,709] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 1.66s. Maybe there are not enough threads?
[2021-12-07 09:42:02,677] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,721] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,721] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,721] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,721] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,721] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,722] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iBad_Miele_Trockner_Programmtyp, value: 0>
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      value = '0'
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      event.value = '0'
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,723] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iBad_Miele_Trockner_Programmtyp'
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,724] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,725] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,725] [            HABApp.Worker]    ERROR |     33
[2021-12-07 09:42:02,725] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 09:42:02,648] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,740] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,740] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,740] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,740] [            HABApp.Worker]    ERROR |     33
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR |     35           return mqtt.MQTT_ERR_NO_CONN
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_ZielGefrierTemperatur'
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR |      payload = -21
[2021-12-07 09:42:02,741] [            HABApp.Worker]    ERROR |      typing.Any = typing.Any
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,742] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,743] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 09:42:02,744] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,744] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,744] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 09:42:02,744] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 1.70s. Maybe there are not enough threads?
[2021-12-07 09:42:02,704] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,755] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iKueche_Miele_Kuehlschrank_AktGefrierTemperatur, value: -21>
[2021-12-07 09:42:02,755] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |      value = -21
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |      event.value = -21
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_AktGefrierTemperatur'
[2021-12-07 09:42:02,756] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |     31
[2021-12-07 09:42:02,757] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |     33
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |     35           return mqtt.MQTT_ERR_NO_CONN
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iKueche_Miele_Kuehlschrank_AktGefrierTemperatur'
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |      payload = -21
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |      typing.Any = typing.Any
[2021-12-07 09:42:02,758] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,759] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 09:42:02,760] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 09:42:02,760] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 09:42:02,760] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 09:42:02,760] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 09:42:02,699] [            HABApp.Worker]    ERROR | Error in MqttEventBus.on_item_state: Mqtt client not connected
[2021-12-07 09:42:02,802] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/core/wrappedfunction.py", line 93, in __run
[2021-12-07 09:42:02,802] [            HABApp.Worker]    ERROR |     self._func(*args, **kwargs)
[2021-12-07 09:42:02,802] [            HABApp.Worker]    ERROR | File "/etc/openhab/habapp/rules/mqtt_event_bus.py", line 73, in on_item_state
[2021-12-07 09:42:02,802] [            HABApp.Worker]    ERROR |     65   def on_item_state(self, event: ItemStateEvent):
[2021-12-07 09:42:02,802] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |     69       value = event.value
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |     70
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |     72
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, value)
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7f2bf32be790>
[2021-12-07 09:42:02,803] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: iBad_Miele_Trockner_Programm, value: 0>
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      value = '0'
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      event.value = '0'
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |      topic = '/messages/states/iBad_Miele_Trockner_Programm'
[2021-12-07 09:42:02,804] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,624] [            HABApp.Worker]    ERROR |      payload = 'CLOSED'
[2021-12-07 09:42:02,813] [            HABApp.Worker]    ERROR |      typing.Any = typing.Any
[2021-12-07 09:42:02,813] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 09:42:02,813] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 09:42:02,813] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7f2c1e75b550>
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 09:42:02,814] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR |
[2021-12-07 09:42:02,815] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected

Does it connect without the rules?

Of course. With the rule it is also connected but not the whole time.

As example if I run mosquitto_sub -v -h localhost -p 1883 -t '#' I will get:

/messages/states/nao1_OnlineState OFF
/messages/states/nao1_ResponseTime (null)
/messages/states/OpenHABVM_Swap_Used 2
/messages/states/OpenHABVM_Swap_Available 1424
/messages/states/OpenHABVM_Swap_Available_Percent 99.9
/messages/states/OpenHABVM_Memory_Available_Percent 50.6
/messages/states/OpenHABVM_Sensor_CPUTemp (null)
/messages/states/OpenHABVM_Storage_Available 21293
/messages/states/OpenHABVM_Storage_Used_Percent 57.6
/messages/states/OpenHABVM_Process_used (null)
/messages/states/OpenHABVM_Memory_Used_Percent 49.4
/messages/states/OpenHABVM_Process_load (null)
/messages/states/OpenHABVM_Swap_Used_Percent 0.1
/messages/states/OpenHABVM_Storage_Used 28974
/messages/states/OpenHABVM_Memory_Used 1943
/messages/states/OpenHABVM_Memory_Available 1993
/messages/states/OpenHABVM_Storage_Available_Percent 42.4
/messages/states/pepper02_OnlineState OFF
/messages/states/pepper02_ResponseTime (null)
/messages/states/OpenHABVM_Swap_Used 2
/messages/states/OpenHABVM_Swap_Available 1424
/messages/states/OpenHABVM_Swap_Available_Percent 99.9
/messages/states/OpenHABVM_Memory_Available_Percent 50.6
/messages/states/OpenHABVM_Sensor_CPUTemp (null)
/messages/states/OpenHABVM_Storage_Available 21293
/messages/states/OpenHABVM_Storage_Used_Percent 57.6
/messages/states/OpenHABVM_Process_used (null)
/messages/states/OpenHABVM_Memory_Used_Percent 49.4
/messages/states/OpenHABVM_Process_load (null)
/messages/states/OpenHABVM_Swap_Used_Percent 0.1
/messages/states/OpenHABVM_Storage_Used 28974
/messages/states/OpenHABVM_Memory_Used 1943
/messages/states/OpenHABVM_Memory_Available 1994
/messages/states/OpenHABVM_Storage_Available_Percent 42.4

...

So the rules are working, too. They can publish and subscribe. The only problem is that not all the time because too much topics. I know that mosquitto broker can do that. With the MQTT 1.x binding we also had about 3000+ topics running at the same time.

So then a few topics go again and again until the limit is reached. This means that I have to press a switch for a lamp a second time and then it works.

Maybe something basic towards thread pools from HABApp.

Hm - strange. Maybe it’s too much traffic and the connection congests.
What happens if you don’t subscribe to ‘#’ but only the topics of the state or command?

What do you mean? Where should I set this? In the config.yml of HABApp? Or maybe in my code above?

Yes - unter mqtt → subscribe → topics there is ‘#’ by default.
You can reduce the topics to the topics from the config just to see if this solves the issue.

Edit:
I ran the HABApp Benchmark and on mqtt HABApp can process ~ 1700msg/sec on my machine.
If you have more than that is most likely the issue.

I changed following: https://github.com/eclipse/paho.mqtt.python/commit/332834e7ec293f1bfe385180b8691863af5a7738

The same problem with loosing the connection remains.

Partially I could also run openHAB Rules via MQTT, where then send_command or post_update are executed. That would work in principle. These are stuck due to the fact that not all states or commands are sent at the moment when they actually change. I mean you can see that the MQTT connection is only gone for a few milliseconds, but that apparently already disturbs the system.

I changed following in the HABApp mqtt_connection.py

@log_exception
def on_disconnect(client, userdata, rc):
    log.log(logging.INFO if not rc else logging.ERROR, f'Disconnect: {mqtt.error_string(rc)} ({rc})')
    STATUS.connected = False
    client.reconnect()

It does not have any effect.

[2021-12-07 15:37:33,516] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,524] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,524] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,524] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,524] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,525] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,526] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.12s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.12s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.12s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.10s. Maybe there are not enough threads?
[2021-12-07 15:37:33,527] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.10s. Maybe there are not enough threads?
[2021-12-07 15:37:33,501] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.17s. Maybe there are not enough threads?
[2021-12-07 15:37:33,516] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,516] [            HABApp.Worker]  WARNING | Starting of MqttEventBus.on_item_state took too long: 0.19s. Maybe there are not enough threads?
[2021-12-07 15:37:33,520] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7fdf2e85f550>
[2021-12-07 15:37:33,530] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7fdf2e85f550>
[2021-12-07 15:37:33,530] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR |
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 15:37:33,523] [            HABApp.Worker]    ERROR |     70
[2021-12-07 15:37:33,533] [            HABApp.Worker]    ERROR |     71       log.info(f'Published  MQTT topic {topic} with {value}')
[2021-12-07 15:37:33,531] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 15:37:33,535] [            HABApp.Worker]    ERROR |     72
[2021-12-07 15:37:33,536] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,536] [            HABApp.Worker]    ERROR | --> 73       self.mqtt.publish(topic, str(value))
[2021-12-07 15:37:33,539] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 15:37:33,540] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,545] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,555] [            HABApp.Worker]    ERROR |      self = </etc/openhab/habapp/rules/mqtt_event_bus.py.MqttEventBus object at 0x7fdf03df1160>
[2021-12-07 15:37:33,563] [            HABApp.Worker]    ERROR |
[2021-12-07 15:37:33,565] [            HABApp.Worker]    ERROR |      event = <ItemStateEvent name: OpenHABVM_Memory_Used, value: 2074>
[2021-12-07 15:37:33,566] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 15:37:33,567] [            HABApp.Worker]    ERROR |      ItemStateEvent = <class 'HABApp.openhab.events.item_events.ItemStateEvent'>
[2021-12-07 15:37:33,570] [            HABApp.Worker]    ERROR |      value = 2074
[2021-12-07 15:37:33,571] [            HABApp.Worker]    ERROR |      event.value = 2074
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |      log.info = <method 'Logger.info' of <Logger MQTTEventBus (WARNING)> __init__.py:1436>
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |      self.mqtt.publish = <function 'publish' mqtt_interface.py:16>
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |      topic = '/messages/states/OpenHABVM_Memory_Used'
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 34, in publish
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     16   def publish(topic: str, payload: typing.Any, qos: int = None, retain: bool = None) -> int:
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |  (...)
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     30       assert isinstance(retain, bool) or retain is None, type(retain)
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     31
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     32       config = HABApp.config.CONFIG.mqtt
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR |     33
[2021-12-07 15:37:33,573] [            HABApp.Worker]    ERROR | --> 34       if not __is_connected():
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |     35           return mqtt.MQTT_ERR_NO_CONN
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      topic = '/messages/states/OpenHABVM_Memory_Used'
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      payload = '2074'
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      typing.Any = typing.Any
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      qos = None
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      retain = None
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      config = <HABApp.config._conf_mqtt.Mqtt object at 0x7fdf2e85f550>
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      HABApp.config.CONFIG.mqtt = <HABApp.config._conf_mqtt.Mqtt object at 0x7fdf2e85f550>
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |      mqtt.MQTT_ERR_NO_CONN = 4
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR | File "/opt/habapp/lib/python3.8/site-packages/HABApp/mqtt/mqtt_interface.py", line 13, in __is_connected
[2021-12-07 15:37:33,574] [            HABApp.Worker]    ERROR |     10   def __is_connected() -> bool:
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |     11       if STATUS.connected:
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |     12           return True
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR | --> 13       raise ConnectionError('Mqtt client not connected')
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |      STATUS.connected = False
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |     ..................................................
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR |
[2021-12-07 15:37:33,575] [            HABApp.Worker]    ERROR | ConnectionError: Mqtt client not connected
[2021-12-07 15:37:37,276] [HABApp.openhab.connection]  WARNING | Status 404 for PUT http://localhost:8080/rest/items/HABApp_Ping/state/ NULL

I still can’t wrap my head around the issue and feel if there is something missing.
You are using 3.1 with basic auth as described in the docs, right?
How much messages do you want to process per second?

The error shows like this:
You get many messages, then suddenly the on_item_state took too long messages start to pile up and then you get the disconnect. Is that correct?


The messages that get sent while HABApp is disconnected are obviously lost. But after the connect everything should work again and this seems to be the case from the logfile.

I tested it with openHAB 2 and openHAB 3. Both in the “newest” version except of course openHAB 2 and openHAB 3. Both master’s (openHAB 2 and openHAB 3) are on a Ubuntu 20.04 VM with Python 3.8. The slaves are Raspberry Pi’s with openhabian and Python 3.7. HABApp is in the current version (0.31.1).

How much messages I want to process is a good question. Is all messages a good answer? Let me give another example I tested:

import logging

import HABApp
from HABApp import Parameter
from HABApp.core.events import ValueChangeEvent, ValueUpdateEvent
from HABApp.mqtt.items import MqttItem
from HABApp.openhab.events import ItemCommandEvent, ItemStateEvent
from HABApp.openhab.items import OpenhabItem

log = logging.getLogger('MQTTEventBus')

# These are the configuration values that will be used to setup the MqttEventBus
log_state = Parameter('mqtt_event_bus', 'log_state', default_value=True).value
statePublishTopic = Parameter(
    'mqtt_event_bus', 'statePublishTopic', default_value='').value
commandPublishTopic = Parameter(
    'mqtt_event_bus', 'commandPublishTopic', default_value='').value
stateSubscribeTopic = Parameter(
    'mqtt_event_bus', 'stateSubscribeTopic', default_value='').value
commandSubscribeTopic = Parameter(
    'mqtt_event_bus', 'commandSubscribeTopic', default_value='').value


class MqttEventBus(HABApp.Rule):
    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            self.openhab.post_update(item.name, item.get_value())
            self.openhab.send_command(item.name, item.get_value())
            if statePublishTopic != '':
                item.listen_event(self.on_item_state, ItemStateEvent)

            if commandPublishTopic != '':
                item.listen_event(self.on_item_command, ItemCommandEvent)

            if commandSubscribeTopic != '':
                topic_command = commandSubscribeTopic.replace(
                    "${item}", str(item.name))

                mqtt_item_command = MqttItem.get_create_item(
                    f'{topic_command}')
                mqtt_item_command.listen_event(
                    self.on_mqtt_command, ValueUpdateEvent)

            if stateSubscribeTopic != '':
                topic_state = stateSubscribeTopic.replace(
                    "${item}", str(item.name))

                mqtt_item_state = MqttItem.get_create_item(f'{topic_state}')
                mqtt_item_state.listen_event(
                    self.on_mqtt_state, ValueUpdateEvent)

    def on_mqtt_command(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = commandSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        cmd = event.value

        log.info(f'Subscribed MQTT topic {event.name} with {cmd}')
        log.info(f'{item} predicted to become {cmd}')

        self.openhab.send_command(item, cmd)

    def on_item_state(self, event: ItemStateEvent):
        topicString = statePublishTopic.replace(
            "${item}", event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, str(value), true)

    def on_item_command(self, event: ItemCommandEvent):
        topicString = commandPublishTopic.replace(
            "${item}", event.name)
        topic = f'{topicString}'
        value = event.value

        log.info(f'Published  MQTT topic {topic} with {value}')

        self.mqtt.publish(topic, str(value))

    def on_mqtt_state(self, event):
        assert isinstance(event, ValueUpdateEvent)

        name = event.name
        itemPosition = stateSubscribeTopic.split("${item}")[0].count("/")

        item = name.split("/")[itemPosition]
        state = event.value

        log.info(f'Subscribed MQTT topic {name} with {state}')
        self.openhab.post_update(item, state)


class LogItemStateRule(HABApp.Rule):
    """This rule logs the item state in the mqtt event bus log file"""

    def __init__(self):
        super().__init__()

        for item in self.get_items(type=OpenhabItem):
            item.listen_event(self.on_item_change, ValueChangeEvent)

    def on_item_change(self, event):
        assert isinstance(event, ValueChangeEvent)
        log.info(f'{event.name} changed from {event.old_value} to {event.value}')


MqttEventBus()

# Create logger rule only if configured
if log_state:
    LogItemStateRule()

One change was that the states will be published as retained message:

self.mqtt.publish(topic, str(value), true)

What I later changed and what is the point with how many topic is thate I added following:

        for item in self.get_items(type=OpenhabItem):
            self.openhab.post_update(item.name, item.get_value())
            self.openhab.send_command(item.name, item.get_value())

Well let me explain my thoughts. The mqtt publisher will publish if an item or an command receives and update. After starting openHAB and/or HABApp not all items will change their states or will receive an update. So maybe there are many item states NULL or UNKNOWN. By using this the state will not be changed but it can be triggered for MQTT. However, the fact is that this works for State but not for Command.

I counted and recorded 4402 topics yesterday. Let’s say pi times thumb that so about 3800 are the States from the Master. Then I have only about 200 Command Topics on the opposite side.

It is clear that the commands are not called the same as the states and that one would have to come up with something else for this.

As I said, I have rules running that work with the MQTT 1.x binding, but not with this Event Bus solution and all others from MQTT 2.x+ on. The rules always work only partially. The reason is that send_command and post_update do not distribute the changes reliably. At least not comparable to the MQTT 1.x binding.

My workaround is also to distribute all states and commands first after startup, so that all topics are known. It actually works that the slave takes over all states from the master. They are partly occupied with Null or Unknown. But not only. openHAB has partly already obtained the states of items via the Things before HABApp has loaded MQTT under certain circumstances. As a rule, starting openHAB takes longer than starting HABApp. For test purposes, I have partly restarted HABApp during operation. Otherwise items that have not changed for a long time would not be detected and the slave would not know their states. But this could be needed for some rules. Another scenario that I have is that I use a persistence. So if after the start directly a value from a database is set and this should not change for a longer time, the same problem would occur.

I think this can be understood and hope it was explained understandably.

I have noticed that if it does not know a topic, it does not execute the send_command of a rule and the topic does not show up in a scan with MQTT.fx.

In expectation, if there are approx. 3800 State topics, would be yes that there would be so approx. 3800 command topics. This is not quite true, because there are String items, Number items etc… How many command topics there would have to be, I don’t know. What I do know, for example, is that there is not a command topic for all lamps. And since they all have a switch, there should be at least all lamp items at the end.

When I look at this more closely, I can certainly break this down to very many other things in the same way. It would be enough for me that a topic is only created if it is created by a state or command. But apparently the rules don’t work like that.

One thing that I have noticed again and again is that individual state changes are not sent. Then only one of them has the right state. This always happens when paho-mqtt has to reconnect at the wrong moment. With a low load, where I tested only one item, I could change and receive the state 100x and also change and receive the command 100x. It doesn’t complain about out of memory or that there is a disconnect or too few threads etc…

Phew, sorry for the long explanation.

You misread. :slight_smile: I asked how many messages per second you want to process.
Because too many messages per second will lead to congestion and it’s possible that the pi
is not beefy enough to process them all.

How about the following workaround which publishes all item states at startup in chunks of 50 items?
That way the startup should be more graceful.

class MqttEventBus(HABApp.Rule):
    def __init__(self):
        super().__init__()

        all_items = self.get_items(type=OpenhabItem)

        for item in all_items:
            ...

        # publish current item state in chunks
        chunk_size = 50
        chunk_nr = 0
        chunk = all_items[chunk_nr * chunk_size:(chunk_nr + 1) * chunk_size]
        while chunk:
            self.run.at(chunk_nr + 1, self._publish_current_state, chunk)
            chunk_nr += 1
            chunk = all_items[chunk_nr * chunk_size:(chunk_nr + 1) * chunk_size]

    def _publish_current_state(self, items: List[OpenhabItem]):
        for item in items:
            topic = statePublishTopic.replace("${item}", item.name)
            self.mqtt.publish(topic, str(item.value))