These are my first lines of code, I was never in touch with python, so ChatGPT was the only way to make this real.
It is only a question of providing the needed information and asking the right question. So now, I have a working piece of code in /i2c/pcf8575.py:
from typing import Dict, Any
import smbus
from core.sensor import Sensor
from core import utils
# Connection dict constant
OUT_SWITCH = "Switch"
class PCF8575Pin(Sensor):
"""Publishes the current state of a configured PCF8575 pin."""
def __init__(self,
publishers: Dict[str, 'connection.Connection'],
dev_cfg: Dict[str, Any]) -> None:
""" Initializes the connection to the PCF8575 pin.
Parameters:
- "i2c_bus_no" : The I2C bus number
- "i2c_address" : The I2C address of the PCF8575
- "i2c_PIN" : The pin number to read
"""
super().__init__(publishers, dev_cfg)
# Check if the required keys are present in dev_cfg
if "i2c_bus_no" not in dev_cfg or "i2c_address" not in dev_cfg or "i2c_PIN" not in dev_cfg:
raise ValueError("Required keys 'i2c_bus_no', 'i2c_address', and 'i2c_PIN' are missing in device configuration")
# Initialize I2C connection
self.i2c_bus_no = dev_cfg["i2c_bus_no"]
self.i2c_address = dev_cfg["i2c_address"]
self.bus = smbus.SMBus(self.i2c_bus_no)
self.pin = dev_cfg["i2c_PIN"]
self.state = self.read_pin_state()
self.log.info("Configured PCF8575Pin %s: I2C bus %d, address 0x%x, pin %d",
self.name, self.i2c_bus_no, self.i2c_address, self.pin)
self.publish_state()
# Configure the output channel for OUT_SWITCH
utils.configure_device_channel(self.comm, is_output=True, output_name=OUT_SWITCH,
name="switch state")
self._register(self.comm)
# Start checking the pin state
self.check_state()
def read_pin_state(self) -> bool:
""" Reads the state of the configured pin. """
data = self.bus.read_i2c_block_data(self.i2c_address, 0, 2)
combined_data = (data[1] << 8) | data[0]
pin_state = (combined_data >> self.pin) & 1
return bool(pin_state)
def check_state(self) -> None:
""" Checks the current state of the pin. """
self.state = self.read_pin_state()
self.publish_state()
def publish_state(self) -> None:
""" Publishes the current state of the pin."""
msg = "ON" if self.state else "OFF"
self._send(msg, self.comm, OUT_SWITCH)
def cleanup(self) -> None:
"""Clean up."""
self.log.debug("Cleaning up PCF8575Pin %s", self.name)
# No specific cleanup needed for I2C pins
What this Code does, I think:
A Poll of 1 second is set ( through yml ) and the program reads 2 bytes. One of those 16 bit is a “1”, representing the Pin 0-15. So for each sensor all the bytes are read once every second.
What could be done:
Using an Interupt and EventDetection to save a lot of traffic.
Since Poll is a normal feature in sensor_reporter, this solution as it is should be OK?
All the tests were great, the PIN states are reliably published.
Thank you so much for all your valuable input.