From f29e139cbb7e4a4d539cba6e894ef4a6acd312d6 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 31 May 2023 09:22:00 +0300 Subject: WIP: big refactoring --- src/relay_mqtt_http_proxy.py | 49 ++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 18 deletions(-) (limited to 'src/relay_mqtt_http_proxy.py') diff --git a/src/relay_mqtt_http_proxy.py b/src/relay_mqtt_http_proxy.py index 098facc..2bc2c4a 100755 --- a/src/relay_mqtt_http_proxy.py +++ b/src/relay_mqtt_http_proxy.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 from home import http from home.config import config -from home.mqtt import MqttRelay, MqttRelayState -from home.mqtt.esp import MqttEspDevice -from home.mqtt.payload import MqttPayload -from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload -from typing import Optional +from home.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule +from home.mqtt.module.relay import MqttRelayState, MqttRelayModule +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload +from typing import Optional, Union -mqtt_relay: Optional[MqttRelay] = None +mqtt: Optional[MqttWrapper] = None +mqtt_nodes: dict[str, MqttNode] = {} +relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} -def on_mqtt_message(device_id, message: MqttPayload): +def on_mqtt_message(node: MqttNode, + message: MqttPayload): if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs = dict(rssi=message.rssi, enabled=message.flags.state) if device_id not in relay_states: @@ -29,17 +31,22 @@ class RelayMqttHttpProxy(http.HTTPServer): async def _relay_on_off(self, enable: Optional[bool], req: http.Request): - device_id = req.match_info['id'] - device_secret = req.query['secret'] + node_id = req.match_info['id'] + node_secret = req.query['secret'] + + node = mqtt_nodes[node_id] + relay_module = relay_modules[node_id] if enable is None: - if device_id in relay_states and relay_states[device_id].ever_updated: - cur_state = relay_states[device_id].enabled + if node_id in relay_states and relay_states[node_id].ever_updated: + cur_state = relay_states[node_id].enabled else: cur_state = False enable = not cur_state - mqtt_relay.set_power(device_id, enable, device_secret) + if not node.secret: + node.secret = node_secret + relay_module.switchpower(enable) return self.ok() async def relay_on(self, req: http.Request): @@ -53,15 +60,21 @@ class RelayMqttHttpProxy(http.HTTPServer): if __name__ == '__main__': - config.load('relay_mqtt_http_proxy') + config.load_app('relay_mqtt_http_proxy') + + mqtt = MqttWrapper() + for device_id, data in config['relays'].items(): + mqtt_node = MqttNode(node_id=device_id) + relay_modules[device_id] = mqtt_node.load_module('relay') + mqtt_nodes[device_id] = mqtt_node + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + mqtt_node.add_payload_callback(on_mqtt_message) - mqtt_relay = MqttRelay(devices=[MqttEspDevice(id=device_id) for device_id in config.get('relay.devices')]) - mqtt_relay.configure_tls() - mqtt_relay.set_message_callback(on_mqtt_message) - mqtt_relay.connect_and_loop(loop_forever=False) + mqtt.connect_and_loop(loop_forever=False) proxy = RelayMqttHttpProxy(config.get_addr('server.listen')) try: proxy.run() except KeyboardInterrupt: - mqtt_relay.disconnect() + mqtt.disconnect() -- cgit v1.2.3 From 327a5298359027099631c3c9967b7585928cd367 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 10 Jun 2023 21:54:56 +0300 Subject: port relay_mqtt_http_proxy to new config scheme; config: support addr types & normalization --- src/relay_mqtt_http_proxy.py | 89 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 18 deletions(-) (limited to 'src/relay_mqtt_http_proxy.py') diff --git a/src/relay_mqtt_http_proxy.py b/src/relay_mqtt_http_proxy.py index 2bc2c4a..e13c04a 100755 --- a/src/relay_mqtt_http_proxy.py +++ b/src/relay_mqtt_http_proxy.py @@ -1,24 +1,69 @@ #!/usr/bin/env python3 +import logging + from home import http -from home.config import config -from home.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule -from home.mqtt.module.relay import MqttRelayState, MqttRelayModule +from home.config import config, AppConfigUnit +from home.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig +from home.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload from typing import Optional, Union + +logger = logging.getLogger(__name__) mqtt: Optional[MqttWrapper] = None mqtt_nodes: dict[str, MqttNode] = {} relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} relay_states: dict[str, MqttRelayState] = {} +mqtt_nodes_config = MqttNodesConfig() + + +class RelayMqttHttpProxyConfig(AppConfigUnit): + NAME = 'relay_mqtt_http_proxy' + + @classmethod + def schema(cls) -> Optional[dict]: + return { + 'relay_nodes': { + 'type': 'list', + 'required': True, + 'schema': { + 'type': 'string' + } + }, + 'listen_addr': cls._addr_schema(required=True) + } + + @staticmethod + def custom_validator(data): + relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) + for node in data['relay_nodes']: + if node not in relay_node_names: + raise ValueError(f'unknown relay node "{node}"') + def on_mqtt_message(node: MqttNode, message: MqttPayload): + try: + is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics'] + logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names') + except KeyError: + is_legacy = False + kwargs = {} + if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): - kwargs = dict(rssi=message.rssi, enabled=message.flags.state) - if device_id not in relay_states: - relay_states[device_id] = MqttRelayState() - relay_states[device_id].update(**kwargs) + kwargs['rssi'] = message.rssi + if is_legacy: + kwargs['enabled'] = message.flags.state + + if not is_legacy and isinstance(message, MqttPowerStatusPayload): + kwargs['enabled'] = message.opened + + if len(kwargs): + logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}') + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) class RelayMqttHttpProxy(http.HTTPServer): @@ -44,8 +89,7 @@ class RelayMqttHttpProxy(http.HTTPServer): cur_state = False enable = not cur_state - if not node.secret: - node.secret = node_secret + node.secret = node_secret relay_module.switchpower(enable) return self.ok() @@ -60,20 +104,29 @@ class RelayMqttHttpProxy(http.HTTPServer): if __name__ == '__main__': - config.load_app('relay_mqtt_http_proxy') - - mqtt = MqttWrapper() - for device_id, data in config['relays'].items(): - mqtt_node = MqttNode(node_id=device_id) - relay_modules[device_id] = mqtt_node.load_module('relay') - mqtt_nodes[device_id] = mqtt_node + config.load_app(RelayMqttHttpProxyConfig) + + mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy', + randomize_client_id=True) + for node_id in config.app_config['relay_nodes']: + node_data = mqtt_nodes_config.get_node(node_id) + mqtt_node = MqttNode(node_id=node_id) + module_kwargs = {} + try: + if node_data['relay']['legacy_topics']: + module_kwargs['legacy_topics'] = True + except KeyError: + pass + relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs) + if 'legacy_topics' in module_kwargs: + mqtt_node.load_module('diagnostics') mqtt_node.add_payload_callback(on_mqtt_message) mqtt.add_node(mqtt_node) - mqtt_node.add_payload_callback(on_mqtt_message) + mqtt_nodes[node_id] = mqtt_node mqtt.connect_and_loop(loop_forever=False) - proxy = RelayMqttHttpProxy(config.get_addr('server.listen')) + proxy = RelayMqttHttpProxy(config.app_config['listen_addr']) try: proxy.run() except KeyboardInterrupt: -- cgit v1.2.3 From b0bf43e6a272d42a55158e657bd937cb82fc3d8d Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 10 Jun 2023 23:02:34 +0300 Subject: move files, rename home package to homekit --- src/relay_mqtt_http_proxy.py | 133 ------------------------------------------- 1 file changed, 133 deletions(-) delete mode 100755 src/relay_mqtt_http_proxy.py (limited to 'src/relay_mqtt_http_proxy.py') diff --git a/src/relay_mqtt_http_proxy.py b/src/relay_mqtt_http_proxy.py deleted file mode 100755 index e13c04a..0000000 --- a/src/relay_mqtt_http_proxy.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python3 -import logging - -from home import http -from home.config import config, AppConfigUnit -from home.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig -from home.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload -from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload -from typing import Optional, Union - - -logger = logging.getLogger(__name__) -mqtt: Optional[MqttWrapper] = None -mqtt_nodes: dict[str, MqttNode] = {} -relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {} -relay_states: dict[str, MqttRelayState] = {} - -mqtt_nodes_config = MqttNodesConfig() - - -class RelayMqttHttpProxyConfig(AppConfigUnit): - NAME = 'relay_mqtt_http_proxy' - - @classmethod - def schema(cls) -> Optional[dict]: - return { - 'relay_nodes': { - 'type': 'list', - 'required': True, - 'schema': { - 'type': 'string' - } - }, - 'listen_addr': cls._addr_schema(required=True) - } - - @staticmethod - def custom_validator(data): - relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True) - for node in data['relay_nodes']: - if node not in relay_node_names: - raise ValueError(f'unknown relay node "{node}"') - - -def on_mqtt_message(node: MqttNode, - message: MqttPayload): - try: - is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics'] - logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names') - except KeyError: - is_legacy = False - kwargs = {} - - if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): - kwargs['rssi'] = message.rssi - if is_legacy: - kwargs['enabled'] = message.flags.state - - if not is_legacy and isinstance(message, MqttPowerStatusPayload): - kwargs['enabled'] = message.opened - - if len(kwargs): - logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}') - if node.id not in relay_states: - relay_states[node.id] = MqttRelayState() - relay_states[node.id].update(**kwargs) - - -class RelayMqttHttpProxy(http.HTTPServer): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.get('/relay/{id}/on', self.relay_on) - self.get('/relay/{id}/off', self.relay_off) - self.get('/relay/{id}/toggle', self.relay_toggle) - - async def _relay_on_off(self, - enable: Optional[bool], - req: http.Request): - node_id = req.match_info['id'] - node_secret = req.query['secret'] - - node = mqtt_nodes[node_id] - relay_module = relay_modules[node_id] - - if enable is None: - if node_id in relay_states and relay_states[node_id].ever_updated: - cur_state = relay_states[node_id].enabled - else: - cur_state = False - enable = not cur_state - - node.secret = node_secret - relay_module.switchpower(enable) - return self.ok() - - async def relay_on(self, req: http.Request): - return await self._relay_on_off(True, req) - - async def relay_off(self, req: http.Request): - return await self._relay_on_off(False, req) - - async def relay_toggle(self, req: http.Request): - return await self._relay_on_off(None, req) - - -if __name__ == '__main__': - config.load_app(RelayMqttHttpProxyConfig) - - mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy', - randomize_client_id=True) - for node_id in config.app_config['relay_nodes']: - node_data = mqtt_nodes_config.get_node(node_id) - mqtt_node = MqttNode(node_id=node_id) - module_kwargs = {} - try: - if node_data['relay']['legacy_topics']: - module_kwargs['legacy_topics'] = True - except KeyError: - pass - relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs) - if 'legacy_topics' in module_kwargs: - mqtt_node.load_module('diagnostics') - mqtt_node.add_payload_callback(on_mqtt_message) - mqtt.add_node(mqtt_node) - mqtt_nodes[node_id] = mqtt_node - - mqtt.connect_and_loop(loop_forever=False) - - proxy = RelayMqttHttpProxy(config.app_config['listen_addr']) - try: - proxy.run() - except KeyboardInterrupt: - mqtt.disconnect() -- cgit v1.2.3