summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-24 02:00:40 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-24 02:00:40 +0300
commitc9b351a08e31aa0c34892065f02f9ef710b6cd34 (patch)
tree509b7886b76c8f432ad9809945d59b0c5e3aca77
parent2a5c34b28d7842ee2de2937faa36f78a1f5364fd (diff)
mqtt changes
-rwxr-xr-xbin/mqtt_node_util.py7
-rwxr-xr-xbin/temphum_mqtt_node.py114
-rw-r--r--include/py/homekit/mqtt/_wrapper.py1
-rw-r--r--include/py/homekit/mqtt/module/ota.py6
-rw-r--r--include/py/homekit/temphum/base.py2
5 files changed, 74 insertions, 56 deletions
diff --git a/bin/mqtt_node_util.py b/bin/mqtt_node_util.py
index a685f26..657fcf9 100755
--- a/bin/mqtt_node_util.py
+++ b/bin/mqtt_node_util.py
@@ -54,6 +54,8 @@ if __name__ == '__main__':
help='send relay state')
parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME',
help='push OTA, receives path to firmware.bin (not .elf!)')
+ parser.add_argument('--custom-ota-topic', type=str,
+ help='only needed for update very old devices')
parser.add_argument('--no-wait', action='store_true',
help='execute command and exit')
@@ -86,7 +88,10 @@ if __name__ == '__main__':
mqtt.add_node(mqtt_node)
# must-have modules
- ota_module = mqtt_node.load_module('ota')
+ ota_kwargs = {}
+ if arg.custom_ota_topic:
+ ota_kwargs['custom_ota_topic'] = arg.custom_ota_topic
+ ota_module = mqtt_node.load_module('ota', **ota_kwargs)
ota_val = arg.push_ota
mqtt_node.load_module('diagnostics')
diff --git a/bin/temphum_mqtt_node.py b/bin/temphum_mqtt_node.py
index 9ea436d..4259f65 100755
--- a/bin/temphum_mqtt_node.py
+++ b/bin/temphum_mqtt_node.py
@@ -1,79 +1,91 @@
#!/usr/bin/env python3
+import __py_include
import asyncio
-import json
import logging
-import __py_include
+from datetime import datetime
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
from typing import Optional
-
+from argparse import ArgumentParser
from homekit.config import config
+from homekit.mqtt import MqttNodesConfig, MqttNode, MqttWrapper
+from homekit.mqtt.module.temphum import MqttTempHumModule, MqttTemphumDataPayload
from homekit.temphum import SensorType, BaseSensor
from homekit.temphum.i2c import create_sensor
-logger = logging.getLogger(__name__)
-sensor: Optional[BaseSensor] = None
-lock = asyncio.Lock()
-delay = 0.01
+_logger = logging.getLogger(__name__)
+_sensor: Optional[BaseSensor] = None
+_lock = asyncio.Lock()
+_mqtt: MqttWrapper
+_mqtt_ndoe: MqttNode
+_mqtt_temphum: MqttTempHumModule
+_stopped = True
+_scheduler = AsyncIOScheduler()
+_sched_task_added = False
async def get_measurements():
- async with lock:
- await asyncio.sleep(delay)
-
- temp = sensor.temperature()
- rh = sensor.humidity()
+ async with _lock:
+ temp = _sensor.temperature()
+ rh = _sensor.humidity()
return rh, temp
-async def handle_client(reader, writer):
- request = None
- while request != 'quit':
- try:
- request = await reader.read(255)
- if request == b'\x04':
- break
- request = request.decode('utf-8').strip()
- except Exception:
- break
+def on_mqtt_connect():
+ global _stopped, _sched_task_added
+ _stopped = False
+
+ if not _sched_task_added:
+ _scheduler.add_job(on_sched_task, 'interval', seconds=60, next_run_time=datetime.now())
+ _scheduler.start()
+ _sched_task_added = True
- if request == 'read':
- try:
- rh, temp = await asyncio.wait_for(get_measurements(), timeout=3)
- data = dict(humidity=rh, temp=temp)
- except asyncio.TimeoutError as e:
- logger.exception(e)
- data = dict(error='i2c call timed out')
- else:
- data = dict(error='invalid request')
- writer.write((json.dumps(data) + '\r\n').encode('utf-8'))
- try:
- await writer.drain()
- except ConnectionResetError:
- pass
+def on_mqtt_disconnect():
+ global _stopped
+ _stopped = True
- writer.close()
+async def on_sched_task():
+ if _stopped:
+ return
-async def run_server(host, port):
- server = await asyncio.start_server(handle_client, host, port)
- async with server:
- logger.info('Server started.')
- await server.serve_forever()
+ rh, temp = await get_measurements()
+ payload = MqttTemphumDataPayload(temp=temp, rh=rh)
+ _mqtt_node.publish('data', payload.pack())
if __name__ == '__main__':
- config.load_app()
+ parser = ArgumentParser()
+ parser.add_argument('--node-id',
+ type=str,
+ required=True,
+ choices=MqttNodesConfig().get_nodes(only_names=True),
+ help='node id must be defined in the config')
+ args = config.load_app(parser=parser)
- if 'measure_delay' in config['sensor']:
- delay = float(config['sensor']['measure_delay'])
+ node_cfg = MqttNodesConfig()[args.node_id]
+ _sensor = create_sensor(SensorType(node_cfg['temphum']['module']),
+ int(node_cfg['temphum']['i2c_bus']))
- sensor = create_sensor(SensorType(config['sensor']['type']),
- int(config['sensor']['bus']))
+ _mqtt = MqttWrapper(client_id=args.node_id)
+ _mqtt.add_connect_callback(on_mqtt_connect)
+ _mqtt.add_disconnect_callback(on_mqtt_disconnect)
+
+ _mqtt_node = MqttNode(node_id=args.node_id,
+ node_secret=MqttNodesConfig.get_node(args.node_id)['password'])
+ _mqtt.add_node(_mqtt_node)
+
+ _mqtt_temphum = _mqtt_node.load_module('temphum')
try:
- host, port = config.get_addr('server.listen')
- asyncio.run(run_server(host, port))
- except KeyboardInterrupt:
- logging.info('Exiting...')
+ _mqtt.connect_and_loop(loop_forever=True)
+
+ except (KeyboardInterrupt, SystemExit):
+ if _scheduler:
+ _scheduler.shutdown()
+ _logger.info('Exiting...')
+
+ finally:
+ _mqtt.disconnect() \ No newline at end of file
diff --git a/include/py/homekit/mqtt/_wrapper.py b/include/py/homekit/mqtt/_wrapper.py
index 5fc33fe..68af093 100644
--- a/include/py/homekit/mqtt/_wrapper.py
+++ b/include/py/homekit/mqtt/_wrapper.py
@@ -44,7 +44,6 @@ class MqttWrapper(Mqtt):
except Exception as e:
self._logger.exception(e)
-
def on_message(self, client: mqtt.Client, userdata, msg):
try:
topic = msg.topic
diff --git a/include/py/homekit/mqtt/module/ota.py b/include/py/homekit/mqtt/module/ota.py
index 2f9b216..16931e9 100644
--- a/include/py/homekit/mqtt/module/ota.py
+++ b/include/py/homekit/mqtt/module/ota.py
@@ -42,10 +42,12 @@ class OtaPayload(MqttPayload):
class MqttOtaModule(MqttModule):
_ota_request: Optional[tuple[str, int]]
+ _custom_ota_topic: Optional[str]
- def __init__(self, *args, **kwargs):
+ def __init__(self, custom_ota_topic=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._ota_request = None
+ self._custom_ota_topic = custom_ota_topic
def on_connect(self, mqtt: MqttNode):
super().on_connect(mqtt)
@@ -64,7 +66,7 @@ class MqttOtaModule(MqttModule):
def do_push_ota(self, secret: str, filename: str, qos: int):
payload = OtaPayload(secret=secret, filename=filename)
- self._mqtt_node_ref.publish('ota',
+ self._mqtt_node_ref.publish('ota' if not self._custom_ota_topic else self._custom_ota_topic,
payload=payload.pack(),
qos=qos)
diff --git a/include/py/homekit/temphum/base.py b/include/py/homekit/temphum/base.py
index 602cab7..c9d5111 100644
--- a/include/py/homekit/temphum/base.py
+++ b/include/py/homekit/temphum/base.py
@@ -16,4 +16,4 @@ class BaseSensor(ABC):
class SensorType(Enum):
Si7021 = 'si7021'
- DHT12 = 'dht12' \ No newline at end of file
+ DHT12 = 'dht12'