I have been working away on Habapp, and I’ve made some routines to emulate some OH2 functionality. Specifically
- sendMail
- pushNotification
- say
- CreateTimer
The say implementation is a bit specific, it sends text to one (or more) chromecast device(s), using google TTS service, and assumes you have a web server you can write the TTS files to, to serve the chromecast devices. You could serve them from a built in web server, but I have one on my OH2 server anyway, OH2 does as well.
Here is what I have:
SendMail (only tested with gmail)
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.application import MIMEApplication
from email import encoders
mail_from = 'me@gmail.com'
mail_to = ['someone@gmail.com']
class MailServer(HABApp.Rule):
def __init__(self, server, port, login, password):
super().__init__()
self.log = logging.getLogger('MyRule.'+self.__class__.__name__)
self.__runtime = self._Rule__runtime
self.serveraddr = server
self.server_port = port
self.login = login
self.password = password
self.loop = self.__runtime.loop
self.q = asyncio.Queue(loop=self.loop)
self.run_soon(self.Push)
def send(self, subject, message, mail_from, mail_to, attachment):
self.run_soon(self.add_to_queue, subject, message, mail_from, mail_to, attachment)
async def add_to_queue(self, subject, message, mail_from, mail_to, attachment):
await self.q.put((subject, message, mail_from, mail_to, attachment))
async def Push(self):
while True:
subject, message, mail_from, mail_to, attachment = await self.q.get()
await self.sendmail(subject, message, mail_from, mail_to, attachment)
async def sendmail(self, subject='No Subject', message = '', mail_from=None, mail_to=[], attachment=None):
if not mail_to or (not message and not attachment):
return
try:
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = mail_from
msg['To'] = ",".join(mail_to)
msg.attach(MIMEText(message))
msg = await self.attach(msg, attachment)
self.send_message(msg)
except Exception as e:
self.log.exception(e)
async def attach(self, msg, attachment):
if attachment:
image = None
if os.path.exists(attachment):
resp = open(attachment, "rb")
image = resp.read()
else:
if attachment.startswith('http'):
async with self.async_http.get(attachment) as resp:
if resp.status == 200:
image = await resp.read()
if image:
p = MIMEImage(image, Name=os.path.basename(attachment))
p.add_header('Content-Disposition', "attachment; filename= %s" % os.path.basename(attachment))
msg.attach(p)
return msg
def send_message(self, msg):
with smtplib.SMTP( self.serveraddr, self.server_port ) as server:
server.starttls()
server.login( self.login, self.password )
self.log.info('Mail message: %s sent' % msg['subject'])
server.send_message( msg )
ms = MailServer('smtp.gmail.com', 587, 'your-email-here@gmail.com', 'your-paswword')
def sendMail(subject = 'No Subject', message='', attachment=None):
ms.send(subject, message, mail_from, mail_to, attachment)
pushNotification (needs pushno library, and a prowl api key)
from pushno import PushNotification
class prowl(HABApp.Rule):
def __init__(self, apikey=None):
super().__init__()
self.log = logging.getLogger('MyRule.'+self.__class__.__name__)
self.__runtime = self._Rule__runtime
self.apikey = apikey
if self.apikey is None:
self.apikey = params.prowl_api.value
self.pn = PushNotification("prowl", api_key=self.apikey, application="HABApp")
self.loop = self.__runtime.loop
self.q = asyncio.Queue(loop=self.loop)
self.run_soon(self.Push)
def send(self, header, message):
self.run_soon(self.add_to_queue, header, message)
async def add_to_queue(self, header, message):
await self.q.put((header, message))
async def Push(self):
while True:
header, message = await self.q.get()
self.log.info('message: %s: %s sent' % (header, message))
self.pn.send(event=header, description=message)
pn = prowl()
def pushNotification(header, message):
pn.send(header, message)
say (needs pychromecast library and google.cloud TTS api key)
import os, re
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '/etc/openhab2/HABApp/GoogleTTSAPIKey.json'
import chromecast
from google.cloud import texttospeech
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except:
IP = '127.0.0.1'
finally:
s.close()
return IP
class GoogleHome(HABApp.Rule):
"""
Create a Google home (an host or a devicename is mandatory)
:param devicename: string : the ip or device name of the Google Home
:param host: the host of google home
:param port: the port to contact google home by ip (default is 8009)
:param ip - ip of your local http server
"""
file_lock = threading.Lock()
delete_lock = threading.Lock()
q = {}
def __init__(self, devicename = None, host = None, port = None, ip = None):
super().__init__()
self.log = logging.getLogger('MyRule.'+self.__class__.__name__)
self.__runtime = self._Rule__runtime
if devicename != None:
chromecasts = pychromecast.get_chromecasts()
filteredChromeCast = filter(lambda c: c.host == devicename or c.device.friendly_name == devicename , chromecasts)
try:
self.cc = next(filteredChromeCast)
except StopIteration:
availbale_devices = list(map(lambda c: c.device.friendly_name, chromecasts))
raise ValueError('Unable to found %s device. Available devices : %s'%(devicename, availbale_devices))
elif host != None:
self.cc = pychromecast.Chromecast(host, port)
else:
raise ValueError('host or devicename is mandatory to create a GoogleHome object.')
#self.log.info('Got Chromecast device')
# get local ip, for http server address
if ip is None:
self.ip = get_ip()
else:
self.ip = ip
self.name = self.cc.device.friendly_name
#add Queue to class variable self.q for synchronizing multiple GoogleHome instances
self.loop = self.__runtime.loop
q = asyncio.Queue(loop=self.loop)
self.q[self.name] = q
self.file_name_max = 50
self.basedir = "/var/www/html/"
self.cachedir = "/mp3_cache/"
self.dir = os.path.normpath(self.basedir+self.cachedir)
self.file_name = 'output.mp3'
self.file_path = os.path.join(self.dir,self.file_name)
self.delete_output()
self.run_soon(self.saying)
def say(self, text):
self.run_soon(self.async_say, text)
async def async_say(self, text):
await self.q[self.name].put(text)
if not self.delete_lock.locked():
with self.delete_lock:
for q in self.q.values():
await q.join()
self.delete_output()
async def saying(self, lang = 'en-GB'):
while True:
text = await self.q[self.name].get()
with self.file_lock:
ttsurl = await self.googleTTS_builder(text, lang)
self.log.info('%s: say: %s' % (self.name, text))
await self.play(ttsurl+'?'+str(random.randint(0,1000))) #caching issue workaround
async def play(self, url, contenttype = 'audio/mp3'):
self.cc.wait()
mc = self.cc.media_controller
mc.play_media(url, contenttype)
mc.block_until_active()
while mc.status.player_is_idle:
#self.log.info("%s: waiting for text to start" % self.name)
await asyncio.sleep(0.1)
#self.log.debug(mc.status)
#mc.play()
while mc.status.player_is_playing:
#self.log.info("%s: waiting for text to end" % self.name)
await asyncio.sleep(0.1)
#mc.stop()
self.log.info("%s: played url %s duration: %ss" % (self.name, url, mc.status.duration))
#self.cc.quit_app()
self.q[self.name].task_done()
def delete_output(self):
with self.file_lock:
self.delete_old_files(30)
if os.path.exists(self.file_path):
self.log.debug('deleted: %s' % self.file_path)
os.remove(self.file_path)
def delete_old_files(self, days = 30):
now = time.time()
for f in os.listdir(self.dir):
file_path = os.path.join(self.dir,f)
if os.path.getmtime(file_path) < now - (days * 86400):
if os.path.isfile(file_path):
os.remove(file_path)
self.log.debug('deleted old TTS file: %s' % file_path)
def filename_from_text(self, text):
if len(text) > self.file_name_max:
return self.file_path, self.file_name
file = re.sub('[^0-9a-zA-Z]', '_', text) + '.mp3'
return os.path.join(self.dir, file), file
def get_url(self, text):
file_path, file = self.filename_from_text(text)
if os.path.exists(file_path):
os.utime(file_path) #update file access and modified time
self.log.debug('TTS file found: %s' % (file))
return "http://"+self.ip+os.path.join(self.cachedir,file)
return None
async def googleTTS_builder(self, text, lang = "en-GB"):
url = self.get_url(text)
if url is not None:
self.log.debug('found existing url: ...%s' % url[len(url)-20:])
return url
self.log.debug('Generating TTS file for %s...' % text[:30])
# Instantiates a client
client = texttospeech.TextToSpeechClient()
# Set the text input to be synthesized
synthesis_input = texttospeech.types.SynthesisInput(text=text)
# Build the voice request, select the language code ("en-US") and the ssml
# voice gender ("neutral")
voice = texttospeech.types.VoiceSelectionParams(
language_code=lang,
#language_code='en-GB',
name='en-GB-Standard-C', #A and C = Female, B and D are male
#name="en-GB-Wavenet-A", #more expensive version of above
ssml_gender=texttospeech.enums.SsmlVoiceGender.FEMALE)
# Select the type of audio file you want returned
audio_config = texttospeech.types.AudioConfig(
audio_encoding=texttospeech.enums.AudioEncoding.MP3,
pitch=0.0, sample_rate_hertz=16000, speaking_rate=0.8, volume_gain_db=0.0)
# Perform the text-to-speech request on the text input with the selected
# voice parameters and audio file type
response = client.synthesize_speech(synthesis_input, voice, audio_config)
try:
os.mkdir(self.dir)
except:
pass
file_path, file = self.filename_from_text(text)
# The response's audio_content is binary.
with open(file_path, 'wb') as out:
# Write the response to the output file.
out.write(response.audio_content)
self.log.debug('TTS file %s written' % file)
#return serve_file(response.audio_content, "audio/mpeg")
return "http://"+self.ip+os.path.join(self.cachedir,file)
# Define your chromecasts here
chromecast={'Google Home Upstairs':{
'ip': 192.168.100.201,
'port': 8009},
'Google Mini 1':{
' ip': 192.168.100.80,
'port': 8009}
}
http_server = 'your web server ip here'
ccs=[]
for name, cc in chromecast.items():
log.debug('loaded chromecast_params: %s: host=%s port=%s http_server:%s' % (name, cc['ip'], cc['port'], http_server))
ccs.append(GoogleHome(host=cc['ip'], port=cc['port'], ip=http_server))
def say(text=''):
for cc in ccs:
cc.say(text)
CreateTimer:
from threading import Timer
import time
class createTimer(HABApp.Rule):
'''
General timer class using threading.Timer
'''
def __init__(self, seconds=-1, exec_func=None, *args, **kwargs):
super().__init__()
self.log = logging.getLogger('MyRule.'+self.__class__.__name__)
self.t = seconds
self.exec_func = exec_func
self.start_time = time.time()
run = self.kwargs.pop('run', False)
self.args = args
self.kwargs = kwargs
self.timer = Timer(self.t, self.function_to_run)
if self.t >= 0 and run:
self.start()
def function_to_run(self):
if self.exec_func is not None:
self.exec_func(*self.args, **self.kwargs)
@property
def is_running(self):
return self.timer.is_alive()
@property
def time_till_run(self):
if self.is_running:
return max(0, self.t - (time.time() - self.start_time))
return 0
def reschedule(self, seconds=None):
self.start(seconds)
def start(self, seconds=None, *args, **kwargs):
if seconds is not None:
self.t = seconds
self.timer.cancel()
if self.t >= 0:
if not args:
args = self.args
else:
self.args = args
if not kwargs:
kwargs = self.kwargs
else:
self.kwargs = kwargs
self.start_time = time.time()
self.timer = Timer(self.t, self.function_to_run)
self.timer.daemon = True
self.timer.start()
def cancel(self):
self.timer.cancel()
CreateTimer is used like this:
class PatioDoor(HABApp.Rule):
def __init__(self):
super().__init__()
self.log = logging.getLogger('MyRule.'+self.__class__.__name__)
self.online_t = 15
self.offline_t = 60
self.t = self.online_t
if self.item['patio_door_online'].value == "Offline":
self.t = self.offline_t # time to decide door has not closed properly
self.closed_timer = createTimer(self.t, self.door_not_closed, run=False)
You can then call self.closed_timer.start()
, self.closed_timer.cancel()
and so on. start()
cancels the timer and restarts it, which is useful. This is a thread based timer, I am thinking of making an asyncio
based timer, but I’m not sure if it’s worth the effort.
These are just examples, you can make of them what you will