Now it looks just the way it should!
To be honest, I have no idea how to diagnose a zigbee network. The program just visualizes the data received from the binding. ¯\_(ツ)_/¯
I installed your script and it works great. Thank you.
I had trouble with Python graphviz. A virtual env must be set for installing and using.
# Python install
python -m venv My_PyEnv
source My_PyEnv/bin/activate
pip install graphviz
pip install dot
# Linux install for dot
sudo apt install graphviz
Use source My_PyEnv/bin/activate before calling the Python script.
I just modified the mode to ‘circo’. It magically places the router nodes on a main circle, with all the connexion crossing between them and the battery things outside.
The inside of the circle is quite crowded with the connection lines, including very low LQI, and reading and understanding is a bit difficult.
So I excluded low LQI lines. Full modified code on second post below.
I use the ‘exec Binding’ to manually and periodically start the code:
UID: exec:command:089d4e880b
label: Cmd_4_Zigbee_Map
thingTypeUID: exec:command
configuration:
interval: 0
autorun: false
command: sudo /home/openhabian/My_Scripts/Zigbee_Map.sh
timeout: 15
#!/usr/bin/env python3
#import json # Only for debugging purposes
import re
import subprocess
import graphviz
from datetime import datetime
import os
output_filename = 'zigbee_map'
output_dir = '/etc/openhab/html/Zigbee_Map/'
output_format = 'svg' #png, svg, dot
#output_format = 'png' #png, svg, dot
output_node_shape = 'box' # box, ellipse, circle, diamond, house, hexagon, octagon
output_line_shape = 'curved' # none, line, polyline, curved, ortho, spline
output_line_shape = 'true' # none, line, polyline, curved, ortho, spline
output_engine = 'neato' # dot, fdp, neato, circo
output_engine = 'circo' # dot, fdp, neato, circo
pwd = "xxxxxxx" # -------------------- Change before usage! ----------------
zigbee_data = [];
cmd_basic = ["openhab-cli","console", "-p", pwd]
cmd_nodes = cmd_basic + ["zigbee", "nodes"]
cmd_neighbor = cmd_basic + ["zigbee", "neighbors"]
cmd_list_things = cmd_basic + ["things", "list"]
cmd_show_thing = cmd_basic + ["things", "show"]
# Zigbee properties
NETWORK_ADDR_SOURCE='network'
NETWORK_ADDR='network address'
MODEL = 'model'
IEEE_ADDR = 'ieee address'
LOGICAL_TYPE='logical type'
STATE = 'state'
LQI = 'lqi'
DEVICE_TYPE = 'device type'
RELATIONSHIP = 'relationship'
RELATIONSHIP_PARENT = 'parent'
RELATIONSHIP_SIBLING = 'sibling'
# Thing properties
LABEL = 'label'
STATUS = 'status'
ZB_NETWORK_ADDR = 'zigbee_networkaddress'
def export(filename, content):
file = open(filename, 'w')
file.write(content)
file.close()
return
def parse_nodes(raw_data):
result_data =[];
columns = []
i = 0
for line in raw_data.splitlines():
if not line[0:8] == ' ':
line = line.strip() # remove leading/trailing white spaces
line = re.sub(r'\s\s+',r'\t', line) # Replace multiple whitespaces by a tab as separator
if line:
if i == 0:
columns = re.sub(NETWORK_ADDR_SOURCE, NETWORK_ADDR, line.strip().lower()).split('\t')
if len(columns)>3:
i = i + 1
else:
d = {} # dictionary to store file data (each line)
data = [item.strip() for item in line.split('\t')]
for index, elem in enumerate(data):
d[columns[index]] = data[index]
result_data.append(d) # append dictionary to list
return result_data
def parse_neighbor(raw_data):
result_data = []
columns = [];
i = 0
for line in raw_data.splitlines():
line = line.strip() # remove leading/trailing white spaces
line = re.sub(r'\s\s+',r'\t', line) # Replace multiple whitespaces by a tab as separator
if i == 0:
columns = line.lower().split('\t')
if len(columns)>3:
i = i + 1
else:
d = {} # dictionary to store file data (each line)
data = [item.strip() for item in line.split('\t')]
for index, elem in enumerate(data):
d[columns[index]] = data[index]
result_data.append(d) # append dictionary to list
return result_data
def get_nodes_raw_data():
try:
proc = subprocess.run(cmd_nodes, capture_output=True)
nodes_raw_data = subprocess.check_output(cmd_nodes,stderr=subprocess.STDOUT).decode('utf-8')
print("Nodes:\n" + nodes_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return nodes_raw_data
def get_neighbor_raw_data(network_ID):
try:
neighbors_raw_data = subprocess.check_output(cmd_neighbor+[network_ID], stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/neighbor' + network_ID + '.txt',neighbors_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return neighbors_raw_data
def get_thing_raw_data(uid):
try:
thing_raw_data = subprocess.check_output(cmd_show_thing + [uid], stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/things.txt',thing_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return thing_raw_data
def get_thingUIDs_raw_data():
try:
thingUIDs_raw_data = subprocess.check_output(cmd_list_things, stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/things.txt',thingUIDs_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return thingUIDs_raw_data
def get_thingUIDs(raw_data):
thingUIDs = []
for line in raw_data.splitlines():
if line.startswith('zigbee:coordinator') or line.startswith('zigbee:device'):
thingUIDs.append(line.partition(' ')[0])
return thingUIDs
def get_thing_information(thingUIDs):
things = {}
for thingUID in thingUIDs:
print('Trying to get thing data for device: ' + thingUID)
raw_data = get_thing_raw_data(thingUID)
thing = {} # dictionary to store properties
for line in raw_data.splitlines():
elements = line.partition(':')
key = elements[0].strip().lower()
value = elements[2].strip()
if (not key == '') and not key in thing:
thing[key] = value
if not ZB_NETWORK_ADDR in thing:
thing[ZB_NETWORK_ADDR] = '0' # Must be the coordinator
things[thing[ZB_NETWORK_ADDR]] = thing
return things
#============================================================================================
# Main
#============================================================================================
print('Retrieving and parsing all Zigbee nodes..')
zigbee_data = parse_nodes(get_nodes_raw_data())
print('Retrieving all Things..')
thingUIDs = get_thingUIDs(get_thingUIDs_raw_data())
things = get_thing_information(thingUIDs)
# for debugging purposes only:
#print(json.dumps(things, indent=4))
#for debugging purposes only:
#print(json.dumps(zigbee_data, indent=4))
dot = graphviz.Digraph(comment='Zigbee-Map', name='ZigBee-Map', engine=output_engine)
dot.format = output_format
for device in zigbee_data:
network_ID = device[NETWORK_ADDR]
caption=''
if (MODEL in device):
caption = device[MODEL] + "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + device[DEVICE_TYPE] + "\n" + things[network_ID][LABEL]
else:
caption = "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + things[network_ID][LABEL]
boxcolor = 'darkgreen' if things[network_ID][STATUS] == 'ONLINE' else 'orange' if device[STATE]=='ONLINE' else 'red'
dot.node(network_ID, caption, color=boxcolor)
if (LOGICAL_TYPE in device) and ((device[LOGICAL_TYPE].lower()=="router") or (device[LOGICAL_TYPE].lower()=="coordinator")):
print("Trying to find neighbors for Zigbee device with network Address: " + network_ID)
neighbors = parse_neighbor(get_neighbor_raw_data(network_ID))
print ('Found: {} neighbors'.format(len(neighbors)))
device['Neighbors'] = neighbors
for neighbor in neighbors:
if (NETWORK_ADDR in neighbor):
if neighbor[RELATIONSHIP].lower() == RELATIONSHIP_PARENT:
thickness = '3'
elif neighbor[RELATIONSHIP].lower() == RELATIONSHIP_SIBLING:
thickness = '2'
else:
thickness = '1'
if int(neighbor[LQI]) < 20:
LQIcolor='transparent'
elif int(neighbor[LQI]) < 60:
LQIcolor='orange'
elif int(neighbor[LQI]) < 100:
LQIcolor='blue'
else:
LQIcolor='green'
dot.node(neighbor[NETWORK_ADDR], shape=output_node_shape)
# 2025-12-01 Black to Blue
#dot.edge(network_ID, neighbor[NETWORK_ADDR], neighbor[LQI], color='red' if int(neighbor[LQI]) < 20 else 'blue', penwidth=thickness)
if int(neighbor[LQI]) >= 20:
dot.edge(network_ID, neighbor[NETWORK_ADDR], neighbor[LQI],color=LQIcolor , penwidth=thickness)
dot.attr(label=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), root="0", splines=output_line_shape, overlap="scale")
dot.render(output_dir + output_filename)