This post will be a little different. You will not be seeing new code here. Instead you will see a refactoring of my existing code. If you recall, I have three places so far where I need to create and keep track of a bunch of Timers, one for each Item that generates an event. This requires a dict and a bunch of book keeping logic that gets repeated in each case. So I’ve extracted that code into a library module and adjusted my Rules to use the library code instead.
First the library module, placed in $OH_CONF/automation/lib/python/personal/timer_mrg.py
"""
A class to centralize management of multiple Timers in cases where one needs to keep
track of one timer per Item
"""
from core.jsr223.scope import ir
from core.jsr223.scope import items
from core.actions import ScriptExecution
from org.joda.time import DateTime
from core.log import log_traceback
class timer_mgr(object):
"""
Keeps and manages a dictionary of Timers keyed on an Item name.
"""
def __init__(self):
self.timers = {}
@log_traceback
def __not_flapping(self, itemName):
"""
Called when the Timer expires. Call the function and delete the timer from the dict.
This function ensures that the dict get's cleaned up.
Args:
itemName: the name of the Item associated with the Timer
"""
try:
self.timers[itemName]['not_flapping']()
finally:
del self.timers[itemName]
@log_traceback
def timer_check(self, itemName, interval, function, flapping_function=None, reschedule=False):
"""
Call to check whether an Item is flapping. If a Timer exists, create a new timer to run the passed
in function.
If a Timer exists, reschedule it if reschedule is True. If a flapping_function was passed, run it.
Args:
itemName: The name of the Item we are monitoring for flapping
interval: How long the Item needs to remain the same state before calling function, in milliseconds
function: Function to call when the timer expired
flapping_function: Optional function to call if the Item is found to be flapping
reschedule: Optional flag that causes the Timer to be rescheduled when the Item is flapping
"""
# Timer exists, reschedule and call flapping_lambda if necessary
if itemName in self.timers:
# Reschedule the Timer if desired
if reschedule: self.timers[itemName]['timer'].reschedule(DateTime.now().plusMillis(interval))
# Cancel the Timer if not rescheduling
else:
self.timers[itemName]['timer'].cancel()
del self.timers[itemName]
# Call the flapping function
if flapping_function: flapping_function()
# No timer exists, create the Timer
else:
item = ir.getItem(itemName)
self.timers[itemName] = { 'orig_state': item.state,
'timer': ScriptExecution.createTimer(DateTime.now().plusMillis(interval), lambda: self.__not_flapping(itemName)),
'flapping': flapping_function,
'not_flapping': function }
def has_timer(self, itemName):
"""
Returns True if there is a Timer for this Item, False otherwise
"""
return True if itemName in self.timers else False
@log_traceback
def cancel(self, itemName):
"""
Cancels the Timer assocaited with this Timer if one exists
"""
if not self.has_timer(itemName): return
self.timers[itemName]['timer'].cancel()
del self.timers[itemName]
Features of note include:
- all of the dict book keeping is centralized in this one place, no more forgetting to del the timer when we are done
- timer_check is where most of the real work is done. It operates in two modes. If the timer already exists we reschedule the timer if told to and we call the flapping function if given one. If the timer doesn’t exist we create it. We use our own function instead of the passed in lambda so we have the opportunity to clean up the dict.
- Most of the time I expect to only be using timer_check, but there is a has_timer and cancel function when it’s necessary.
Here is a test script:
from core.log import logging, LOG_PREFIX
log = logging.getLogger("{}.TEST".format(LOG_PREFIX))
import personal.timer_mgr
reload(personal.timer_mgr)
from personal.timer_mgr import timer_mgr
timers = timer_mgr()
def flapping():
log.info("Test was found to be flapping")
def not_flapping():
log.info("Test was found not to be flapping")
from time import sleep
log.info("TEST: Flapping, no lambda, no reschedule: no output")
timers.timer_check("Test", 1000, not_flapping)
sleep(0.5)
timers.timer_check("Test", 1000, not_flapping)
sleep(0.5)
log.info("TEST: Flapping, with lambda, no reschedule: Test as found to be flapping")
timers.timer_check("Test", 1000, not_flapping, flapping_function=flapping)
sleep(0.5)
timers.timer_check("Test", 1000, not_flapping, flapping_function=flapping)
sleep(0.5)
log.info("TEST: Flapping, with lambda and reschedule: Test was found to be flapping, Test was found not to be flapping")
timers.timer_check("Test", 1000, not_flapping, flapping_function=flapping, reschedule=True)
sleep(0.5)
timers.timer_check("Test", 1000, not_flapping, flapping_function=flapping, reschedule=True)
sleep(1.1)
log.info("TEST: Not flapping: Test was found not to be flapping")
timers.timer_check("Test", 1000, not_flapping)
sleep(1.1)
log.info("TEST: hasTimer and cancel")
timers.timer_check("Test", 1000, not_flapping)
sleep(0.5)
if timers.has_timer("Test"): log.info("Timer exists")
log.info("Cancelling timer")
timers.cancel("Test")
sleep(0.6)
log.info("Cancelling non-existant timer")
if not timers.has_timer("Test"): log.info("No timer exists")
timers.cancel("Test")
log.info("Done")
And here are the new versions of my offline alerting Rule and my door reminder rule, both of which have been modified to use this new class.
from core.rules import rule
from core.triggers import when
from core.metadata import get_key_value, set_metadata
from personal.util import send_info, get_name
from threading import Timer
from core.log import log_traceback
from personal.timer_mgr import timer_mgr
# -----------------------------------------------------------------------------
timers = timer_mgr()
on_off_map = { ON: 'online', OFF: 'offline' }
@log_traceback
def alert_timer_expired(itemName, name, origState, log):
alerted = get_key_value(itemName, "Alert", "alerted") or "OFF"
if items[itemName] != origState:
log.warning("In alert_timer_expired and {}'s current state of {} is different from it's original state of {}.".format(name, items[itemName], origState))
# If our alerted flag equals the Item's state we need to generate an alert
if str(items[itemName]) == alerted:
send_info("{} is now {}".format(name, on_off_map[items[itemName]]), log)
set_metadata(itemName, "Alert", { "alerted": 'OFF' if alerted == 'ON'else 'ON' }, overwrite=False)
else: log.warning("Alert timer expired but curr state doesn't match alert {} != {}".format(name, items[itemName], alerted))
def alert_timer_flapping(itemName, name, log):
alerted = get_key_value(itemName, "Alert", "alerted") or "OFF"
log.warning("{} is flapping! Alerted = {} and current state = {}".format(name, alerted, items[itemName]))
@rule("Device online/offline", description="A device we track it's online/offline status changed state", tags=["admin"])
@when("Member of gSensorStatus changed")
def status_alert(event):
name = get_name(event.itemName)
if isinstance(event.oldItemState, UnDefType):
status_alert.log.warning("{} is in an undef type, canceling any running timers".format(name))
timers.cancel(event.itemName)
return
timers.timer_check(event.itemName, 60000,
lambda: alert_timer_expired(event.itemName, name, event.itemState, status_alert.log),
flapping_function=lambda: alert_timer_flapping(event.itemName, name, status_alert.log),
reschedule=True)
# -----------------------------------------------------------------------------
@rule("System status reminder", description="Send a message with a list of offline sensors at 08:00 and System start", tags=["admin"])
@when("Time cron 0 0 8 * * ?")
@when("System started")
def status_reminder(event):
numNull = len(filter(lambda item: isinstance(item.state, UnDefType), ir.getItem("gSensorStatus").members))
if numNull > 0: status_reminder.log.warning("There are {} sensors in an unknown state!".format(numNull))
offline = filter(lambda item: item.state == OFF, ir.getItem("gSensorStatus").members)
if len(offline) == 0:
status_reminder.log.info("All sensors are online")
return
offlineMessage = "The following sensors are known to be offline: {}".format(", ".join(map(lambda sensor: "{}".format(get_name(sensor.name)), sorted(sensor for sensor in offline))))
for sensor in offline: set_metadata(sensor.name, "Alert", { "alerted" : "ON"}, overwrite=False)
send_info(offlineMessage, status_reminder.log)
# -----------------------------------------------------------------------------
@rule("Power Meter online status", description="Zwave has marked the power meter as offline", tags=["admin"])
@when("Thing zwave:device:dongle:node7 changed")
def pm_online(event):
pm_online.log.info("Power meter Thing changed status {}".format(event.statusInfo.status))
if event.statusInfo.status == ONLINE: events.sendCommand("vPowerMeter_Online", "ON")
else: events.sendCommand("vPowerMeter_Online", "OFF")
# -----------------------------------------------------------------------------
@rule("Process heartbeat", description="Process an uptime heartbeat message to ping the online status of a sensor", tags=["admin"])
@when("Member of SensorEvents received update")
def heartbeat(event):
events.sendCommand(event.itemName.replace("Uptime", "Online"), "ON")
There are a couple of other changes from when I originally posted this one. See if you can find them. Notice how there is no more book keeping code in this Rule.
from core.rules import rule
from core.triggers import when
from personal.util import get_name, send_alert
from core.actions import ScriptExecution
from org.joda.time import DateTime
from core.log import log_traceback
from core.metadata import get_key_value
from personal.timer_mgr import timer_mgr
flapping_timers = timer_mgr()
reminder_timers = timer_mgr()
def is_night(): return items["vTimeOfDay"] == StringType("NIGHT") or items["vTimeOfDay"] == StringType("BED")
@log_traceback
def get_mins(itemName, log):
value = get_key_value(itemName, "Static", "rem_mins")
return 60 if not value else value.intValue()
@log_traceback
def reminder_timer(itemName, name, mins, log):
if items[itemName] != OPEN:
log.warning("{} open timer expired but door is not open!".format(name))
return
send_alert("{} has been open for {} minutes".format(name, mins), log)
if is_night():
log.info("Rescheduling timer because it is night")
reminderTimers[itemName].reschedule(DateTime.now().plusMinutes(mins))
reminder_timers.timer_check(itemName, int(mins*60*1000),
lambda: reminder_timer(itemName, name, mins, log),
reschedule=True)
@log_traceback
def not_flapping(itemName, origState, name, log):
if items[itemName] != origState:
log.warning("{} was flapping, false alarm!".format(name))
else:
# Update the timestamp
events.postUpdate(itemName+"_LastUpdate", DateTime.now().toString())
# Set a reminder timer
mins = get_mins(itemName, log)
if items[itemName] == OPEN:
reminder_timers.timer_check(itemName, int(mins*60*1000),
lambda: reminder_timer(itemName, name, mins, log),
reschedule=True)
elif items[itemName] == CLOSED:
if reminder_timers.has_timer(itemName): log.info("{} has a timer!".format(name))
reminder_timers.cancel(itemName)
# Create the message to log and maybe alert
alert = False
change = "opened" if origState == OPEN else "closed"
time = ""
present = ""
if is_night():
time = " and it is night"
alert = True
if items["vPresent"] == OFF:
present = " and no one is home"
alert = True
msg = "The {} was {}{}{}!".format(name, change, time, present)
if alert: send_alert(msg, log)
else: log.info(msg)
def flapping(name, state, log):
log.warning("{} appears to be flapping, it is now {}!".format(name, state))
# TODO: rework, these reminders are not useful any longer
@rule("Door reminder", description="Track when doors change state and set an alert if they remain open for too long", tags=["entry"])
@when("Member of gDoorSensors changed")
def door_changed(event):
if isinstance(event.oldItemState, UnDefType): return
name = get_name(event.itemName)
state = event.itemState
if state == UNDEF:
door_changed.log.warning("{} is in an unkown state!".format(name))
return
# If a door changes within a second we treat it as flapping
flapping_timers.timer_check(event.itemName, 1000,
lambda: not_flapping(event.itemName, state, name, door_changed.log),
lambda: flapping(name, state, door_changed.log),
True)
@rule("Reset timers for OPEN doors", description="Reschedules the Timers for doors that are open on restart", tags=["entry"])
@when("System started")
def reschedule_door_timers(event):
for door in filter(lambda door: door.state == OPEN, ir.getItem("gDoorSensors").members):
mins = get_mins(door.name, reschedule_door_timers.log)
name = get_name(door.name)
reminder_timers.timer_check(door.name, int(mins*60*1000),
lambda: reminder_timer(door.name, name, mins, reschedule_door_timers.log),
reschedule=True)
These Rules actually have two sets of Timers, one to test for antiflapping and the other to set a reminder. To avoid collisions and more complicated code to distinguish between the flapping timers and the reminder timers for the same Item, we create two instances of the timer_mgr class (this is why it’s a class BTW and not a simple script). The reduction in code here is fairly significant and IMHO the code is easier to follow because it isn’t burdened by all the dict book keeping stuff.
So, at least for me, this refactoring is a win. I don’t necessarily share this to mean you should be using my timer_mgr class. My purpose in posting is to show that even pretty early on as you transition your Rules you will learn new things and discover new ways to accomplish the same thing. Don’t be afraid to go back and apply what you have learned to code you’ve already migrated.
In this Rule there are a couple of improvements I can make still. I’ll leave that to you to find.
If anyone does find this a useful library, there are some things I can do to improve it that I already have in mind (e.g. letting you pass a number with units so you can pass minutes, for example). Any other improvement ideas are welcome.
Previous: Journey to JSR223 Python 5 of 9
Next: Journey to JSR223 Python 7 of?