Flume - Water Monitoring

Tags: #<Tag:0x00007fc91251ae40>

Anyone have any experience with FLUME?

Seems they have an API which could make might make it easy to integrate with OH.

Squid :squid:

I have been using the first gen version of Flume for almost a year. Once calibrated it works great in detecting water usage and leaks using their mobile app. I have not built an openhab binding but have put together a quick python script that uses their api to collect the water usage every few minutes to store them in my own influxdb. Glad to share the script if interested.

Oha, I’ve been looking for something like this for some time. But in the FAQ they quoted that it’s for the US market only currently and sadly it seems that their cloud API don’t offer local access.

If anyone knows a similar product which is available in the EU and comes without a mandatory cloud-connection, please tell me. :slight_smile:

@basriram

Would love to see your python script would you mind sharing?

Squid :squid:

Not the prettiest code with lot of hardcoding but gets the job done, pulls the data every 15 min via a cronjob and inserts them into influxdb 2.0. You need to setup your clientid, clientsecret, username and password for flume api usage and obtain the userid, deviceid using postman or other means.

import time
import json
from datetime import date, timedelta, datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import math
from pytz import timezone
import requests
database = 'waterdb'
retention_policy = 'autogen'
bucket = f'{database}/{retention_policy}'
token = '<influxdb2.0 token>'
current_milli_time = lambda: int(round(time.time() * 1000))

class FlumeUsageScraper:
    API_url = 'https://api.flumetech.com/oauth/token'
    export_url = 'https://api.flumetech.com/users/<userid>/devices/<deviceid>/query'

    def pretty_print_POST(self, req):
        print('{}\n{}\n{}\n\n{}'.format(
            '-----------START-----------',
            req.method + ' ' + req.url,
            '\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
            req.body,
        ))

    def get_water_usage(self, mytime):
        headers = {
        'Accept' : 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding' : 'gzip, deflate, br',
        'Accept-Language':'en-US,en;q=0.9',
        'X-Requested-With': 'XMLHttpRequest',
        'User-Agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
        }

        data = {
         'grant_type':'password',
         'client_id':'<clientid>',
         'client_secret':'<clientsecret>',
         'username':'<username>',
         'password':'<password>'
         }

        querydata = {'queries':[{'bucket':'MIN','since_datetime':mytime,'request_id':'5705766de1914'}]}
        session = requests.session()
        initreq = requests.Request('POST', self.API_url, data=data, headers=headers)
        initprep = session.prepare_request(initreq)
        self.pretty_print_POST(initprep)
        response = session.send(initprep)
        mydata = response.json()['data']
        access_token = mydata[0]['access_token']
        headers['Authorization'] = 'Bearer ' + access_token
        headers['Content-Type'] = 'application/json'
        jsondata = json.dumps(querydata)
        exportreq = requests.Request('POST', self.export_url, data=jsondata, headers=headers )
        exportprep = session.prepare_request(exportreq)
        self.pretty_print_POST(exportprep)
        response = session.send(exportprep)
        return response.json()

    def run(self):
        lastfifteenmin = datetime.now() - timedelta(minutes = 16)
        print(lastfifteenmin.strftime('%Y-%m-%d %H:%M:%S'))
        data = self.get_water_usage(lastfifteenmin.strftime('%Y-%m-%d %H:%M:%S'))
        jsondata = data['data'][0]['5705766de1914']
        client = InfluxDBClient(url='http://influxdb.iot.lan:8086', token=f'{token}', org='waterdborg')
        write_api = client.write_api()
        for i in range(len(jsondata)):
            if (jsondata[i]['value'] > 0):
                dt = datetime.strptime(jsondata[i]['datetime'], '%Y-%m-%d %H:%M:%S')
                cdt = timezone('US/Central').localize(dt)
                ts = time.mktime(cdt.timetuple())
                print(jsondata[i]['datetime'])
                print(jsondata[i]['value'])
                point = Point("flumedata").field("value", float(jsondata[i]['value'])).time(int(ts),write_precision=WritePrecision.S)
                print(point.to_line_protocol())
                write_api.write(bucket=bucket, record=point)
            else:
                print('not writing 0 value for ts'+jsondata[i]['datetime'])
        write_api.__del__()

if __name__ == '__main__':
  scraper = FlumeUsageScraper()
  scraper.run()

Thanks for sharing this information. Please keep suggesting such kind of post.

MyCCPay Portal