ZigBee network map when using only the ZigBee Binding

Now it looks just the way it should!
To be honest, I have no idea how to diagnose a zigbee network. The program just visualizes the data received from the binding. ¯\_(ツ)_/¯

I installed your script and it works great. Thank you.

I had trouble with Python graphviz. A virtual env must be set for installing and using.

# Python install

python -m venv My_PyEnv

source My_PyEnv/bin/activate

pip install graphviz

pip install dot

# Linux install for dot

sudo apt install graphviz

Use source My_PyEnv/bin/activate before calling the Python script.

I just modified the mode to ‘circo’. It magically places the router nodes on a main circle, with all the connexion crossing between them and the battery things outside.

The inside of the circle is quite crowded with the connection lines, including very low LQI, and reading and understanding is a bit difficult.

So I excluded low LQI lines. Full modified code on second post below.

I use the ‘exec Binding’ to manually and periodically start the code:

UID: exec:command:089d4e880b
label: Cmd_4_Zigbee_Map
thingTypeUID: exec:command
configuration:
  interval: 0
  autorun: false
  command: sudo /home/openhabian/My_Scripts/Zigbee_Map.sh
  timeout: 15
#!/usr/bin/env python3

#import json # Only for debugging purposes
import re
import subprocess
import graphviz
from datetime import datetime
import os

output_filename = 'zigbee_map'
output_dir = '/etc/openhab/html/Zigbee_Map/'
output_format = 'svg' #png, svg, dot
#output_format = 'png' #png, svg, dot
output_node_shape = 'box' # box, ellipse, circle, diamond, house, hexagon, octagon
output_line_shape = 'curved' # none, line, polyline, curved, ortho, spline
output_line_shape = 'true' # none, line, polyline, curved, ortho, spline

output_engine = 'neato' # dot, fdp, neato, circo
output_engine = 'circo' # dot, fdp, neato, circo

pwd = "xxxxxxx"   # -------------------- Change before usage! ----------------

zigbee_data = [];
cmd_basic = ["openhab-cli","console", "-p", pwd]
cmd_nodes = cmd_basic + ["zigbee", "nodes"]
cmd_neighbor = cmd_basic + ["zigbee", "neighbors"]
cmd_list_things = cmd_basic + ["things", "list"]
cmd_show_thing = cmd_basic + ["things", "show"]

# Zigbee properties
NETWORK_ADDR_SOURCE='network'
NETWORK_ADDR='network address'
MODEL = 'model'
IEEE_ADDR = 'ieee address'
LOGICAL_TYPE='logical type'
STATE = 'state'
LQI = 'lqi'
DEVICE_TYPE = 'device type'
RELATIONSHIP = 'relationship'
RELATIONSHIP_PARENT = 'parent'
RELATIONSHIP_SIBLING = 'sibling'

# Thing properties
LABEL = 'label'
STATUS = 'status'
ZB_NETWORK_ADDR = 'zigbee_networkaddress'

def export(filename, content):
    file = open(filename, 'w')
    file.write(content)
    file.close()
    return

def parse_nodes(raw_data):
    result_data =[];
    columns = []
    i = 0
    for line in raw_data.splitlines():
        if not line[0:8] == '        ':
            line = line.strip()                     # remove leading/trailing white spaces
            line = re.sub(r'\s\s+',r'\t', line)     # Replace multiple whitespaces by a tab as separator
            if line:
                if i == 0:
                    columns = re.sub(NETWORK_ADDR_SOURCE, NETWORK_ADDR, line.strip().lower()).split('\t')
                    if len(columns)>3:
                        i = i + 1
                else:
                    d = {}                          # dictionary to store file data (each line)
                    data = [item.strip() for item in line.split('\t')]
                    for index, elem in enumerate(data):
                        d[columns[index]] = data[index]
                    result_data.append(d)           # append dictionary to list
    return result_data

def parse_neighbor(raw_data):
    result_data = []
    columns = [];
    i = 0
    for line in raw_data.splitlines():
        line = line.strip()                     # remove leading/trailing white spaces
        line = re.sub(r'\s\s+',r'\t', line)     # Replace multiple whitespaces by a tab as separator
        if i == 0:
            columns = line.lower().split('\t')
            if len(columns)>3:
                i = i + 1
        else:
            d = {}                              # dictionary to store file data (each line)
            data = [item.strip() for item in line.split('\t')]
            for index, elem in enumerate(data):
                d[columns[index]] = data[index]
            result_data.append(d)               # append dictionary to list
    return result_data

def get_nodes_raw_data():
    try:
        proc = subprocess.run(cmd_nodes, capture_output=True)
        nodes_raw_data = subprocess.check_output(cmd_nodes,stderr=subprocess.STDOUT).decode('utf-8')
        print("Nodes:\n" + nodes_raw_data)
    except subprocess.CalledProcessError as e:
        raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
    return nodes_raw_data

def get_neighbor_raw_data(network_ID):
    try:
        neighbors_raw_data = subprocess.check_output(cmd_neighbor+[network_ID], stderr=subprocess.STDOUT).decode('utf-8')
        #export('tmp/neighbor' + network_ID + '.txt',neighbors_raw_data)
    except subprocess.CalledProcessError as e:
        raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
    return neighbors_raw_data

def get_thing_raw_data(uid):
    try:
        thing_raw_data = subprocess.check_output(cmd_show_thing + [uid], stderr=subprocess.STDOUT).decode('utf-8')
        #export('tmp/things.txt',thing_raw_data)
    except subprocess.CalledProcessError as e:
        raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
    return thing_raw_data

def get_thingUIDs_raw_data():
    try:
        thingUIDs_raw_data = subprocess.check_output(cmd_list_things, stderr=subprocess.STDOUT).decode('utf-8')
        #export('tmp/things.txt',thingUIDs_raw_data)
    except subprocess.CalledProcessError as e:
        raise RuntimeError("command '{}' return with error (code {}): {}\nerror: {}".format(e.cmd, e.returncode, e.output, e.stderr))
    return thingUIDs_raw_data

def get_thingUIDs(raw_data):
    thingUIDs = []
    for line in raw_data.splitlines():
        if line.startswith('zigbee:coordinator') or line.startswith('zigbee:device'):
            thingUIDs.append(line.partition(' ')[0])
    return thingUIDs

def get_thing_information(thingUIDs):
    things = {}
    for thingUID in thingUIDs:
        print('Trying to get thing data for device: ' + thingUID)
        raw_data = get_thing_raw_data(thingUID)
        thing = {} # dictionary to store properties
        for line in raw_data.splitlines():
            elements = line.partition(':')
            key = elements[0].strip().lower()
            value = elements[2].strip()
            if (not key == '') and not key in thing:
                thing[key] = value
        if not ZB_NETWORK_ADDR in thing:
            thing[ZB_NETWORK_ADDR] = '0'    # Must be the coordinator
        things[thing[ZB_NETWORK_ADDR]] = thing
    return things

#============================================================================================
#                   Main
#============================================================================================
print('Retrieving and parsing all Zigbee nodes..')
zigbee_data = parse_nodes(get_nodes_raw_data())
print('Retrieving all Things..')
thingUIDs = get_thingUIDs(get_thingUIDs_raw_data())
things = get_thing_information(thingUIDs)
# for debugging purposes only:
#print(json.dumps(things, indent=4))

#for debugging purposes only:
#print(json.dumps(zigbee_data, indent=4))

dot = graphviz.Digraph(comment='Zigbee-Map', name='ZigBee-Map', engine=output_engine)
dot.format = output_format

for device in zigbee_data:
    network_ID = device[NETWORK_ADDR]
    caption=''
    if (MODEL in device):
        caption = device[MODEL] + "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + device[DEVICE_TYPE] + "\n" + things[network_ID][LABEL]
    else:
        caption = "\n" + network_ID + "\n" + device[IEEE_ADDR] + "\n" + things[network_ID][LABEL]
    boxcolor = 'darkgreen' if things[network_ID][STATUS] == 'ONLINE' else 'orange' if device[STATE]=='ONLINE' else 'red'
    dot.node(network_ID, caption, color=boxcolor)
    if (LOGICAL_TYPE in device) and ((device[LOGICAL_TYPE].lower()=="router") or (device[LOGICAL_TYPE].lower()=="coordinator")):
        print("Trying to find neighbors for Zigbee device with network Address: " + network_ID)
        neighbors = parse_neighbor(get_neighbor_raw_data(network_ID))
        print ('Found: {} neighbors'.format(len(neighbors)))
        device['Neighbors'] = neighbors

        for neighbor in neighbors:
            if (NETWORK_ADDR in neighbor):
                if neighbor[RELATIONSHIP].lower() == RELATIONSHIP_PARENT:
                    thickness = '3'
                elif neighbor[RELATIONSHIP].lower() == RELATIONSHIP_SIBLING:
                    thickness = '2'
                else:
                    thickness = '1'

                if int(neighbor[LQI]) < 20:
                    LQIcolor='transparent'
                elif  int(neighbor[LQI]) < 60:
                    LQIcolor='orange'
                elif  int(neighbor[LQI]) < 100:
                    LQIcolor='blue'
                else:
                    LQIcolor='green'

                dot.node(neighbor[NETWORK_ADDR], shape=output_node_shape)

                # 2025-12-01 Black to Blue
                #dot.edge(network_ID, neighbor[NETWORK_ADDR], neighbor[LQI], color='red' if int(neighbor[LQI]) < 20 else 'blue', penwidth=thickness)
                if int(neighbor[LQI]) >= 20:
                    dot.edge(network_ID, neighbor[NETWORK_ADDR], neighbor[LQI],color=LQIcolor , penwidth=thickness)

dot.attr(label=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), root="0", splines=output_line_shape, overlap="scale")
dot.render(output_dir + output_filename)