#!/usr/bin/env python3 import logging from home import http from home.config import config, AppConfigUnit from home.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig from home.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload from typing import Optional, Union logger = logging.getLogger(__name__) mqtt: Optional[MqttWrapper] = None mqtt_nodes: dict[str, MqttNode] = {} relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} mqtt_nodes_config = MqttNodesConfig() class RelayMqttHttpProxyConfig(AppConfigUnit): NAME = 'relay_mqtt_http_proxy' @classmethod def schema(cls) -> Optional[dict]: return { 'relay_nodes': { 'type': 'list', 'required': True, 'schema': { 'type': 'string' } }, 'listen_addr': cls._addr_schema(required=True) } @staticmethod def custom_validator(data): relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) for node in data['relay_nodes']: if node not in relay_node_names: raise ValueError(f'unknown relay node "{node}"') def on_mqtt_message(node: MqttNode, message: MqttPayload): try: is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics'] logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names') except KeyError: is_legacy = False kwargs = {} if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs['rssi'] = message.rssi if is_legacy: kwargs['enabled'] = message.flags.state if not is_legacy and isinstance(message, MqttPowerStatusPayload): kwargs['enabled'] = message.opened if len(kwargs): logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}') if node.id not in relay_states: relay_states[node.id] = MqttRelayState() relay_states[node.id].update(**kwargs) class RelayMqttHttpProxy(http.HTTPServer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.get('/relay/{id}/on', self.relay_on) self.get('/relay/{id}/off', self.relay_off) self.get('/relay/{id}/toggle', self.relay_toggle) async def _relay_on_off(self, enable: Optional[bool], req: http.Request): node_id = req.match_info['id'] node_secret = req.query['secret'] node = mqtt_nodes[node_id] relay_module = relay_modules[node_id] if enable is None: if node_id in relay_states and relay_states[node_id].ever_updated: cur_state = relay_states[node_id].enabled else: cur_state = False enable = not cur_state node.secret = node_secret relay_module.switchpower(enable) return self.ok() async def relay_on(self, req: http.Request): return await self._relay_on_off(True, req) async def relay_off(self, req: http.Request): return await self._relay_on_off(False, req) async def relay_toggle(self, req: http.Request): return await self._relay_on_off(None, req) if __name__ == '__main__': config.load_app(RelayMqttHttpProxyConfig) mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy', randomize_client_id=True) for node_id in config.app_config['relay_nodes']: node_data = mqtt_nodes_config.get_node(node_id) mqtt_node = MqttNode(node_id=node_id) module_kwargs = {} try: if node_data['relay']['legacy_topics']: module_kwargs['legacy_topics'] = True except KeyError: pass relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs) if 'legacy_topics' in module_kwargs: mqtt_node.load_module('diagnostics') mqtt_node.add_payload_callback(on_mqtt_message) mqtt.add_node(mqtt_node) mqtt_nodes[node_id] = mqtt_node mqtt.connect_and_loop(loop_forever=False) proxy = RelayMqttHttpProxy(config.app_config['listen_addr']) try: proxy.run() except KeyboardInterrupt: mqtt.disconnect()