Based on this excellent post Washing Machine State Machine that I used in DSL, I finished my transition to Python and searched a generic approach that led me to use “transition” library.
Here’s where I’m now landed :
from transitions import Machine,State
from core.log import logging, LOG_PREFIX, log_traceback
from core.rules import rule
from core.triggers import when
from threading import Timer
from personal.notification import notification
class MyMachine(Machine):
def __init__(self, name, powersource, nicename, thresholds):
self.thresholds = thresholds
self.eqptname = name
self.powersource = powersource
self.nicename = nicename
Machine.__init__(self, name=name, states=['OFF','STANDBY','ACTIVE','FINISHED'], initial='OFF', after_state_change='enter_state')
self.add_transition('consume', 'OFF', 'STANDBY', conditions='is_hidle')
self.add_transition('consume', 'ACTIVE', 'FINISHED', conditions='is_hidle')
self.add_transition('consume', '*', 'ACTIVE', conditions='is_running')
self.add_transition('consume', '*', 'OFF', conditions='is_stopped')
def is_stopped(self, power):
return self.state !='OFF' and power < self.thresholds[0]
def is_hidle(self, power):
return power >= self.thresholds[0] and power < self.thresholds[1]
def is_running(self, power):
return self.state!='ACTIVE' and power >= self.thresholds[2]
def enter_state(self, power):
notification(u"{} {}".format(self.nicename, self.state))
events.postUpdate(self.eqptname + "_opstate",self.state)
if self.state == "FINISHED":
Timer(300,applianceTimerScript,[self]).start()
def applianceTimerScript(*args):
machine = args[0]
if machine.state == "FINISHED":
notification(u"Cycle {} fini depuis 5mn, je coupe le courant".format(machine.nicename))
events.sendCommand(machine.powersource,"OFF")
machine_array = [
MyMachine("lavelinge","prise5_energy", u"Lave-Linge",[1.2,2,5]),
MyMachine("sechelinge","prise6_energy", u"Sèche-Linge",[1.4,2,10])
]
def machine_trigger_generator(machine_array):
def generated_triggers(function):
for machine in machine_array:
when("Item {} changed".format(machine.powersource))(function)
return function
return generated_triggers
@rule("FSM : equipement consumes")
@machine_trigger_generator(machine_array)
def state_machine_lave_linge(event):
declencheur = str(event.itemName)
for fsm in machine_array:
if fsm.powersource == declencheur:
puissance = event.itemState.floatValue()
fsm.consume(puissance)
This seems to work quite well. I’m not sure to be using the Transitions at its full power, but it’s a start.