from .._node import MqttNode from .._module import MqttModule from .._payload import MqttPayload from typing import Optional from ...temphum import BaseSensor two_digits_precision = lambda x: round(x, 2) MODULE_NAME = 'MqttTempHumModule' DATA_TOPIC = 'temphum/data' class MqttTemphumLegacyDataPayload(MqttPayload): FORMAT = '=dd' UNPACKER = { 'temp': two_digits_precision, 'rh': two_digits_precision } temp: float rh: float class MqttTemphumDataPayload(MqttTemphumLegacyDataPayload): FORMAT = '=ddb' temp: float rh: float error: int class MqttTempHumModule(MqttModule): _legacy_payload: bool def __init__(self, sensor: Optional[BaseSensor] = None, legacy_payload=False, write_to_database=False, *args, **kwargs): if sensor is not None: kwargs['tick_interval'] = 10 super().__init__(*args, **kwargs) self._sensor = sensor self._legacy_payload = legacy_payload def on_connect(self, mqtt: MqttNode): super().on_connect(mqtt) mqtt.subscribe_module(DATA_TOPIC, self) def tick(self): if not self._sensor: return error = 0 temp = 0 rh = 0 try: temp = self._sensor.temperature() rh = self._sensor.humidity() except: error = 1 pld = self._get_data_payload_cls()(temp=temp, rh=rh, error=error) self._mqtt_node_ref.publish(DATA_TOPIC, pld.pack()) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic == DATA_TOPIC: message = self._get_data_payload_cls().unpack(payload) self._logger.debug(message) return message def _get_data_payload_cls(self): return MqttTemphumLegacyDataPayload if self._legacy_payload else MqttTemphumDataPayload