aboutsummaryrefslogtreecommitdiff
path: root/include/py/homekit/mqtt/_node.py
diff options
context:
space:
mode:
Diffstat (limited to 'include/py/homekit/mqtt/_node.py')
-rw-r--r--include/py/homekit/mqtt/_node.py92
1 files changed, 92 insertions, 0 deletions
diff --git a/include/py/homekit/mqtt/_node.py b/include/py/homekit/mqtt/_node.py
new file mode 100644
index 0000000..4e259a4
--- /dev/null
+++ b/include/py/homekit/mqtt/_node.py
@@ -0,0 +1,92 @@
+import logging
+import importlib
+
+from typing import List, TYPE_CHECKING, Optional
+from ._payload import MqttPayload
+from ._module import MqttModule
+if TYPE_CHECKING:
+ from ._wrapper import MqttWrapper
+else:
+ MqttWrapper = None
+
+
+class MqttNode:
+ _modules: List[MqttModule]
+ _module_subscriptions: dict[str, MqttModule]
+ _node_id: str
+ _node_secret: str
+ _payload_callbacks: list[callable]
+ _wrapper: Optional[MqttWrapper]
+
+ def __init__(self,
+ node_id: str,
+ node_secret: Optional[str] = None):
+ self._modules = []
+ self._module_subscriptions = {}
+ self._node_id = node_id
+ self._node_secret = node_secret
+ self._payload_callbacks = []
+ self._logger = logging.getLogger(self.__class__.__name__)
+ self._wrapper = None
+
+ def on_connect(self, wrapper: MqttWrapper):
+ self._wrapper = wrapper
+ for module in self._modules:
+ if not module.is_initialized():
+ module.on_connect(self)
+ module.set_initialized()
+
+ def on_disconnect(self):
+ self._wrapper = None
+ for module in self._modules:
+ module.unset_initialized()
+
+ def on_message(self, topic, payload):
+ if topic in self._module_subscriptions:
+ payload = self._module_subscriptions[topic].handle_payload(self, topic, payload)
+ if isinstance(payload, MqttPayload):
+ for f in self._payload_callbacks:
+ f(self, payload)
+
+ def load_module(self, module_name: str, *args, **kwargs) -> MqttModule:
+ module = importlib.import_module(f'..module.{module_name}', __name__)
+ if not hasattr(module, 'MODULE_NAME'):
+ raise RuntimeError(f'MODULE_NAME not found in module {module}')
+ cl = getattr(module, getattr(module, 'MODULE_NAME'))
+ instance = cl(*args, **kwargs)
+ self.add_module(instance)
+ return instance
+
+ def add_module(self, module: MqttModule):
+ self._modules.append(module)
+ if self._wrapper and self._wrapper._connected:
+ module.on_connect(self)
+ module.set_initialized()
+
+ def subscribe_module(self, topic: str, module: MqttModule, qos: int = 1):
+ if not self._wrapper or not self._wrapper._connected:
+ raise RuntimeError('not connected')
+
+ self._module_subscriptions[topic] = module
+ self._wrapper.subscribe(self.id, topic, qos)
+
+ def publish(self,
+ topic: str,
+ payload: bytes,
+ qos: int = 1):
+ self._wrapper.publish(self.id, topic, payload, qos)
+
+ def add_payload_callback(self, callback: callable):
+ self._payload_callbacks.append(callback)
+
+ @property
+ def id(self) -> str:
+ return self._node_id
+
+ @property
+ def secret(self) -> str:
+ return self._node_secret
+
+ @secret.setter
+ def secret(self, secret: str) -> None:
+ self._node_secret = secret