Persistance Average Modification - Retrieve all persistet values since specified time for moving average

Hello,
I’m trying to calculate a moving average for a measurement value where my data-set has here and there some false values - coming directly from the measurements.It is not easy to distinguish the false values right away in my setup.

For data-smoothing I calculate a moving average with the persistence service average function - done in a rule, triggering the rule on every update of the real measurement.:

AverageValue = triggeringItem.averageSince(now.minusMinutes(30))

Then this value is persisted in the rule for the corresponding “average-Item”. However with this function I cannot rule out specific out-of-range values. This leads to wrong averaging. See example graph below. Red line is the original raw measurment item, green line the moving average (with 20min average in this example). The glitches down should be eliminated.

image

Two solution ideas:

  • I calculate my own average function in the rule:
    But for this I need to retrieve all historic values from now to x minutes in the past. Issue is that I do not know in advance how many historic state are persisted for the last x minutes timeframe as different sensors have different - also non-uniform - measurement intervals.
    How can I retrieve all historic Values from a persisted item in a specific timeframe (e.g. from now to x minutes in the past) and do some calculations with them in a rule - while not knowing in advance the item interval or the number of historic states to be expected?

  • Modify somehow the .averageSince() function with parameters to specify a value range which should be considered only for calculating the average.
    For the second optional unfortunately I have no idea how to start.

With the first option also more complicated function could be implemented (e.g. some kind of low-pass filter or weighted moving average functions).

Thanks for your ideas how this could be solved.

I’d turn persistence for these items off and explicitly call item.persist() in my rule only after I validated them so I only ever store valid values.

2 Likes

In the interests of Scientific Truth I’d persist both the raw and filtered data, in case I messed up or changed my mind later :wink:

But yes, it is a lot easier to filter data than alter someone else’s functions that process data.

Thanks for the hints that is potentially part of the solution

Drawback: this solution does still not allow to calculate own type of averaging function using historic values.

That’s not possible anyway at least not in a simple way. Well of course you can download all values one by one then apply whatever arithmetics, but why? averageSince() is the perfect function if you want a moving window average if all the values are validated. That’s no drawback but an efficient solution.

How do you know if a reading is valid or not? If you want to get rid of outliers you could use a median rather than the average, it is a valid stat and is not influenced by outliers. But I understand this could be not simple to calc.
If you are storing your data in influxdb you might look at writing a continuous query to calc the median.

Calculating this directly in the database is an option, however then we loose transparency to different persistence services.

How would it be possible to download all values up to a specific time (if I do not know the timestamps for each of the historic values)? It might not be that efficient, I agree, but still there are a lot of other smoothing methods available beside then simple averaging which then could be programmed individually depending on the needs.

This is beyond the capabilities of openHAB persistence service, which supports many “backends” and provides only a minimal common set of features.

Meaning, you’d need to script that externally.

That’s a question way beyond the original one so if you want to discuss please open a new thread.
And the answer highly depends on the use case, the sensor to exhibit and the nature of the reading so there’s no across-the-board answer like ‘median’ (although that’s a good one in many cases).

PersistenceViewer is just making calls to the REST API.

This works in Jython…

from core.actions import Exec
RESULT = Exec.executeCommandLine("/bin/sh@@-c@@/usr/bin/curl -s --connect-timeout 1 --max-time 10 -X GET --header \"Accept: application/json\" \"http://localhost:8080/rest/persistence/items/HEM1_Total_Energy?starttime=2019-11-14T05:55:55&endtime=2019-11-14T06:30\"", 10000)

Install the RESTDOCS UI and look under Persistence.

Thanks, this is a very good hint - I will have a detailed look and will try to build a solution out of that in a rule…

I just want to share here the procedure I followed finally to calculate two different type of Moving Average Methods:

  • Simple Moving average (average of the last x measurement readings)
  • Single Exponential Smoothing (At+1 = a Mt + (1 - a) Ft

The method can be defined via a parameter in the rule ( AveragingMethod ) - but could be also of course an item Parameter.

Item Definition:
Each Item which should be averaged should be
a) in the Group gMovingAverage
b) have a second corresponding item with the same name + postfix ("_MovingAverage")
which must tbe in the group “gAveragedMembers”

Rule:

   
    > rule "SprMovingAverageCalculation"
    > when
    > 	Member of gMovingAverage  changed
    > then
    > 	// example item which is to be updated with this rule:
    > 	// myItemName_MovingAverage
    > 
    > 	var Number AverageValue
    > 	var DateTime StartTime
    > 	var Number AveragingPeriod_Minutes
    > 	var Number ExponentialSmoothingFactor_Alpha
    > 	//var Item itemAverage
    > 	
    > 	// calculate the average
    > 	if( triggeringItem.state !== null )
    > 	{
    > 		// --------- define the averaging method
    > 		//var AveragingMethod = "MovingAverage"
    > 		var AveragingMethod = "SingleExponentialSmoothing"
    > 
    > 		// define all necessary parameters  - these could eventually also provided by separate parameter items (if they need to be dynamic changeable)
    > 		AveragingPeriod_Minutes = 30  // this is useful only for the Method "MovingAverage"
    > 		ExponentialSmoothingFactor_Alpha = 0.3
    > 		val Number local_AllowedDistortionPercent = 0.10
    > 
    > 		
    > 		// -----------------------------------------
    > 		// ------------------ Standard Averaging MEthod from persistance service 
    > 		// -----------------------------------------
    > 		AverageValue = triggeringItem.averageSince(now.minusMinutes(AveragingPeriod_Minutes))
    > 
    > 		val SearchItemName = ((triggeringItem.name) as String) + "_MovingAverage"
    > 		//  find the item containging the moving average
    > 		val itemAverage  = gAveragedMembers.members.findFirst[ name.equals( SearchItemName )]
    > 
    > 		// -----------------------------------------
    > 		// ------------------ Alternative Averaging Method using REST API  and skipping values out of range
    > 		// -----------------------------------------
    > 		StartTime = now.minusMinutes(AveragingPeriod_Minutes)
    > 
    > 		// retrieve the stored datapoints of the triggeringItem from persistance service via the REST Interface
    > 		val local_httpString = "http://localhost:8080/rest/persistence/items/" + ((triggeringItem.name) as String) + "?starttime=" + StartTime.toString("yyyy-MM-dd'T'HH:mm:ss")
    > 		val RESULT_json = sendHttpGetRequest(local_httpString,1000)
   
    > 
    > 		// get the number of datapoints from the json structure
    > 		var Number local_numdatapoints = transform("JSONPATH", "$.datapoints", RESULT_json)
    > 
    > 		// retrieve the persisted values also for the averaging item -> this is required to allow an exponential smoothing algorithm
    > 		// Note: actually we do not need to retrieve all datapoints from the same StartTime but we keep it for simplicity
    > 		val local_httpString_itemAverage = "http://localhost:8080/rest/persistence/items/" + ((itemAverage.name) as String) + "?starttime=" + StartTime.toString("yyyy-MM-dd'T'HH:mm:ss")
    > 		val RESULT_json_itemAverage = sendHttpGetRequest(local_httpString_itemAverage,1000)
    > 
    > 		// get the number of datapoints
    > 		var Number local_numdatapoints_itemAverage = transform("JSONPATH", "$.datapoints", RESULT_json_itemAverage)
    > 
    > 		// Note: extracting an array of data is not supported with JSONPATH - thus next line is invalid
    > 		//--- multiple JSON PATH results from an arry seems not supported by JSONPATH transformation - thus we use javascript transformation function
    > 		//var local_datapoints = transform("JSONPATH", "$.data", RESULT_json)
    > 		
    > 		// build the parameter string for the Javascript function. 
    > 		// unfortunately multiple arguments to Javascript functions is not supported in OpenHAB - Thus we concatenate the parameters in a single
    > 		// String parameter seperated by a | character
    > 		// Parameters are
    > 		// 		json_string_input     - json structure (as string) containing all data points for the measurement item
    > 		//		AllowedDistortionPercent   - Percent of allowed difference between the average and the actual measurement to take the actual 
    > 		//									measurement value into account for averaging
    > 		//		Average_non_clean			- the average value as calculated from simple persistance service
    > 		//		json_string_input_averaged   - json structure (as string) containing all data points for the averaged-item
    > 		//		AveragingMethod 
    > 		// 			Averaging Method can be one out of the following parameters:
    > 		// 			AveragingMethod = "MovingAverage"
    > 		// 			AveragingMethod = "SingleExponentialSmoothing"
    > 		//		ExponentialSmoothingFactor_Alpha  -- defines the exponential smoothing factor alpha 
    > 
    > 		var String local_paramstring = RESULT_json.concat("|").concat(String::valueOf(local_AllowedDistortionPercent)).concat("|").concat(String::valueOf(AverageValue)).concat("|").concat(RESULT_json_itemAverage).concat("|").concat(AveragingMethod).concat("|").concat(String::valueOf(ExponentialSmoothingFactor_Alpha))
    > 		logInfo("SprinklerRule:SprMovingAverageCalculation", "ParameterString for javascript transform of Item: " + triggeringItem.name + "= " + local_paramstring )
    > 		// invoke the Javascript Function - where the actual calculation is performed
    > 		var String  local_CleanAverageValue_string = transform("JS", "average_within_range.js", local_paramstring)
    > 		var local_CleanAverageValue = Float::parseFloat(local_CleanAverageValue_string) as Number
    > 		
    > 		// ------------- write result to the itemAverage
    > 		//   using standard average
    > 		//itemAverage.postUpdate( AverageValue )
    > 
    > 		// using local clean average
    > 		itemAverage.postUpdate( local_CleanAverageValue )
    > 	}
    > 	else
    > 	{
    > 		logInfo("SprinklerRule:SprMovingAverageCalculation", "Item: " + triggeringItem.name + " has state NULL: No average calculated ")
    > 	}
    > end  // end of rule SprMovingAverageCalculation

And the correspnding Javascript file (to be located in the …/transform directory ):

(function(inputParam) 
{

    // Input Parameters needs to be passed in single parameter, seperated by a | character
    //json_string_input | AllowedDistortionPercent | Average_non_clean | json_string_input_averaged | AveragingMethod | ExponentialSmoothingFactor_Alpha
    // Averaging Method can be one out of the following parameters:
    // AveragingMethod = "MovingAverage"
    // AveragingMethod = "SingleExponentialSmoothing"


    var inputArray = inputParam.split("|"); 
    var json_string_input = inputArray[0];
    var AllowedDistortionPercent = Number(inputArray[1]);
    var Average_non_clean = Number(inputArray[2]);
    var json_string_input_averaged = inputArray[3];
    var AveragingMethod = inputArray[4];
    var ExponentialSmoothingFactor_Alpha = Number(inputArray[5]);

    if(AveragingMethod!="MovingAverage" && AveragingMethod!="SingleExponentialSmoothing")
    {
        return ("Error in javascript: AveragingMethod not recognized");
    };

    //
    //  This is an exaple of an exptected json_string_input string
    // {
    //     "name": "the_item_name",
    //     "datapoints": "2",
    //     "data": [
    //       {
    //         "time": 1574888891000,
    //         "state": "551.35"
    //       },
    //       {
    //         "time": 1574889191000,
    //         "state": "-1.0"
    //       }
    //     ]
    //   }



    var json_object = JSON.parse(json_string_input);
    var json_object_average = JSON.parse(json_string_input_averaged);

    var datapoints = json_object.datapoints;
    var datapoints_average = json_object_average.datapoints;
    var Arithmetic_sum_valid_datapoints = 0.0;
    var valid_datapoints = 0;
    var local_CleanAverageValue = 0.0;
    var difference = 0.0;

    // ---------------------------------------------------
    // --------------- Moving Average Method
    // ---------------------------------------------------

    if(AveragingMethod=="MovingAverage")
    {
            
        
        for(i = 0, l = Number(json_object.data.length); i < l; i++) 
        {
            // `i` will take on the values `0`, `1`, `2`,..., i.e. in each iteration
            // we can access the next element in the array with `data.items[i]`, example:
            // 
            // var obj = data.items[i];
            // 
            // Since each element is an object (in our example),
            // we can now access the objects properties with `obj.id` and `obj.name`. 
            // We could also use `data.items[i].id`.
            
         
            if( Number(json_object.data[i].state) > 0)
            {
                difference = Math.abs(Number(json_object.data[i].state) - Average_non_clean);
                if(((Number(json_object.data[i].state) - Average_non_clean)/Average_non_clean) < AllowedDistortionPercent)
                {
                    // if current datapoint is within valid distortion range then we use this datapoint for the averaging calculation
                    Arithmetic_sum_valid_datapoints = Arithmetic_sum_valid_datapoints + Number(json_object.data[i].state);
                    valid_datapoints = valid_datapoints + 1;
                };
            };
            
        };

        if(valid_datapoints > 0)
        {
            local_CleanAverageValue = Math.round(Arithmetic_sum_valid_datapoints / valid_datapoints*1000)/1000;
        }
        else
        {
            local_CleanAverageValue = 0.0;
        };

    } // End of moving average method
    // ---------------------------------------------------
    // --------------- Single Exponential Smoothing Method
    // ---------------------------------------------------
    else if(AveragingMethod=="SingleExponentialSmoothing")
    {
        if(Number(json_object.data.length)>=1 && Number(json_object_average.data.length)>=1)
        {
            // the last entry in the array is the latest measurement
            i = Number(json_object.data.length)-1;
            j = Number(json_object_average.data.length)-1
            if( Number(json_object.data[i].state) > 0)
            {
                
                difference = Math.round(Math.abs(Number(json_object.data[i].state) - Number(json_object_average.data[j].state))*1000)/1000;
                if(((Number(json_object.data[i].state) - Average_non_clean)/Average_non_clean) < AllowedDistortionPercent)
                {
                    // there is a valid datapoint available - otherwise we use the same value as the last averaging value
                    // counter i should represent the time t for the new measuerment data but at the same time it should represent time t-1 for the
                    // averaging value series as the averaging value series should have always one datapoint less (as we compute this here in this function)
                    local_CleanAverageValue = Math.round((Number(json_object.data[i].state) * ExponentialSmoothingFactor_Alpha + Number(json_object_average.data[j].state) * (1.0 - ExponentialSmoothingFactor_Alpha))*1000)/1000
                }
                else
                {
                    // use the previous value again if the new measurement value seems to be "far" out of range
                    local_CleanAverageValue = Number(json_object_average.data[j].state)
                };
            }
            else
            {
                // use the previous value again if the new measurement value is invalid
                local_CleanAverageValue = Number(json_object_average.data[j].state)
            }
            
        };
       

    } // end of SingleExponentialSmoothing Method
    return (local_CleanAverageValue);
    
})(input)

It is quite a quick programming style and can for sure be made more efficient but I think it shows the concept.

2 Likes

Thanks a lot for sharing your solution.
Has to make some changes in Rule, to make it work on OH 4.0.4
Here it is.

// example item which is to be updated with this rule:
// myItemName_MovingAverage

val formatter = java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")
  
var DecimalType AverageValue
var ZonedDateTime StartTime
var long AveragingPeriod_Minutes
var double ExponentialSmoothingFactor_Alpha
//var Item itemAverage

// calculate the average
if (triggeringItem.state !== null) {
    // --------- define the averaging method
    //var AveragingMethod = "MovingAverage"
    var AveragingMethod = "SingleExponentialSmoothing"
    
    // define all necessary parameters  - these could eventually also provided by separate parameter items (if they need to be dynamic changeable)
    AveragingPeriod_Minutes = 30  // this is useful only for the Method "MovingAverage"
    ExponentialSmoothingFactor_Alpha = 0.2
    val Number local_AllowedDistortionPercent = 0.10


    // -----------------------------------------
    // ------------------ Standard Averaging MEthod from persistance service 
    // -----------------------------------------
    AverageValue = triggeringItem.averageSince(ZonedDateTime.now.minusMinutes(30))

    val SearchItemName = ((triggeringItem.name) as String) + "_MovingAverage"
    //  find the item containging the moving average
    val itemAverage = gAveragedMembers.members.findFirst[name.equals(SearchItemName)]

    // -----------------------------------------
    // ------------------ Alternative Averaging Method using REST API  and skipping values out of range
    // -----------------------------------------
    //StartTime = ZonedDateTime.now.minusMinutes(AveragingPeriod_Minutes)
    
    StartTime = ZonedDateTime.now.minusMinutes(AveragingPeriod_Minutes)
    
    logInfo("StartTime", "Item: " + StartTime)
    
    // retrieve the stored datapoints of the triggeringItem from persistance service via the REST Interface
    val local_httpString = "http://localhost:8080/rest/persistence/items/" + ((triggeringItem.name) as String) + "?starttime=" + StartTime.format(formatter) //.toString("yyyy-MM-dd'T'HH:mm:ss")
    val RESULT_json = sendHttpGetRequest(local_httpString, 1000)

    // get the number of datapoints from the json structure
    var Number local_numdatapoints = Integer::parseInt(transform("JSONPATH", "$.datapoints", RESULT_json))
    logInfo("SprinklerRule:SprMovingAverageCalculation", "local_numdatapoints: " + local_numdatapoints)

    // retrieve the persisted values also for the averaging item -> this is required to allow an exponential smoothing algorithm
    // Note: actually we do not need to retrieve all datapoints from the same StartTime but we keep it for simplicity
    val local_httpString_itemAverage = "http://localhost:8080/rest/persistence/items/" + ((itemAverage.name) as String) + "?starttime=" + StartTime.format(formatter) //.toString("yyyy-MM-dd'T'HH:mm:ss")
    val RESULT_json_itemAverage = sendHttpGetRequest(local_httpString_itemAverage, 1000)

    // get the number of datapoints
    var Number local_numdatapoints_itemAverage = Integer::parseInt(transform("JSONPATH", "$.datapoints", RESULT_json_itemAverage))
    logInfo("SprinklerRule:SprMovingAverageCalculation", "local_numdatapoints_itemAverage: " + local_numdatapoints_itemAverage)

    // Note: extracting an array of data is not supported with JSONPATH - thus next line is invalid
    //--- multiple JSON PATH results from an arry seems not supported by JSONPATH transformation - thus we use javascript transformation function
    //var local_datapoints = transform("JSONPATH", "$.data", RESULT_json)

    // build the parameter string for the Javascript function. 
    // unfortunately multiple arguments to Javascript functions is not supported in OpenHAB - Thus we concatenate the parameters in a single
    // String parameter seperated by a | character
    // Parameters are
    // 		json_string_input     - json structure (as string) containing all data points for the measurement item
    //		AllowedDistortionPercent   - Percent of allowed difference between the average and the actual measurement to take the actual 
    //									measurement value into account for averaging
    //		Average_non_clean			- the average value as calculated from simple persistance service
    //		json_string_input_averaged   - json structure (as string) containing all data points for the averaged-item
    //		AveragingMethod 
    // 			Averaging Method can be one out of the following parameters:
    // 			AveragingMethod = "MovingAverage"
    // 			AveragingMethod = "SingleExponentialSmoothing"
    //		ExponentialSmoothingFactor_Alpha  -- defines the exponential smoothing factor alpha 

    var String local_paramstring = RESULT_json.concat("|").concat(String:: valueOf(local_AllowedDistortionPercent)).concat("|").concat(String:: valueOf(AverageValue)).concat("|").concat(RESULT_json_itemAverage).concat("|").concat(AveragingMethod).concat("|").concat(String:: valueOf(ExponentialSmoothingFactor_Alpha))
    //logInfo("SprinklerRule:SprMovingAverageCalculation", "ParameterString for javascript transform of Item: " + triggeringItem.name + "= " + local_paramstring)
    // invoke the Javascript Function - where the actual calculation is performed
    var String local_CleanAverageValue_string = transform("JS", "config:js:MovingAverage", local_paramstring)
    
    logInfo("SprinklerRule:SprMovingAverageCalculation", "local_CleanAverageValue_string of Item: " + triggeringItem.name + "= " + local_CleanAverageValue_string)
    
    var local_CleanAverageValue = Float::parseFloat(local_CleanAverageValue_string) as Number

    // ------------- write result to the itemAverage
    //   using standard average
    //itemAverage.postUpdate( AverageValue )

    // using local clean average
    itemAverage.postUpdate(local_CleanAverageValue)
}
else {
    logInfo("SprinklerRule:SprMovingAverageCalculation", "Item: " + triggeringItem.name + " has state NULL: No average calculated ")
}