diff options
Diffstat (limited to 'src/home/mqtt/payload/relay.py')
-rw-r--r-- | src/home/mqtt/payload/relay.py | 90 |
1 files changed, 8 insertions, 82 deletions
diff --git a/src/home/mqtt/payload/relay.py b/src/home/mqtt/payload/relay.py index 1a38201..4902991 100644 --- a/src/home/mqtt/payload/relay.py +++ b/src/home/mqtt/payload/relay.py @@ -1,53 +1,13 @@ -import hashlib +from .base_payload import MqttPayload +from .esp import ( + OTAResultPayload, + OTAPayload, + InitialDiagnosticsPayload, + DiagnosticsPayload +) -from .base_payload import MQTTPayload, MQTTPayloadCustomField - -# _logger = logging.getLogger(__name__) - -class StatFlags(MQTTPayloadCustomField): - state: bool - config_changed_value_present: bool - config_changed: bool - - @staticmethod - def unpack(flags: int): - # _logger.debug(f'StatFlags.unpack: flags={flags}') - state = flags & 0x1 - ccvp = (flags >> 1) & 0x1 - cc = (flags >> 2) & 0x1 - # _logger.debug(f'StatFlags.unpack: state={state}') - return StatFlags(state=(state == 1), - config_changed_value_present=(ccvp == 1), - config_changed=(cc == 1)) - - def __index__(self): - bits = 0 - bits |= (int(self.state) & 0x1) - bits |= (int(self.config_changed_value_present) & 0x1) << 1 - bits |= (int(self.config_changed) & 0x1) << 2 - return bits - - -class InitialStatPayload(MQTTPayload): - FORMAT = '=IBbIB' - - ip: int - fw_version: int - rssi: int - free_heap: int - flags: StatFlags - - -class StatPayload(MQTTPayload): - FORMAT = '=bIB' - - rssi: int - free_heap: int - flags: StatFlags - - -class PowerPayload(MQTTPayload): +class PowerPayload(MqttPayload): FORMAT = '=12sB' PACKER = { 'state': lambda n: int(n), @@ -60,37 +20,3 @@ class PowerPayload(MQTTPayload): secret: str state: bool - - -class OTAResultPayload(MQTTPayload): - FORMAT = '=BB' - result: int - error_code: int - - -class OTAPayload(MQTTPayload): - secret: str - filename: str - - # structure of returned data: - # - # uint8_t[len(secret)] secret; - # uint8_t[16] md5; - # *uint8_t data - - def pack(self): - buf = bytearray(self.secret.encode()) - m = hashlib.md5() - with open(self.filename, 'rb') as fd: - content = fd.read() - m.update(content) - buf.extend(m.digest()) - buf.extend(content) - return buf - - def unpack(cls, buf: bytes): - raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') - # secret = buf[:12].decode() - # filename = buf[12:].decode() - # return OTAPayload(secret=secret, filename=filename) - |