I’m doing a rewrite of my sensorReporter script (Note: link is to my work in progress branch), applying all the lessons I’ve learned in how to write Python over the years. sensorReporter has both an MQTT Connection and a REST Connection. The MQTT Connection has always been two directional (i.e. can publish sensor readings and receive command messages) but the REST Connection was only able to publish. As part of my rewrite I’ve updated the REST Connection to be two directional, able to subscribe to the openHAB Server-Sent Events (SSE) feed and publish to the REST API endpoints.
I figured there would be others building external applications that want to integrate through openHAB’s REST API so here is how I did it.
Requirements
The over all use case for sensorReporter is to have a light weight service running on a machine separate from the one where openHAB is running. This service provides extensions I call capabilities that can be enabled. There are three types of capabilities:
- Sensors: call a script, poll a GPIO pin, read a DHT11 sensor, etc. and publish the readings
- Actuators: receive a message and perform some action (send a GPIO pin to high or low, call a script, etc), publishing the result
- Connections: provide two way communication between sensorReporter and external clients, namely openHAB; currently supports MQTT and now openHAB’s REST API.
Eventually I plan on implementing Homie for the MQTT but for now MQTT would be used for integration with external non-openHAB services (or openHAB if you prefer).
The openHAB REST API Connection makes some restrictions for simplicity.
- all sensor readings and actuator results are posted as updates to configured Items using the REST API Items endpoint.
- all Actuators only listen for commands on configured Items.
Example round trip data flow
- openHAB issues a command to Item Test_Actuator_CMD
- sensorReporter has an Actuator that is listening for commands to that Item. The string contents of the command are passed to the Actuator.
- The Actuator performs it’s action based on the contents of the command.
- The Actuator publishes the results of the action to Test_Actuator_Results
How it Works
This example is written in Python 3. For publishing Item state updates the requests library is used to issue PUT commands to http://<address of openHAB>:<port of openHAB>/rest/items/<Item Name>/state
. For subscribing we use openHAB’s SSE endpoint http://<address of openHAB>:<port>/rest/events
.
Publishing
def publish(self, state, item):
"""Publishes the passed in state to the passed in Item as an update."""
try:
log.debug("Publishing message %s to %s", state, item)
response = requests.put("{}/rest/items/{}/state"
.format(self.openhab_url, item),
data=state, timeout=10)
response.raise_for_status()
except ConnectionError:
log.error("Failed to connect to %s\n%s", self.openhab_url,
traceback.format_exc())
except requests.exceptions.Timeout:
log.error("Timed out connecting to %s")
except requests.exceptions.HTTPError as ex:
log.error("Received and unsuccessful response code %s", ex)
It couldn’t be more straight forward. Issue a PUT to the right URL with the passed in state as the data. Check to make sure that the request was successful and log an error if it wasn’t.
Exercise for the reader: add message caching if it fails with a Timeout or ConnectionError and send them when the connection is restored.
Subscribing
The subscription is through SSE. Unlike websockets, SSE is unidirectional. The messages originate at the server, openHAB in this case, and are sent to the clients. The SSE endpoint for openHAB is http://<address of openHAB>:8080/rest/events
.
In Python there are several third party libraries that will subscribe to the SSE feed and let you write code to process each event. I chose sseclient-py as one of the easiest to use. There may be better options. To use it:
- open a streaming request to the events endpoint using requests
- create an sseclient.SSEClient(stream) to process the stream of events
- create a loop to process the events as they come in.
Because of the way sensorReporter works, I put the loop that processes the messages into a separate Thread.
...
# Subscribe to SSE events and start processing the events
stream = requests.get("{}/rest/events".format(self.openhab_url),
stream=True)
self.client = sseclient.SSEClient(stream)
self.thread = Thread(target=self._get_messages)
self.thread.start()
self.stop = False
def _get_messages(self):
"""Blocks until stop is set to True. Loops through all the events on the
SSE subscription and if it's a command to a registered Item, call's that
Item's handler.
"""
for event in self.client.events():
# If we are stopping exit.
if self.stop:
return
# See if this is an event we care about. Commands on registered Items.
decoded = json.loads(event.data)
if decoded["type"] == "ItemCommandEvent":
item = decoded["topic"].replace("smarthome/items/", "").replace("/command", "")
if item in self.registered:
payload = json.loads(decoded["payload"])
msg = payload["value"]
log.info("Received command from %s: %s", item, msg)
self.registered[item](msg)
The events are JSON and look like
{"topic":"smarthome/items/vRichPhone_Manticore_Net/command","payload":"{\"type\":\"OnOff\",\"value\":\"ON\"}","type":"ItemCommandEvent"}
Putting it All Together
"""Communicator that publishes and subscribes to openHAB's REST API.
Classes:
- openhab_rest: publishes state updates to openHAB Items.
"""
import logging
import json
import traceback
from threading import Thread
import requests
import sseclient
from core.connection import Connection
log = logging.getLogger(__name__.split(".")[1])
class OpenhabREST(Connection):
"""Publishes a state to a given openHAB Item. Expects there to be a URL
parameter set to the base URL of the openHAB instance. Subscribes to the OH
SSE feed for commands on the registered Items.
"""
def __init__(self, msg_processor, params):
"""Starts the SSE subscription and registers for commands on
RefreshItem. Expects the following params:
- "URL": base URL of the openHAB instance NOTE: does not support TLS.
- "RefreshItem": Name of the openHAB Item that, when it receives a
command will cause sensor_reporter to publish the most recent states of
all the sensors.
- msg_processor: message handler for command to the RefreshItem
"""
super().__init__(msg_processor, params, log)
log.info("Initializing openHAB REST Connection...")
self.openhab_url = params("URL")
self.refresh_item = params("RefreshItem")
self.registered = {}
self.registered[self.refresh_item] = msg_processor
# Subscribe to SSE events and start processing the events
stream = requests.get("{}/rest/events".format(self.openhab_url),
stream=True)
self.client = sseclient.SSEClient(stream)
self.thread = Thread(target=self._get_messages)
self.thread.start()
self.stop = False
def _get_messages(self):
"""Blocks until stop is set to True. Loops through all the events on the
SSE subscription and if it's a command to a registered Item, call's that
Item's handler.
"""
for event in self.client.events():
# If we are stopping exit.
if self.stop:
return
# See if this is an event we care about. Commands on registered Items.
decoded = json.loads(event.data)
if decoded["type"] == "ItemCommandEvent":
item = decoded["topic"].replace("smarthome/items/", "").replace("/command", "")
if item in self.registered:
payload = json.loads(decoded["payload"])
msg = payload["value"]
log.info("Received command from %s: %s", item, msg)
self.registered[item](msg)
def register(self, item, handler):
"""Actuators register the passed in Item with this Connection. When that
Item receives a command, the handler is called with the received command.
The handler is expected to accept a single string as an argument.
"""
self.registered[item] = handler
def publish(self, state, item):
"""Publishes the passed in state to the passed in Item as an update."""
try:
log.debug("Publishing message %s to %s", state, item)
response = requests.put("{}/rest/items/{}/state"
.format(self.openhab_url, item),
data=state, timeout=10)
response.raise_for_status()
except ConnectionError:
log.error("Failed to connect to %s\n%s", self.openhab_url,
traceback.format_exc())
except requests.exceptions.Timeout:
log.error("Timed out connecting to %s")
except requests.exceptions.HTTPError as ex:
log.error("Received and unsuccessful response code %s", ex)
def disconnect(self):
"""Stops the event processing loop."""
log.info("Disconnecting from openHAB SSE")
self.stop = True
self.thread.join()
The above is still a work in progress and subject to change as I progress with my rewrite but the publish/subscribe parts work. I hope it provides inspiration and help for others trying to so the same.