summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2024-02-24 02:00:40 +0300
committerEvgeny Zinoviev <me@ch1p.io>2024-02-24 02:00:40 +0300
commitc9b351a08e31aa0c34892065f02f9ef710b6cd34 (patch)
tree509b7886b76c8f432ad9809945d59b0c5e3aca77 /bin
parent2a5c34b28d7842ee2de2937faa36f78a1f5364fd (diff)
mqtt changes
Diffstat (limited to 'bin')
-rwxr-xr-xbin/mqtt_node_util.py7
-rwxr-xr-xbin/temphum_mqtt_node.py114
2 files changed, 69 insertions, 52 deletions
diff --git a/bin/mqtt_node_util.py b/bin/mqtt_node_util.py
index a685f26..657fcf9 100755
--- a/bin/mqtt_node_util.py
+++ b/bin/mqtt_node_util.py
@@ -54,6 +54,8 @@ if __name__ == '__main__':
help='send relay state')
parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME',
help='push OTA, receives path to firmware.bin (not .elf!)')
+ parser.add_argument('--custom-ota-topic', type=str,
+ help='only needed for update very old devices')
parser.add_argument('--no-wait', action='store_true',
help='execute command and exit')
@@ -86,7 +88,10 @@ if __name__ == '__main__':
mqtt.add_node(mqtt_node)
# must-have modules
- ota_module = mqtt_node.load_module('ota')
+ ota_kwargs = {}
+ if arg.custom_ota_topic:
+ ota_kwargs['custom_ota_topic'] = arg.custom_ota_topic
+ ota_module = mqtt_node.load_module('ota', **ota_kwargs)
ota_val = arg.push_ota
mqtt_node.load_module('diagnostics')
diff --git a/bin/temphum_mqtt_node.py b/bin/temphum_mqtt_node.py
index 9ea436d..4259f65 100755
--- a/bin/temphum_mqtt_node.py
+++ b/bin/temphum_mqtt_node.py
@@ -1,79 +1,91 @@
#!/usr/bin/env python3
+import __py_include
import asyncio
-import json
import logging
-import __py_include
+from datetime import datetime
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
from typing import Optional
-
+from argparse import ArgumentParser
from homekit.config import config
+from homekit.mqtt import MqttNodesConfig, MqttNode, MqttWrapper
+from homekit.mqtt.module.temphum import MqttTempHumModule, MqttTemphumDataPayload
from homekit.temphum import SensorType, BaseSensor
from homekit.temphum.i2c import create_sensor
-logger = logging.getLogger(__name__)
-sensor: Optional[BaseSensor] = None
-lock = asyncio.Lock()
-delay = 0.01
+_logger = logging.getLogger(__name__)
+_sensor: Optional[BaseSensor] = None
+_lock = asyncio.Lock()
+_mqtt: MqttWrapper
+_mqtt_ndoe: MqttNode
+_mqtt_temphum: MqttTempHumModule
+_stopped = True
+_scheduler = AsyncIOScheduler()
+_sched_task_added = False
async def get_measurements():
- async with lock:
- await asyncio.sleep(delay)
-
- temp = sensor.temperature()
- rh = sensor.humidity()
+ async with _lock:
+ temp = _sensor.temperature()
+ rh = _sensor.humidity()
return rh, temp
-async def handle_client(reader, writer):
- request = None
- while request != 'quit':
- try:
- request = await reader.read(255)
- if request == b'\x04':
- break
- request = request.decode('utf-8').strip()
- except Exception:
- break
+def on_mqtt_connect():
+ global _stopped, _sched_task_added
+ _stopped = False
+
+ if not _sched_task_added:
+ _scheduler.add_job(on_sched_task, 'interval', seconds=60, next_run_time=datetime.now())
+ _scheduler.start()
+ _sched_task_added = True
- if request == 'read':
- try:
- rh, temp = await asyncio.wait_for(get_measurements(), timeout=3)
- data = dict(humidity=rh, temp=temp)
- except asyncio.TimeoutError as e:
- logger.exception(e)
- data = dict(error='i2c call timed out')
- else:
- data = dict(error='invalid request')
- writer.write((json.dumps(data) + '\r\n').encode('utf-8'))
- try:
- await writer.drain()
- except ConnectionResetError:
- pass
+def on_mqtt_disconnect():
+ global _stopped
+ _stopped = True
- writer.close()
+async def on_sched_task():
+ if _stopped:
+ return
-async def run_server(host, port):
- server = await asyncio.start_server(handle_client, host, port)
- async with server:
- logger.info('Server started.')
- await server.serve_forever()
+ rh, temp = await get_measurements()
+ payload = MqttTemphumDataPayload(temp=temp, rh=rh)
+ _mqtt_node.publish('data', payload.pack())
if __name__ == '__main__':
- config.load_app()
+ parser = ArgumentParser()
+ parser.add_argument('--node-id',
+ type=str,
+ required=True,
+ choices=MqttNodesConfig().get_nodes(only_names=True),
+ help='node id must be defined in the config')
+ args = config.load_app(parser=parser)
- if 'measure_delay' in config['sensor']:
- delay = float(config['sensor']['measure_delay'])
+ node_cfg = MqttNodesConfig()[args.node_id]
+ _sensor = create_sensor(SensorType(node_cfg['temphum']['module']),
+ int(node_cfg['temphum']['i2c_bus']))
- sensor = create_sensor(SensorType(config['sensor']['type']),
- int(config['sensor']['bus']))
+ _mqtt = MqttWrapper(client_id=args.node_id)
+ _mqtt.add_connect_callback(on_mqtt_connect)
+ _mqtt.add_disconnect_callback(on_mqtt_disconnect)
+
+ _mqtt_node = MqttNode(node_id=args.node_id,
+ node_secret=MqttNodesConfig.get_node(args.node_id)['password'])
+ _mqtt.add_node(_mqtt_node)
+
+ _mqtt_temphum = _mqtt_node.load_module('temphum')
try:
- host, port = config.get_addr('server.listen')
- asyncio.run(run_server(host, port))
- except KeyboardInterrupt:
- logging.info('Exiting...')
+ _mqtt.connect_and_loop(loop_forever=True)
+
+ except (KeyboardInterrupt, SystemExit):
+ if _scheduler:
+ _scheduler.shutdown()
+ _logger.info('Exiting...')
+
+ finally:
+ _mqtt.disconnect() \ No newline at end of file