diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2022-12-18 05:33:54 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2022-12-18 05:33:54 +0300 |
commit | 9af0e28b94fd44d2340b0b9ddf2dcccc11d48271 (patch) | |
tree | b27dea0aee4226a802e641070b202eaa88b9455a | |
parent | 09a6b8d1ed6ac97a044023987cb40dbd16d45bcb (diff) |
mqtt refactoring (wip)
-rw-r--r-- | src/home/mqtt/message/__init__.py | 2 | ||||
-rw-r--r-- | src/home/mqtt/message/inverter.py | 86 | ||||
-rw-r--r-- | src/home/mqtt/message/sensors.py | 19 | ||||
-rw-r--r-- | src/home/mqtt/mqtt.py | 30 | ||||
-rw-r--r-- | src/home/mqtt/payload/base_payload.py | 129 | ||||
-rw-r--r-- | src/home/mqtt/payload/inverter.py | 73 | ||||
-rw-r--r-- | src/home/mqtt/payload/relay.py | 54 | ||||
-rw-r--r-- | src/home/mqtt/payload/sensors.py | 20 | ||||
-rwxr-xr-x | src/inverter_mqtt_receiver.py | 73 | ||||
-rwxr-xr-x | src/inverter_mqtt_sender.py | 40 | ||||
-rwxr-xr-x | src/sensors_mqtt_receiver.py | 22 | ||||
-rwxr-xr-x | src/sensors_mqtt_sender.py | 29 |
12 files changed, 366 insertions, 211 deletions
diff --git a/src/home/mqtt/message/__init__.py b/src/home/mqtt/message/__init__.py deleted file mode 100644 index 2a2221b..0000000 --- a/src/home/mqtt/message/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .inverter import Status, Generation -from .sensors import Temperature diff --git a/src/home/mqtt/message/inverter.py b/src/home/mqtt/message/inverter.py deleted file mode 100644 index d36aad5..0000000 --- a/src/home/mqtt/message/inverter.py +++ /dev/null @@ -1,86 +0,0 @@ -import struct - -from typing import Tuple - - -class Status: - # 46 bytes - format = 'IHHHHHHBHHHHHBHHHHHHHH' - - def pack(self, time: int, data: dict) -> bytes: - bits = 0 - bits |= (data['mppt1_charger_status'] & 0x3) - bits |= (data['mppt2_charger_status'] & 0x3) << 2 - bits |= (data['battery_power_direction'] & 0x3) << 4 - bits |= (data['dc_ac_power_direction'] & 0x3) << 6 - bits |= (data['line_power_direction'] & 0x3) << 8 - bits |= (data['load_connected'] & 0x1) << 10 - - return struct.pack( - self.format, - time, - int(data['grid_voltage'] * 10), - int(data['grid_freq'] * 10), - int(data['ac_output_voltage'] * 10), - int(data['ac_output_freq'] * 10), - data['ac_output_apparent_power'], - data['ac_output_active_power'], - data['output_load_percent'], - int(data['battery_voltage'] * 10), - int(data['battery_voltage_scc'] * 10), - int(data['battery_voltage_scc2'] * 10), - data['battery_discharge_current'], - data['battery_charge_current'], - data['battery_capacity'], - data['inverter_heat_sink_temp'], - data['mppt1_charger_temp'], - data['mppt2_charger_temp'], - data['pv1_input_power'], - data['pv2_input_power'], - int(data['pv1_input_voltage'] * 10), - int(data['pv2_input_voltage'] * 10), - bits - ) - - def unpack(self, buf: bytes) -> Tuple[int, dict]: - data = struct.unpack(self.format, buf) - return data[0], { - 'grid_voltage': data[1] / 10, - 'grid_freq': data[2] / 10, - 'ac_output_voltage': data[3] / 10, - 'ac_output_freq': data[4] / 10, - 'ac_output_apparent_power': data[5], - 'ac_output_active_power': data[6], - 'output_load_percent': data[7], - 'battery_voltage': data[8] / 10, - 'battery_voltage_scc': data[9] / 10, - 'battery_voltage_scc2': data[10] / 10, - 'battery_discharge_current': data[11], - 'battery_charge_current': data[12], - 'battery_capacity': data[13], - 'inverter_heat_sink_temp': data[14], - 'mppt1_charger_temp': data[15], - 'mppt2_charger_temp': data[16], - 'pv1_input_power': data[17], - 'pv2_input_power': data[18], - 'pv1_input_voltage': data[19] / 10, - 'pv2_input_voltage': data[20] / 10, - 'mppt1_charger_status': data[21] & 0x03, - 'mppt2_charger_status': (data[21] >> 2) & 0x03, - 'battery_power_direction': (data[21] >> 4) & 0x03, - 'dc_ac_power_direction': (data[21] >> 6) & 0x03, - 'line_power_direction': (data[21] >> 8) & 0x03, - 'load_connected': (data[21] >> 10) & 0x01, - } - - -class Generation: - # 8 bytes - format = 'II' - - def pack(self, time: int, wh: int) -> bytes: - return struct.pack(self.format, int(time), wh) - - def unpack(self, buf: bytes) -> tuple: - data = struct.unpack(self.format, buf) - return tuple(data) diff --git a/src/home/mqtt/message/sensors.py b/src/home/mqtt/message/sensors.py deleted file mode 100644 index ee522f0..0000000 --- a/src/home/mqtt/message/sensors.py +++ /dev/null @@ -1,19 +0,0 @@ -import struct - -from typing import Tuple - - -class Temperature: - format = 'IhH' - - def pack(self, time: int, temp: float, rh: float) -> bytes: - return struct.pack( - self.format, - time, - int(temp*100), - int(rh*100) - ) - - def unpack(self, buf: bytes) -> Tuple[int, float, float]: - data = struct.unpack(self.format, buf) - return data[0], data[1]/100, data[2]/100 diff --git a/src/home/mqtt/mqtt.py b/src/home/mqtt/mqtt.py index b360d22..b3334b5 100644 --- a/src/home/mqtt/mqtt.py +++ b/src/home/mqtt/mqtt.py @@ -17,18 +17,18 @@ def username_and_password() -> Tuple[str, str]: class MQTTBase: def __init__(self, clean_session=True): - self.client = mqtt.Client(client_id=config['mqtt']['client_id'], - protocol=mqtt.MQTTv311, - clean_session=clean_session) - self.client.on_connect = self.on_connect - self.client.on_disconnect = self.on_disconnect - self.client.on_message = self.on_message + self._client = mqtt.Client(client_id=config['mqtt']['client_id'], + protocol=mqtt.MQTTv311, + clean_session=clean_session) + self._client.on_connect = self.on_connect + self._client.on_disconnect = self.on_disconnect + self._client.on_message = self.on_message - self.home_id = 1 + self._logger = logging.getLogger(self.__class__.__name__) username, password = username_and_password() if username and password: - self.client.username_pw_set(username, password) + self._client.username_pw_set(username, password) def configure_tls(self): ca_certs = os.path.realpath(os.path.join( @@ -39,23 +39,23 @@ class MQTTBase: 'assets', 'mqtt_ca.crt' )) - self.client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2) + self._client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2) def connect_and_loop(self, loop_forever=True): host = config['mqtt']['host'] port = config['mqtt']['port'] - self.client.connect(host, port, 60) + self._client.connect(host, port, 60) if loop_forever: - self.client.loop_forever() + self._client.loop_forever() else: - self.client.loop_start() + self._client.loop_start() def on_connect(self, client: mqtt.Client, userdata, flags, rc): - logger.info("Connected with result code " + str(rc)) + self._logger.info("Connected with result code " + str(rc)) def on_disconnect(self, client: mqtt.Client, userdata, rc): - logger.info("Disconnected with result code " + str(rc)) + self._logger.info("Disconnected with result code " + str(rc)) def on_message(self, client: mqtt.Client, userdata, msg): - logger.info(msg.topic + ": " + str(msg.payload)) + self._logger.info(msg.topic + ": " + str(msg.payload)) diff --git a/src/home/mqtt/payload/base_payload.py b/src/home/mqtt/payload/base_payload.py new file mode 100644 index 0000000..c9ec907 --- /dev/null +++ b/src/home/mqtt/payload/base_payload.py @@ -0,0 +1,129 @@ +import abc +import struct + +from typing import Generic, TypeVar + + +class MQTTPayload(abc.ABC): + FORMAT = '' + PACKER = {} + UNPACKER = {} + + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + def pack(self): + args = [] + bf_number = -1 + bf_arg = 0 + bf_progress = 0 + + for field, field_type in self.__class__.__annotations__.items(): + field_type_origin = None + if hasattr(field_type, '__extra__') or hasattr(field_type, '__origin__'): + try: + field_type_origin = field_type.__extra__ + except AttributeError: + field_type_origin = field_type.__origin__ + + if field_type_origin is not None and issubclass(field_type_origin, MQTTPayloadBitField): + n, s, b = field_type.__args__ + if n != bf_number: + if bf_number != -1: + args.append(bf_arg) + bf_number = n + bf_progress = 0 + bf_arg = 0 + bf_arg |= (getattr(self, field) & (2 ** b - 1)) << bf_progress + bf_progress += b + + else: + if bf_number != -1: + args.append(bf_arg) + bf_number = -1 + bf_progress = 0 + bf_arg = 0 + + args.append(self._pack_field(field)) + + if bf_number != -1: + args.append(bf_arg) + + return struct.pack(self.FORMAT, *args) + + @classmethod + def unpack(cls, buf: bytes): + data = struct.unpack(cls.FORMAT, buf) + kwargs = {} + i = 0 + bf_number = -1 + bf_progress = 0 + + for field, field_type in cls.__annotations__.items(): + field_type_origin = None + if hasattr(field_type, '__extra__') or hasattr(field_type, '__origin__'): + try: + field_type_origin = field_type.__extra__ + except AttributeError: + field_type_origin = field_type.__origin__ + + if field_type_origin is not None and issubclass(field_type_origin, MQTTPayloadBitField): + n, s, b = field_type.__args__ + if n != bf_number: + bf_number = n + bf_progress = 0 + kwargs[field] = (data[i] >> bf_progress) & (2 ** b - 1) + bf_progress += b + continue # don't increment i + + if bf_number != -1: + bf_number = -1 + i += 1 + + if issubclass(field_type, MQTTPayloadCustomField): + kwargs[field] = field_type.unpack(data[i]) + else: + kwargs[field] = cls._unpack_field(field, data[i]) + i += 1 + return cls(**kwargs) + + def _pack_field(self, name): + val = getattr(self, name) + if self.PACKER and name in self.PACKER: + return self.PACKER[name](val) + else: + return val + + @classmethod + def _unpack_field(cls, name, val): + if isinstance(val, MQTTPayloadCustomField): + return + if cls.UNPACKER and name in cls.UNPACKER: + return cls.UNPACKER[name](val) + else: + return val + + +class MQTTPayloadCustomField(abc.ABC): + def __init__(self, **kwargs): + for field in self.__class__.__annotations__: + setattr(self, field, kwargs[field]) + + @abc.abstractmethod + def __index__(self): + pass + + @classmethod + @abc.abstractmethod + def unpack(cls, *args, **kwargs): + pass + + +NT = TypeVar('NT') # number of bit field +ST = TypeVar('ST') # size in bytes +BT = TypeVar('BT') # size in bits of particular value + + +class MQTTPayloadBitField(int, Generic[NT, ST, BT]): + pass diff --git a/src/home/mqtt/payload/inverter.py b/src/home/mqtt/payload/inverter.py new file mode 100644 index 0000000..b3f4edd --- /dev/null +++ b/src/home/mqtt/payload/inverter.py @@ -0,0 +1,73 @@ +import struct + +from .base_payload import MQTTPayload, MQTTPayloadBitField +from typing import Tuple + +_mult_10 = lambda n: int(n*10) +_div_10 = lambda n: n/10 + + +class Status(MQTTPayload): + # 46 bytes + FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH' + + PACKER = { + 'grid_voltage': _mult_10, + 'grid_freq': _mult_10, + 'ac_output_voltage': _mult_10, + 'ac_output_freq': _mult_10, + 'battery_voltage': _mult_10, + 'battery_voltage_scc': _mult_10, + 'battery_voltage_scc2': _mult_10, + 'pv1_input_voltage': _mult_10, + 'pv2_input_voltage': _mult_10 + } + UNPACKER = { + 'grid_voltage': _div_10, + 'grid_freq': _div_10, + 'ac_output_voltage': _div_10, + 'ac_output_freq': _div_10, + 'battery_voltage': _div_10, + 'battery_voltage_scc': _div_10, + 'battery_voltage_scc2': _div_10, + 'pv1_input_voltage': _div_10, + 'pv2_input_voltage': _div_10 + } + + time: int + grid_voltage: float + grid_freq: float + ac_output_voltage: float + ac_output_freq: float + ac_output_apparent_power: int + ac_output_active_power: int + output_load_percent: int + battery_voltage: float + battery_voltage_scc: float + battery_voltage_scc2: float + battery_discharge_current: int + battery_charge_current: int + battery_capacity: int + inverter_heat_sink_temp: int + mppt1_charger_temp: int + mppt2_charger_temp: int + pv1_input_power: int + pv2_input_power: int + pv1_input_voltage: float + pv2_input_voltage: float + + # H + mppt1_charger_status: MQTTPayloadBitField[0, 16, 2] + mppt2_charger_status: MQTTPayloadBitField[0, 16, 2] + battery_power_direction: MQTTPayloadBitField[0, 16, 2] + dc_ac_power_direction: MQTTPayloadBitField[0, 16, 2] + line_power_direction: MQTTPayloadBitField[0, 16, 2] + load_connected: MQTTPayloadBitField[0, 16, 1] + + +class Generation(MQTTPayload): + # 8 bytes + FORMAT = 'II' + + time: int + wh: int diff --git a/src/home/mqtt/payload/relay.py b/src/home/mqtt/payload/relay.py new file mode 100644 index 0000000..2a327ba --- /dev/null +++ b/src/home/mqtt/payload/relay.py @@ -0,0 +1,54 @@ +from .base_payload import MQTTPayload, MQTTPayloadCustomField + + +class StatFlags(MQTTPayloadCustomField): + state: bool + config_changed_value_present: bool + config_changed: bool + + @staticmethod + def unpack(flags: int): + state = flags & 0x1 + ccvp = (flags >> 1) & 0x1 + cc = (flags >> 2) & 0x1 + return StatFlags(state=(state == 1), + config_changed_value_present=(ccvp == 1), + config_changed=(cc == 1)) + + def __index__(self): + bits = 0 + bits |= (int(self.state) & 0x1) + bits |= (int(self.config_changed_value_present) & 0x1) << 1 + bits |= (int(self.config_changed) & 0x1) << 2 + return bits + + +class InitialStatPayload(MQTTPayload): + FORMAT = 'IBbIB' + + ip: int + fw_version: int + rssi: int + free_heap: int + flags: StatFlags + + +class StatPayload(MQTTPayload): + FORMAT = 'bIB' + + rssi: int + free_heap: int + flags: StatFlags + + +class PowerPayload(MQTTPayload): + FORMAT = '12sB' + PACKER = { + 'state': lambda n: int(n) + } + UNPACKER = { + 'state': lambda n: bool(n) + } + + secret: str + state: bool diff --git a/src/home/mqtt/payload/sensors.py b/src/home/mqtt/payload/sensors.py new file mode 100644 index 0000000..3ecc243 --- /dev/null +++ b/src/home/mqtt/payload/sensors.py @@ -0,0 +1,20 @@ +from .base_payload import MQTTPayload + +_mult_100 = lambda n: int(n*100) +_div_100 = lambda n: n/100 + + +class Temperature(MQTTPayload): + FORMAT = 'IhH' + PACKER = { + 'temp': _mult_100, + 'rh': _mult_100, + } + UNPACKER = { + 'temp': _div_100, + 'rh': _div_100, + } + + time: int + temp: float + rh: float diff --git a/src/inverter_mqtt_receiver.py b/src/inverter_mqtt_receiver.py index d3a487b..ec250ae 100755 --- a/src/inverter_mqtt_receiver.py +++ b/src/inverter_mqtt_receiver.py @@ -4,12 +4,10 @@ import re import logging from home.mqtt import MQTTBase -from home.mqtt.message import Status, Generation +from home.mqtt.payload.inverter import Status, Generation from home.database import InverterDatabase from home.config import config -logger = logging.getLogger(__name__) - class MQTTReceiver(MQTTBase): def __init__(self): @@ -18,55 +16,54 @@ class MQTTReceiver(MQTTBase): def on_connect(self, client: mqtt.Client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) - logger.info("subscribing to home/#") + self._logger.info("subscribing to home/#") client.subscribe('home/#', qos=1) def on_message(self, client: mqtt.Client, userdata, msg): try: - match = re.match(r'home/(\d+)/(status|gen)', msg.topic) + match = re.match(r'(?:home|hk)/(\d+)/(status|gen)', msg.topic) if not match: return + # FIXME string home_id must be supported home_id, what = int(match.group(1)), match.group(2) if what == 'gen': - packer = Generation() - client_time, watts = packer.unpack(msg.payload) - self.database.add_generation(home_id, client_time, watts) + gen = Generation.unpack(msg.payload) + self.database.add_generation(home_id, gen.time, gen.wh) elif what == 'status': - packer = Status() - client_time, data = packer.unpack(msg.payload) + s = Status.unpack(msg.payload) self.database.add_status(home_id, - client_time, - grid_voltage=int(data['grid_voltage']*10), - grid_freq=int(data['grid_freq'] * 10), - ac_output_voltage=int(data['ac_output_voltage'] * 10), - ac_output_freq=int(data['ac_output_freq'] * 10), - ac_output_apparent_power=data['ac_output_apparent_power'], - ac_output_active_power=data['ac_output_active_power'], - output_load_percent=data['output_load_percent'], - battery_voltage=int(data['battery_voltage'] * 10), - battery_voltage_scc=int(data['battery_voltage_scc'] * 10), - battery_voltage_scc2=int(data['battery_voltage_scc2'] * 10), - battery_discharge_current=data['battery_discharge_current'], - battery_charge_current=data['battery_charge_current'], - battery_capacity=data['battery_capacity'], - inverter_heat_sink_temp=data['inverter_heat_sink_temp'], - mppt1_charger_temp=data['mppt1_charger_temp'], - mppt2_charger_temp=data['mppt2_charger_temp'], - pv1_input_power=data['pv1_input_power'], - pv2_input_power=data['pv2_input_power'], - pv1_input_voltage=int(data['pv1_input_voltage'] * 10), - pv2_input_voltage=int(data['pv2_input_voltage'] * 10), - mppt1_charger_status=data['mppt1_charger_status'], - mppt2_charger_status=data['mppt2_charger_status'], - battery_power_direction=data['battery_power_direction'], - dc_ac_power_direction=data['dc_ac_power_direction'], - line_power_direction=data['line_power_direction'], - load_connected=data['load_connected']) + client_time=s.time, + grid_voltage=int(s.grid_voltage*10), + grid_freq=int(s.grid_freq * 10), + ac_output_voltage=int(s.ac_output_voltage * 10), + ac_output_freq=int(s.ac_output_freq * 10), + ac_output_apparent_power=s.ac_output_apparent_power, + ac_output_active_power=s.ac_output_active_power, + output_load_percent=s.output_load_percent, + battery_voltage=int(s.battery_voltage * 10), + battery_voltage_scc=int(s.battery_voltage_scc * 10), + battery_voltage_scc2=int(s.battery_voltage_scc2 * 10), + battery_discharge_current=s.battery_discharge_current, + battery_charge_current=s.battery_charge_current, + battery_capacity=s.battery_capacity, + inverter_heat_sink_temp=s.inverter_heat_sink_temp, + mppt1_charger_temp=s.mppt1_charger_temp, + mppt2_charger_temp=s.mppt2_charger_temp, + pv1_input_power=s.pv1_input_power, + pv2_input_power=s.pv2_input_power, + pv1_input_voltage=int(s.pv1_input_voltage * 10), + pv2_input_voltage=int(s.pv2_input_voltage * 10), + mppt1_charger_status=s.mppt1_charger_status, + mppt2_charger_status=s.mppt2_charger_status, + battery_power_direction=s.battery_power_direction, + dc_ac_power_direction=s.dc_ac_power_direction, + line_power_direction=s.line_power_direction, + load_connected=s.load_connected) except Exception as e: - logger.exception(str(e)) + self._logger.exception(str(e)) if __name__ == '__main__': diff --git a/src/inverter_mqtt_sender.py b/src/inverter_mqtt_sender.py index 4e06436..74191a2 100755 --- a/src/inverter_mqtt_sender.py +++ b/src/inverter_mqtt_sender.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 -import paho.mqtt.client as mqtt -import logging import time import datetime import json @@ -8,21 +6,18 @@ import inverterd from home.config import config from home.mqtt import MQTTBase, poll_tick -from home.mqtt.message import Status, Generation - -logger = logging.getLogger(__name__) +from home.mqtt.payload.inverter import Status, Generation class MQTTClient(MQTTBase): def __init__(self): super().__init__() - self.inverter = inverterd.Client() - self.inverter.connect() - self.inverter.format(inverterd.Format.SIMPLE_JSON) + self._home_id = config['mqtt']['home_id'] - def on_connect(self, client: mqtt.Client, userdata, flags, rc): - super().on_connect(client, userdata, flags, rc) + self._inverter = inverterd.Client() + self._inverter.connect() + self._inverter.format(inverterd.Format.SIMPLE_JSON) def poll_inverter(self): freq = int(config['mqtt']['inverter']['poll_freq']) @@ -36,18 +31,18 @@ class MQTTClient(MQTTBase): # read status now = time.time() try: - raw = self.inverter.exec('get-status') + raw = self._inverter.exec('get-status') except inverterd.InverterError as e: - logger.error(f'inverter error: {str(e)}') + self._logger.error(f'inverter error: {str(e)}') # TODO send to server continue data = json.loads(raw)['data'] + status = Status(time=round(now), **data) # FIXME this will crash with 99% probability - packer = Status() - self.client.publish(f'home/{self.home_id}/status', - payload=packer.pack(round(now), data), - qos=1) + self._client.publish(f'hk/{self._home_id}/status', + payload=status.pack(), + qos=1) # read today's generation stat now = time.time() @@ -55,18 +50,17 @@ class MQTTClient(MQTTBase): gen_prev = now today = datetime.date.today() try: - raw = self.inverter.exec('get-day-generated', (today.year, today.month, today.day)) + raw = self._inverter.exec('get-day-generated', (today.year, today.month, today.day)) except inverterd.InverterError as e: - logger.error(f'inverter error: {str(e)}') + self._logger.error(f'inverter error: {str(e)}') # TODO send to server continue - # print('raw:', raw, type(raw)) data = json.loads(raw)['data'] - packer = Generation() - self.client.publish(f'home/{self.home_id}/gen', - payload=packer.pack(round(now), data['wh']), - qos=1) + gen = Generation(time=round(now), wh=data['wh']) + self._client.publish(f'hk/{self._home_id}/gen', + payload=gen.pack(), + qos=1) if __name__ == '__main__': diff --git a/src/sensors_mqtt_receiver.py b/src/sensors_mqtt_receiver.py index 011ee44..99b6af2 100755 --- a/src/sensors_mqtt_receiver.py +++ b/src/sensors_mqtt_receiver.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 import paho.mqtt.client as mqtt -import logging import re from home.mqtt import MQTTBase from home.config import config -from home.mqtt.message import Temperature +from home.mqtt.payload.sensors import Temperature from home.api.types import TemperatureSensorLocation from home.database import SensorsDatabase -logger = logging.getLogger(__name__) - def get_sensor_type(sensor: str) -> TemperatureSensorLocation: for item in TemperatureSensorLocation: @@ -26,27 +23,26 @@ class MQTTServer(MQTTBase): def on_connect(self, client: mqtt.Client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) - logger.info("subscribing to home/#") + self._logger.info("subscribing to home/#") client.subscribe('home/#', qos=1) def on_message(self, client: mqtt.Client, userdata, msg): try: variants = '|'.join([s.name.lower() for s in TemperatureSensorLocation]) - match = re.match(rf'home/(\d+)/si7021/({variants})', msg.topic) + match = re.match(rf'(?:home|hk)/(\d+)/si7021/({variants})', msg.topic) if not match: return + # FIXME string home_id must be supported home_id = int(match.group(1)) sensor = get_sensor_type(match.group(2)) - packer = Temperature() - client_time, temp, rh = packer.unpack(msg.payload) - - self.database.add_temperature(home_id, client_time, sensor, - temp=int(temp*100), - rh=int(rh*100)) + payload = Temperature.unpack(msg.payload) + self.database.add_temperature(home_id, payload.time, sensor, + temp=int(payload.temp*100), + rh=int(payload.rh*100)) except Exception as e: - logger.exception(str(e)) + self._logger.exception(str(e)) if __name__ == '__main__': diff --git a/src/sensors_mqtt_sender.py b/src/sensors_mqtt_sender.py index f4f8ec9..2cf2717 100755 --- a/src/sensors_mqtt_sender.py +++ b/src/sensors_mqtt_sender.py @@ -1,24 +1,21 @@ #!/usr/bin/env python3 -import paho.mqtt.client as mqtt -import logging import time import json from home.util import parse_addr, MySimpleSocketClient from home.mqtt import MQTTBase, poll_tick -from home.mqtt.message import Temperature +from home.mqtt.payload.sensors import Temperature from home.config import config -logger = logging.getLogger(__name__) - class MQTTClient(MQTTBase): - def on_connect(self, client: mqtt.Client, userdata, flags, rc): - super().on_connect(client, userdata, flags, rc) + def __init__(self): + super().__init__(self) + self._home_id = config['mqtt']['home_id'] def poll(self): freq = int(config['mqtt']['sensors']['poll_freq']) - logger.debug(f'freq={freq}') + self._logger.debug(f'freq={freq}') g = poll_tick(freq) while True: @@ -28,7 +25,7 @@ class MQTTClient(MQTTBase): self.publish_si7021(host, port, k) def publish_si7021(self, host: str, port: int, name: str): - logging.debug(f"publish_si7021/{name}: {host}:{port}") + self._logger.debug(f"publish_si7021/{name}: {host}:{port}") try: now = time.time() @@ -40,14 +37,16 @@ class MQTTClient(MQTTBase): temp = response['temp'] humidity = response['humidity'] - logging.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}') + self._logger.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}') - packer = Temperature() - self.client.publish(f'home/{self.home_id}/si7021/{name}', - payload=packer.pack(round(now), temp, humidity), - qos=1) + pld = Temperature(time=round(now), + temp=temp, + rh=humidity) + self._client.publish(f'hk/{self._home_id}/si7021/{name}', + payload=pld.pack(), + qos=1) except Exception as e: - logger.exception(e) + self._logger.exception(e) if __name__ == '__main__': |