Unexpected exception while processing MQTT message

  • Platform information:
    • Hardware: RBP4
    • OS: OH
    • Java Runtime Environment: Java 8
    • openHAB version: 2.5.9

For reading out my smartmeter I use a python script via a rule which gets called every 10 seconds.

Unfortunately this gets broken very often and I get a whole bunch of the following mqtt exceptions:

2020-10-26 14:33:04.550 [ERROR] [.moquette.broker.NewNettyMQTTHandler] - Unexpected exception while processing MQTT message. Closing Netty channel. CId=SmartMeterHousePythonClient
java.lang.NullPointerException: null
	at io.moquette.broker.MQTTConnection.handleConnectionLost(MQTTConnection.java:255) ~[?:?]
	at io.moquette.broker.NewNettyMQTTHandler.channelInactive(NewNettyMQTTHandler.java:82) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.moquette.broker.metrics.MQTTMessageLogger.channelInactive(MQTTMessageLogger.java:109) [bundleFile:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:393) [bundleFile:4.1.42.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:358) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) [bundleFile:4.1.42.Final]
	at io.moquette.broker.AutoFlushHandler.channelInactive(AutoFlushHandler.java:90) [bundleFile:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) [bundleFile:4.1.42.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) [bundleFile:4.1.42.Final]
	at io.moquette.broker.InflightResender.channelInactive(InflightResender.java:123) [bundleFile:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) [bundleFile:4.1.42.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1417) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) [bundleFile:4.1.42.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:913) [bundleFile:4.1.42.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:819) [bundleFile:4.1.42.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [bundleFile:4.1.42.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510) [bundleFile:4.1.42.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518) [bundleFile:4.1.42.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) [bundleFile:4.1.42.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [bundleFile:4.1.42.Final]
	at io.netty.util.concurren

My python script is the following:

#!/usr/bin/env python
# encoding: utf-8
"""
Copyright by Alex Mack 2020
"""

import sys
import os
import datetime
import serial                       #needs an 'sudo -H pip install pyserial' if used via exec binding
import time
import re
import paho.mqtt.client as paho     #needs an 'sudo -H pip install paho-mqtt' if used via exec binding


#openhab remove commands
#sudo apt-get purge openhab2*
#sudo rm /etc/apt/sources.list.d/openhab2.list


# SML regexes
complete_message_regex = '1b1b1b1b01010101.*1b1b1b1b.{8}'
meter_108_regex = '77070100010800ff.{30}(.{8})01'
meter_208_regex = '77070100020800ff.{30}(.{8})01'
meter_167_regex = '77070100100700ff.{14}(.{4})01'

#MQTT Broker setup

brokerIP="192.168.1.27"
brokerPort=1883
brokerUser="Psst"
brokerPwd="notTellingThatOne"

#Connected Serial Device Address
serialDevice="/dev/ttyUSB1"

#Refreshing period in seconds
refreshSeconds=1
consumptionPublishedId=0
supplyPublishedId=0
currentConsumptionPublishedId=0

# Wait for mqtt client to close properly
MQTTClientConnectionClosed=0

def on_publish(client, userdata, result):
    if result == 3:
        #print "closing"
        disconnectMQTTClient(client)

def disconnectMQTTClient(client):
    client.loop_stop()
    client.disconnect()
    MQTTClientConnectionClosed=1

def on_disconnect():
    print "disconnected"

def setupMQTTBrokerClient():
    client = paho.Client("SmartMeterHousePythonClient")
    client.username_pw_set(brokerUser, brokerPwd)
    client.connect(brokerIP, brokerPort)

    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.loop_start()
    return client

def main():
    ser = serial.Serial(serialDevice, baudrate=9600, bytesize=8, parity='N', stopbits=1, timeout=0 )
    tty_error = 0
    searching = 1
    numberOfTrials = 0;

    client1 = setupMQTTBrokerClient()

    if tty_error == 0:
        while searching > 0 and numberOfTrials <= 100:
            numberOfTrials += 1
            data = ser.read(4096)
            data = data.encode('hex').lower()

            if re.match(complete_message_regex, data):

                m108 = re.search(meter_108_regex, data)
                m208 = re.search(meter_208_regex, data)
                m167 = re.search(meter_167_regex, data)

                if m208:
                    d208 = abs(int(m208.group(1),16)/10000.0)
                    if d208 is not None:
                        ret = client1.publish("smartmeters/house/totalSupply",d208)
                        supplyPublishedId=ret[1]
                        print "Gesamt-Einspeisung:{:.2f} KWh".format(d208)

                if m108:
                    d108 = abs(int(m108.group(1),16)/10000.0)
                    if d108 is not None:
                        ret = client1.publish("smartmeters/house/totalConsumption",d108)
                        consumptionPublishedId=ret[1]
                        print "Gesamtverbrauch: {:.3f}".format(d108)

                if m167:
                    d167 = abs(int(m167.group(1),16)/10.0)
                    if d167 is not None:
                        ret = client1.publish("smartmeters/house/currentConsumptionTotal",d167)
                        currentConsumptionPublishedId=ret[1]
                        print "Aktueller Gesamtverbrauch Haus: {:.3f}".format(d167)

                if m208 and m108 and m167:
                    searching = 0;

                #Current Usage calculated by openhab persistence
                #if m167:
                #    d167 = int(m167.group(1),16)
                #    print "Aktueller Verbrauch: {:.2f}".format(d167)

            time.sleep(refreshSeconds)

        #At this point we should have sent out all messages or aborted due to to much tries
        time.sleep(refreshSeconds)
        while MQTTClientConnectionClosed != 1:
            print "Closing connection manually"
            disconnectMQTTClient(client1)
            time.sleep(refreshSeconds)

if __name__ == '__main__':
    main()

I thought that this has something todo with not closing the mqtt client properly, that´s why I recently built in the while with the definite closing but it didn´t help

I would really appreciate any help

The moquette broker bundled with OH2 has exhibited problems for some users, but is no longer maintained by its authors.
The usual advice if you do run into problems with it, is to set up some other broker instead (typically mosquitto).

Perfect, thank you very much - switching to the mosquitto solved the problem.