import datetime from typing import Optional from .. import MqttModule, MqttPayload, MqttNode MODULE_NAME = 'MqttRelayModule' class MqttPowerSwitchPayload(MqttPayload): FORMAT = '=12sB' PACKER = { 'state': lambda n: int(n), 'secret': lambda s: s.encode('utf-8') } UNPACKER = { 'state': lambda n: bool(n), 'secret': lambda s: s.decode('utf-8') } secret: str state: bool class MqttPowerStatusPayload(MqttPayload): FORMAT = '=B' PACKER = { 'opened': lambda n: int(n), } UNPACKER = { 'opened': lambda n: bool(n), } opened: bool class MqttRelayState: enabled: bool update_time: datetime.datetime rssi: int fw_version: int ever_updated: bool def __init__(self): self.ever_updated = False self.enabled = False self.rssi = 0 def update(self, enabled: bool, rssi: int, fw_version=None): self.ever_updated = True self.enabled = enabled self.rssi = rssi self.update_time = datetime.datetime.now() if fw_version: self.fw_version = fw_version class MqttRelayModule(MqttModule): def init(self, mqtt: MqttNode): mqtt.subscribe_module('relay/switch', self) mqtt.subscribe_module('relay/status', self) @staticmethod def switchpower(mqtt: MqttNode, enable: bool, secret: str): payload = MqttPowerSwitchPayload(secret=secret, state=enable) mqtt.publish('relay/switch', payload=payload.pack()) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: message = None if topic == 'relay/switch': message = MqttPowerSwitchPayload.unpack(payload) elif topic == 'relay/status': message = MqttPowerStatusPayload.unpack(payload) if message is not None: self._logger.debug(message) return message