import ssl import socket import random import hashlib import base64 import re import xmltodict import sys class NiceGate: def __init__(self, host, target, source, username, descr, pwd): self.host = host self.target = target self.source = source self.username = username self.descr = descr self.pwd = pwd # Client challenge, randomly generated self.clientChallenge = '{:08x}'.format(random.randint(1, 9999999)).upper() # Server challenge, send by server self.serverChallenge = "" self.commandSequence = 1 self.commandID = 0 self.sessionID = 1 self.debug = False self.counterPing = 0 self.counterReconnect = 0 self.reconnectMessageID = 100000 self.connection = False # Translate hex string to byte array def hexStringToByteArray(self, hex): return bytes.fromhex(hex) # Get sha256 def mysha256(self, *args): m = hashlib.sha256() for a in args: m.update(a) return m.digest() # Invert byte array def invertArray(self, data): return data[::-1] # Generating command ID from seesion ID def generateCommandID(self, sessionID): i = self.commandSequence self.commandSequence = i + 1 return ((i << 8) | (int(sessionID) & 255)) # Build sign for message def buildSign(self, xmlCommand): clientChallengeArr = self.hexStringToByteArray(self.clientChallenge) serverChallengeArr = self.hexStringToByteArray(self.serverChallenge) pairingPassword = base64.b64decode(self.pwd) sessionPassword = self.mysha256(pairingPassword, self.invertArray(serverChallengeArr), self.invertArray(clientChallengeArr)) msgHash = self.mysha256(xmlCommand.encode()) sign = self.mysha256(msgHash, sessionPassword) return '' + base64.b64encode(sign).decode("utf-8") + '' # Check if sign needed def isSignNeeded(self, commandType): if commandType == 'VERIFY' or commandType == 'CONNECT' or commandType == 'PAIR': return False else: return True # Wrap message, protocol needed def wrapMessage(self, xml): if self.debug: print (xml) return ('\u0002' + xml + '\u0003').encode() # Get all data from socket def recvall(self): BUFF_SIZE = 512 data = b'' while True: part = self.serv.recv(BUFF_SIZE) data += part if len(part) < BUFF_SIZE: # either 0 or end of data break return data # Find Session ID in responce, SessionID used for MessageID generating def findSessionID(self, msg): match = re.search(r'Authentication\sid=[\'"]?([^\'" >]+)', msg) if match: self.sessionID = match.group(1) # Find server challenge in responce, needed fo message signature def findServerChallenge(self, msg): match = re.search(r'sc=[\'"]?([^\'" >]+)', msg) if match: self.serverChallenge = match.group(1) else: print ('No server challenge found') exit() # Bild request def buildMessage(self, commandType, body): self.commandID = self.generateCommandID(self.sessionID) startRequest = '\r\n'.format(self.commandID, self.source, self.target, commandType) endRequest = '\r\n' msg = self.wrapMessage(startRequest + body + (self.buildSign(startRequest + body) if self.isSignNeeded(commandType) else "") + endRequest) self.serv.send(msg) answer = self.recvall() answer = answer.decode() if self.debug: print ('\r\n#####BEGIN####\r\n' + answer + '\r\n####END####\r\n') self.findSessionID(answer) return answer # Connect to IT4WIFI def connect(self, *args): self.serv = ssl.wrap_socket(socket.socket(socket.AF_INET,socket.SOCK_STREAM)) self.serv.connect((self.host, 443)) if args: for ar in args: if ar == 'pair': pair = self.buildMessage('PAIR', ''.format(self.username, self.descr)) print (pair) exit() verify = self.buildMessage('VERIFY', ''.format(self.username)) if re.search(r'Authentication\sid=[\'"]?([^\'" >]+)', verify): connect = self.buildMessage('CONNECT', ''.format(self.username, self.clientChallenge)) self.findServerChallenge(connect) self.connection = True else: print ('No user found') self.connection = False exit() # Get IT4WIFI status def status(self): statusDict = {} status = self.buildMessage('STATUS', '') match = re.search(r'', status) if match.group(0): statusDictRaw = xmltodict.parse(match.group(0)) statusDict['Name'] = statusDictRaw['Response']['Interface']['Settings']['Name'] statusDict['ID'] = statusDictRaw['Response']['Devices']['Device']['@id'] statusDict['Status'] = statusDictRaw['Response']['Devices']['Device']['Properties']['DoorStatus'] statusDict['Obstruct'] = statusDictRaw['Response']['Devices']['Device']['Properties']['Obstruct'] if self.debug: print(statusDict) return statusDict # Open, close or stop gates def change(self, command): change = self.buildMessage('CHANGE', '\n{}\n'.format(command)) #self.event() # Ping for prevent sokcet close def check(self): check = self.buildMessage('CHECK', ''.format(self.sessionID, self.username)) return check def event(self): #self.serv.send(self.wrapMessage('s')) print ('EVENT\r\n\r\n') answer = self.recvall() answer = answer.decode() print (answer) # Disconnect from IT4WIFI def disconnect(self): self.connection = False self.commandID = 0 self.commandSequence = 1 self.serv.close() # Time to ping counter def __time_to_ping(self): self.counterPing += 1 if self.counterPing >= 30: self.counterPing = 0 return True return False # Time to reconnect counter def __time_to_reconnect(self): self.counterReconnect += 1 if self.counterReconnect >= 5: self.counterReconnect = 0 return True return False # Ping IT4WIFI for keep alive socket connection def ping(self): ping = self.check() if re.search(r'Error', ping): if self.debug: print('Check error. Disconnecting from gates...') self.disconnect() return False else: match = re.search(r'id=[\'"]?([^\'" >]+)', ping) messageID = int(match.group(1)) if messageID > self.reconnectMessageID: if self.debug: print('Reconnect. messageID: ' + match.group(1) + ' Disconnecting from gates...') self.disconnect() return True # Main loop def loop(self): if not self.connection: if self.__time_to_reconnect(): if self.debug: print ('Connecting to gates....') self.commandID = 0 self.commandSequence = 1 self.connect() else: if self.__time_to_ping(): self.ping()