import hashlib from .base_payload import MqttPayload, MqttPayloadCustomField class OTAResultPayload(MqttPayload): FORMAT = '=BB' result: int error_code: int class OTAPayload(MqttPayload): secret: str filename: str # structure of returned data: # # uint8_t[len(secret)] secret; # uint8_t[16] md5; # *uint8_t data def pack(self): buf = bytearray(self.secret.encode()) m = hashlib.md5() with open(self.filename, 'rb') as fd: content = fd.read() m.update(content) buf.extend(m.digest()) buf.extend(content) return buf def unpack(cls, buf: bytes): raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') # secret = buf[:12].decode() # filename = buf[12:].decode() # return OTAPayload(secret=secret, filename=filename) class DiagnosticsFlags(MqttPayloadCustomField): state: bool config_changed_value_present: bool config_changed: bool @staticmethod def unpack(flags: int): # _logger.debug(f'StatFlags.unpack: flags={flags}') state = flags & 0x1 ccvp = (flags >> 1) & 0x1 cc = (flags >> 2) & 0x1 # _logger.debug(f'StatFlags.unpack: state={state}') return DiagnosticsFlags(state=(state == 1), config_changed_value_present=(ccvp == 1), config_changed=(cc == 1)) def __index__(self): bits = 0 bits |= (int(self.state) & 0x1) bits |= (int(self.config_changed_value_present) & 0x1) << 1 bits |= (int(self.config_changed) & 0x1) << 2 return bits class InitialDiagnosticsPayload(MqttPayload): FORMAT = '=IBbIB' ip: int fw_version: int rssi: int free_heap: int flags: DiagnosticsFlags class DiagnosticsPayload(MqttPayload): FORMAT = '=bIB' rssi: int free_heap: int flags: DiagnosticsFlags