Locking a shared queue (ArrayList) between rule

I am migrating my notification system to OpenHAB. It basically consist in queuing notifications in a String[] array (nt_queue) and processing the queue. I have doubt on handling the shared variable nt_queue between the 3 rules in charge of the notification system:

  1. ENQUEUING: when item SYS_Notification item receives a string command, the rule adds the received command to the nt_queue, gives it a UID and posts an update to the openhab bus to trigger the processing of the queue.

  2. PROCESSING: when item SYS_Notification has been updated, it triggers the processing of the first item of nt_queue and launches elementary notification tasks (OSD to Kodi, TTS in Alexa, push, email, lighting or whatever). This allows asynchroneous triggering but synchronisation in notification timing (for example between TTS and OSD)

  3. DEQUEUING: when elementary notification has ended (duration applies to TTS, light and OSD), item SYS_NotificationCompleted state is updated with its UID. This triggers removal of the processed notification from the nt_queue array and a postUpdate(SYS_Notification) to trigger again PROCESSING on the next item in the queue.

I have used reentrant lock to do so but this is not efficient for the processing which involves sendHttpGetRequest() timeout that will keep the process locked until timeout returns (see PROCESSING_V1). So I tried a PROCESSING_V2 to have the sendHttpGetRequest() outside the lock/unnlock block but this seems to exit without unlocking.

The 3 rules are in one file and here is the framework of the code handling the locking:

var queue_lock = new ReentrantLock
var List\<String\> nt_queue = newArrayList()

rule "ENQUEUE"   
when 
    Item SYS_Notification received command
then
    queue_lock.lock
    try{
    //here the code to add the received command to nt_queue
    }
    finally{ queue_lock.unlock() }
end

rule "DEQUEUE"   
when 
    Item SYS_NotificationCompleted received update
then
    queue_lock.lock
    try{
    //here the code to delete the notification from nt_queue
    }
    finally{ queue_lock.unlock() }
end

rule "PROCESS_V1"
when 
    Item SYS_Notification received update
then
    queue_lock.lock
    try{
    //here the code to read current notification to process (1st in the queue)
    //here the code to launch elementary notification task
    }
    finally{ queue_lock.unlock() }
end

rule "PROCESS_V2"
when 
    Item SYS_Notification received update
then
    queue_lock.lock
    try{
    //here the code to read current notification to process (1st in the queue)
    }
    finally{ queue_lock.unlock() }

    //here the code to launch elementary notification task
end

I am into Java but I read about sharedResource statement: would this be the right solution to lock properly nt_queue? Any other strategy?

Thank you in advance!

Locks are somewhat supported but dangerous to use in OH Rules. The biggest problem is some errors will bypass the finally clause meaning it is possible that the lock will never get unlocked. On the other hand, you have the problem that you only get 5 threads in which to run Rules. When you have locks and long running calls like to sendHttp*Request Actions you run the risk of running out.

First some questions:

  • How fast are these notifications being generated that this is even a problem?
  • How important is it that the notifications are guaranteed to be processed in order?

If the notifications are not occurring super fast then it is unlikely that the queue will ever have more than one notification in the queue in the first place so this looks really complicated.

My first recommendation would be to consider whether this approach is necessary.

If it is necessary then next I’d consider writing a small side service to implement this. Something you send the messages to and that code handles the locking and queueing.

The Rules DSL is simply never going to let you implement something like this in a safe manner. If your notifications are happening so fast that this is necessary then you will quickly consume all your Rules runtime threads and all your Rules will come to a halt because you will have 5 or more waiting for access to the lock.

If you still want to try, look into using one of the Collections that are made to implement this sort of thing like a ConcurrentLinkedQueue which implements the queueing and locking for you. But you will still run into the thread pool issue.

The notification system as is a core brick to me.
A) it collects and prioritizes notfications raised by the system:

  • preventing overlaps (for TTS or OSD)
  • synchronizing several notification channel (TTS / OSD/lighting)
  • interrupting an on-going notification to serve a more important one, then resume the one which was interrupted

B) it serves notifications at the right location (specific room or remote if home members are outside, based on presence tracking), to the right user or users, using the most appropriate communication channel (OSD, TTS, light, pushover, SMS, email or any other)

Locking only occurs while the queue is modified (enqueue / dequeue rules) or read (process rule). The processing triggers the different notification channels then exits so it does not need to maintain the lock during notification. This is why I tried to unlock right after (see rule Process_V2). But it does not seem to work.

If the try{} catch{} finally{} block does not exit properly when using lock, then I need a different approach. Will I have the same problem using JSR223? When you refer to a side-service, do you refer to a piece of binding or something external? Any template of this? Can I use the ConcurrentLinked queue directly from Rules DSL?

I was trying to get rid of the Python/Eventghost sub-system that handled the “intelligence” to move it all to OpenHAB because of its platform independency and Java core. I would like to avoid adding another piece of software for HA…

All the thread management synchronization and such is handled by the native language. So if you use JSR223 and Jython, you will be dealing with Jython problems instead of Rules DSL problems. So the try/catch/finally issues won’t be a problem but I guarantee there will be other problems. Every language has oddities.

I’m thinking more of a little service that runs along side OH that OH sends messages to and it handles all the queueing and locking and such externally to OH so you don’t risk tying up OH Rules threads.

See https://github.com/jpmens/mqttwarn and https://github.com/rkoshak/sensorReporter for a couple of examples in Python of what I’m talking about. The first one lets you send MQTT messages from OH and it forwards it to one or more of a couple score of notification services. The latter is a script I initially wrote to control the GPIOs on RPis not colocated with my OH server and to distribute BT presence sensors across the house. I and others have added additional capabilities over the years but the key is that all the processing takes place outside of OH. OH just sends/receives messages which don’t even require rules.

Then I strongly suggest using a Collection that implements the locking for you. They have both blocking and non-blocking queues. I recommend non-blocking.

You can use any standard Java class directly from the Rules DSL. You will probably have to import it though. You are using the Java List and ArrayList classes from Java above.