summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/home/config/_validators.py46
-rw-r--r--src/home/config/config.py59
-rw-r--r--src/home/mqtt/_node.py16
-rw-r--r--src/home/mqtt/_wrapper.py8
-rw-r--r--src/home/mqtt/module/ota.py11
-rw-r--r--src/home/mqtt/module/relay.py6
-rw-r--r--src/home/mqtt/module/temphum.py1
-rwxr-xr-xsrc/inverter_mqtt_util.py2
-rwxr-xr-xsrc/mqtt_node_util.py8
-rwxr-xr-xsrc/pump_bot.py2
-rwxr-xr-xsrc/relay_mqtt_bot.py45
-rwxr-xr-xsrc/relay_mqtt_http_proxy.py43
-rwxr-xr-xsrc/sensors_mqtt_sender.py58
-rwxr-xr-xsrc/temphum_mqtt_receiver.py (renamed from src/sensors_mqtt_receiver.py)23
14 files changed, 182 insertions, 146 deletions
diff --git a/src/home/config/_validators.py b/src/home/config/_validators.py
new file mode 100644
index 0000000..963a25f
--- /dev/null
+++ b/src/home/config/_validators.py
@@ -0,0 +1,46 @@
+import logging
+import inspect
+
+from cerberus import Validator, DocumentError
+
+
+__all__ = [
+ 'linux_boards_validator'
+]
+
+_logger = logging.getLogger(__name__)
+
+
+def validate(schema, data):
+ v = Validator(schema)
+ if not v.validate(data):
+ frame = inspect.currentframe().f_back
+ caller_name = frame.f_code.co_name
+ raise DocumentError(f'{caller_name}: failed to validate data: ' + v.errors)
+
+
+def linux_boards_validator(data) -> None:
+ validate({
+ 'type': 'dict',
+ 'valuesrules': {
+ 'type': 'dict',
+ 'schema': {
+ 'mdns': {'type': 'string', 'required': True},
+ 'board': {'type': 'string', 'required': True},
+ 'network': {'type': 'list', 'required': True, 'empty': False},
+ 'ram': {'type': 'integer', 'required': True},
+ 'ext_hdd': {
+ 'type': 'list',
+ 'schema': {
+ 'type': 'dict',
+ 'schema': {
+ 'mountpoint': {'type': 'string', 'required': True},
+ 'size': {'type': 'integer', 'required': True}
+ }
+ },
+ },
+ 'services': {'type': 'list', 'empty': False},
+ 'online': {'type': 'boolean', 'required': True}
+ }
+ }
+ }, data)
diff --git a/src/home/config/config.py b/src/home/config/config.py
index 4681685..b79fecb 100644
--- a/src/home/config/config.py
+++ b/src/home/config/config.py
@@ -7,39 +7,50 @@ from os.path import join, isdir, isfile
from typing import Optional, Any, MutableMapping
from argparse import ArgumentParser
from ..util import parse_addr
+import _validators as validators
-def _get_config_path(name: str) -> str:
- formats = ['toml', 'yaml']
+_validators = {}
+
+
+def _get_validator(name: str) -> Optional[callable]:
+ if hasattr(validators, f'{name}_validator'):
+ return getattr(validators, f'{name}_validator')
+ if name in _validators:
+ return _validators[name]
+ return None
+
- dirname = join(os.environ['HOME'], '.config', name)
+def add_validator(name: str, f: callable):
+ _validators[name] = f
- if isdir(dirname):
- for fmt in formats:
- filename = join(dirname, f'config.{fmt}')
- if isfile(filename):
- return filename
- raise IOError(f'config not found in {dirname}')
+def _get_config_path(name: str) -> str:
+ formats = ['toml', 'yaml']
+
+ dirnames = [
+ join(os.environ['HOME'], '.config', 'homekit'),
+ '/etc/homekit'
+ ]
- else:
- filenames = [join(os.environ['HOME'], '.config', f'{name}.{format}') for format in formats]
- for file in filenames:
- if isfile(file):
- return file
+ for dirname in dirnames:
+ if isdir(dirname):
+ for fmt in formats:
+ filename = join(dirname, f'{name}.{fmt}')
+ if isfile(filename):
+ return filename
- raise IOError(f'config not found')
+ raise IOError(f'config \'{name}\' not found')
-class ConfigStore:
+class SingleConfig:
data: MutableMapping[str, Any]
- app_name: Optional[str]
def __init__(self):
self.data = {}
- self.app_name = None
- def load(self, name: Optional[str] = None,
+ def load(self,
+ name: Optional[str] = None,
use_cli=True,
parser: ArgumentParser = None):
self.app_name = name
@@ -126,6 +137,16 @@ class ConfigStore:
return self.data.items()
+class Config:
+ app_name: Optional[str]
+
+ def __init__(self):
+
+ self.app_name = None
+
+
+
+
config = ConfigStore()
diff --git a/src/home/mqtt/_node.py b/src/home/mqtt/_node.py
index ddf5ba2..4e259a4 100644
--- a/src/home/mqtt/_node.py
+++ b/src/home/mqtt/_node.py
@@ -14,13 +14,17 @@ class MqttNode:
_modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule]
_node_id: str
+ _node_secret: str
_payload_callbacks: list[callable]
_wrapper: Optional[MqttWrapper]
- def __init__(self, node_id: str):
+ def __init__(self,
+ node_id: str,
+ node_secret: Optional[str] = None):
self._modules = []
self._module_subscriptions = {}
self._node_id = node_id
+ self._node_secret = node_secret
self._payload_callbacks = []
self._logger = logging.getLogger(self.__class__.__name__)
self._wrapper = None
@@ -42,7 +46,7 @@ class MqttNode:
payload = self._module_subscriptions[topic].handle_payload(self, topic, payload)
if isinstance(payload, MqttPayload):
for f in self._payload_callbacks:
- f(payload)
+ f(self, payload)
def load_module(self, module_name: str, *args, **kwargs) -> MqttModule:
module = importlib.import_module(f'..module.{module_name}', __name__)
@@ -78,3 +82,11 @@ class MqttNode:
@property
def id(self) -> str:
return self._node_id
+
+ @property
+ def secret(self) -> str:
+ return self._node_secret
+
+ @secret.setter
+ def secret(self, secret: str) -> None:
+ self._node_secret = secret
diff --git a/src/home/mqtt/_wrapper.py b/src/home/mqtt/_wrapper.py
index 41f9d89..0b32197 100644
--- a/src/home/mqtt/_wrapper.py
+++ b/src/home/mqtt/_wrapper.py
@@ -9,11 +9,15 @@ from ..util import strgen
class MqttWrapper(Mqtt):
_nodes: list[MqttNode]
- def __init__(self, topic_prefix='hk', randomize_client_id=False):
+ def __init__(self,
+ topic_prefix='hk',
+ randomize_client_id=False,
+ clean_session=True):
client_id = config['mqtt']['client_id']
if randomize_client_id:
client_id += '_'+strgen(6)
- super().__init__(clean_session=True, client_id=client_id)
+ super().__init__(clean_session=clean_session,
+ client_id=client_id)
self._nodes = []
self._topic_prefix = topic_prefix
diff --git a/src/home/mqtt/module/ota.py b/src/home/mqtt/module/ota.py
index e71cccc..70c5475 100644
--- a/src/home/mqtt/module/ota.py
+++ b/src/home/mqtt/module/ota.py
@@ -41,7 +41,7 @@ class OtaPayload(MqttPayload):
class MqttOtaModule(MqttModule):
- _ota_request: Optional[tuple[str, str, int]]
+ _ota_request: Optional[tuple[str, int]]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -52,9 +52,9 @@ class MqttOtaModule(MqttModule):
mqtt.subscribe_module("otares", self)
if self._ota_request is not None:
- secret, filename, qos = self._ota_request
+ filename, qos = self._ota_request
self._ota_request = None
- self.do_push_ota(secret, filename, qos)
+ self.do_push_ota(self._mqtt_node_ref.secret, filename, qos)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic == 'otares':
@@ -69,10 +69,9 @@ class MqttOtaModule(MqttModule):
qos=qos)
def push_ota(self,
- secret: str,
filename: str,
qos: int):
if not self._initialized:
- self._ota_request = (secret, filename, qos)
+ self._ota_request = (filename, qos)
else:
- self.do_push_ota(secret, filename, qos)
+ self.do_push_ota(filename, qos)
diff --git a/src/home/mqtt/module/relay.py b/src/home/mqtt/module/relay.py
index ae88ddb..5383fb6 100644
--- a/src/home/mqtt/module/relay.py
+++ b/src/home/mqtt/module/relay.py
@@ -64,9 +64,9 @@ class MqttRelayModule(MqttModule):
mqtt.subscribe_module('relay/status', self)
def switchpower(self,
- enable: bool,
- secret: str):
- payload = MqttPowerSwitchPayload(secret=secret, state=enable)
+ enable: bool):
+ payload = MqttPowerSwitchPayload(secret=self._mqtt_node_ref.secret,
+ state=enable)
self._mqtt_node_ref.publish('relay/switch', payload=payload.pack())
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py
index 83ae34d..0e22793 100644
--- a/src/home/mqtt/module/temphum.py
+++ b/src/home/mqtt/module/temphum.py
@@ -48,6 +48,7 @@ class MqttTemphumDataPayload(MqttPayload):
class MqttTempHumModule(MqttModule):
def __init__(self,
sensor: Optional[BaseSensor] = None,
+ write_to_database=False,
*args, **kwargs):
if sensor is not None:
kwargs['tick_interval'] = 10
diff --git a/src/inverter_mqtt_util.py b/src/inverter_mqtt_util.py
index edea29a..fa3bdf5 100755
--- a/src/inverter_mqtt_util.py
+++ b/src/inverter_mqtt_util.py
@@ -11,7 +11,7 @@ if __name__ == '__main__':
config.load('inverter_mqtt_util', parser=parser)
arg = parser.parse_args()
- mqtt = MqttWrapper()
+ mqtt = MqttWrapper(clean_session=arg.mode[0] != 'receiver')
node = MqttNode(node_id='inverter')
module_kwargs = {}
if arg.mode[0] == 'sender':
diff --git a/src/mqtt_node_util.py b/src/mqtt_node_util.py
index 0af430a..70eae95 100755
--- a/src/mqtt_node_util.py
+++ b/src/mqtt_node_util.py
@@ -30,7 +30,7 @@ if __name__ == '__main__':
raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules')
mqtt = MqttWrapper(randomize_client_id=True)
- mqtt_node = MqttNode(node_id=arg.node_id)
+ mqtt_node = MqttNode(node_id=arg.node_id, node_secret=arg.node_secret)
mqtt.add_node(mqtt_node)
@@ -44,9 +44,7 @@ if __name__ == '__main__':
if m == 'relay' and arg.switch_relay is not None:
if not arg.node_secret:
raise ArgumentError(None, '--switch-relay requires --node-secret')
- module_instance.switchpower(mqtt_node,
- arg.switch_relay == 1,
- arg.node_secret)
+ module_instance.switchpower(arg.switch_relay == 1)
mqtt.configure_tls()
try:
@@ -58,7 +56,7 @@ if __name__ == '__main__':
if not arg.node_secret:
raise ArgumentError(None, 'pushing OTA requires --node-secret')
- ota_module.push_ota(arg.node_secret, arg.push_ota, 1)
+ ota_module.push_ota(arg.push_ota, 1)
while True:
sleep(0.1)
diff --git a/src/pump_bot.py b/src/pump_bot.py
index ab73097..1d56044 100755
--- a/src/pump_bot.py
+++ b/src/pump_bot.py
@@ -208,7 +208,7 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
-def mqtt_payload_callback(payload: MqttPayload):
+def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload):
global watering_mcu_status
types_the_node_can_send = (
diff --git a/src/relay_mqtt_bot.py b/src/relay_mqtt_bot.py
index de5671c..8fcf423 100755
--- a/src/relay_mqtt_bot.py
+++ b/src/relay_mqtt_bot.py
@@ -6,10 +6,9 @@ from functools import partial
from home.config import config
from home.telegram import bot
-from home.mqtt import MqttRelay, MqttRelayState
-from home.mqtt.esp import MqttEspDevice
-from home.mqtt.payload import MqttPayload
-from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload
+from home.mqtt import MqttPayload, MqttNode, MqttWrapper
+from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
+from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load('relay_mqtt_bot')
@@ -36,7 +35,9 @@ status_emoji = {
}
-mqtt_relay: Optional[MqttRelay] = None
+# mqtt_relay: Optional[MqttRelayModule] = None
+mqtt: Optional[MqttWrapper] = None
+relay_nodes: dict[str, MqttRelayModule] = {}
relay_states: dict[str, MqttRelayState] = {}
@@ -45,23 +46,24 @@ class UserAction(Enum):
OFF = 'off'
-def on_mqtt_message(home_id, message: MqttPayload):
+def on_mqtt_message(node: MqttNode,
+ message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
- if home_id not in relay_states:
- relay_states[home_id] = MqttRelayState()
- relay_states[home_id].update(**kwargs)
+ if node.id not in relay_states:
+ relay_states[node.id] = MqttRelayState()
+ relay_states[node.id].update(**kwargs)
-def enable_handler(home_id: str, ctx: bot.Context) -> None:
- mqtt_relay.set_power(home_id, True)
+def enable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(True)
ctx.reply(ctx.lang('done'))
-def disable_handler(home_id: str, ctx: bot.Context) -> None:
- mqtt_relay.set_power(home_id, False)
+def disable_handler(node_id: str, ctx: bot.Context) -> None:
+ relay_nodes[node_id].switchpower(False)
ctx.reply(ctx.lang('done'))
@@ -88,9 +90,13 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
if __name__ == '__main__':
devices = []
+ mqtt = MqttWrapper()
for device_id, data in config['relays'].items():
- devices.append(MqttEspDevice(id=device_id,
- secret=data['secret']))
+ mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
+ relay_nodes[device_id] = mqtt_node.load_module('relay')
+ mqtt_node.add_payload_callback(on_mqtt_message)
+ mqtt.add_node(mqtt_node)
+
labels = data['labels']
bot.lang.ru(**{device_id: labels['ru']})
bot.lang.en(**{device_id: labels['en']})
@@ -103,12 +109,9 @@ if __name__ == '__main__':
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
- mqtt_relay = MqttRelay(devices=devices)
- mqtt_relay.set_message_callback(on_mqtt_message)
- mqtt_relay.configure_tls()
- mqtt_relay.connect_and_loop(loop_forever=False)
+ mqtt.configure_tls()
+ mqtt.connect_and_loop(loop_forever=False)
- # bot.enable_logging(BotType.RELAY_MQTT)
bot.run(start_handler=start)
- mqtt_relay.disconnect()
+ mqtt.disconnect()
diff --git a/src/relay_mqtt_http_proxy.py b/src/relay_mqtt_http_proxy.py
index 60d2963..05cc88d 100755
--- a/src/relay_mqtt_http_proxy.py
+++ b/src/relay_mqtt_http_proxy.py
@@ -1,16 +1,19 @@
#!/usr/bin/env python3
from home import http
from home.config import config
-from home.mqtt import MqttRelay, MqttRelayState
-from home.mqtt import MqttPayload
+from home.mqtt import MqttPayload, MqttWrapper, MqttNode
+from home.mqtt.module.relay import MqttRelayState, MqttRelayModule
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
from typing import Optional
-mqtt_relay: Optional[MqttRelay] = None
+mqtt: Optional[MqttWrapper] = None
+mqtt_nodes: dict[str, MqttNode] = {}
+relay_modules: dict[str, MqttRelayModule] = {}
relay_states: dict[str, MqttRelayState] = {}
-def on_mqtt_message(device_id, message: MqttPayload):
+def on_mqtt_message(node: MqttNode,
+ message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if device_id not in relay_states:
@@ -28,17 +31,22 @@ class RelayMqttHttpProxy(http.HTTPServer):
async def _relay_on_off(self,
enable: Optional[bool],
req: http.Request):
- device_id = req.match_info['id']
- device_secret = req.query['secret']
+ node_id = req.match_info['id']
+ node_secret = req.query['secret']
+
+ node = mqtt_nodes[node_id]
+ relay_module = relay_modules[node_id]
if enable is None:
- if device_id in relay_states and relay_states[device_id].ever_updated:
- cur_state = relay_states[device_id].enabled
+ if node_id in relay_states and relay_states[node_id].ever_updated:
+ cur_state = relay_states[node_id].enabled
else:
cur_state = False
enable = not cur_state
- mqtt_relay.set_power(device_id, enable, device_secret)
+ if not node.secret:
+ node.secret = node_secret
+ relay_module.switchpower(enable)
return self.ok()
async def relay_on(self, req: http.Request):
@@ -54,13 +62,20 @@ class RelayMqttHttpProxy(http.HTTPServer):
if __name__ == '__main__':
config.load('relay_mqtt_http_proxy')
- mqtt_relay = MqttRelay(devices=[MqttEspDevice(id=device_id) for device_id in config.get('relay.devices')])
- mqtt_relay.configure_tls()
- mqtt_relay.set_message_callback(on_mqtt_message)
- mqtt_relay.connect_and_loop(loop_forever=False)
+ mqtt = MqttWrapper()
+ for device_id, data in config['relays'].items():
+ mqtt_node = MqttNode(node_id=device_id)
+ relay_modules[device_id] = mqtt_node.load_module('relay')
+ mqtt_nodes[device_id] = mqtt_node
+ mqtt_node.add_payload_callback(on_mqtt_message)
+ mqtt.add_node(mqtt_node)
+ mqtt_node.add_payload_callback(on_mqtt_message)
+
+ mqtt.configure_tls()
+ mqtt.connect_and_loop(loop_forever=False)
proxy = RelayMqttHttpProxy(config.get_addr('server.listen'))
try:
proxy.run()
except KeyboardInterrupt:
- mqtt_relay.disconnect()
+ mqtt.disconnect()
diff --git a/src/sensors_mqtt_sender.py b/src/sensors_mqtt_sender.py
deleted file mode 100755
index 393962a..0000000
--- a/src/sensors_mqtt_sender.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3
-import time
-import json
-
-from home.util import parse_addr, MySimpleSocketClient
-from home.mqtt import Mqtt, poll_tick
-from home.mqtt.payload.sensors import Temperature
-from home.config import config
-
-
-class MqttClient(Mqtt):
- def __init__(self):
- super().__init__(self)
- self._home_id = config['mqtt']['home_id']
-
- def poll(self):
- freq = int(config['mqtt']['sensors']['poll_freq'])
- self._logger.debug(f'freq={freq}')
-
- g = poll_tick(freq)
- while True:
- time.sleep(next(g))
- for k, v in config['mqtt']['sensors']['si7021'].items():
- host, port = parse_addr(v['addr'])
- self.publish_si7021(host, port, k)
-
- def publish_si7021(self, host: str, port: int, name: str):
- self._logger.debug(f"publish_si7021/{name}: {host}:{port}")
-
- try:
- now = time.time()
- socket = MySimpleSocketClient(host, port)
-
- socket.write('read')
- response = json.loads(socket.read().strip())
-
- temp = response['temp']
- humidity = response['humidity']
-
- self._logger.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}')
-
- pld = Temperature(time=round(now),
- temp=temp,
- rh=humidity)
- self._client.publish(f'hk/{self._home_id}/si7021/{name}',
- payload=pld.pack(),
- qos=1)
- except Exception as e:
- self._logger.exception(e)
-
-
-if __name__ == '__main__':
- config.load('sensors_mqtt_sender')
-
- client = MqttClient()
- client.configure_tls()
- client.connect_and_loop(loop_forever=False)
- client.poll()
diff --git a/src/sensors_mqtt_receiver.py b/src/temphum_mqtt_receiver.py
index f7cb467..970d92e 100755
--- a/src/sensors_mqtt_receiver.py
+++ b/src/temphum_mqtt_receiver.py
@@ -2,18 +2,8 @@
import paho.mqtt.client as mqtt
import re
-from home.mqtt import Mqtt
from home.config import config
-from home.mqtt.payload.sensors import Temperature
-from home.api.types import TemperatureSensorLocation
-from home.database import SensorsDatabase
-
-
-def get_sensor_type(sensor: str) -> TemperatureSensorLocation:
- for item in TemperatureSensorLocation:
- if sensor == item.name.lower():
- return item
- raise ValueError(f'unexpected sensor value: {sensor}')
+from home.mqtt import MqttWrapper, MqttNode
class MqttServer(Mqtt):
@@ -47,7 +37,12 @@ class MqttServer(Mqtt):
if __name__ == '__main__':
- config.load('sensors_mqtt_receiver')
+ config.load('temphum_mqtt_receiver')
+
+ mqtt = MqttWrapper(clean_session=False)
+ node = MqttNode(node_id='+')
+ node.load_module('temphum', write_to_database=True)
+ mqtt.add_node(node)
- server = MqttServer()
- server.connect_and_loop()
+ mqtt.configure_tls()
+ mqtt.connect_and_loop() \ No newline at end of file