Hey
i never had the drive to write an binding for it but i do have a python script with mqtt that pulls the name date and door and type of event and this has been running for a year already.
Just make sure you generate a token and change the mqtt settings and folder location for logs of this script.
import paho.mqtt.client as mqtt
import logging
import signal
import sys
import json
import websocket
import ssl
import time
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime
# Initialize logging with rotation every 5 days
log_file = '/etc/openhab/misc/unifi/unifi_api.log'
handler = TimedRotatingFileHandler(log_file, when="D", interval=5, backupCount=5)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger = logging.getLogger("system_logs")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# MQTT settings
BROKER = "127.0.0.1" # ip of broker
USERNAME = "openhabian" # user broker
PASSWORD = "mysecretpassword" # password broker
CLIENT_ID = "system_logs_api"
# WebSocket settings
WS_HOST = "wss://192.168.1.1:12445/api/v1/developer/devices/notifications"
TOKEN = "yourtoken" # here place token generated in the unifi acess interface
# Create an MQTT client
logger.info("Initializing MQTT client.")
client = mqtt.Client(CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT broker.")
else:
logger.error(f"Failed to connect to MQTT broker, Code: {rc}")
client.on_connect = on_connect
# Connect to MQTT broker and start loop
client.connect(BROKER)
client.loop_start()
logger.info("MQTT client loop started.")
from datetime import datetime
from datetime import datetime, timezone
def extract_door_unlock_event(logs):
if isinstance(logs, str) and (logs.strip() == '"Hello"' or not logs.startswith("{")):
return # Skip without logging
door_unlock_events = []
try:
if 'data' in logs and '_source' in logs['data'] and 'event' in logs['data']['_source']:
event_type = logs['data']['_source']['event'].get('type', '')
if event_type == 'access.door.unlock':
# Get the 'published' timestamp from the API in milliseconds and treat it as UTC
timestamp_ms = logs['data']['_source']['event'].get('published', 0)
if timestamp_ms:
# Convert the timestamp from milliseconds to seconds
timestamp_utc = datetime.utcfromtimestamp(timestamp_ms / 1000).replace(tzinfo=timezone.utc)
# Convert UTC to local time
timestamp_local = timestamp_utc.astimezone().strftime('%Y-%m-%d %H:%M:%S')
else:
timestamp_local = 'N/A'
event_data = {
'Timestamp': timestamp_local,
'Display_Name': logs['data']['_source']['actor'].get('display_name', 'N/A'),
'Result': logs['data']['_source']['event'].get('result', 'N/A'),
'Door': next((t['display_name'] for t in logs['data']['_source']['target'] if t['type'] == 'door'), 'N/A')
}
door_unlock_events.append(event_data)
else:
logger.info("Event type is not access.door.unlock.")
except Exception as e:
logger.error(f"Error extracting door unlock event: {e}")
return door_unlock_events
def publish_with_retry(topic, payload, retries=3):
attempt = 0
while attempt < retries:
result = client.publish(topic, payload)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
return True
else:
logger.error(f"Failed to publish to {topic}, attempt {attempt + 1}/{retries}, result code: {result.rc}")
attempt += 1
return False
def publish_all_with_retry(event, retries=3):
topics_payloads = [
("openhab/door_unlock/Timestamp", event['Timestamp']),
("openhab/door_unlock/Display_Name", event['Display_Name']),
("openhab/door_unlock/Door", event['Door']),
("openhab/door_unlock/Result", event['Result'])
]
results = []
for topic, payload in topics_payloads:
result = client.publish(topic, payload)
results.append((result, topic, payload))
# Now check results and retry if necessary
for attempt in range(retries):
failed_publishes = []
for result, topic, payload in results:
if result.rc != mqtt.MQTT_ERR_SUCCESS:
logger.error(f"Failed to publish to {topic}, attempt {attempt + 1}/{retries}, result code: {result.rc}")
failed_publishes.append((topic, payload))
# Retry only for failed messages
if not failed_publishes:
break
results = []
for topic, payload in failed_publishes:
result = client.publish(topic, payload)
results.append((result, topic, payload))
# Final check after all retries
for result, topic, payload in results:
if result.rc != mqtt.MQTT_ERR_SUCCESS:
logger.error(f"Failed to publish to {topic} after {retries} retries.")
def on_message(ws, message):
# Suppress messages like "Hello" completely without logging
if message == '"Hello"':
return
try:
# Attempt to parse the message as JSON
logs = json.loads(message)
# Check if the message is a valid JSON object (dict)
if not isinstance(logs, dict):
# If not valid JSON, skip further processing without logging an error
return
# Process only if the event is "access.logs.add"
if logs.get('event') == 'access.logs.add':
logger.info(f"Processing access.logs.add event: {message}")
door_unlock_events = extract_door_unlock_event(logs)
if door_unlock_events:
for event in door_unlock_events:
# Publish all fields together with retry mechanism
publish_all_with_retry(event)
logger.info(f"Published door unlock event: {event}")
else:
logger.info("No door unlock event found.")
else:
# Ignore all other events
return
except json.JSONDecodeError:
# Silently ignore messages that are not valid JSON without logging an error
pass
except Exception as e:
logger.error(f"Error processing message: {e}")
def on_error(ws, error):
logger.error(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg):
logger.info(f"WebSocket closed: {close_status_code}, {close_msg}")
def on_open(ws):
logger.info("WebSocket connection opened.")
import time
def start_websocket():
ws_url = WS_HOST
headers = {
"Authorization": f"Bearer {TOKEN}"
}
websocket.enableTrace(True)
while True: # Keep trying to reconnect in case of disconnection
try:
ws = websocket.WebSocketApp(
ws_url,
header=headers,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# Disable SSL verification
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
except Exception as e:
logger.error(f"Error while establishing WebSocket connection: {e}")
logger.info("WebSocket connection lost. Attempting to reconnect in 5 seconds...")
time.sleep(5) # Wait 5 seconds before attempting to reconnect
# Signal handling for graceful exit
def handle_exit(sig, frame):
logger.info("Exiting...")
client.disconnect()
client.loop_stop()
sys.exit(0)
# Register signal handler for clean exit
signal.signal(signal.SIGINT, handle_exit)
# Start WebSocket and listen for events (without manual loop)
start_websocket()
To make a service that starts automatically just
sudo nano /etc/systemd/system/unifi_log.service
[Unit]
Description=Unifi log MQTT Service
After=network.target
[Service]
ExecStart=/usr/bin/python3 /etc/openhab/misc/unifi/unifi_api.py
Restart=on-failure
User=openhab
Group=openhab
[Install]
WantedBy=multi-user.target
Then for openhab adapt this for the mqtt binding
Bridge mqtt:broker:system_logs_api "System Logs broker" [ host="127.0.0.1", secure=false, username="openhabian", password="mysecretpassword", clientID="openhab_system_logs" ] {
Thing topic system_logs "System Logs topic" {
Channels:
Type string : timestamp "Timestamp" [ stateTopic="openhab/door_unlock/Timestamp" ]
Type string : name "Name" [ stateTopic="openhab/door_unlock/Display_Name" ]
Type string : event "Door" [ stateTopic="openhab/door_unlock/Door" ]
Type string : result "Result" [ stateTopic="openhab/door_unlock/Result" ]
}
}
//unifi_logs
String UserActivity_Timestamp "Timestamp [%s]" {channel="mqtt:topic:system_logs_api:system_logs:timestamp"}
String UserActivity_Name "Name [%s]" {channel="mqtt:topic:system_logs_api:system_logs:name"}
String UserActivity_Event "Event [%s]" {channel="mqtt:topic:system_logs_api:system_logs:event"}
String UserActivity_Result "Result [%s]" {channel="mqtt:topic:system_logs_api:system_logs:result"}