import datetime from typing import Optional from .. import MqttModule, MqttPayload, MqttNode MODULE_NAME = 'MqttRelayModule' class MqttPowerSwitchPayload(MqttPayload): FORMAT = '=12sB' PACKER = { 'state': lambda n: int(n), 'secret': lambda s: s.encode('utf-8') } UNPACKER = { 'state': lambda n: bool(n), 'secret': lambda s: s.decode('utf-8') } secret: str state: bool class MqttPowerStatusPayload(MqttPayload): FORMAT = '=B' PACKER = { 'opened': lambda n: int(n), } UNPACKER = { 'opened': lambda n: bool(n), } opened: bool class MqttRelayState: enabled: bool update_time: datetime.datetime rssi: int fw_version: int ever_updated: bool def __init__(self): self.ever_updated = False self.enabled = False self.rssi = 0 def update(self, enabled: bool, rssi: int, fw_version=None): self.ever_updated = True self.enabled = enabled self.rssi = rssi self.update_time = datetime.datetime.now() if fw_version: self.fw_version = fw_version class MqttRelayModule(MqttModule): _legacy_topics: bool def __init__(self, legacy_topics=False, *args, **kwargs): super().__init__(*args, **kwargs) self._legacy_topics = legacy_topics def on_connect(self, mqtt: MqttNode): super().on_connect(mqtt) mqtt.subscribe_module(self._get_switch_topic(), self) mqtt.subscribe_module('relay/status', self) def switchpower(self, enable: bool): payload = MqttPowerSwitchPayload(secret=self._mqtt_node_ref.secret, state=enable) self._mqtt_node_ref.publish(self._get_switch_topic(), payload=payload.pack()) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: message = None if topic == self._get_switch_topic(): message = MqttPowerSwitchPayload.unpack(payload) elif topic == 'relay/status': message = MqttPowerStatusPayload.unpack(payload) if message is not None: self._logger.debug(message) return message def _get_switch_topic(self) -> str: return 'relay/power' if self._legacy_topics else 'relay/switch'