import ssl
import socket
import random
import hashlib
import base64
import re
import xmltodict
import sys
class NiceGate:
def __init__(self, host, target, source, username, descr, pwd):
self.host = host
self.target = target
self.source = source
self.username = username
self.descr = descr
self.pwd = pwd
# Client challenge, randomly generated
self.clientChallenge = '{:08x}'.format(random.randint(1, 9999999)).upper()
# Server challenge, send by server
self.serverChallenge = ""
self.commandSequence = 1
self.commandID = 0
self.sessionID = 1
self.debug = False
self.counterPing = 0
self.counterReconnect = 0
self.reconnectMessageID = 100000
self.connection = False
# Translate hex string to byte array
def hexStringToByteArray(self, hex):
return bytes.fromhex(hex)
# Get sha256
def mysha256(self, *args):
m = hashlib.sha256()
for a in args:
m.update(a)
return m.digest()
# Invert byte array
def invertArray(self, data):
return data[::-1]
# Generating command ID from seesion ID
def generateCommandID(self, sessionID):
i = self.commandSequence
self.commandSequence = i + 1
return ((i << 8) | (int(sessionID) & 255))
# Build sign for message
def buildSign(self, xmlCommand):
clientChallengeArr = self.hexStringToByteArray(self.clientChallenge)
serverChallengeArr = self.hexStringToByteArray(self.serverChallenge)
pairingPassword = base64.b64decode(self.pwd)
sessionPassword = self.mysha256(pairingPassword, self.invertArray(serverChallengeArr), self.invertArray(clientChallengeArr))
msgHash = self.mysha256(xmlCommand.encode())
sign = self.mysha256(msgHash, sessionPassword)
return '' + base64.b64encode(sign).decode("utf-8") + ''
# Check if sign needed
def isSignNeeded(self, commandType):
if commandType == 'VERIFY' or commandType == 'CONNECT' or commandType == 'PAIR':
return False
else:
return True
# Wrap message, protocol needed
def wrapMessage(self, xml):
if self.debug:
print (xml)
return ('\u0002' + xml + '\u0003').encode()
# Get all data from socket
def recvall(self):
BUFF_SIZE = 512
data = b''
while True:
part = self.serv.recv(BUFF_SIZE)
data += part
if len(part) < BUFF_SIZE:
# either 0 or end of data
break
return data
# Find Session ID in responce, SessionID used for MessageID generating
def findSessionID(self, msg):
match = re.search(r'Authentication\sid=[\'"]?([^\'" >]+)', msg)
if match:
self.sessionID = match.group(1)
# Find server challenge in responce, needed fo message signature
def findServerChallenge(self, msg):
match = re.search(r'sc=[\'"]?([^\'" >]+)', msg)
if match:
self.serverChallenge = match.group(1)
else:
print ('No server challenge found')
exit()
# Bild request
def buildMessage(self, commandType, body):
self.commandID = self.generateCommandID(self.sessionID)
startRequest = '\r\n'.format(self.commandID, self.source, self.target, commandType)
endRequest = '\r\n'
msg = self.wrapMessage(startRequest + body + (self.buildSign(startRequest + body) if self.isSignNeeded(commandType) else "") + endRequest)
self.serv.send(msg)
answer = self.recvall()
answer = answer.decode()
if self.debug:
print ('\r\n#####BEGIN####\r\n' + answer + '\r\n####END####\r\n')
self.findSessionID(answer)
return answer
# Connect to IT4WIFI
def connect(self, *args):
self.serv = ssl.wrap_socket(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
self.serv.connect((self.host, 443))
if args:
for ar in args:
if ar == 'pair':
pair = self.buildMessage('PAIR', ''.format(self.username, self.descr))
print (pair)
exit()
verify = self.buildMessage('VERIFY', ''.format(self.username))
if re.search(r'Authentication\sid=[\'"]?([^\'" >]+)', verify):
connect = self.buildMessage('CONNECT', ''.format(self.username, self.clientChallenge))
self.findServerChallenge(connect)
self.connection = True
else:
print ('No user found')
self.connection = False
exit()
# Get IT4WIFI status
def status(self):
statusDict = {}
status = self.buildMessage('STATUS', '')
match = re.search(r'', status)
if match.group(0):
statusDictRaw = xmltodict.parse(match.group(0))
statusDict['Name'] = statusDictRaw['Response']['Interface']['Settings']['Name']
statusDict['ID'] = statusDictRaw['Response']['Devices']['Device']['@id']
statusDict['Status'] = statusDictRaw['Response']['Devices']['Device']['Properties']['DoorStatus']
statusDict['Obstruct'] = statusDictRaw['Response']['Devices']['Device']['Properties']['Obstruct']
if self.debug:
print(statusDict)
return statusDict
# Open, close or stop gates
def change(self, command):
change = self.buildMessage('CHANGE', '\n{}\n'.format(command))
#self.event()
# Ping for prevent sokcet close
def check(self):
check = self.buildMessage('CHECK', ''.format(self.sessionID, self.username))
return check
def event(self):
#self.serv.send(self.wrapMessage('s'))
print ('EVENT\r\n\r\n')
answer = self.recvall()
answer = answer.decode()
print (answer)
# Disconnect from IT4WIFI
def disconnect(self):
self.connection = False
self.commandID = 0
self.commandSequence = 1
self.serv.close()
# Time to ping counter
def __time_to_ping(self):
self.counterPing += 1
if self.counterPing >= 30:
self.counterPing = 0
return True
return False
# Time to reconnect counter
def __time_to_reconnect(self):
self.counterReconnect += 1
if self.counterReconnect >= 5:
self.counterReconnect = 0
return True
return False
# Ping IT4WIFI for keep alive socket connection
def ping(self):
ping = self.check()
if re.search(r'Error', ping):
if self.debug:
print('Check error. Disconnecting from gates...')
self.disconnect()
return False
else:
match = re.search(r'id=[\'"]?([^\'" >]+)', ping)
messageID = int(match.group(1))
if messageID > self.reconnectMessageID:
if self.debug:
print('Reconnect. messageID: ' + match.group(1) + ' Disconnecting from gates...')
self.disconnect()
return True
# Main loop
def loop(self):
if not self.connection:
if self.__time_to_reconnect():
if self.debug:
print ('Connecting to gates....')
self.commandID = 0
self.commandSequence = 1
self.connect()
else:
if self.__time_to_ping():
self.ping()