from .._node import MqttNode from .._module import MqttModule from .._payload import MqttPayload from typing import Optional from ...temphum import BaseSensor two_digits_precision = lambda x: round(x, 2) MODULE_NAME = 'MqttTempHumModule' DATA_TOPIC = 'temphum/data' class MqttTemphumDataPayload(MqttPayload): FORMAT = '=ddb' UNPACKER = { 'temp': two_digits_precision, 'rh': two_digits_precision } temp: float rh: float error: int # class MqttTempHumNodes(HashableEnum): # KBN_SH_HALL = auto() # KBN_SH_BATHROOM = auto() # KBN_SH_LIVINGROOM = auto() # KBN_SH_BEDROOM = auto() # # KBN_BH_2FL = auto() # KBN_BH_2FL_STREET = auto() # KBN_BH_1FL_LIVINGROOM = auto() # KBN_BH_1FL_BEDROOM = auto() # KBN_BH_1FL_BATHROOM = auto() # # KBN_NH_1FL_INV = auto() # KBN_NH_1FL_CENTER = auto() # KBN_NH_1LF_KT = auto() # KBN_NH_1FL_DS = auto() # KBN_NH_1FS_EZ = auto() # # SPB_FLAT120_CABINET = auto() class MqttTempHumModule(MqttModule): def __init__(self, sensor: Optional[BaseSensor] = None, write_to_database=False, *args, **kwargs): if sensor is not None: kwargs['tick_interval'] = 10 super().__init__(*args, **kwargs) self._sensor = sensor def on_connect(self, mqtt: MqttNode): super().on_connect(mqtt) mqtt.subscribe_module(DATA_TOPIC, self) def tick(self): if not self._sensor: return error = 0 temp = 0 rh = 0 try: temp = self._sensor.temperature() rh = self._sensor.humidity() except: error = 1 pld = MqttTemphumDataPayload(temp=temp, rh=rh, error=error) self._mqtt_node_ref.publish(DATA_TOPIC, pld.pack()) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic == DATA_TOPIC: message = MqttTemphumDataPayload.unpack(payload) self._logger.debug(message) return message