import hashlib from typing import Optional from ..mqtt import MqttPayload from .._node import MqttModule, MqttNode MODULE_NAME = 'MqttOtaModule' class OtaResultPayload(MqttPayload): FORMAT = '=BB' result: int error_code: int class OtaPayload(MqttPayload): secret: str filename: str # structure of returned data: # # uint8_t[len(secret)] secret; # uint8_t[16] md5; # *uint8_t data def pack(self): buf = bytearray(self.secret.encode()) m = hashlib.md5() with open(self.filename, 'rb') as fd: content = fd.read() m.update(content) buf.extend(m.digest()) buf.extend(content) return buf def unpack(cls, buf: bytes): raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented') # secret = buf[:12].decode() # filename = buf[12:].decode() # return OTAPayload(secret=secret, filename=filename) class MqttOtaModule(MqttModule): def init(self, mqtt: MqttNode): mqtt.subscribe_module("otares", self) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic == 'otares': message = OtaResultPayload.unpack(payload) self._logger.debug(message) return message # def push_ota(self, # node_id, # filename: str, # publish_callback: callable, # qos: int): # device = next(d for d in self._devices if d.id == device_id) # assert device.secret is not None, 'device secret not specified' # # self._ota_publish_callback = publish_callback # payload = OtaPayload(secret=device.secret, filename=filename) # publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota', # payload=payload.pack(), # qos=qos) # self._ota_mid = publish_result.mid # self._client.loop_write()