summaryrefslogtreecommitdiff
path: root/src/home/mqtt/_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/home/mqtt/_config.py')
-rw-r--r--src/home/mqtt/_config.py165
1 files changed, 0 insertions, 165 deletions
diff --git a/src/home/mqtt/_config.py b/src/home/mqtt/_config.py
deleted file mode 100644
index 9ba9443..0000000
--- a/src/home/mqtt/_config.py
+++ /dev/null
@@ -1,165 +0,0 @@
-from ..config import ConfigUnit
-from typing import Optional, Union
-from ..util import Addr
-from collections import namedtuple
-
-MqttCreds = namedtuple('MqttCreds', 'username, password')
-
-
-class MqttConfig(ConfigUnit):
- NAME = 'mqtt'
-
- @classmethod
- def schema(cls) -> Optional[dict]:
- addr_schema = {
- 'type': 'dict',
- 'required': True,
- 'schema': {
- 'host': {'type': 'string', 'required': True},
- 'port': {'type': 'integer', 'required': True}
- }
- }
-
- schema = {}
- for key in ('local', 'remote'):
- schema[f'{key}_addr'] = addr_schema
-
- schema['creds'] = {
- 'type': 'dict',
- 'required': True,
- 'keysrules': {'type': 'string'},
- 'valuesrules': {
- 'type': 'dict',
- 'schema': {
- 'username': {'type': 'string', 'required': True},
- 'password': {'type': 'string', 'required': True},
- }
- }
- }
-
- for key in ('client', 'server'):
- schema[f'default_{key}_creds'] = {'type': 'string', 'required': True}
-
- return schema
-
- def remote_addr(self) -> Addr:
- return Addr(host=self['remote_addr']['host'],
- port=self['remote_addr']['port'])
-
- def local_addr(self) -> Addr:
- return Addr(host=self['local_addr']['host'],
- port=self['local_addr']['port'])
-
- def creds_by_name(self, name: str) -> MqttCreds:
- return MqttCreds(username=self['creds'][name]['username'],
- password=self['creds'][name]['password'])
-
- def creds(self) -> MqttCreds:
- return self.creds_by_name(self['default_client_creds'])
-
- def server_creds(self) -> MqttCreds:
- return self.creds_by_name(self['default_server_creds'])
-
-
-class MqttNodesConfig(ConfigUnit):
- NAME = 'mqtt_nodes'
-
- @classmethod
- def schema(cls) -> Optional[dict]:
- return {
- 'common': {
- 'type': 'dict',
- 'schema': {
- 'temphum': {
- 'type': 'dict',
- 'schema': {
- 'interval': {'type': 'integer'}
- }
- },
- 'password': {'type': 'string'}
- }
- },
- 'nodes': {
- 'type': 'dict',
- 'required': True,
- 'keysrules': {'type': 'string'},
- 'valuesrules': {
- 'type': 'dict',
- 'schema': {
- 'type': {'type': 'string', 'required': True, 'allowed': ['esp8266', 'linux', 'none'],},
- 'board': {'type': 'string', 'allowed': ['nodemcu', 'd1_mini_lite', 'esp12e']},
- 'temphum': {
- 'type': 'dict',
- 'schema': {
- 'module': {'type': 'string', 'required': True, 'allowed': ['si7021', 'dht12']},
- 'interval': {'type': 'integer'},
- 'i2c_bus': {'type': 'integer'},
- 'tcpserver': {
- 'type': 'dict',
- 'schema': {
- 'port': {'type': 'integer', 'required': True}
- }
- }
- }
- },
- 'relay': {
- 'type': 'dict',
- 'schema': {
- 'device_type': {'type': 'string', 'allowed': ['lamp', 'pump', 'solenoid'], 'required': True},
- 'legacy_topics': {'type': 'boolean'}
- }
- },
- 'password': {'type': 'string'}
- }
- }
- }
- }
-
- @staticmethod
- def custom_validator(data):
- for name, node in data['nodes'].items():
- if 'temphum' in node:
- if node['type'] == 'linux':
- if 'i2c_bus' not in node['temphum']:
- raise KeyError(f'nodes.{name}.temphum: i2c_bus is missing but required for type=linux')
- if node['type'] in ('esp8266',) and 'board' not in node:
- raise KeyError(f'nodes.{name}: board is missing but required for type={node["type"]}')
-
- def get_node(self, name: str) -> dict:
- node = self['nodes'][name]
- if node['type'] == 'none':
- return node
-
- try:
- if 'password' not in node:
- node['password'] = self['common']['password']
- except KeyError:
- pass
-
- try:
- if 'temphum' in node:
- for ckey, cval in self['common']['temphum'].items():
- if ckey not in node['temphum']:
- node['temphum'][ckey] = cval
- except KeyError:
- pass
-
- return node
-
- def get_nodes(self,
- filters: Optional[Union[list[str], tuple[str]]] = None,
- only_names=False) -> Union[dict, list[str]]:
- if filters:
- for f in filters:
- if f not in ('temphum', 'relay'):
- raise ValueError(f'{self.__class__.__name__}::get_node(): invalid filter {f}')
- reslist = []
- resdict = {}
- for name in self['nodes'].keys():
- node = self.get_node(name)
- if (not filters) or ('temphum' in filters and 'temphum' in node) or ('relay' in filters and 'relay' in node):
- if only_names:
- reslist.append(name)
- else:
- resdict[name] = node
- return reslist if only_names else resdict