diff options
-rw-r--r-- | requirements.txt | 2 | ||||
-rw-r--r-- | src/home/config/_validators.py | 46 | ||||
-rw-r--r-- | src/home/config/config.py | 59 | ||||
-rw-r--r-- | src/home/mqtt/_node.py | 16 | ||||
-rw-r--r-- | src/home/mqtt/_wrapper.py | 8 | ||||
-rw-r--r-- | src/home/mqtt/module/ota.py | 11 | ||||
-rw-r--r-- | src/home/mqtt/module/relay.py | 6 | ||||
-rw-r--r-- | src/home/mqtt/module/temphum.py | 1 | ||||
-rwxr-xr-x | src/inverter_mqtt_util.py | 2 | ||||
-rwxr-xr-x | src/mqtt_node_util.py | 8 | ||||
-rwxr-xr-x | src/pump_bot.py | 2 | ||||
-rwxr-xr-x | src/relay_mqtt_bot.py | 45 | ||||
-rwxr-xr-x | src/relay_mqtt_http_proxy.py | 43 | ||||
-rwxr-xr-x | src/sensors_mqtt_sender.py | 58 | ||||
-rwxr-xr-x | src/temphum_mqtt_receiver.py (renamed from src/sensors_mqtt_receiver.py) | 23 | ||||
-rw-r--r-- | systemd/sensors_mqtt_receiver.service | 4 | ||||
-rw-r--r-- | systemd/sensors_mqtt_sender.service | 13 |
17 files changed, 185 insertions, 162 deletions
diff --git a/requirements.txt b/requirements.txt index 46f9b8c..e11d512 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ apscheduler~=3.9.1 psutil~=5.9.1 aioshutil~=1.1 scikit-image~=0.19.3 - +cerberus~=1.3.4 # following can be installed from debian repositories # matplotlib~=3.5.0 diff --git a/src/home/config/_validators.py b/src/home/config/_validators.py new file mode 100644 index 0000000..963a25f --- /dev/null +++ b/src/home/config/_validators.py @@ -0,0 +1,46 @@ +import logging +import inspect + +from cerberus import Validator, DocumentError + + +__all__ = [ + 'linux_boards_validator' +] + +_logger = logging.getLogger(__name__) + + +def validate(schema, data): + v = Validator(schema) + if not v.validate(data): + frame = inspect.currentframe().f_back + caller_name = frame.f_code.co_name + raise DocumentError(f'{caller_name}: failed to validate data: ' + v.errors) + + +def linux_boards_validator(data) -> None: + validate({ + 'type': 'dict', + 'valuesrules': { + 'type': 'dict', + 'schema': { + 'mdns': {'type': 'string', 'required': True}, + 'board': {'type': 'string', 'required': True}, + 'network': {'type': 'list', 'required': True, 'empty': False}, + 'ram': {'type': 'integer', 'required': True}, + 'ext_hdd': { + 'type': 'list', + 'schema': { + 'type': 'dict', + 'schema': { + 'mountpoint': {'type': 'string', 'required': True}, + 'size': {'type': 'integer', 'required': True} + } + }, + }, + 'services': {'type': 'list', 'empty': False}, + 'online': {'type': 'boolean', 'required': True} + } + } + }, data) diff --git a/src/home/config/config.py b/src/home/config/config.py index 4681685..b79fecb 100644 --- a/src/home/config/config.py +++ b/src/home/config/config.py @@ -7,39 +7,50 @@ from os.path import join, isdir, isfile from typing import Optional, Any, MutableMapping from argparse import ArgumentParser from ..util import parse_addr +import _validators as validators -def _get_config_path(name: str) -> str: - formats = ['toml', 'yaml'] +_validators = {} + + +def _get_validator(name: str) -> Optional[callable]: + if hasattr(validators, f'{name}_validator'): + return getattr(validators, f'{name}_validator') + if name in _validators: + return _validators[name] + return None + - dirname = join(os.environ['HOME'], '.config', name) +def add_validator(name: str, f: callable): + _validators[name] = f - if isdir(dirname): - for fmt in formats: - filename = join(dirname, f'config.{fmt}') - if isfile(filename): - return filename - raise IOError(f'config not found in {dirname}') +def _get_config_path(name: str) -> str: + formats = ['toml', 'yaml'] + + dirnames = [ + join(os.environ['HOME'], '.config', 'homekit'), + '/etc/homekit' + ] - else: - filenames = [join(os.environ['HOME'], '.config', f'{name}.{format}') for format in formats] - for file in filenames: - if isfile(file): - return file + for dirname in dirnames: + if isdir(dirname): + for fmt in formats: + filename = join(dirname, f'{name}.{fmt}') + if isfile(filename): + return filename - raise IOError(f'config not found') + raise IOError(f'config \'{name}\' not found') -class ConfigStore: +class SingleConfig: data: MutableMapping[str, Any] - app_name: Optional[str] def __init__(self): self.data = {} - self.app_name = None - def load(self, name: Optional[str] = None, + def load(self, + name: Optional[str] = None, use_cli=True, parser: ArgumentParser = None): self.app_name = name @@ -126,6 +137,16 @@ class ConfigStore: return self.data.items() +class Config: + app_name: Optional[str] + + def __init__(self): + + self.app_name = None + + + + config = ConfigStore() diff --git a/src/home/mqtt/_node.py b/src/home/mqtt/_node.py index ddf5ba2..4e259a4 100644 --- a/src/home/mqtt/_node.py +++ b/src/home/mqtt/_node.py @@ -14,13 +14,17 @@ class MqttNode: _modules: List[MqttModule] _module_subscriptions: dict[str, MqttModule] _node_id: str + _node_secret: str _payload_callbacks: list[callable] _wrapper: Optional[MqttWrapper] - def __init__(self, node_id: str): + def __init__(self, + node_id: str, + node_secret: Optional[str] = None): self._modules = [] self._module_subscriptions = {} self._node_id = node_id + self._node_secret = node_secret self._payload_callbacks = [] self._logger = logging.getLogger(self.__class__.__name__) self._wrapper = None @@ -42,7 +46,7 @@ class MqttNode: payload = self._module_subscriptions[topic].handle_payload(self, topic, payload) if isinstance(payload, MqttPayload): for f in self._payload_callbacks: - f(payload) + f(self, payload) def load_module(self, module_name: str, *args, **kwargs) -> MqttModule: module = importlib.import_module(f'..module.{module_name}', __name__) @@ -78,3 +82,11 @@ class MqttNode: @property def id(self) -> str: return self._node_id + + @property + def secret(self) -> str: + return self._node_secret + + @secret.setter + def secret(self, secret: str) -> None: + self._node_secret = secret diff --git a/src/home/mqtt/_wrapper.py b/src/home/mqtt/_wrapper.py index 41f9d89..0b32197 100644 --- a/src/home/mqtt/_wrapper.py +++ b/src/home/mqtt/_wrapper.py @@ -9,11 +9,15 @@ from ..util import strgen class MqttWrapper(Mqtt): _nodes: list[MqttNode] - def __init__(self, topic_prefix='hk', randomize_client_id=False): + def __init__(self, + topic_prefix='hk', + randomize_client_id=False, + clean_session=True): client_id = config['mqtt']['client_id'] if randomize_client_id: client_id += '_'+strgen(6) - super().__init__(clean_session=True, client_id=client_id) + super().__init__(clean_session=clean_session, + client_id=client_id) self._nodes = [] self._topic_prefix = topic_prefix diff --git a/src/home/mqtt/module/ota.py b/src/home/mqtt/module/ota.py index e71cccc..70c5475 100644 --- a/src/home/mqtt/module/ota.py +++ b/src/home/mqtt/module/ota.py @@ -41,7 +41,7 @@ class OtaPayload(MqttPayload): class MqttOtaModule(MqttModule): - _ota_request: Optional[tuple[str, str, int]] + _ota_request: Optional[tuple[str, int]] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -52,9 +52,9 @@ class MqttOtaModule(MqttModule): mqtt.subscribe_module("otares", self) if self._ota_request is not None: - secret, filename, qos = self._ota_request + filename, qos = self._ota_request self._ota_request = None - self.do_push_ota(secret, filename, qos) + self.do_push_ota(self._mqtt_node_ref.secret, filename, qos) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: if topic == 'otares': @@ -69,10 +69,9 @@ class MqttOtaModule(MqttModule): qos=qos) def push_ota(self, - secret: str, filename: str, qos: int): if not self._initialized: - self._ota_request = (secret, filename, qos) + self._ota_request = (filename, qos) else: - self.do_push_ota(secret, filename, qos) + self.do_push_ota(filename, qos) diff --git a/src/home/mqtt/module/relay.py b/src/home/mqtt/module/relay.py index ae88ddb..5383fb6 100644 --- a/src/home/mqtt/module/relay.py +++ b/src/home/mqtt/module/relay.py @@ -64,9 +64,9 @@ class MqttRelayModule(MqttModule): mqtt.subscribe_module('relay/status', self) def switchpower(self, - enable: bool, - secret: str): - payload = MqttPowerSwitchPayload(secret=secret, state=enable) + enable: bool): + payload = MqttPowerSwitchPayload(secret=self._mqtt_node_ref.secret, + state=enable) self._mqtt_node_ref.publish('relay/switch', payload=payload.pack()) def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py index 83ae34d..0e22793 100644 --- a/src/home/mqtt/module/temphum.py +++ b/src/home/mqtt/module/temphum.py @@ -48,6 +48,7 @@ class MqttTemphumDataPayload(MqttPayload): class MqttTempHumModule(MqttModule): def __init__(self, sensor: Optional[BaseSensor] = None, + write_to_database=False, *args, **kwargs): if sensor is not None: kwargs['tick_interval'] = 10 diff --git a/src/inverter_mqtt_util.py b/src/inverter_mqtt_util.py index edea29a..fa3bdf5 100755 --- a/src/inverter_mqtt_util.py +++ b/src/inverter_mqtt_util.py @@ -11,7 +11,7 @@ if __name__ == '__main__': config.load('inverter_mqtt_util', parser=parser) arg = parser.parse_args() - mqtt = MqttWrapper() + mqtt = MqttWrapper(clean_session=arg.mode[0] != 'receiver') node = MqttNode(node_id='inverter') module_kwargs = {} if arg.mode[0] == 'sender': diff --git a/src/mqtt_node_util.py b/src/mqtt_node_util.py index 0af430a..70eae95 100755 --- a/src/mqtt_node_util.py +++ b/src/mqtt_node_util.py @@ -30,7 +30,7 @@ if __name__ == '__main__': raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules') mqtt = MqttWrapper(randomize_client_id=True) - mqtt_node = MqttNode(node_id=arg.node_id) + mqtt_node = MqttNode(node_id=arg.node_id, node_secret=arg.node_secret) mqtt.add_node(mqtt_node) @@ -44,9 +44,7 @@ if __name__ == '__main__': if m == 'relay' and arg.switch_relay is not None: if not arg.node_secret: raise ArgumentError(None, '--switch-relay requires --node-secret') - module_instance.switchpower(mqtt_node, - arg.switch_relay == 1, - arg.node_secret) + module_instance.switchpower(arg.switch_relay == 1) mqtt.configure_tls() try: @@ -58,7 +56,7 @@ if __name__ == '__main__': if not arg.node_secret: raise ArgumentError(None, 'pushing OTA requires --node-secret') - ota_module.push_ota(arg.node_secret, arg.push_ota, 1) + ota_module.push_ota(arg.push_ota, 1) while True: sleep(0.1) diff --git a/src/pump_bot.py b/src/pump_bot.py index ab73097..1d56044 100755 --- a/src/pump_bot.py +++ b/src/pump_bot.py @@ -208,7 +208,7 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) -def mqtt_payload_callback(payload: MqttPayload): +def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload): global watering_mcu_status types_the_node_can_send = ( diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py index de5671c..8fcf423 100755 --- a/src/relay_mqtt_bot.py +++ b/src/relay_mqtt_bot.py @@ -6,10 +6,9 @@ from functools import partial from home.config import config from home.telegram import bot -from home.mqtt import MqttRelay, MqttRelayState -from home.mqtt.esp import MqttEspDevice -from home.mqtt.payload import MqttPayload -from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload +from home.mqtt import MqttPayload, MqttNode, MqttWrapper +from home.mqtt.module.relay import MqttRelayModule, MqttRelayState +from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload config.load('relay_mqtt_bot') @@ -36,7 +35,9 @@ status_emoji = { } -mqtt_relay: Optional[MqttRelay] = None +# mqtt_relay: Optional[MqttRelayModule] = None +mqtt: Optional[MqttWrapper] = None +relay_nodes: dict[str, MqttRelayModule] = {} relay_states: dict[str, MqttRelayState] = {} @@ -45,23 +46,24 @@ class UserAction(Enum): OFF = 'off' -def on_mqtt_message(home_id, message: MqttPayload): +def on_mqtt_message(node: MqttNode, + message: MqttPayload): if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs = dict(rssi=message.rssi, enabled=message.flags.state) if isinstance(message, InitialDiagnosticsPayload): kwargs['fw_version'] = message.fw_version - if home_id not in relay_states: - relay_states[home_id] = MqttRelayState() - relay_states[home_id].update(**kwargs) + if node.id not in relay_states: + relay_states[node.id] = MqttRelayState() + relay_states[node.id].update(**kwargs) -def enable_handler(home_id: str, ctx: bot.Context) -> None: - mqtt_relay.set_power(home_id, True) +def enable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(True) ctx.reply(ctx.lang('done')) -def disable_handler(home_id: str, ctx: bot.Context) -> None: - mqtt_relay.set_power(home_id, False) +def disable_handler(node_id: str, ctx: bot.Context) -> None: + relay_nodes[node_id].switchpower(False) ctx.reply(ctx.lang('done')) @@ -88,9 +90,13 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: if __name__ == '__main__': devices = [] + mqtt = MqttWrapper() for device_id, data in config['relays'].items(): - devices.append(MqttEspDevice(id=device_id, - secret=data['secret'])) + mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret']) + relay_nodes[device_id] = mqtt_node.load_module('relay') + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + labels = data['labels'] bot.lang.ru(**{device_id: labels['ru']}) bot.lang.en(**{device_id: labels['en']}) @@ -103,12 +109,9 @@ if __name__ == '__main__': messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}') bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id)) - mqtt_relay = MqttRelay(devices=devices) - mqtt_relay.set_message_callback(on_mqtt_message) - mqtt_relay.configure_tls() - mqtt_relay.connect_and_loop(loop_forever=False) + mqtt.configure_tls() + mqtt.connect_and_loop(loop_forever=False) - # bot.enable_logging(BotType.RELAY_MQTT) bot.run(start_handler=start) - mqtt_relay.disconnect() + mqtt.disconnect() diff --git a/src/relay_mqtt_http_proxy.py b/src/relay_mqtt_http_proxy.py index 60d2963..05cc88d 100755 --- a/src/relay_mqtt_http_proxy.py +++ b/src/relay_mqtt_http_proxy.py @@ -1,16 +1,19 @@ #!/usr/bin/env python3 from home import http from home.config import config -from home.mqtt import MqttRelay, MqttRelayState -from home.mqtt import MqttPayload +from home.mqtt import MqttPayload, MqttWrapper, MqttNode +from home.mqtt.module.relay import MqttRelayState, MqttRelayModule from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload from typing import Optional -mqtt_relay: Optional[MqttRelay] = None +mqtt: Optional[MqttWrapper] = None +mqtt_nodes: dict[str, MqttNode] = {} +relay_modules: dict[str, MqttRelayModule] = {} relay_states: dict[str, MqttRelayState] = {} -def on_mqtt_message(device_id, message: MqttPayload): +def on_mqtt_message(node: MqttNode, + message: MqttPayload): if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload): kwargs = dict(rssi=message.rssi, enabled=message.flags.state) if device_id not in relay_states: @@ -28,17 +31,22 @@ class RelayMqttHttpProxy(http.HTTPServer): async def _relay_on_off(self, enable: Optional[bool], req: http.Request): - device_id = req.match_info['id'] - device_secret = req.query['secret'] + node_id = req.match_info['id'] + node_secret = req.query['secret'] + + node = mqtt_nodes[node_id] + relay_module = relay_modules[node_id] if enable is None: - if device_id in relay_states and relay_states[device_id].ever_updated: - cur_state = relay_states[device_id].enabled + if node_id in relay_states and relay_states[node_id].ever_updated: + cur_state = relay_states[node_id].enabled else: cur_state = False enable = not cur_state - mqtt_relay.set_power(device_id, enable, device_secret) + if not node.secret: + node.secret = node_secret + relay_module.switchpower(enable) return self.ok() async def relay_on(self, req: http.Request): @@ -54,13 +62,20 @@ class RelayMqttHttpProxy(http.HTTPServer): if __name__ == '__main__': config.load('relay_mqtt_http_proxy') - mqtt_relay = MqttRelay(devices=[MqttEspDevice(id=device_id) for device_id in config.get('relay.devices')]) - mqtt_relay.configure_tls() - mqtt_relay.set_message_callback(on_mqtt_message) - mqtt_relay.connect_and_loop(loop_forever=False) + mqtt = MqttWrapper() + for device_id, data in config['relays'].items(): + mqtt_node = MqttNode(node_id=device_id) + relay_modules[device_id] = mqtt_node.load_module('relay') + mqtt_nodes[device_id] = mqtt_node + mqtt_node.add_payload_callback(on_mqtt_message) + mqtt.add_node(mqtt_node) + mqtt_node.add_payload_callback(on_mqtt_message) + + mqtt.configure_tls() + mqtt.connect_and_loop(loop_forever=False) proxy = RelayMqttHttpProxy(config.get_addr('server.listen')) try: proxy.run() except KeyboardInterrupt: - mqtt_relay.disconnect() + mqtt.disconnect() diff --git a/src/sensors_mqtt_sender.py b/src/sensors_mqtt_sender.py deleted file mode 100755 index 393962a..0000000 --- a/src/sensors_mqtt_sender.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -import time -import json - -from home.util import parse_addr, MySimpleSocketClient -from home.mqtt import Mqtt, poll_tick -from home.mqtt.payload.sensors import Temperature -from home.config import config - - -class MqttClient(Mqtt): - def __init__(self): - super().__init__(self) - self._home_id = config['mqtt']['home_id'] - - def poll(self): - freq = int(config['mqtt']['sensors']['poll_freq']) - self._logger.debug(f'freq={freq}') - - g = poll_tick(freq) - while True: - time.sleep(next(g)) - for k, v in config['mqtt']['sensors']['si7021'].items(): - host, port = parse_addr(v['addr']) - self.publish_si7021(host, port, k) - - def publish_si7021(self, host: str, port: int, name: str): - self._logger.debug(f"publish_si7021/{name}: {host}:{port}") - - try: - now = time.time() - socket = MySimpleSocketClient(host, port) - - socket.write('read') - response = json.loads(socket.read().strip()) - - temp = response['temp'] - humidity = response['humidity'] - - self._logger.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}') - - pld = Temperature(time=round(now), - temp=temp, - rh=humidity) - self._client.publish(f'hk/{self._home_id}/si7021/{name}', - payload=pld.pack(), - qos=1) - except Exception as e: - self._logger.exception(e) - - -if __name__ == '__main__': - config.load('sensors_mqtt_sender') - - client = MqttClient() - client.configure_tls() - client.connect_and_loop(loop_forever=False) - client.poll() diff --git a/src/sensors_mqtt_receiver.py b/src/temphum_mqtt_receiver.py index f7cb467..970d92e 100755 --- a/src/sensors_mqtt_receiver.py +++ b/src/temphum_mqtt_receiver.py @@ -2,18 +2,8 @@ import paho.mqtt.client as mqtt import re -from home.mqtt import Mqtt from home.config import config -from home.mqtt.payload.sensors import Temperature -from home.api.types import TemperatureSensorLocation -from home.database import SensorsDatabase - - -def get_sensor_type(sensor: str) -> TemperatureSensorLocation: - for item in TemperatureSensorLocation: - if sensor == item.name.lower(): - return item - raise ValueError(f'unexpected sensor value: {sensor}') +from home.mqtt import MqttWrapper, MqttNode class MqttServer(Mqtt): @@ -47,7 +37,12 @@ class MqttServer(Mqtt): if __name__ == '__main__': - config.load('sensors_mqtt_receiver') + config.load('temphum_mqtt_receiver') + + mqtt = MqttWrapper(clean_session=False) + node = MqttNode(node_id='+') + node.load_module('temphum', write_to_database=True) + mqtt.add_node(node) - server = MqttServer() - server.connect_and_loop() + mqtt.configure_tls() + mqtt.connect_and_loop()
\ No newline at end of file diff --git a/systemd/sensors_mqtt_receiver.service b/systemd/sensors_mqtt_receiver.service index e67c112..5b9ff6a 100644 --- a/systemd/sensors_mqtt_receiver.service +++ b/systemd/sensors_mqtt_receiver.service @@ -1,12 +1,12 @@ [Unit] -Description=sensors mqtt receiver +Description=temphum mqtt receiver After=network.target [Service] User=user Group=user Restart=on-failure -ExecStart=python3 /home/user/home/src/sensors_mqtt_receiver.py +ExecStart=python3 /home/user/home/src/temphum_mqtt_receiver.py WorkingDirectory=/home/user [Install] diff --git a/systemd/sensors_mqtt_sender.service b/systemd/sensors_mqtt_sender.service deleted file mode 100644 index a271d72..0000000 --- a/systemd/sensors_mqtt_sender.service +++ /dev/null @@ -1,13 +0,0 @@ -[Unit] -Description=Sensors MQTT sender -After=temphumd.service - -[Service] -User=user -Group=user -Restart=on-failure -ExecStart=/home/user/homekit/src/sensors_mqtt_sender.py -WorkingDirectory=/home/user - -[Install] -WantedBy=multi-user.target
\ No newline at end of file |