from __future__ import annotations import abc import logging import threading from time import sleep from ..util import next_tick_gen from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from ._node import MqttNode from ._payload import MqttPayload class MqttModule(abc.ABC): _tick_interval: int _initialized: bool _connected: bool _ticker: Optional[threading.Thread] _mqtt_node_ref: Optional[MqttNode] def __init__(self, tick_interval=0): self._tick_interval = tick_interval self._initialized = False self._ticker = None self._logger = logging.getLogger(self.__class__.__name__) self._connected = False self._mqtt_node_ref = None def on_connect(self, mqtt: MqttNode): self._connected = True self._mqtt_node_ref = mqtt if self._tick_interval: self._start_ticker() def on_disconnect(self, mqtt: MqttNode): self._connected = False self._mqtt_node_ref = None def is_initialized(self): return self._initialized def set_initialized(self): self._initialized = True def unset_initialized(self): self._initialized = False def tick(self): pass def _tick(self): g = next_tick_gen(self._tick_interval) while self._connected: sleep(next(g)) if not self._connected: break self.tick() def _start_ticker(self): if not self._ticker or not self._ticker.is_alive(): name_part = f'{self._mqtt_node_ref.id}/' if self._mqtt_node_ref else '' self._ticker = None self._ticker = threading.Thread(target=self._tick, name=f'mqtt:{self.__class__.__name__}/{name_part}ticker') self._ticker.start() def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]: pass