1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
import logging
import importlib
from typing import List, TYPE_CHECKING, Optional
from ._payload import MqttPayload
from ._module import MqttModule
if TYPE_CHECKING:
from ._wrapper import MqttWrapper
else:
MqttWrapper = None
class MqttNode:
_modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule]
_node_id: str
_payload_callbacks: list[callable]
_wrapper: Optional[MqttWrapper]
def __init__(self, node_id: str):
self._modules = []
self._module_subscriptions = {}
self._node_id = node_id
self._payload_callbacks = []
self._logger = logging.getLogger(self.__class__.__name__)
self._wrapper = None
def on_connect(self, wrapper: MqttWrapper):
self._wrapper = wrapper
for module in self._modules:
if not module.is_initialized():
module.on_connect(self)
module.set_initialized()
def on_disconnect(self):
self._wrapper = None
for module in self._modules:
module.unset_initialized()
def on_message(self, topic, payload):
if topic in self._module_subscriptions:
payload = self._module_subscriptions[topic].handle_payload(self, topic, payload)
if isinstance(payload, MqttPayload):
for f in self._payload_callbacks:
f(payload)
def load_module(self, module_name: str, *args, **kwargs) -> MqttModule:
module = importlib.import_module(f'..module.{module_name}', __name__)
if not hasattr(module, 'MODULE_NAME'):
raise RuntimeError(f'MODULE_NAME not found in module {module}')
cl = getattr(module, getattr(module, 'MODULE_NAME'))
instance = cl(*args, **kwargs)
self.add_module(instance)
return instance
def add_module(self, module: MqttModule):
self._modules.append(module)
if self._wrapper and self._wrapper._connected:
module.on_connect(self)
module.set_initialized()
def subscribe_module(self, topic: str, module: MqttModule, qos: int = 1):
if not self._wrapper or not self._wrapper._connected:
raise RuntimeError('not connected')
self._module_subscriptions[topic] = module
self._wrapper.subscribe(self.id, topic, qos)
def publish(self,
topic: str,
payload: bytes,
qos: int = 1):
self._wrapper.publish(self.id, topic, payload, qos)
def add_payload_callback(self, callback: callable):
self._payload_callbacks.append(callback)
@property
def id(self) -> str:
return self._node_id
|