summaryrefslogtreecommitdiff
path: root/src/home/mqtt/module/diagnostics.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/home/mqtt/module/diagnostics.py')
-rw-r--r--src/home/mqtt/module/diagnostics.py64
1 files changed, 0 insertions, 64 deletions
diff --git a/src/home/mqtt/module/diagnostics.py b/src/home/mqtt/module/diagnostics.py
deleted file mode 100644
index 5db5e99..0000000
--- a/src/home/mqtt/module/diagnostics.py
+++ /dev/null
@@ -1,64 +0,0 @@
-from .._payload import MqttPayload, MqttPayloadCustomField
-from .._node import MqttNode, MqttModule
-from typing import Optional
-
-MODULE_NAME = 'MqttDiagnosticsModule'
-
-
-class DiagnosticsFlags(MqttPayloadCustomField):
- state: bool
- config_changed_value_present: bool
- config_changed: bool
-
- @staticmethod
- def unpack(flags: int):
- # _logger.debug(f'StatFlags.unpack: flags={flags}')
- state = flags & 0x1
- ccvp = (flags >> 1) & 0x1
- cc = (flags >> 2) & 0x1
- # _logger.debug(f'StatFlags.unpack: state={state}')
- return DiagnosticsFlags(state=(state == 1),
- config_changed_value_present=(ccvp == 1),
- config_changed=(cc == 1))
-
- def __index__(self):
- bits = 0
- bits |= (int(self.state) & 0x1)
- bits |= (int(self.config_changed_value_present) & 0x1) << 1
- bits |= (int(self.config_changed) & 0x1) << 2
- return bits
-
-
-class InitialDiagnosticsPayload(MqttPayload):
- FORMAT = '=IBbIB'
-
- ip: int
- fw_version: int
- rssi: int
- free_heap: int
- flags: DiagnosticsFlags
-
-
-class DiagnosticsPayload(MqttPayload):
- FORMAT = '=bIB'
-
- rssi: int
- free_heap: int
- flags: DiagnosticsFlags
-
-
-class MqttDiagnosticsModule(MqttModule):
- def on_connect(self, mqtt: MqttNode):
- super().on_connect(mqtt)
- for topic in ('diag', 'd1ag', 'stat', 'stat1'):
- mqtt.subscribe_module(topic, self)
-
- def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
- message = None
- if topic in ('stat', 'diag'):
- message = DiagnosticsPayload.unpack(payload)
- elif topic in ('stat1', 'd1ag'):
- message = InitialDiagnosticsPayload.unpack(payload)
- if message:
- self._logger.debug(message)
- return message