Asynchronous Events to Synchronous Event

Problem Statement

At times we want to process events that happen asynchronously but only after we have recent data from a set of events. This can be difficult to implement without a lot of custom programming.

As an example, MQTT does not guarantee the order of receiving messages. So if we need to process data from MQTT and we only want to do that when a recent set of MQTT messages has been received, we need to do a lot of work to track the timing of those messages.

Concept

Asynchronous to Synchronous provides a simple way to group items such that a single event is generated when all of those items have been updated within a predetermined time.

Use Case

The MQTT Car Presence is a great way to automate garage door opening and closing. The device works by connecting to your local wifi network and sending 3 separate MQTT Topics. You need all 3 payloads (On/Off, Signal Strength, Up Time) to determine if the car is arriving or leaving. However, the order in which the topics will arrive is not guaranteed. Consequently a fair bit of coding is required to determine the timing of the messages before taking an action such as opening or closing the garage door. This method removes all of that extra work by tracking the items, the last time they were updated and then generating a single event to indicate that the tracked items are all in sync with one another.

Required Groups

async_to_sync.items -> create in items folder

// Main Groups
Group	gAsync_to_Sync
Group	gAsync_to_Sync_Aggregator				(gAsync_to_Sync)
Group	gAsync_to_Sync_Proxy_Switches			(gAsync_to_Sync)

Example

Items

// Example
Group	gMQTTTestGroup							(gAsync_to_Sync_Aggregator)			{Async_Time=".5"}	// .5 seconds to collect events
Switch	MQTTTestSwitch1							(gMQTTTestGroup)					{Async_Time="10"}   // override default time above
Switch	MQTTTestSwitch2							(gMQTTTestGroup)                                        // uses default group time
Switch	gMQTTTestGroup_Sync_Switch				(gAsync_to_Sync_Proxy_Switches)		// ON when events happen with aggreation time, OFF when not. NOTE: this item is autocreated by the module

Behavior

A group call gMQTTTestGroup is created to contain the items that we want to collect updates from.

The last item is a proxy switch which is turned ON when updates have occurred within Group Async_Time. This proxy switch item is created automatically by the script and is shown here for example only (you can create it if you want).

Additionally, if some items are not updated as frequently as others, you specify the Async_Time for that item as shown above.

Jython Code

async_to_sync.py -> create in automation/jsr223/python/personal


from datetime import datetime

from core import osgi,items
from core.triggers import when
from core.rules import rule
from core.metadata import get_value,set_value

from org.slf4j import Logger, LoggerFactory

scriptExtension.importPreset("RuleSupport")
scriptExtension.importPreset("RuleSimple")

log = LoggerFactory.getLogger("org.eclipse.smarthome.model.script.Rules")

event_aggregator_group = ir.getItem("gAsync_to_Sync_Aggregator")
proxy_switches_group = ir.getItem("gAsync_to_Sync_Proxy_Switches")

@rule("Async to Sync Aggregator")
@when("Descendent of gAsync_to_Sync_Aggregator received update")
def aggregator(event):
    event_item=ir.getItem(event.itemName)
    log.info ('Async event from item {}'.format(event_item))

    for event_group in event_aggregator_group.members:#find group(s) item is a member of 
        if event_item in event_group.members:
            group_aggregation_time = get_value(event_group.name,'Async_Time')
            if group_aggregation_time is None:
                log.warn ('Async group {} has no Async_Time metadata'.format(event_group))
                return
 
            group_aggregation_time=float(group_aggregation_time)
            log.info ('Async group {} aggregation time {}'.format(event_group,group_aggregation_time))

            proxy_switch_name = event_group.name+'_Sync_Switch'
            #create proxy switch item if needed
            try:
                proxy_switch_item = ir.getItem(proxy_switch_name)
            except:
                proxy_switch_item = items.add_item (proxy_switch_name,item_type="Switch",groups=[proxy_switches_group.name])

            now = datetime.now()
            set_value (event_item.name,"Last_Update",str(now))# store event time for this item

            events_in_sync=True
            for item in event_group.members:
                last_update = get_value(item.name,'Last_Update')
                if last_update is None: # no events from this item, exit
                    events_in_sync=False
                    break

                last_update_time = datetime.strptime(last_update, "%Y-%m-%d %H:%M:%S.%f")
                log.info('Item {} Last update {}'.format(item.name,last_update_time))
                delta=now-last_update_time

                aggregation_time = group_aggregation_time # get group time or use item time if given
                item_aggregation_time = get_value(item.name,'Async_Time')
                if item_aggregation_time is not None:
                    aggregation_time=float(item_aggregation_time)
                    log.info ('Async item {} aggregation time {}'.format(item,aggregation_time))

                if delta.total_seconds() > aggregation_time: #determine if events are in sync
                    events_in_sync=False
                    break
        
            if events_in_sync:
                events.sendCommand(proxy_switch_item,'ON')
            else:
                events.sendCommand(proxy_switch_item,'OFF')
     

The slf4j import is not needed, since there is a log attribute created for the rule function. Also, it does not look like you are using the core.osgi import. There are several things that would be cleaned up by running this through pylint too :wink:!

As for your tutorial, it is not clear to me exactly what the problem is that you are trying to solve and how your code helps. I think it would be helpful if you could provide some real-world examples of where and how you have used this.

I’ll throw in an example.

Modbus periodically polls many “registers” in one transfer from a remote device. We might consider that to be a “set” of related data.
However, that set of raw data gets processed one piece at a time into Individual Items. Due to openHAB’s asychronous nature, It’s not really possible to determine reliably which would be the last Item to get processed.

If you want to process two Items as a pair, there is risk that triggering from the update of either would have you working with one “old” state and one “new” state. Let’s say you have something send “03” and “99” which represents a meter reading 3.99. Maybe next time you get “04” and “01”. The risk is processing that into a false 04.99 if you don’t use matching data.

The only existing circumvention is a crude delay from either update, and hope for the best.

The example allows a positive action as soon as all of some set of Items are “ready”.

1 Like

Added a use case to the text.

2 Likes

Thank you for saying what I didn’t :slight_smile: