Queue initialisation

I want to implement a dynamic queue which gets filled if a sendTelegram call fails.
I’m testing every minute if the queue has elements to resend them.
I tried several different types of initialisation but I get always an error message while adding an item to the queue.
Rule 'Stromausfall': cannot invoke method public abstract boolean java.util.Queue.add(java.lang.Object) on null

How should I initialise it in the right way?


import java.util.Queue
import java.util.LinkedList

val Queue<String> ListeBot = new LinkedList<String>()
val Queue<String> ListeMessage = new LinkedList<String>()
val Queue<DateTime> ListeTime = new LinkedList<DateTime>()

val Procedures.Procedure2<String, String> fnSendTelegram = [
												Bot,
												Message
|
	val success = sendTelegram(Bot, Message)

	if (!success)
	{
		ListeMessage.add(Message)
		ListeBot.add(Bot)
		ListeTime.add(now())
	}
]


rule "ResendAllTelegrams"
when
	Time cron "17 0/1 * * * ?"
then
	if (ListeTime !== null)  // noch nicht initialisiert
	{
		var MessageTime = ListeTime.peek
		while (MessageTime !== null)
		{
			val Message = ListeMessage.peek + "\r\nErster Versuch: " + MessageTime.toString
			val success = sendTelegram(ListeBot.peek, Message)
			if (success)
			{
				ListeTime.remove
				ListeMessage.remove
				ListeBot.remove
				MessageTime = ListeTime.peek
			}
			else
				MessageTime = null
		}
	}
end

rule "Stromausfall"
when
	Item str_Sys_StromWeg changed
	or
	Item str_Sys_StromDa changed
then

	if (swi_Sys_StartUp.state == OFF)
	{
		// Unix-Zeit
		val StromWegUNIX = Long::parseLong(str_Sys_StromWeg.state.toString)
		val StromDaUNIX = Long::parseLong(str_Sys_StromDa.state.toString)

		// Datum & Uhrzeit
//		val StromWeg = new DateTime(StromWegUNIX * 1000)
//		val StromDa = new DateTime(StromDaUNIX * 1000)

		// Dauer des Stromausfalls
		val Dauer = (StromDaUNIX - StromWegUNIX)

		val DauerString = fnSekundenZuString.apply(Dauer)

		if (Dauer > 0)
			fnSendTelegram.apply("HausWartung", "Stromausfall beendet!\r\nDauer " + DauerString)
		else
			fnSendTelegram.apply("HausWartung", "Stromausfall!")
	}
end

This seems overly complex to me.

But besides that, your error is you have to pass your Queues to the Procedure as arguments. Global vals and vars cannot see each other.

Another problem is this approach has a high probability of causing your Rules to come to a halt. There are only five Rules execution threads available to you. If for some reason telegram goes down for more than five minutes then you will have five copies of “ResendAllTelegrams” running at the same time and no other rules will be able to run until telegram comes back and one of those five rules finishes sending the missed telegrams and exits.

A third problem is the ResendAllTelegrams will furiously try to resend the messages as fast as it possibly can (times 5). I suspect eventually Telegram will ban your IP or your API key for spamming the system with requests.

I would use Timers instead of a Queue to resend the messages:

import java.util.Map

val Map<String, Timer> timers = createHashMap

val Procedures.Procedure3<String, String, Map<String, Timer>> fnSendTelegram = [ Bot, Message, timers |
    val success = sendTelegram(Bot, Message)
    if(success) return;

    val key = now.toString
    timers.put(key, createTimer(now.plusMinutes(1), [ |
        val success = sendTelegram(Bot, Message + "\r\nErster Versuch: " + key)
        if(!success) timers.get(key).reschedule(now.plusMinutes(1))
        else timers.put(key, null)
    ])
]

rule "Clear missed messages"
when
    // some Item received a command
then
    timers.values.cancel
    timers.clear
end

In the above approach, on a failure, each message gets its own timer that retries sending that message every minute. I added a new rule to clear out the missed messages should you ever need to.

Another approach can use a single Timer in place of the Rule you currently have to avoid the running out of execution threads problem.

import java.util.List

val List<List<String>> missedMessages = new ArrayList<List<String>>()

val Procedures.Proicedure3<String, String, List<List<String>> fnSendTelegram = [ Bot, Message, missed |
    val success = sendTelegram(Bot, Message)
    if(success) return;

    val List<String> msg = createArrayList
    msg.add(Message)
    msg.add(Bot)
    msg.add(now.toString)

    // timer is running if missed has elements
    if(missed.size > 0) missed.add(msg)
    
    // create the timer to catch up with the missed elements
    else {
        createTimer(now.plusMinutes(1), [ |
            while(missed.size > 0){
                missed.forEach[ List<String> msg |
                    val suc = sendTelegram(msg.get(0), msg.get(1) + "\r\nErster Versuch: " + msg.get(2))
                    if(suc) missed.remove(msg)
                    Thread::sleep(1000) // avoid spamming Telegram service
                ]            
            }
            Thread::sleep(60000)
        ])
    }
]

rule "Clear missed messages"
when
    // some Item received a command
then
    missedMessages.clear
end

The code isn’t quite as clean as the separate Timers for each message but this has the advantage that the messages will be resent in their original order (more or less, if the telegram service comes back online in the middle of executing the forEach loop then the messages will be sent out of order). The way it works is if the message fails it looks to see if there have already been failed messages. If there have it adds this message to the list. If not then it creates a Timer that tries to send the messages again every minute.

I personally like the first approach much better but you might want to preserve the order which would require the second approach.

If you absolutely must have the messages sent in order it gets a little more complex:

import java.util.List

val List<List<String>> missedMessages = new ArrayList<List<String>>()

val Procedures.Proicedure3<String, String, List<List<String>> fnSendTelegram = [ Bot, Message, missed |
    val success = sendTelegram(Bot, Message)
    if(success) return;

    val List<String> msg = createArrayList
    msg.add(Message)
    msg.add(Bot)
    msg.add(now.toString)

    // timer is running if missed has elements
    if(missed.size > 0) missed.add(msg)
    
    // create the timer to catch up with the missed elements
    else {
        createTimer(now.plusMinutes(1), [ |
            while(missed.size > 0){
                var cont = true
                while(missed.size > 0 && continue){
                    val msg = missed.size.get(0)
                    cont = sendTelegram(msg.get(0), msg.get(1) + "\r\nErster Versuch: " + msg.get(2))
                    if(cont) {
                        missed.remove(0)
                        Thread::sleep(1000)
                    }
                }
            }
            Thread::sleep(60000)
        ])
    }
]

rule "Clear missed messages"
when
    // some Item received a command
then
    missedMessages.clear
end

In the above, we immediately stop trying to send messages in the Timer when one message fails and wait another minute before trying again. This will avoid trying to send subsequent messages we know will fail and it lets us preserve the original order of the messages.

NOTE: All of these approaches are just typed in. There are likely typos.

2 Likes

First of all thanks a lot for digging in the problem and writing down your thoughts about it. I appreciate your efforts for this community.

You solved my immediate problem with the error message and pointed me to the problem with the possibility of spamming the service. With these hints my solution works (at least for one queued message).

Next, I need to understand how your different concepts are working in detail. One big advantage, I can see right now, is that I can get rid of the cron scheduled rule.