Python Example of Subscribing to openHAB's SSE Feed

I’m doing a rewrite of my sensorReporter script (Note: link is to my work in progress branch), applying all the lessons I’ve learned in how to write Python over the years. sensorReporter has both an MQTT Connection and a REST Connection. The MQTT Connection has always been two directional (i.e. can publish sensor readings and receive command messages) but the REST Connection was only able to publish. As part of my rewrite I’ve updated the REST Connection to be two directional, able to subscribe to the openHAB Server-Sent Events (SSE) feed and publish to the REST API endpoints.

I figured there would be others building external applications that want to integrate through openHAB’s REST API so here is how I did it.

Requirements

The over all use case for sensorReporter is to have a light weight service running on a machine separate from the one where openHAB is running. This service provides extensions I call capabilities that can be enabled. There are three types of capabilities:

  • Sensors: call a script, poll a GPIO pin, read a DHT11 sensor, etc. and publish the readings
  • Actuators: receive a message and perform some action (send a GPIO pin to high or low, call a script, etc), publishing the result
  • Connections: provide two way communication between sensorReporter and external clients, namely openHAB; currently supports MQTT and now openHAB’s REST API.

Eventually I plan on implementing Homie for the MQTT but for now MQTT would be used for integration with external non-openHAB services (or openHAB if you prefer).

The openHAB REST API Connection makes some restrictions for simplicity.

  • all sensor readings and actuator results are posted as updates to configured Items using the REST API Items endpoint.
  • all Actuators only listen for commands on configured Items.

Example round trip data flow

  1. openHAB issues a command to Item Test_Actuator_CMD
  2. sensorReporter has an Actuator that is listening for commands to that Item. The string contents of the command are passed to the Actuator.
  3. The Actuator performs it’s action based on the contents of the command.
  4. The Actuator publishes the results of the action to Test_Actuator_Results

How it Works

This example is written in Python 3. For publishing Item state updates the requests library is used to issue PUT commands to http://<address of openHAB>:<port of openHAB>/rest/items/<Item Name>/state. For subscribing we use openHAB’s SSE endpoint http://<address of openHAB>:<port>/rest/events.

Publishing

    def publish(self, state, item):
        """Publishes the passed in state to the passed in Item as an update."""
        try:
            log.debug("Publishing message %s to %s", state, item)
            response = requests.put("{}/rest/items/{}/state"
                                    .format(self.openhab_url, item),
                                    data=state, timeout=10)
            response.raise_for_status()
        except ConnectionError:
            log.error("Failed to connect to %s\n%s", self.openhab_url,
                      traceback.format_exc())
        except requests.exceptions.Timeout:
            log.error("Timed out connecting to %s")
        except requests.exceptions.HTTPError as ex:
            log.error("Received and unsuccessful response code %s", ex)

It couldn’t be more straight forward. Issue a PUT to the right URL with the passed in state as the data. Check to make sure that the request was successful and log an error if it wasn’t.

Exercise for the reader: add message caching if it fails with a Timeout or ConnectionError and send them when the connection is restored.

Subscribing

The subscription is through SSE. Unlike websockets, SSE is unidirectional. The messages originate at the server, openHAB in this case, and are sent to the clients. The SSE endpoint for openHAB is http://<address of openHAB>:8080/rest/events.

In Python there are several third party libraries that will subscribe to the SSE feed and let you write code to process each event. I chose sseclient-py as one of the easiest to use. There may be better options. To use it:

  • open a streaming request to the events endpoint using requests
  • create an sseclient.SSEClient(stream) to process the stream of events
  • create a loop to process the events as they come in.

Because of the way sensorReporter works, I put the loop that processes the messages into a separate Thread.

...

        # Subscribe to SSE events and start processing the events
        stream = requests.get("{}/rest/events".format(self.openhab_url),
                              stream=True)
        self.client = sseclient.SSEClient(stream)

        self.thread = Thread(target=self._get_messages)
        self.thread.start()
        self.stop = False

    def _get_messages(self):
        """Blocks until stop is set to True. Loops through all the events on the
        SSE subscription and if it's a command to a registered Item, call's that
        Item's handler.
        """
        for event in self.client.events():
            # If we are stopping exit.
            if self.stop:
                return

            # See if this is an event we care about. Commands on registered Items.
            decoded = json.loads(event.data)
            if decoded["type"] == "ItemCommandEvent":
                item = decoded["topic"].replace("smarthome/items/", "").replace("/command", "")
                if item in self.registered:
                    payload = json.loads(decoded["payload"])
                    msg = payload["value"]
                    log.info("Received command from %s: %s", item, msg)
                    self.registered[item](msg)

The events are JSON and look like

{"topic":"smarthome/items/vRichPhone_Manticore_Net/command","payload":"{\"type\":\"OnOff\",\"value\":\"ON\"}","type":"ItemCommandEvent"}

Putting it All Together

"""Communicator that publishes and subscribes to openHAB's REST API.
Classes:
    - openhab_rest: publishes state updates to openHAB Items.
"""
import logging
import json
import traceback
from threading import Thread
import requests
import sseclient
from core.connection import Connection

log = logging.getLogger(__name__.split(".")[1])

class OpenhabREST(Connection):
    """Publishes a state to a given openHAB Item. Expects there to be a URL
    parameter set to the base URL of the openHAB instance. Subscribes to the OH
    SSE feed for commands on the registered Items.
    """

    def __init__(self, msg_processor, params):
        """Starts the SSE subscription and registers for commands on
        RefreshItem. Expects the following params:
        - "URL": base URL of the openHAB instance NOTE: does not support TLS.
        - "RefreshItem": Name of the openHAB Item that, when it receives a
        command will cause sensor_reporter to publish the most recent states of
        all the sensors.
        - msg_processor: message handler for command to the RefreshItem
        """
        super().__init__(msg_processor, params, log)
        log.info("Initializing openHAB REST Connection...")

        self.openhab_url = params("URL")
        self.refresh_item = params("RefreshItem")
        self.registered = {}
        self.registered[self.refresh_item] = msg_processor

        # Subscribe to SSE events and start processing the events
        stream = requests.get("{}/rest/events".format(self.openhab_url),
                              stream=True)
        self.client = sseclient.SSEClient(stream)

        self.thread = Thread(target=self._get_messages)
        self.thread.start()
        self.stop = False

    def _get_messages(self):
        """Blocks until stop is set to True. Loops through all the events on the
        SSE subscription and if it's a command to a registered Item, call's that
        Item's handler.
        """
        for event in self.client.events():
            # If we are stopping exit.
            if self.stop:
                return

            # See if this is an event we care about. Commands on registered Items.
            decoded = json.loads(event.data)
            if decoded["type"] == "ItemCommandEvent":
                item = decoded["topic"].replace("smarthome/items/", "").replace("/command", "")
                if item in self.registered:
                    payload = json.loads(decoded["payload"])
                    msg = payload["value"]
                    log.info("Received command from %s: %s", item, msg)
                    self.registered[item](msg)

    def register(self, item, handler):
        """Actuators register the passed in Item with this Connection. When that
        Item receives a command, the handler is called with the received command.
        The handler is expected to accept a single string as an argument.
        """
        self.registered[item] = handler

    def publish(self, state, item):
        """Publishes the passed in state to the passed in Item as an update."""
        try:
            log.debug("Publishing message %s to %s", state, item)
            response = requests.put("{}/rest/items/{}/state"
                                    .format(self.openhab_url, item),
                                    data=state, timeout=10)
            response.raise_for_status()
        except ConnectionError:
            log.error("Failed to connect to %s\n%s", self.openhab_url,
                      traceback.format_exc())
        except requests.exceptions.Timeout:
            log.error("Timed out connecting to %s")
        except requests.exceptions.HTTPError as ex:
            log.error("Received and unsuccessful response code %s", ex)

    def disconnect(self):
        """Stops the event processing loop."""
        log.info("Disconnecting from openHAB SSE")
        self.stop = True
        self.thread.join()

The above is still a work in progress and subject to change as I progress with my rewrite but the publish/subscribe parts work. I hope it provides inspiration and help for others trying to so the same.

4 Likes

Wouldn’t it be easier to just use HABApp for the OpenHAB and MQTT connection and then just create a single rule that calls your functions and contains your logic?
That way you get reconnection handling and lots of other nice things without having to focus on the connection side.
This is literally the use case for HABApp:
Make python code work with openhab and mqtt.

2 Likes

I wrote sensorReporter (2016) a long time before you wrote HABApp. I have a number of people who still use it including myself and so I continue to maintain it. And I did not write it to be openHAB specific, though this REST API stuff is openHAB specific. But I’d be happy to add a way to connect to Home Assistant or Node-Red or anything else if desired.

sensorReporter does also support MQTT and it is in fact what I mostly use. In fact when I first wrote it it was called mqttReporter. However others had requested openHAB REST API support early on which was added though it only supported publishing to openHAB.

I’m going through a rewrite of it right now and took it as a challenge to myself to see if I could get two-way support on the openHAB REST API without long polling. The above is the result of that challenge to myself. And indeed, I need to add in reconnection handling but presenting a complete and usable solution is not the point of this post.

The purpose of the post is not “hey, come use sensorReporter!” The purpose of the post is to provide an example of interacting with openHAB’s SSE feed as the topic comes up with relative frequency, usually for people trying to build their own UI. Focusing on the connection side is the point of the post. All the discussion about how sensorReporter works is there to put the example code into context.

Sorry if I came of rude, this was not my intention at all. :slight_smile:

I was just wondering why you were burdening yourself with all this work when there already is a library that jumps through all the hoops for you. You even could have build it on top of HABApp and create a working instance with all your sensor integrations.

It’s just that I have seen so many half assed python scripts in this forum where users struggle with integration just because they don’t know or forgot about HABApp. It really hurts sometimes. :wink:

But learning new things is always good and if the post helps someone it was definitely worth the effort!

Ultimately, I’m exploring to see if there will be a way to implement an Even Bus using a JSONDB Rule Template that does not depend on MQTT. The above was just a handy and practical way to explore the SSE. That way users would only need import the JSON, create a Rule from it and configure a URL, or even better have it autodiscover the other OH instances. I’m not yet convinced all of this is possible but won’t know until I try.

But isn’t that ultimately what HABApp provides?
An event bus, a scheduler and some convenience functions to interact with openhab and mqtt you can use.
Imho all you have to do is write the json to rule converter (of course the devil is always in the detail :wink: ).

It just seems that you go through a lot of effort to (re)create something that already exists and provides 90% of the features you are looking for. I wont nag any further but I really encourage you to take a look at HABApp and at least try it out. Maybe you see some features and ideas you like that you want to take over to or give me some feedback so I can improve HABApp. :slight_smile:

When I say event bus I mean linking two or more openHAB’s instances event bus. I’ve three requirements for the openHAB event bus:

  1. It shall be native to openHAB.
  2. It shall require minimal to no configuration on the part of the user. Putting in the address of the peer OH ins
  3. It shall link two or more openHAB instances so that one can have primary/secondary relationships between instances on an item by item basis (i.e. some Items are primary on one instance and others are primary on a differe ntinstance) which will be critical to providing some semblance of support for OH 1.x bindings once OH 3 is released.

HABApp fails on 1 and 2. I can’t say if it works for 3 but my understanding is HABApp is more about providing an alternative rules environment. I’m not after providing a way for users to run rules remotely. I’m after providing an easy way for users who need to to be able to continue to use 1.x bindings in their openHAB setup with a little work on their part as possible.

My existing MQTT Event Bus implementation almost fails on 1 (Moquette’s future as part of OH is in doubt in my understanding) and 2. So I’m exploring the SSE option which solves 1 and improves 2.

Imho it is not possible to share events across multiple openhab instances.
Sharing items and their values and value changes however definitely works.
I’ve even made a sample rule that mirrors changes of all items to an mqtt broker.
The only thing that is missing is a rule on the other openhab instance that listens to the published topics and posts the values to the openhab items.

The only configuration needed is the mqtt broker, the rest works out of the box.
Imho that should check 1 and 2 already.

That’s the idea behind it. But it also happens to provide a pythonic and easy way of interacting with openhab in python.

I got that, but my suggestion was that you do some clever rules and python magic and distribute working instance(s) of HABApp.
Basically that you build your application logic (which is some kind of rules) on top of the provided framework.

A command is an event, not a state. And it is absolutely possible to share events across the instances of OH. I’m doing it now with my two OH instances that are federated using an MQTT event bus. The only gotcha is commands can only go one direction, from the “primary” to the “secondary”. Updates (not just changes) flow from the “secondary” to the “primary”.

This is not theoretical stuff. It’s been working since shortly after the release of MQTT 2.4.

HABApp is not openHAB native so until it comes with OH or can be installed as an add-on it can never meet 1. HABApp iteself requires some configuration. The MQTT requires some configuration. That already makes it require more configuration than the MQTT version that I’m trying to replace with something simpler already.

Me too, and mine works both ways Marketplace MQTT Event Bus and has both a Jython and Rules DSL version posted. I plan on doing a JavaScript Rule Template as well so users can import it through PaperUI on the OH 2.x side and the replacement to PaperUI on the OH 3 side without needing to install anything else but set up MQTT.

But a number of people lost their s#@t because “MQTT is too complicated.” I had to add about 2000 words to that post to cover the MQTT basics. So any dependency on MQTT is a non-starter. Any approach that requires anything like manually configuring MQTT Broker Things and Generic MQTT Things is going to fail 2.

One of the justifications for dropping 1.x bindings in OH 3 was the ability to federate openHAB instances. Kai specifically pointed to that link above as the justification. But it is clear too that for some users any dependency on MQTT will put the capability completely out of reach.

It is also why I have 1. as a requirement. This has to be something a user can install, set up, and configure all from the PaperUI and the OH 3 user interface with no other external dependencies, particularly external dependencies that are not a part of the openHAB project itself.

Can it be done using HABApp? Obviously. But if the exact same capability written in native OH rules is already too complicated, HABApp is never going to be viable to solve this particular problem.

Yes your are right. I wrote something different than what I was thinking. :face_with_raised_eyebrow:
I wanted to write that item related events (e.g. change, command, update) can be mapped to a different instance (because items can easily be created there).
However ThingEvents can not be mapped to another instance.

I totally misunderstood what you ment with “native”.

The expectations of some people! :tired_face:
And MQTT (the basics) is not really hard if you spend the 3 minutes at the wikipedia article, but I guess nobody wants to learn anything anymore.

Correct. :slight_smile:

I tend to agree. MQTT is the most appropriate mechanism to implement the event bus to federate OH instances. And I’ve not completely given up on it. If I can update my rules to make the other openHAB instance follow the Home Assistant or Homie standards than the OH instances could auto-discover each other. Maybe that would ease the burden enough for users who refuse to learn a little about MQTT.

I’ve thought about writing a binding but as a rule bindings are not supposed to know anything about Items which puts a hamper on what’s possible event bus wise. It could go into the core but it’s really difficult to get anything new into the core. But in all of those cases using the REST SSE is most likely going to be the approach as it lacks the external dependency (hence the OP). All you need is the URL of the other OH instance. However, one drawback of the SSE approach is you don’t get updates, only commands and changes on Items. You can also get Thing events (not sure how useful that is for openHAB federation) and rule events (if using the NGRE) but not Item updates. For the event bus, I do think that updates are needed because updates, even to the same state, can be meaningful (e.g. a motion sensor at a remote OH controlling a light at the local OH instance).

Anyway, I think you now understand what my end goal is and why. I’m not trying to replace or ignore HABApp. HABApp is just not able to meet the requirements. And while some users may be a bit demanding on this point, I think getting this as easy to set up and configure as possible is important to help address the storm coming our way when OH 3 comes out without support for 1.x bindings.

Thanks for the exchange. It’s important to avoid “not invented here” so I’m glad you brought up HABApp.

You definitely do get item updates (they are called ItemStateEvent :roll_eyes: ), I process them regularly in my rules.
While it is possible to get ThingEvents, contrary to ItemEvents it is not possible to generate them on the secondary side. Hence my statement that they can not be mapped.

Don’t get me started on this one :wink:

I was thinking you can probably use the GenericEventTrigger to trigger a rule on all item related events and then use a little bit of JavaScript to forward the event to the other instance using the REST API, without MQTT or anything external involved. This should work in a JSON rule (possibly a rule template configured with the IP of the other instance as a parameter) on OH2 if you have the NGRE installed:

{
   "triggers": [
      {
        "id": "1",
        "configuration": {
          "eventSource": "",
          "eventTopic": "smarthome/*",
          "eventTypes": "ItemStateChangedEvent,ItemCommandEvent"
        },
        "type": "core.GenericEventTrigger"
      }
    ],
    "conditions": [],
    "actions": [
      {
        "inputs": {},
        "id": "2",
        "configuration": {
          "type": "application/javascript",
          "script": "
print('topic=' + event.topic + ',payload=' + event.payload);
var payload = JSON.parse(event.payload);
switch (event.type) {
  case 'ItemStateChangedEvent':
    // send state update to other instance via HTTP (PUT /rest/items/itemName/state)
    break;
  case 'ItemCommandEvent':
    // send command to other instance via HTTP (POST /rest/items/itemName)
    break;
  // ...etc
}"
        },
        "type": "script.ScriptAction"
      }
  }

(line breaks added in the script parameter for clarity)

I get this when I play with items at least, so it seems to work:

18:44:31.214 [INFO ] [smarthome.event.ItemCommandEvent     ] - Item 'NewItem' received command 2
18:44:31.216 [INFO ] [smarthome.event.ItemStateChangedEvent] - Item 'NewItem' changed from 1 to 2
topic=smarthome/items/NewItem/command,payload={"type":"String","value":"2"}
topic=smarthome/items/NewItem/statechanged,payload={"type":"String","value":"2","oldType":"String","oldValue":"1"}
18:44:33.239 [INFO ] [smarthome.event.ItemCommandEvent     ] - Item 'NewItem' received command 3
topic=smarthome/items/NewItem/command,payload={"type":"String","value":"3"}
18:44:33.242 [INFO ] [smarthome.event.ItemStateChangedEvent] - Item 'NewItem' changed from 2 to 3
topic=smarthome/items/NewItem/statechanged,payload={"type":"String","value":"3","oldType":"String","oldValue":"2"}

You can easily parse the item name from the topic, and assume by convention there’s one with the same name on the other side.
So this might be worth exploring for a dependency-free solution, though I don’t know if it’s practical to have a rule triggered on every event.

That was ultimately the direction I was heading. I’ve been taking it little step by little step. I’d love to have a way to filter the events a little bit so it’s easier to avoid infinite loops, but a rule like you just posted is absolutely the direction I’m headed. I want it to be JavaScript JSON Rules and use the SSE and the REST APIs.

The rule you posted solves half of the event bus and very closely matches the Jython Rules I’ve already written. The other side of the bus, subscribing to events from the other side is a little harder to avoid infinite loops.

For example, let’s say I have a Switch on a remote OH instance that I want to control from a local OH instance. I need to forward only command events for that Switch from the local OH instance to the remote, and only updates/changes from the remote OH instance to the local OH. If I forward updates and commands from local to remote I end up with an infinite loop. So there definitely needs to be a leader/follower relationship or, as I’ve called it a “real” and “proxy”. The “real” Item is linked to the device and only forwards updates. The “proxy” is unlinked and only forwards commands.

I can probably handle that via tags or metadata though. If there are no Items tagged to publish to the event bus, assume that all updates and no commands need to be published. If Items are tagged, they can be tagged with either command or update and the rule will only publish those events for those Items. That way the leader OH instance (i.e. the one with the “proxies”) won’t be bombarding the follower with events for Items it doesn’t know nor care about. And since the events are all pushed by the rule we don’t need to even worry about the SSE.

I like this idea. I’ll need to try it out. I really want it to be a JSON rule or rule template using JavaScript so there is no external dependency. It didn’t occur to me that when one OH instance publishes to the other’s Items through REST there’s no need for the subscription to the SSE in the first place. It would be limited to just one-to-one but that should be fine for users who want to continue to use 1.x bindings through federation. I could probably expand it to support many-to-many but one step at a time.

Thanks! Sometimes I get myopic in my approach. I was trying to reproduce what the MQTT implementation in Jython does and totally missed the shortcut!

Thanks. When I briefly watched the SSE I guess I never had just an update or I totally missed it. There were a lot of events scrolling by and I mainly cared only about the commands. It’s useful to know even if it’s looking like I don’t need the SSE after all. :smiley:

True, the risk of getting infinite loops is very real, you can probably mitigate them with the set of assumptions you detailed - commands are iniated on the primary instance only (where the UIs/automation logic are) and should be forwarded to the one where the link is, never the other way around; state updates should either only be forwarded from the linked instance to the primary - bidirectional should work when there’s a change, so you might get a round-trip to “stabilize” the state on both instances and that’s it). You may also use groups or tags or metadata (or a prefix to the item names that you use for topic filtering i.e. smarthome/items/fw_*/*, that’s a little ugly though).