From c9b351a08e31aa0c34892065f02f9ef710b6cd34 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 24 Feb 2024 02:00:40 +0300 Subject: mqtt changes --- bin/mqtt_node_util.py | 7 ++- bin/temphum_mqtt_node.py | 114 +++++++++++++++++++--------------- include/py/homekit/mqtt/_wrapper.py | 1 - include/py/homekit/mqtt/module/ota.py | 6 +- include/py/homekit/temphum/base.py | 2 +- 5 files changed, 74 insertions(+), 56 deletions(-) diff --git a/bin/mqtt_node_util.py b/bin/mqtt_node_util.py index a685f26..657fcf9 100755 --- a/bin/mqtt_node_util.py +++ b/bin/mqtt_node_util.py @@ -54,6 +54,8 @@ if __name__ == '__main__': help='send relay state') parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME', help='push OTA, receives path to firmware.bin (not .elf!)') + parser.add_argument('--custom-ota-topic', type=str, + help='only needed for update very old devices') parser.add_argument('--no-wait', action='store_true', help='execute command and exit') @@ -86,7 +88,10 @@ if __name__ == '__main__': mqtt.add_node(mqtt_node) # must-have modules - ota_module = mqtt_node.load_module('ota') + ota_kwargs = {} + if arg.custom_ota_topic: + ota_kwargs['custom_ota_topic'] = arg.custom_ota_topic + ota_module = mqtt_node.load_module('ota', **ota_kwargs) ota_val = arg.push_ota mqtt_node.load_module('diagnostics') diff --git a/bin/temphum_mqtt_node.py b/bin/temphum_mqtt_node.py index 9ea436d..4259f65 100755 --- a/bin/temphum_mqtt_node.py +++ b/bin/temphum_mqtt_node.py @@ -1,79 +1,91 @@ #!/usr/bin/env python3 +import __py_include import asyncio -import json import logging -import __py_include +from datetime import datetime +from apscheduler.schedulers.asyncio import AsyncIOScheduler from typing import Optional - +from argparse import ArgumentParser from homekit.config import config +from homekit.mqtt import MqttNodesConfig, MqttNode, MqttWrapper +from homekit.mqtt.module.temphum import MqttTempHumModule, MqttTemphumDataPayload from homekit.temphum import SensorType, BaseSensor from homekit.temphum.i2c import create_sensor -logger = logging.getLogger(__name__) -sensor: Optional[BaseSensor] = None -lock = asyncio.Lock() -delay = 0.01 +_logger = logging.getLogger(__name__) +_sensor: Optional[BaseSensor] = None +_lock = asyncio.Lock() +_mqtt: MqttWrapper +_mqtt_ndoe: MqttNode +_mqtt_temphum: MqttTempHumModule +_stopped = True +_scheduler = AsyncIOScheduler() +_sched_task_added = False async def get_measurements(): - async with lock: - await asyncio.sleep(delay) - - temp = sensor.temperature() - rh = sensor.humidity() + async with _lock: + temp = _sensor.temperature() + rh = _sensor.humidity() return rh, temp -async def handle_client(reader, writer): - request = None - while request != 'quit': - try: - request = await reader.read(255) - if request == b'\x04': - break - request = request.decode('utf-8').strip() - except Exception: - break +def on_mqtt_connect(): + global _stopped, _sched_task_added + _stopped = False + + if not _sched_task_added: + _scheduler.add_job(on_sched_task, 'interval', seconds=60, next_run_time=datetime.now()) + _scheduler.start() + _sched_task_added = True - if request == 'read': - try: - rh, temp = await asyncio.wait_for(get_measurements(), timeout=3) - data = dict(humidity=rh, temp=temp) - except asyncio.TimeoutError as e: - logger.exception(e) - data = dict(error='i2c call timed out') - else: - data = dict(error='invalid request') - writer.write((json.dumps(data) + '\r\n').encode('utf-8')) - try: - await writer.drain() - except ConnectionResetError: - pass +def on_mqtt_disconnect(): + global _stopped + _stopped = True - writer.close() +async def on_sched_task(): + if _stopped: + return -async def run_server(host, port): - server = await asyncio.start_server(handle_client, host, port) - async with server: - logger.info('Server started.') - await server.serve_forever() + rh, temp = await get_measurements() + payload = MqttTemphumDataPayload(temp=temp, rh=rh) + _mqtt_node.publish('data', payload.pack()) if __name__ == '__main__': - config.load_app() + parser = ArgumentParser() + parser.add_argument('--node-id', + type=str, + required=True, + choices=MqttNodesConfig().get_nodes(only_names=True), + help='node id must be defined in the config') + args = config.load_app(parser=parser) - if 'measure_delay' in config['sensor']: - delay = float(config['sensor']['measure_delay']) + node_cfg = MqttNodesConfig()[args.node_id] + _sensor = create_sensor(SensorType(node_cfg['temphum']['module']), + int(node_cfg['temphum']['i2c_bus'])) - sensor = create_sensor(SensorType(config['sensor']['type']), - int(config['sensor']['bus'])) + _mqtt = MqttWrapper(client_id=args.node_id) + _mqtt.add_connect_callback(on_mqtt_connect) + _mqtt.add_disconnect_callback(on_mqtt_disconnect) + + _mqtt_node = MqttNode(node_id=args.node_id, + node_secret=MqttNodesConfig.get_node(args.node_id)['password']) + _mqtt.add_node(_mqtt_node) + + _mqtt_temphum = _mqtt_node.load_module('temphum') try: - host, port = config.get_addr('server.listen') - asyncio.run(run_server(host, port)) - except KeyboardInterrupt: - logging.info('Exiting...') + _mqtt.connect_and_loop(loop_forever=True) + + except (KeyboardInterrupt, SystemExit): + if _scheduler: + _scheduler.shutdown() + _logger.info('Exiting...') + + finally: + _mqtt.disconnect() \ No newline at end of file diff --git a/include/py/homekit/mqtt/_wrapper.py b/include/py/homekit/mqtt/_wrapper.py index 5fc33fe..68af093 100644 --- a/include/py/homekit/mqtt/_wrapper.py +++ b/include/py/homekit/mqtt/_wrapper.py @@ -44,7 +44,6 @@ class MqttWrapper(Mqtt): except Exception as e: self._logger.exception(e) - def on_message(self, client: mqtt.Client, userdata, msg): try: topic = msg.topic diff --git a/include/py/homekit/mqtt/module/ota.py b/include/py/homekit/mqtt/module/ota.py index 2f9b216..16931e9 100644 --- a/include/py/homekit/mqtt/module/ota.py +++ b/include/py/homekit/mqtt/module/ota.py @@ -42,10 +42,12 @@ class OtaPayload(MqttPayload): class MqttOtaModule(MqttModule): _ota_request: Optional[tuple[str, int]] + _custom_ota_topic: Optional[str] - def __init__(self, *args, **kwargs): + def __init__(self, custom_ota_topic=None, *args, **kwargs): super().__init__(*args, **kwargs) self._ota_request = None + self._custom_ota_topic = custom_ota_topic def on_connect(self, mqtt: MqttNode): super().on_connect(mqtt) @@ -64,7 +66,7 @@ class MqttOtaModule(MqttModule): def do_push_ota(self, secret: str, filename: str, qos: int): payload = OtaPayload(secret=secret, filename=filename) - self._mqtt_node_ref.publish('ota', + self._mqtt_node_ref.publish('ota' if not self._custom_ota_topic else self._custom_ota_topic, payload=payload.pack(), qos=qos) diff --git a/include/py/homekit/temphum/base.py b/include/py/homekit/temphum/base.py index 602cab7..c9d5111 100644 --- a/include/py/homekit/temphum/base.py +++ b/include/py/homekit/temphum/base.py @@ -16,4 +16,4 @@ class BaseSensor(ABC): class SensorType(Enum): Si7021 = 'si7021' - DHT12 = 'dht12' \ No newline at end of file + DHT12 = 'dht12' -- cgit v1.2.3