#!/usr/bin/env python3 import logging import __py_include from homekit import http from homekit.config import config, AppConfigUnit from homekit.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig from homekit.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload from typing import Optional, Union logger = logging.getLogger(__name__) mqtt: Optional[MqttWrapper] = None mqtt_nodes: dict[str, MqttNode] = {} relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} mqtt_nodes_config = MqttNodesConfig() class RelayMqttHttpProxyConfig(AppConfigUnit): NAME = 'relay_mqtt_http_proxy' @classmethod def schema(cls) -> Optional[dict]: return { 'relay_nodes': { 'type': 'list', 'required': True, 'schema': { 'type': 'string' } }, 'listen_addr': cls._addr_schema(required=True) } @staticmethod def custom_validator(data): relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) for node in data['relay_nodes']: if node not in relay_node_names: raise ValueError(f'unknown relay node "{node}"') def on_mqtt_message(node: MqttNode, message: MqttPayload): try: is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics'] logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names') except KeyError: is_legacy = False kwargs = {} if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs['rssi'] = message.rssi if is_legacy: kwargs['enabled'] = message.flags.state if not is_legacy and isinstance(message, MqttPowerStatusPayload): kwargs['enabled'] = message.opened if len(kwargs): logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}') if node.id not in relay_states: relay_states[node.id] = MqttRelayState() relay_states[node.id].update(**kwargs) class RelayMqttHttpProxy(http.HTTPServer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.get('/relay/{id}/on', self.relay_on) self.get('/relay/{id}/off', self.relay_off) self.get('/relay/{id}/toggle', self.relay_toggle) async def _relay_on_off(self, enable: Optional[bool], req: http.Request): node_id = req.match_info['id'] node_secret = req.query['secret'] node = mqtt_nodes[node_id] relay_module = relay_modules[node_id] if enable is None: if node_id in relay_states and relay_states[node_id].ever_updated: cur_state = relay_states[node_id].enabled else: cur_state = False enable = not cur_state node.secret = node_secret relay_module.switchpower(enable) return self.ok() async def relay_on(self, req: http.Request): return await self._relay_on_off(True, req) async def relay_off(self, req: http.Request): return await self._relay_on_off(False, req) async def relay_toggle(self, req: http.Request): return await self._relay_on_off(None, req) if __name__ == '__main__': config.load_app(RelayMqttHttpProxyConfig) mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy', randomize_client_id=True) for node_id in config.app_config['relay_nodes']: node_data = mqtt_nodes_config.get_node(node_id) mqtt_node = MqttNode(node_id=node_id) module_kwargs = {} try: if node_data['relay']['legacy_topics']: module_kwargs['legacy_topics'] = True except KeyError: pass relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs) if 'legacy_topics' in module_kwargs: mqtt_node.load_module('diagnostics') mqtt_node.add_payload_callback(on_mqtt_message) mqtt.add_node(mqtt_node) mqtt_nodes[node_id] = mqtt_node mqtt.connect_and_loop(loop_forever=False) proxy = RelayMqttHttpProxy(config.app_config['listen_addr']) try: proxy.run() except KeyboardInterrupt: mqtt.disconnect()