aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-06-07 22:41:22 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-06-07 22:41:22 +0300
commit5aad97192dc5941147d91d28486eaff5a61afeb0 (patch)
tree45953b3e884d128f7c92387b773db47b0b603a56
parentae8070b2dd8c05dc6ed057d733f6d57f396ad100 (diff)
parentebe7990bf3dd475f1963a0c1eaaa5f230f3d5d48 (diff)
Merge branch 'mqtt-refactoring' of ch1p.io:homekit into mqtt-refactoring
-rw-r--r--src/home/mqtt/module/inverter.py128
-rw-r--r--src/home/mqtt/module/temphum.py25
-rwxr-xr-xsrc/inverter_mqtt_receiver.py74
-rwxr-xr-xsrc/inverter_mqtt_sender.py72
-rwxr-xr-xsrc/inverter_mqtt_util.py23
-rw-r--r--systemd/inverter_mqtt_receiver.service13
-rw-r--r--systemd/inverter_mqtt_sender.service2
7 files changed, 181 insertions, 156 deletions
diff --git a/src/home/mqtt/module/inverter.py b/src/home/mqtt/module/inverter.py
index 9cf2978..d927a06 100644
--- a/src/home/mqtt/module/inverter.py
+++ b/src/home/mqtt/module/inverter.py
@@ -1,13 +1,31 @@
-import struct
+import time
+import json
+import datetime
+try:
+ import inverterd
+except:
+ pass
+from typing import Optional
+from .._module import MqttModule
from .._node import MqttNode
from .._payload import MqttPayload, bit_field
+try:
+ from home.database import InverterDatabase
+except:
+ pass
_mult_10 = lambda n: int(n*10)
_div_10 = lambda n: n/10
-class Status(MqttPayload):
+MODULE_NAME = 'MqttInverterModule'
+
+STATUS_TOPIC = 'status'
+GENERATION_TOPIC = 'generation'
+
+
+class MqttInverterStatusPayload(MqttPayload):
# 46 bytes
FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH'
@@ -65,7 +83,7 @@ class Status(MqttPayload):
load_connected: bit_field(0, 16, 1)
-class Generation(MqttPayload):
+class MqttInverterGenerationPayload(MqttPayload):
# 8 bytes
FORMAT = 'II'
@@ -73,5 +91,105 @@ class Generation(MqttPayload):
wh: int
-class MqttInverterModule(MqttNode):
- pass
+class MqttInverterModule(MqttModule):
+ _status_poll_freq: int
+ _generation_poll_freq: int
+ _inverter: Optional[inverterd.Client]
+ _database: Optional[InverterDatabase]
+ _gen_prev: float
+
+ def __init__(self, status_poll_freq=0, generation_poll_freq=0):
+ super().__init__(tick_interval=status_poll_freq)
+ self._status_poll_freq = status_poll_freq
+ self._generation_poll_freq = generation_poll_freq
+
+ # this defines whether this is a publisher or a subscriber
+ if status_poll_freq > 0:
+ self._inverter = inverterd.Client()
+ self._inverter.connect()
+ self._inverter.format(inverterd.Format.SIMPLE_JSON)
+ self._database = None
+ else:
+ self._inverter = None
+ self._database = InverterDatabase()
+
+ self._gen_prev = 0
+
+ def on_connect(self, mqtt: MqttNode):
+ super().on_connect(mqtt)
+ if not self._inverter:
+ mqtt.subscribe_module(STATUS_TOPIC, self)
+ mqtt.subscribe_module(GENERATION_TOPIC, self)
+
+ def tick(self):
+ if not self._inverter:
+ return
+
+ # read status
+ now = time.time()
+ try:
+ raw = self._inverter.exec('get-status')
+ except inverterd.InverterError as e:
+ self._logger.error(f'inverter error: {str(e)}')
+ # TODO send to server
+ return
+
+ data = json.loads(raw)['data']
+ status = MqttInverterStatusPayload(time=round(now), **data)
+ self._mqtt_node_ref.publish(STATUS_TOPIC, status.pack())
+
+ # read today's generation stat
+ now = time.time()
+ if self._gen_prev == 0 or now - self._gen_prev >= self._generation_poll_freq:
+ self._gen_prev = now
+ today = datetime.date.today()
+ try:
+ raw = self._inverter.exec('get-day-generated', (today.year, today.month, today.day))
+ except inverterd.InverterError as e:
+ self._logger.error(f'inverter error: {str(e)}')
+ # TODO send to server
+ return
+
+ data = json.loads(raw)['data']
+ gen = MqttInverterGenerationPayload(time=round(now), wh=data['wh'])
+ self._mqtt_node_ref.publish(GENERATION_TOPIC, gen.pack())
+
+ def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
+ home_id = 1 # legacy compat
+
+ if topic == STATUS_TOPIC:
+ s = MqttInverterStatusPayload.unpack(payload)
+ self._database.add_status(home_id=home_id,
+ client_time=s.time,
+ grid_voltage=int(s.grid_voltage*10),
+ grid_freq=int(s.grid_freq * 10),
+ ac_output_voltage=int(s.ac_output_voltage * 10),
+ ac_output_freq=int(s.ac_output_freq * 10),
+ ac_output_apparent_power=s.ac_output_apparent_power,
+ ac_output_active_power=s.ac_output_active_power,
+ output_load_percent=s.output_load_percent,
+ battery_voltage=int(s.battery_voltage * 10),
+ battery_voltage_scc=int(s.battery_voltage_scc * 10),
+ battery_voltage_scc2=int(s.battery_voltage_scc2 * 10),
+ battery_discharge_current=s.battery_discharge_current,
+ battery_charge_current=s.battery_charge_current,
+ battery_capacity=s.battery_capacity,
+ inverter_heat_sink_temp=s.inverter_heat_sink_temp,
+ mppt1_charger_temp=s.mppt1_charger_temp,
+ mppt2_charger_temp=s.mppt2_charger_temp,
+ pv1_input_power=s.pv1_input_power,
+ pv2_input_power=s.pv2_input_power,
+ pv1_input_voltage=int(s.pv1_input_voltage * 10),
+ pv2_input_voltage=int(s.pv2_input_voltage * 10),
+ mppt1_charger_status=s.mppt1_charger_status,
+ mppt2_charger_status=s.mppt2_charger_status,
+ battery_power_direction=s.battery_power_direction,
+ dc_ac_power_direction=s.dc_ac_power_direction,
+ line_power_direction=s.line_power_direction,
+ load_connected=s.load_connected)
+ return s
+
+ elif topic == GENERATION_TOPIC:
+ gen = MqttInverterGenerationPayload.unpack(payload)
+ self._database.add_generation(home_id, gen.time, gen.wh)
+ return gen
diff --git a/src/home/mqtt/module/temphum.py b/src/home/mqtt/module/temphum.py
index 5501f91..83ae34d 100644
--- a/src/home/mqtt/module/temphum.py
+++ b/src/home/mqtt/module/temphum.py
@@ -9,6 +9,7 @@ from ...temphum import BaseSensor
two_digits_precision = lambda x: round(x, 2)
MODULE_NAME = 'MqttTempHumModule'
+DATA_TOPIC = 'temphum/data'
class MqttTemphumDataPayload(MqttPayload):
@@ -45,22 +46,38 @@ class MqttTemphumDataPayload(MqttPayload):
class MqttTempHumModule(MqttModule):
- def __init__(self, sensor: Optional[BaseSensor] = None, *args, **kwargs):
+ def __init__(self,
+ sensor: Optional[BaseSensor] = None,
+ *args, **kwargs):
+ if sensor is not None:
+ kwargs['tick_interval'] = 10
super().__init__(*args, **kwargs)
self._sensor = sensor
def on_connect(self, mqtt: MqttNode):
super().on_connect(mqtt)
- mqtt.subscribe_module('temphum/data', self)
+ mqtt.subscribe_module(DATA_TOPIC, self)
def tick(self):
- pass
+ if not self._sensor:
+ return
+
+ error = 0
+ temp = 0
+ rh = 0
+ try:
+ temp = self._sensor.temperature()
+ rh = self._sensor.humidity()
+ except:
+ error = 1
+ pld = MqttTemphumDataPayload(temp=temp, rh=rh, error=error)
+ self._mqtt_node_ref.publish(DATA_TOPIC, pld.pack())
def handle_payload(self,
mqtt: MqttNode,
topic: str,
payload: bytes) -> Optional[MqttPayload]:
- if topic == 'temphum/data':
+ if topic == DATA_TOPIC:
message = MqttTemphumDataPayload.unpack(payload)
self._logger.debug(message)
return message
diff --git a/src/inverter_mqtt_receiver.py b/src/inverter_mqtt_receiver.py
deleted file mode 100755
index cc0f1ff..0000000
--- a/src/inverter_mqtt_receiver.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/env python3
-import paho.mqtt.client as mqtt
-import re
-
-from home.mqtt import Mqtt
-from home.mqtt.payload.inverter import Status, Generation
-from home.database import InverterDatabase
-from home.config import config
-
-
-class MqttReceiver(Mqtt):
- def __init__(self):
- super().__init__(clean_session=False)
- self.database = InverterDatabase()
-
- def on_connect(self, client: mqtt.Client, userdata, flags, rc):
- super().on_connect(client, userdata, flags, rc)
- self._logger.info("subscribing to hk/#")
- client.subscribe('hk/#', qos=1)
-
- def on_message(self, client: mqtt.Client, userdata, msg):
- super().on_message(client, userdata, msg)
- try:
- match = re.match(r'(?:home|hk)/(\d+)/(status|gen)', msg.topic)
- if not match:
- return
-
- # FIXME string home_id must be supported
- home_id, what = int(match.group(1)), match.group(2)
- if what == 'gen':
- gen = Generation.unpack(msg.payload)
- self.database.add_generation(home_id, gen.time, gen.wh)
-
- elif what == 'status':
- s = Status.unpack(msg.payload)
- self.database.add_status(home_id,
- client_time=s.time,
- grid_voltage=int(s.grid_voltage*10),
- grid_freq=int(s.grid_freq * 10),
- ac_output_voltage=int(s.ac_output_voltage * 10),
- ac_output_freq=int(s.ac_output_freq * 10),
- ac_output_apparent_power=s.ac_output_apparent_power,
- ac_output_active_power=s.ac_output_active_power,
- output_load_percent=s.output_load_percent,
- battery_voltage=int(s.battery_voltage * 10),
- battery_voltage_scc=int(s.battery_voltage_scc * 10),
- battery_voltage_scc2=int(s.battery_voltage_scc2 * 10),
- battery_discharge_current=s.battery_discharge_current,
- battery_charge_current=s.battery_charge_current,
- battery_capacity=s.battery_capacity,
- inverter_heat_sink_temp=s.inverter_heat_sink_temp,
- mppt1_charger_temp=s.mppt1_charger_temp,
- mppt2_charger_temp=s.mppt2_charger_temp,
- pv1_input_power=s.pv1_input_power,
- pv2_input_power=s.pv2_input_power,
- pv1_input_voltage=int(s.pv1_input_voltage * 10),
- pv2_input_voltage=int(s.pv2_input_voltage * 10),
- mppt1_charger_status=s.mppt1_charger_status,
- mppt2_charger_status=s.mppt2_charger_status,
- battery_power_direction=s.battery_power_direction,
- dc_ac_power_direction=s.dc_ac_power_direction,
- line_power_direction=s.line_power_direction,
- load_connected=s.load_connected)
-
- except Exception as e:
- self._logger.exception(str(e))
-
-
-if __name__ == '__main__':
- config.load('inverter_mqtt_receiver')
-
- server = MqttReceiver()
- server.connect_and_loop()
-
diff --git a/src/inverter_mqtt_sender.py b/src/inverter_mqtt_sender.py
deleted file mode 100755
index 3215d18..0000000
--- a/src/inverter_mqtt_sender.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/usr/bin/env python3
-import time
-import datetime
-import json
-import inverterd
-
-from home.config import config
-from home.mqtt import Mqtt, poll_tick
-from home.mqtt.payload.inverter import Status, Generation
-
-
-class MqttClient(Mqtt):
- def __init__(self):
- super().__init__()
-
- self._home_id = config['mqtt']['home_id']
-
- self._inverter = inverterd.Client()
- self._inverter.connect()
- self._inverter.format(inverterd.Format.SIMPLE_JSON)
-
- def poll_inverter(self):
- freq = int(config['mqtt']['inverter']['poll_freq'])
- gen_freq = int(config['mqtt']['inverter']['generation_poll_freq'])
-
- g = poll_tick(freq)
- gen_prev = 0
- while True:
- time.sleep(next(g))
-
- # read status
- now = time.time()
- try:
- raw = self._inverter.exec('get-status')
- except inverterd.InverterError as e:
- self._logger.error(f'inverter error: {str(e)}')
- # TODO send to server
- continue
-
- data = json.loads(raw)['data']
- status = Status(time=round(now), **data) # FIXME this will crash with 99% probability
-
- self._client.publish(f'hk/{self._home_id}/status',
- payload=status.pack(),
- qos=1)
-
- # read today's generation stat
- now = time.time()
- if gen_prev == 0 or now - gen_prev >= gen_freq:
- gen_prev = now
- today = datetime.date.today()
- try:
- raw = self._inverter.exec('get-day-generated', (today.year, today.month, today.day))
- except inverterd.InverterError as e:
- self._logger.error(f'inverter error: {str(e)}')
- # TODO send to server
- continue
-
- data = json.loads(raw)['data']
- gen = Generation(time=round(now), wh=data['wh'])
- self._client.publish(f'hk/{self._home_id}/gen',
- payload=gen.pack(),
- qos=1)
-
-
-if __name__ == '__main__':
- config.load('inverter_mqtt_sender')
-
- client = MqttClient()
- client.configure_tls()
- client.connect_and_loop(loop_forever=False)
- client.poll_inverter() \ No newline at end of file
diff --git a/src/inverter_mqtt_util.py b/src/inverter_mqtt_util.py
new file mode 100755
index 0000000..edea29a
--- /dev/null
+++ b/src/inverter_mqtt_util.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python3
+from argparse import ArgumentParser
+from home.config import config
+from home.mqtt import MqttWrapper, MqttNode
+
+
+if __name__ == '__main__':
+ parser = ArgumentParser()
+ parser.add_argument('mode', type=str, choices=('sender', 'receiver'), nargs=1)
+
+ config.load('inverter_mqtt_util', parser=parser)
+ arg = parser.parse_args()
+
+ mqtt = MqttWrapper()
+ node = MqttNode(node_id='inverter')
+ module_kwargs = {}
+ if arg.mode[0] == 'sender':
+ module_kwargs['status_poll_freq'] = int(config['mqtt']['inverter']['poll_freq'])
+ module_kwargs['generation_poll_freq'] = int(config['mqtt']['inverter']['generation_poll_freq'])
+ node.load_module('inverter', **module_kwargs)
+ mqtt.add_node(node)
+
+ mqtt.connect_and_loop()
diff --git a/systemd/inverter_mqtt_receiver.service b/systemd/inverter_mqtt_receiver.service
new file mode 100644
index 0000000..fedf11f
--- /dev/null
+++ b/systemd/inverter_mqtt_receiver.service
@@ -0,0 +1,13 @@
+[Unit]
+Description=Inverter MQTT receiver
+After=clickhouse-server.service
+
+[Service]
+User=user
+Group=user
+Restart=on-failure
+ExecStart=/home/user/homekit/src/inverter_mqtt_util.py receiver
+WorkingDirectory=/home/user
+
+[Install]
+WantedBy=multi-user.target \ No newline at end of file
diff --git a/systemd/inverter_mqtt_sender.service b/systemd/inverter_mqtt_sender.service
index e3925f6..34272bb 100644
--- a/systemd/inverter_mqtt_sender.service
+++ b/systemd/inverter_mqtt_sender.service
@@ -6,7 +6,7 @@ After=inverterd.service
User=user
Group=user
Restart=on-failure
-ExecStart=/home/user/homekit/src/inverter_mqtt_sender.py
+ExecStart=/home/user/homekit/src/inverter_mqtt_util.py sender
WorkingDirectory=/home/user
[Install]