Wired opening sensor integration

I think the MQTT section for the sensor isn’t right.

My contact sensors look like this:

    Connections:
        MQTT:
            Switch:
                StateDest: garagedoor1/state

A number of changes needed to be made to the connections sections to support Homie MQTT. Each sensor has one or more outputs and you need to configure a StateDest (when using MQTT) separately for each output you want to use.

For a GPIO Sensor there are three outputs (Switch, ShortButtonPress, and LongButtonPress). You need to configure a state topic on each of the ones you want to use separately. I only use the Switch above so I’ve configured that separately separately and ignore the other two.

I don’t think it works to just define the MQTT Connections section like that any more.

This was the solution. Switch. Thank you! Now, the state is published via mqtt every time, I plug the PIN in or out.

At the beginning, I had problems with the syntax, never heard of indentation before. Now I know, that even the order is important.

I will now try to follow your way through the i2c part, I will try my very best!

just a short sign.
5 days, and I got through errors and arrors, 1 working PIN, 16 PINs but no action… most of the stuff through ChatGPT, cause of lack of skills.

Now I have a piece of code, that can do 8 PINs ( first bank of pcf8575 ), this should be suitable for pcf8574… and looking back, I would rather buy these at the moment :slight_smile:

I had some trouble to talk to the pcf (finally tried smbus). And understanding how to fit in your code. This is what I got now:

from typing import Dict, Any
import smbus
from core.sensor import Sensor
from core import utils

# Connection dict constant
OUT_SWITCH = "Switch"

class PCF8575Pin(Sensor):
    """Publishes the current state of a configured PCF8575 pin."""

    def __init__(self,
                 publishers: Dict[str, 'connection.Connection'],
                 dev_cfg: Dict[str, Any]) -> None:
        """ Initializes the connection to the PCF8575 pin.

            Parameters:
                - "i2c_bus_no"    : The I2C bus number
                - "i2c_address"   : The I2C address of the PCF8575
                - "i2c_PIN"       : The pin number to read
        """
        super().__init__(publishers, dev_cfg)

        # Check if the required keys are present in dev_cfg
        if "i2c_bus_no" not in dev_cfg or "i2c_address" not in dev_cfg or "i2c_PIN" not in dev_cfg:
            raise ValueError("Required keys 'i2c_bus_no', 'i2c_address', and 'i2c_PIN' are missing in device configuration")

        # Initialize I2C connection
        self.i2c_bus_no = dev_cfg["i2c_bus_no"]
        self.i2c_address = dev_cfg["i2c_address"]
        self.bus = smbus.SMBus(self.i2c_bus_no)
        self.pin = dev_cfg["i2c_PIN"]

        self.state = self.read_pin_state()

        self.log.info("Configured PCF8575Pin %s: I2C bus %d, address 0x%x, pin %d",
                      self.name, self.i2c_bus_no, self.i2c_address, self.pin)

        self.publish_state()

        # Configure the output channel for OUT_SWITCH
        utils.configure_device_channel(self.comm, is_output=True, output_name=OUT_SWITCH,
                                       name="switch state")
        self._register(self.comm)

        # Start checking the pin state
        self.check_state()

    def read_pin_state(self) -> bool:
        """ Reads the state of the configured pin. """
        data = self.bus.read_byte(self.i2c_address)
        return (data >> self.pin) & 1

    def check_state(self) -> None:
        """ Checks the current state of the pin. """
        self.state = self.read_pin_state()
        self.publish_state()

    def publish_state(self) -> None:
        """ Publishes the current state of the pin."""
        msg = "ON" if self.state else "OFF"
        self._send(msg, self.comm, OUT_SWITCH)

    def cleanup(self) -> None:
        """Clean up."""
        self.log.debug("Cleaning up PCF8575Pin %s", self.name)
        # No specific cleanup needed for I2C pins

Looks reasonable so far. the great will be in whether it works.

Be careful with ChatGPT. It might do better with generic coffee but I’m my experience to fix the code as you would need to write it in the first place.

These are my first lines of code, I was never in touch with python, so ChatGPT was the only way to make this real.

It is only a question of providing the needed information and asking the right question. So now, I have a working piece of code in /i2c/pcf8575.py:

from typing import Dict, Any
import smbus
from core.sensor import Sensor
from core import utils

# Connection dict constant
OUT_SWITCH = "Switch"

class PCF8575Pin(Sensor):
    """Publishes the current state of a configured PCF8575 pin."""

    def __init__(self,
                 publishers: Dict[str, 'connection.Connection'],
                 dev_cfg: Dict[str, Any]) -> None:
        """ Initializes the connection to the PCF8575 pin.

            Parameters:
                - "i2c_bus_no"    : The I2C bus number
                - "i2c_address"   : The I2C address of the PCF8575
                - "i2c_PIN"       : The pin number to read
        """
        super().__init__(publishers, dev_cfg)

        # Check if the required keys are present in dev_cfg
        if "i2c_bus_no" not in dev_cfg or "i2c_address" not in dev_cfg or "i2c_PIN" not in dev_cfg:
            raise ValueError("Required keys 'i2c_bus_no', 'i2c_address', and 'i2c_PIN' are missing in device configuration")

        # Initialize I2C connection
        self.i2c_bus_no = dev_cfg["i2c_bus_no"]
        self.i2c_address = dev_cfg["i2c_address"]
        self.bus = smbus.SMBus(self.i2c_bus_no)
        self.pin = dev_cfg["i2c_PIN"]

        self.state = self.read_pin_state()

        self.log.info("Configured PCF8575Pin %s: I2C bus %d, address 0x%x, pin %d",
                      self.name, self.i2c_bus_no, self.i2c_address, self.pin)

        self.publish_state()

        # Configure the output channel for OUT_SWITCH
        utils.configure_device_channel(self.comm, is_output=True, output_name=OUT_SWITCH,
                                       name="switch state")
        self._register(self.comm)

        # Start checking the pin state
        self.check_state()

    def read_pin_state(self) -> bool:
        """ Reads the state of the configured pin. """
        data = self.bus.read_i2c_block_data(self.i2c_address, 0, 2)
        combined_data = (data[1] << 8) | data[0]
        pin_state = (combined_data >> self.pin) & 1
        return bool(pin_state)

    def check_state(self) -> None:
        """ Checks the current state of the pin. """
        self.state = self.read_pin_state()
        self.publish_state()

    def publish_state(self) -> None:
        """ Publishes the current state of the pin."""
        msg = "ON" if self.state else "OFF"
        self._send(msg, self.comm, OUT_SWITCH)

    def cleanup(self) -> None:
        """Clean up."""
        self.log.debug("Cleaning up PCF8575Pin %s", self.name)
        # No specific cleanup needed for I2C pins


What this Code does, I think:
A Poll of 1 second is set ( through yml ) and the program reads 2 bytes. One of those 16 bit is a “1”, representing the Pin 0-15. So for each sensor all the bytes are read once every second.

What could be done:
Using an Interupt and EventDetection to save a lot of traffic.

Since Poll is a normal feature in sensor_reporter, this solution as it is should be OK?
All the tests were great, the PIN states are reliably published.

Thank you so much for all your valuable input.

If polling is good enough for your purposes it’s perfectly OK from sensorReporter’s perspective. That’s why the polling option is there. Event detection is usually better since it’s more timely and doesn’t miss events that take less than a second to complete, but sometimes polling is all you can do.

OK, I gave it a try, but I was not able to open a GPIO as an Input PIN for the Interrupt. Always the very PIN I choose is busy, the others are fine. At this point I really have no idea.

But you are right, it should be fine for my needs.

Next thing is testing with really long cables and 30-40 devices, which should be enough for opening & movement detection.

Hi There, good evening,

I wanted to give a little feedback on this project.

First of all, I had a circuit board manufactured; suitable for a DIN Rail:

From top to bottom: pull-up resistors, incoming contacts, resistor&capacitor as rc-thing, 3 Schmitt Trigger

This works really well regarding debounce, wiring, and mounting the pcf. The price of the board as shown is about 3€. The MQTT messages are incredibly fast and accurate within OpenHab as I managed to implement an Interupt! CPU usage seems to be around 6%.

I also tested cable length around 20m, but on the 12V side of the relais…

So right now, I’m quite happy and a little bit exhausted. Maybe I can give some final feedback, when everything is build up and working in a real environment.

1 Like