summaryrefslogtreecommitdiff
path: root/include/py/homekit/mqtt/_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/homekit/mqtt/_config.py')
-rw-r--r--include/py/homekit/mqtt/_config.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/include/py/homekit/mqtt/_config.py b/include/py/homekit/mqtt/_config.py
new file mode 100644
index 0000000..8aa3bfe
--- /dev/null
+++ b/include/py/homekit/mqtt/_config.py
@@ -0,0 +1,183 @@
+from ..config import ConfigUnit
+from typing import Optional, Union
+from ..util import Addr
+from collections import namedtuple
+
+MqttCreds = namedtuple('MqttCreds', 'username, password')
+
+
+class MqttConfig(ConfigUnit):
+ NAME = 'mqtt'
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ addr_schema = {
+ 'type': 'dict',
+ 'required': True,
+ 'schema': {
+ 'host': {'type': 'string', 'required': True},
+ 'port': {'type': 'integer', 'required': True}
+ }
+ }
+
+ schema = {}
+ for key in ('local', 'remote'):
+ schema[f'{key}_addr'] = addr_schema
+
+ schema['creds'] = {
+ 'type': 'dict',
+ 'required': True,
+ 'keysrules': {'type': 'string'},
+ 'valuesrules': {
+ 'type': 'dict',
+ 'schema': {
+ 'username': {'type': 'string', 'required': True},
+ 'password': {'type': 'string', 'required': True},
+ }
+ }
+ }
+
+ for key in ('client', 'server'):
+ schema[f'default_{key}_creds'] = {'type': 'string', 'required': True}
+
+ return schema
+
+ def remote_addr(self) -> Addr:
+ return Addr(host=self['remote_addr']['host'],
+ port=self['remote_addr']['port'])
+
+ def local_addr(self) -> Addr:
+ return Addr(host=self['local_addr']['host'],
+ port=self['local_addr']['port'])
+
+ def creds_by_name(self, name: str) -> MqttCreds:
+ return MqttCreds(username=self['creds'][name]['username'],
+ password=self['creds'][name]['password'])
+
+ def creds(self) -> MqttCreds:
+ return self.creds_by_name(self['default_client_creds'])
+
+ def server_creds(self) -> MqttCreds:
+ return self.creds_by_name(self['default_server_creds'])
+
+
+class MqttNodesConfig(ConfigUnit):
+ NAME = 'mqtt_nodes'
+
+ @classmethod
+ def schema(cls) -> Optional[dict]:
+ return {
+ 'common': {
+ 'type': 'dict',
+ 'schema': {
+ 'temphum': {
+ 'type': 'dict',
+ 'schema': {
+ 'interval': {'type': 'integer'}
+ }
+ },
+ 'password': {'type': 'string'}
+ }
+ },
+ 'nodes': {
+ 'type': 'dict',
+ 'required': True,
+ 'keysrules': {'type': 'string'},
+ 'valuesrules': {
+ 'type': 'dict',
+ 'schema': {
+ 'type': {'type': 'string', 'required': True, 'allowed': ['esp8266', 'linux', 'none'],},
+ 'board': {'type': 'string', 'allowed': ['nodemcu', 'd1_mini_lite', 'esp12e']},
+ 'temphum': {
+ 'type': 'dict',
+ 'schema': {
+ 'module': {'type': 'string', 'required': True, 'allowed': ['si7021', 'dht12']},
+ 'legacy_payload': {'type': 'boolean', 'required': False, 'default': False},
+ 'interval': {'type': 'integer'},
+ 'i2c_bus': {'type': 'integer'},
+ 'tcpserver': {
+ 'type': 'dict',
+ 'schema': {
+ 'port': {'type': 'integer', 'required': True}
+ }
+ }
+ }
+ },
+ 'relay': {
+ 'type': 'dict',
+ 'schema': {
+ 'device_type': {'type': 'string', 'allowed': ['lamp', 'pump', 'solenoid', 'cooler'], 'required': True},
+ 'legacy_topics': {'type': 'boolean'}
+ }
+ },
+ 'password': {'type': 'string'},
+ 'defines': {
+ 'type': 'dict',
+ 'keysrules': {'type': 'string'},
+ 'valuesrules': {'type': ['string', 'integer']}
+ }
+ }
+ }
+ }
+ }
+
+ @staticmethod
+ def custom_validator(data):
+ for name, node in data['nodes'].items():
+ if 'temphum' in node:
+ if node['type'] == 'linux':
+ if 'i2c_bus' not in node['temphum']:
+ raise KeyError(f'nodes.{name}.temphum: i2c_bus is missing but required for type=linux')
+ if node['type'] in ('esp8266',) and 'board' not in node:
+ raise KeyError(f'nodes.{name}: board is missing but required for type={node["type"]}')
+
+ def get_node(self, name: str) -> dict:
+ node = self['nodes'][name]
+ if node['type'] == 'none':
+ return node
+
+ try:
+ if 'password' not in node:
+ node['password'] = self['common']['password']
+ except KeyError:
+ pass
+
+ try:
+ if 'temphum' in node:
+ for ckey, cval in self['common']['temphum'].items():
+ if ckey not in node['temphum']:
+ node['temphum'][ckey] = cval
+ except KeyError:
+ pass
+
+ return node
+
+ def get_nodes(self,
+ filters: Optional[Union[list[str], tuple[str]]] = None,
+ only_names=False) -> Union[dict, list[str]]:
+ if filters:
+ for f in filters:
+ if f not in ('temphum', 'relay'):
+ raise ValueError(f'{self.__class__.__name__}::get_node(): invalid filter {f}')
+ reslist = []
+ resdict = {}
+ for name in self['nodes'].keys():
+ node = self.get_node(name)
+ if (not filters) or ('temphum' in filters and 'temphum' in node) or ('relay' in filters and 'relay' in node):
+ if only_names:
+ reslist.append(name)
+ else:
+ resdict[name] = node
+ return reslist if only_names else resdict
+
+ def node_uses_legacy_temphum_data_payload(self, node_id: str) -> bool:
+ try:
+ return self.get_node(node_id)['temphum']['legacy_payload']
+ except KeyError:
+ return False
+
+ def node_uses_legacy_relay_power_payload(self, node_id: str) -> bool:
+ try:
+ return self.get_node(node_id)['relay']['legacy_topics']
+ except KeyError:
+ return False