Hi Folks,
based on the scripts and ideas of this thread, I’ve created a completely new script that is not affected by the problems described here (like update cycle, etc.). My approach also gets rid of the problem with some coordinators that are not showing the neighbor data.
My new script does not use the REST Api, but instead uses the built-in zigbee commands in the openhab console. The script is written in python and only needs standard plugins that should be available everywhere.
Cons: As the script accesses the console several times for 1 complete run, it takes some seconds to complete, depending on the number of nodes in the network. In my case it takes about 20s.
This the output, that comes out on my zigbee network (Engine “neato”):

Engine “dot”:

Here is the script source:
#!/usr/bin/env python3
#import json # Only for debugging purposes
import re
import subprocess
import graphviz
from datetime import datetime
import os
output_filename = 'zigbee_map'
output_dir = '/etc/openhab/html/'
output_format = 'svg' #png, svg, dot
output_node_shape = 'box' # box, ellipse, circle, diamond, house, hexagon, octagon
output_line_shape = 'curved' # none, line, polyline, curved, ortho, spline
output_engine = 'neato' # dot, fdp, neato, circo
pwd = "Enter your console password here" # -------------------- Change before usage! ----------------
zigbee_data = [];
cmd_basic = ["openhab-cli","console", "-p", pwd]
cmd_nodes = cmd_basic + ["zigbee", "nodes"]
cmd_neighbor = cmd_basic + ["zigbee", "neighbors"]
cmd_list_things = cmd_basic + ["things", "list"]
cmd_show_thing = cmd_basic + ["things", "show"]
# Zigbee properties
NETWORK_ADDR_SOURCE='network'
NETWORK_ADDR='network address'
MODEL = 'model'
IEEE_ADDR = 'ieee address'
LOGICAL_TYPE='logical type'
STATE = 'state'
LQI = 'lqi'
DEVICE_TYPE = 'device type'
RELATIONSHIP = 'relationship'
RELATIONSHIP_PARENT = 'parent'
RELATIONSHIP_SIBLING = 'sibling'
# Thing properties
LABEL = 'label'
STATUS = 'status'
ZB_NETWORK_ADDR = 'zigbee_networkaddress'
def export(filename, content):
file = open(filename, 'w')
file.write(content)
file.close()
return
def parse_nodes(raw_data):
result_data =[];
columns = []
i = 0
for line in raw_data.splitlines():
if not line[0:8] == ' ':
line = line.strip() # remove leading/trailing white spaces
line = re.sub(r'\s\s+',r'\t', line) # Replace multiple whitespaces by a tab as separator
if line:
if i == 0:
columns = re.sub(NETWORK_ADDR_SOURCE, NETWORK_ADDR, line.strip().lower()).split('\t')
if len(columns)>3:
i = i + 1
else:
d = {} # dictionary to store file data (each line)
data = [item.strip() for item in line.split('\t')]
for index, elem in enumerate(data):
d[columns[index]] = data[index]
result_data.append(d) # append dictionary to list
return result_data
def parse_neighbor(raw_data):
result_data = []
columns = [];
i = 0
for line in raw_data.splitlines():
line = line.strip() # remove leading/trailing white spaces
line = re.sub(r'\s\s+',r'\t', line) # Replace multiple whitespaces by a tab as separator
if i == 0:
columns = line.lower().split('\t')
if len(columns)>3:
i = i + 1
else:
d = {} # dictionary to store file data (each line)
data = [item.strip() for item in line.split('\t')]
for index, elem in enumerate(data):
d[columns[index]] = data[index]
result_data.append(d) # append dictionary to list
return result_data
def get_nodes_raw_data():
try:
proc = subprocess.run(cmd_nodes, capture_output=True)
nodes_raw_data = subprocess.check_output(cmd_nodes,stderr=subprocess.STDOUT).decode('utf-8')
print("Nodes:\n" + nodes_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return nodes_raw_data
def get_neighbor_raw_data(network_ID):
try:
neighbors_raw_data = subprocess.check_output(cmd_neighbor+[network_ID], stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/neighbor' + network_ID + '.txt',neighbors_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return neighbors_raw_data
def get_thing_raw_data(uid):
try:
thing_raw_data = subprocess.check_output(cmd_show_thing + [uid], stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/things.txt',thing_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return thing_raw_data
def get_thingUIDs_raw_data():
try:
thingUIDs_raw_data = subprocess.check_output(cmd_list_things, stderr=subprocess.STDOUT).decode('utf-8')
#export('tmp/things.txt',thingUIDs_raw_data)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
return thingUIDs_raw_data
def get_thingUIDs(raw_data):
thingUIDs = []
for line in raw_data.splitlines():
if line.startswith('zigbee:coordinator') or line.startswith('zigbee:device'):
thingUIDs.append(line.partition(' ')[0])
return thingUIDs
def get_thing_information(thingUIDs):
things = {}
for thingUID in thingUIDs:
print('Trying to get thing data for device: ' + thingUID)
raw_data = get_thing_raw_data(thingUID)
thing = {} # dictionary to store properties
for line in raw_data.splitlines():
elements = line.partition(':')
key = elements[0].strip().lower()
value = elements[2].strip()
if (not key == '') and not key in thing:
thing[key] = value
if not ZB_NETWORK_ADDR in thing:
thing[ZB_NETWORK_ADDR] = '0' # Must be the coordinator
things[thing[ZB_NETWORK_ADDR]] = thing
return things
#============================================================================================
# Main
#============================================================================================
print('Retrieving and parsing all Zigbee nodes..')
zigbee_data = parse_nodes(get_nodes_raw_data())
print('Retrieving all Things..')
thingUIDs = get_thingUIDs(get_thingUIDs_raw_data())
things = get_thing_information(thingUIDs)
# for debugging purposes only:
#print(json.dumps(things, indent=4))
#for debugging purposes only:
#print(json.dumps(zigbee_data, indent=4))
dot = graphviz.Digraph(comment='Zigbee-Map', name='ZigBee-Map', engine=output_engine)
dot.format = output_format
for device in zigbee_data:
network_ID = device[NETWORK_ADDR]
caption=''
if (MODEL in device):
caption = device[MODEL] + "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + device[DEVICE_TYPE] + "\n" + things[network_ID][LABEL]
else:
caption = "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + things[network_ID][LABEL]
boxcolor = 'darkgreen' if things[network_ID][STATUS] == 'ONLINE' else 'orange' if device[STATE]=='ONLINE' else 'red'
dot.node(network_ID, caption, color=boxcolor)
if (LOGICAL_TYPE in device) and ((device[LOGICAL_TYPE].lower()=="router") or (device[LOGICAL_TYPE].lower()=="coordinator")):
print("Trying to find neighbors for Zigbee device with network Address: " + network_ID)
neighbors = parse_neighbor(get_neighbor_raw_data(network_ID))
print ('Found: {} neighbors'.format(len(neighbors)))
device['Neighbors'] = neighbors
for neighbor in neighbors:
if (NETWORK_ADDR in neighbor):
if neighbor[RELATIONSHIP].lower() == RELATIONSHIP_PARENT:
thickness = '2'
elif neighbor[RELATIONSHIP].lower() == RELATIONSHIP_SIBLING:
thickness = '1'
else:
thickness = '0.5'
dot.node(neighbor[NETWORK_ADDR], shape=output_node_shape)
dot.edge(network_ID, neighbor[NETWORK_ADDR], neighbor[LQI], color='red' if int(neighbor[LQI]) < 20 else 'black', penwidth=thickness)
dot.attr(label=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), root="0", splines=output_line_shape, overlap="scale")
dot.render(output_dir + output_filename)
I schedule the script via the executer addon in openhab.
Feedback would be nice, maybe we can improve it even more.
Have fun!