from .._payload import MqttPayload, MqttPayloadCustomField from .._node import MqttNode, MqttModule from typing import Optional MODULE_NAME = 'MqttDiagnosticsModule' class DiagnosticsFlags(MqttPayloadCustomField): state: bool config_changed_value_present: bool config_changed: bool @staticmethod def unpack(flags: int): # _logger.debug(f'StatFlags.unpack: flags={flags}') state = flags & 0x1 ccvp = (flags >> 1) & 0x1 cc = (flags >> 2) & 0x1 # _logger.debug(f'StatFlags.unpack: state={state}') return DiagnosticsFlags(state=(state == 1), config_changed_value_present=(ccvp == 1), config_changed=(cc == 1)) def __index__(self): bits = 0 bits |= (int(self.state) & 0x1) bits |= (int(self.config_changed_value_present) & 0x1) << 1 bits |= (int(self.config_changed) & 0x1) << 2 return bits class InitialDiagnosticsPayload(MqttPayload): FORMAT = '=IBbIB' ip: int fw_version: int rssi: int free_heap: int flags: DiagnosticsFlags class DiagnosticsPayload(MqttPayload): FORMAT = '=bIB' rssi: int free_heap: int flags: DiagnosticsFlags class MqttDiagnosticsModule(MqttModule): def on_connect(self, mqtt: MqttNode): super().on_connect(mqtt) for topic in ('diag', 'd1ag', 'stat', 'stat1'): mqtt.subscribe_module(topic, self) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: message = None if topic in ('stat', 'diag'): message = DiagnosticsPayload.unpack(payload) elif topic in ('stat1', 'd1ag'): message = InitialDiagnosticsPayload.unpack(payload) if message: self._logger.debug(message) return message