MQTT device initial state checking

I do exactly this.

I’ve not encountered that error before but I handle this use case fairly differently so maybe that is why. Below is my code which relies on the fact that the devices pretty regularly publish messages so an absence of a message for an extended period of time can indicate an error. I also use the LWT message to indicate that a Service has gone offline immediately.

I don’t know if this will help you. I’ve written up a more generic implementation that works with non-MQTT devices as well here. The following is a simplified implementation that takes advantage of MQTT.

Items

Group:Switch:AND(ON, OFF) gSensorStatus "Sensor's Status [MAP(admin.map):%s]"
  <network>

Group:Switch gOfflineAlerted

Switch vCerberos_SensorReporter_Online "Cerberos sensorReporter [MAP(admin.map):%s]"
    <network> (gSensorStatus)
    { mqtt="<[mosquitto:status/sensor-reporters:command:OFF:.*cerberos sensorReporter is dead.*],<[mosquitto:status/cerberos/heartbeat/string:command:ON]", expire="11m,command=OFF" }

Switch vCerberos_SensorReporter_Online_Alerted (gOfflineAlerted)

In the above, I have a Group for which all the Online switches are a member and a Group and Associated Item that gets set when I generate an offline alert so I only receive one alert per offline event.

As you can see, I subscribe to a LWT topic. I use the same topic for all my devices so I use the matching REGEX to only match against this device’s LWT messages. Then I subscribe to a heartbeat topic for that device and set the online switch to ON for any message published there. This can be any or all of the topics your device uses. The contents of the message do not matter, just the fact that a message was published is enough to set the Switch to ON. Finally, I use the Expire Binding to command the Switch to OFF if no messages are received for 11 minutes.

Depending on what you want this one Item config may be sufficient for your purposes.

Rules

import java.util.concurrent.locks.ReentrantLock
import java.util.Map

val ReentrantLock statusLock = new ReentrantLock
val Map<String, Timer> timers = newHashMap

rule "A sensor changed its online state"
when
	Item gSensorStatus received update
then
    try {
    	statusLock.lock
    	Thread::sleep(100)
    	val recentUpdates = gSensorStatus.members.filter[sensor|sensor.lastUpdate("mapdb") != null && sensor.lastUpdate("mapdb").isBefore(now.minusSeconds(1).millis)]

    	recentUpdates.forEach[sensor|
    		val alerted = gOfflineAlerted.members.filter[a|a.name==sensor.name+"_Alerted"].head
    		if(alerted == null) logError("admin", "Cannot find Item " + sensor.name+"_Alerted")
    		
    		if(alerted != null && alerted.state == sensor.state && timers.get(sensor.name) == null){
    			val currState = sensor.state
    			// wait a few seconds and check again before sending alert
    			timers.put(sensor.name, createTimer(now.plusSeconds(15), [|
    				if(sensor.state == currState) {
    					var name = transform("MAP", "admin.map", sensor.name)
    					if(name == "") name = sensor.name
		    			aInfo.sendCommand(name + " is now " + transform("MAP", "admin.map", sensor.state.toString) + "!")
		    			alerted.postUpdate(if(sensor.state == ON) OFF else ON)   					
    				}
    			]))
    		}
    	]
    }
    catch(Exception e){
    	logError("admin", "Error processing an online status change: " + e.toString)
    }
    finally {
    	statusLock.unlock
    }
end
 
rule "Reminder at 08:00 and system start"
when
	Time cron "0 0 8 * * ? *" or
	System started
then
    val message = new StringBuilder 
    
    val offline = gSensorStatus.members.filter[sensor|sensor.state != ON]
    if(offline.size > 0) {
    	message.append("The following sensors are offline: ")
    	offline.forEach[sensor|
    		var name = transform("MAP", "admin.map", sensor.name)
    		if(name == "") name = sensor.name
    		message.append(name)
    		message.append(", ")
            gOfflineAlerted.members.filter[a|a.name==sensor.name+"_Alerted"].head.postUpdate(ON)
    	]
    	message.delete(message.length-2, message.length)
    	aInfo.sendCommand(message.toString)
    }
    
    gSensorStatus.members.filter[sensor|sensor.state == ON].forEach[sensor |
    	sensor.sendCommand(ON) // reset the Expire timer
    ]
end 

rule "A sensor device came back online, request an update from the sensors"
when
  System started
then
	gDoorsTimers.members.filter[door|door.state == ON].forEach[ door | door.sendCommand(ON) ]
    aSensorUpdate.sendCommand(ON)
end

In the above rules I have one rule that triggers when ever the sensor online’s group receives an update. I pull those that Items that received an update in the last second and generate an alert message. I also set the assocaited Alerted Item to ON when I alert the sensor is offline and OFF when I alert they are back online.

The second rule generates an alert when OH first starts up and at 08:00 with a list of all the sensors that are offline.

Sitemap

  Text item=gSensorStatus {
        Frame item=vNetwork_Cerberos {
            Text item=vCerberos_SensorReporter_Online
            ...
        }
   }

I have other sensors that get listed on the sitemap, each in a separate frame. vNetwork_Cerberos is attached to the Network binding and shows whether the device itself is online. Other Items use the Network binding to see if certain services are running on that machine. It ends up looking like:

image

Finally, I have a Sytem started rule that publishes a message to my “update” topic that all the MQTT devices subscribe to. They respond with their current sensor readings.

So I guess I am doing the same thing and not seeing any sort of error.

What are you running on?

1 Like