from ..config import ConfigUnit from typing import Optional, Union from ..util import Addr from collections import namedtuple MqttCreds = namedtuple('MqttCreds', 'username, password') class MqttConfig(ConfigUnit): NAME = 'mqtt' @classmethod def schema(cls) -> Optional[dict]: addr_schema = { 'type': 'dict', 'required': True, 'schema': { 'host': {'type': 'string', 'required': True}, 'port': {'type': 'integer', 'required': True} } } schema = {} for key in ('local', 'remote'): schema[f'{key}_addr'] = addr_schema schema['creds'] = { 'type': 'dict', 'required': True, 'keysrules': {'type': 'string'}, 'valuesrules': { 'type': 'dict', 'schema': { 'username': {'type': 'string', 'required': True}, 'password': {'type': 'string', 'required': True}, } } } for key in ('client', 'server'): schema[f'default_{key}_creds'] = {'type': 'string', 'required': True} return schema def remote_addr(self) -> Addr: return Addr(host=self['remote_addr']['host'], port=self['remote_addr']['port']) def local_addr(self) -> Addr: return Addr(host=self['local_addr']['host'], port=self['local_addr']['port']) def creds_by_name(self, name: str) -> MqttCreds: return MqttCreds(username=self['creds'][name]['username'], password=self['creds'][name]['password']) def creds(self) -> MqttCreds: return self.creds_by_name(self['default_client_creds']) def server_creds(self) -> MqttCreds: return self.creds_by_name(self['default_server_creds']) class MqttNodesConfig(ConfigUnit): NAME = 'mqtt_nodes' @classmethod def schema(cls) -> Optional[dict]: return { 'common': { 'type': 'dict', 'schema': { 'temphum': { 'type': 'dict', 'schema': { 'interval': {'type': 'integer'} } }, 'password': {'type': 'string'} } }, 'nodes': { 'type': 'dict', 'required': True, 'keysrules': {'type': 'string'}, 'valuesrules': { 'type': 'dict', 'schema': { 'type': {'type': 'string', 'required': True, 'allowed': ['esp8266', 'linux', 'none'],}, 'board': {'type': 'string', 'allowed': ['nodemcu', 'd1_mini_lite', 'esp12e']}, 'temphum': { 'type': 'dict', 'schema': { 'module': {'type': 'string', 'required': True, 'allowed': ['si7021', 'dht12']}, 'interval': {'type': 'integer'}, 'i2c_bus': {'type': 'integer'}, 'tcpserver': { 'type': 'dict', 'schema': { 'port': {'type': 'integer', 'required': True} } } } }, 'relay': { 'type': 'dict', 'schema': { 'device_type': {'type': 'string', 'allowed': ['lamp', 'pump', 'solenoid'], 'required': True}, 'legacy_topics': {'type': 'boolean'} } }, 'password': {'type': 'string'} } } } } @staticmethod def custom_validator(data): for name, node in data['nodes'].items(): if 'temphum' in node: if node['type'] == 'linux': if 'i2c_bus' not in node['temphum']: raise KeyError(f'nodes.{name}.temphum: i2c_bus is missing but required for type=linux') if node['type'] in ('esp8266',) and 'board' not in node: raise KeyError(f'nodes.{name}: board is missing but required for type={node["type"]}') def get_node(self, name: str) -> dict: node = self['nodes'][name] if node['type'] == 'none': return node try: if 'password' not in node: node['password'] = self['common']['password'] except KeyError: pass try: if 'temphum' in node: for ckey, cval in self['common']['temphum'].items(): if ckey not in node['temphum']: node['temphum'][ckey] = cval except KeyError: pass return node def get_nodes(self, filters: Optional[Union[list[str], tuple[str]]] = None, only_names=False) -> Union[dict, list[str]]: if filters: for f in filters: if f not in ('temphum', 'relay'): raise ValueError(f'{self.__class__.__name__}::get_node(): invalid filter {f}') reslist = [] resdict = {} for name in self['nodes'].keys(): node = self.get_node(name) if (not filters) or ('temphum' in filters and 'temphum' in node) or ('relay' in filters and 'relay' in node): if only_names: reslist.append(name) else: resdict[name] = node return reslist if only_names else resdict