MQTT device initial state checking

I’m trying to come up with an efficient way to keep track of MQTT devices online state, and query them when they come back up or when the system starts.

Previously I had a rule fire on system start that would start a timer that would query all my devices for their states after a sufficient delay, so that the MQTT service would be up and connected. This solution is a bit of a kudgel, and only works when the system comes up.

Yesterday I decided it was time to improve this part of my installation and move to keeping track of online states using the ONLINE/OFFLINE messages in the LWT topic for each device. It occurred to me that I could also use the ONLINE message as a trigger to send a status query command, which works great when the system is already up and running.
Below you can see the item I am using to capture the LWT messages. I then fire the ONLINE message back out to the command topic, which results in the device responding with its status because it does not recognise ONLINE as a valid command (I am doing this because the MQTT binding does not seem to allow sending null payloads from items)

String mqtt_bedroom_overhead "Bedroom Overhead [%s]" (mqtt_group) {mqtt=">[alfred-mqtt:command/bedroom/ceilinglight/POWER:command:ONLINE:${command}], <[alfred-mqtt:telemetry/bedroom/ceilinglight/LWT:command:default]"}

The issue I am having is at system start. As you can see in the log entries below, the binding parses the incoming messages in a separate thread, thus generating the outgoing message while the broker is still finishing setting itself up. So the broker has not posted that it is finished connecting before another thread is done parsing the retained LWT message and trying to send that message back out.

13:05:18.667 [INFO ] - Starting MQTT broker connection 'alfred-mqtt'
13:05:18.789 [INFO ] - Item 'mqtt_bedroom_overhead' received command Online
13:05:18.796 [WARN ] - Broker connection not started. Cannot publish message to topic 'command/bedroom/ceilinglight/POWER'

Has anyone else had a similar issue?

Any thoughts on how to handle this case, aside from creating a rule for each device or continuing to use my startup rule?

Do you think this counts as a bug, and if so, where do I report it?

Thanks for your input
Mike

I do exactly this.

I’ve not encountered that error before but I handle this use case fairly differently so maybe that is why. Below is my code which relies on the fact that the devices pretty regularly publish messages so an absence of a message for an extended period of time can indicate an error. I also use the LWT message to indicate that a Service has gone offline immediately.

I don’t know if this will help you. I’ve written up a more generic implementation that works with non-MQTT devices as well here. The following is a simplified implementation that takes advantage of MQTT.

Items

Group:Switch:AND(ON, OFF) gSensorStatus "Sensor's Status [MAP(admin.map):%s]"
  <network>

Group:Switch gOfflineAlerted

Switch vCerberos_SensorReporter_Online "Cerberos sensorReporter [MAP(admin.map):%s]"
    <network> (gSensorStatus)
    { mqtt="<[mosquitto:status/sensor-reporters:command:OFF:.*cerberos sensorReporter is dead.*],<[mosquitto:status/cerberos/heartbeat/string:command:ON]", expire="11m,command=OFF" }

Switch vCerberos_SensorReporter_Online_Alerted (gOfflineAlerted)

In the above, I have a Group for which all the Online switches are a member and a Group and Associated Item that gets set when I generate an offline alert so I only receive one alert per offline event.

As you can see, I subscribe to a LWT topic. I use the same topic for all my devices so I use the matching REGEX to only match against this device’s LWT messages. Then I subscribe to a heartbeat topic for that device and set the online switch to ON for any message published there. This can be any or all of the topics your device uses. The contents of the message do not matter, just the fact that a message was published is enough to set the Switch to ON. Finally, I use the Expire Binding to command the Switch to OFF if no messages are received for 11 minutes.

Depending on what you want this one Item config may be sufficient for your purposes.

Rules

import java.util.concurrent.locks.ReentrantLock
import java.util.Map

val ReentrantLock statusLock = new ReentrantLock
val Map<String, Timer> timers = newHashMap

rule "A sensor changed its online state"
when
	Item gSensorStatus received update
then
    try {
    	statusLock.lock
    	Thread::sleep(100)
    	val recentUpdates = gSensorStatus.members.filter[sensor|sensor.lastUpdate("mapdb") != null && sensor.lastUpdate("mapdb").isBefore(now.minusSeconds(1).millis)]

    	recentUpdates.forEach[sensor|
    		val alerted = gOfflineAlerted.members.filter[a|a.name==sensor.name+"_Alerted"].head
    		if(alerted == null) logError("admin", "Cannot find Item " + sensor.name+"_Alerted")
    		
    		if(alerted != null && alerted.state == sensor.state && timers.get(sensor.name) == null){
    			val currState = sensor.state
    			// wait a few seconds and check again before sending alert
    			timers.put(sensor.name, createTimer(now.plusSeconds(15), [|
    				if(sensor.state == currState) {
    					var name = transform("MAP", "admin.map", sensor.name)
    					if(name == "") name = sensor.name
		    			aInfo.sendCommand(name + " is now " + transform("MAP", "admin.map", sensor.state.toString) + "!")
		    			alerted.postUpdate(if(sensor.state == ON) OFF else ON)   					
    				}
    			]))
    		}
    	]
    }
    catch(Exception e){
    	logError("admin", "Error processing an online status change: " + e.toString)
    }
    finally {
    	statusLock.unlock
    }
end
 
rule "Reminder at 08:00 and system start"
when
	Time cron "0 0 8 * * ? *" or
	System started
then
    val message = new StringBuilder 
    
    val offline = gSensorStatus.members.filter[sensor|sensor.state != ON]
    if(offline.size > 0) {
    	message.append("The following sensors are offline: ")
    	offline.forEach[sensor|
    		var name = transform("MAP", "admin.map", sensor.name)
    		if(name == "") name = sensor.name
    		message.append(name)
    		message.append(", ")
            gOfflineAlerted.members.filter[a|a.name==sensor.name+"_Alerted"].head.postUpdate(ON)
    	]
    	message.delete(message.length-2, message.length)
    	aInfo.sendCommand(message.toString)
    }
    
    gSensorStatus.members.filter[sensor|sensor.state == ON].forEach[sensor |
    	sensor.sendCommand(ON) // reset the Expire timer
    ]
end 

rule "A sensor device came back online, request an update from the sensors"
when
  System started
then
	gDoorsTimers.members.filter[door|door.state == ON].forEach[ door | door.sendCommand(ON) ]
    aSensorUpdate.sendCommand(ON)
end

In the above rules I have one rule that triggers when ever the sensor online’s group receives an update. I pull those that Items that received an update in the last second and generate an alert message. I also set the assocaited Alerted Item to ON when I alert the sensor is offline and OFF when I alert they are back online.

The second rule generates an alert when OH first starts up and at 08:00 with a list of all the sensors that are offline.

Sitemap

  Text item=gSensorStatus {
        Frame item=vNetwork_Cerberos {
            Text item=vCerberos_SensorReporter_Online
            ...
        }
   }

I have other sensors that get listed on the sitemap, each in a separate frame. vNetwork_Cerberos is attached to the Network binding and shows whether the device itself is online. Other Items use the Network binding to see if certain services are running on that machine. It ends up looking like:

image

Finally, I have a Sytem started rule that publishes a message to my “update” topic that all the MQTT devices subscribe to. They respond with their current sensor readings.

So I guess I am doing the same thing and not seeing any sort of error.

What are you running on?

1 Like

As usual, an excessively clear and elaborate answer!

While this example is great, it’s a bit of overkill for my use case. At present my application of it is not so mission critical as yours seems. I have, however, improved my implementation after studying yours.

Previously my startup rule manually published messages to each device to get its status. Now that I have implemented this LWT catching method from above, reading your implementation gave me the idea to just use a lambda to loop through all my devices and use the online item to query them.

So I changed all of the items to look like this. Switched to manual updates and added a second inbound MQTT catch to update the state so that only the device LWT message can change the state. It also brings in messages as commands, and sends out the online command to get device status.

String mqtt_dev_bedroom_overhead "Bedroom Overhead Lamp [%s]" (mqtt_group) {autoupdate="false", mqtt=">[alfred-mqtt:command/bedroom/ceilinglight/POWER:command:ONLINE:${command}], <[alfred-mqtt:telemetry/bedroom/ceilinglight/LWT:command:default], <[alfred-mqtt:telemetry/bedroom/ceilinglight/LWT:state:default]"}

And now my startup rule is still in use because I can’t get around the issue of the messages being parsed before the broker is finished connecting. I don’t have to add each device manually anymore though

mqtt_group.members.filter[a|a.name.toString.contains("_dev_")].forEach[ device| device.sendCommand("ONLINE")]

I just use a lambda that sends an online command to each device, thus triggering the status request messages, but not affecting the state of the item.

Thanks again for your help, and for all of the tutorials and examples you have all over the forum. I have learned much from you, and continue to every day!
Mike

This may not be too helpful… but for those having control over their MQTT devices… I have each MQTT node send their Ethernet status at a set interval, and use the expire binding on an item to trigger an alert when the heartbeat is absent. Works like a charm.
The nodes also report if they have rebooted; this last time stamp is captured.

image

Great minds think alike. I do the same. That is where that “Cerberos sensorReporter Uptime” comes from. My MQTT device (in this particular case a Pi running sensorReporter) reports the number of seconds it has been running periodically as a heart beat which along with Expire binding is what drives the “Cerberos sensorReporter” online status.

I just parse the number of seconds to days:hours:minutes:seconds instead of a DateTime.

The same code I posted above and approach also works for any sort of device that reports periodically. For example, the Nest status gets updated every minute or so. My zwave smoke alarms have a heartbeat channel. These are convenient. However, if you have devices that are not as easy to split out into a separate Online Switch a slightly more complicated approach which I wrote up in the design pattern I linked above can be used.

The thing I like about this approach thus far is that to add something new to be monitored all I need to do is create the Items (an Online and Alerted) for each one, link them to how ever they get updated with Expire binding, and add them to the right Groups. The Rules remain unchanged.

In fact, as an anecdote, I added some Network binding Things to monitor some of my other services. When I created the Items and added them to the gSensorStatus Group I got a couple of alerts. I thought I had a bug in my code. It turned out I had two services that I rarely use that failed to come back up the last time I had a power outage.

Well this gave me a few ideas. I’ve added some more device statistics to my setup, mostly just for my curiosity at this point.

Good idea, I have update my devices’ configuration so that they report a status message every 5 minutes (previously they were only sending keep-alive packets to the MQTT server). From those messages I am now capturing the message time, RSSI, and Uptime as well.

I am still only using the LWT message for the online status, I will always get an offline message this way. Even if a device disconnects ungracefully, the server will post the offline message after a specified delay (I believe they are set to 5 or 15min).

I will be coming back to this thread for reference when I start incorporating push notifications and alerts into my installation. That code of yours @rlkoshak is very portable, and will make it easy to move to having online/offline alerts for any type of device.

1 Like