diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2022-11-06 20:40:42 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2022-11-06 20:53:55 +0300 |
commit | 75ee161b6eb64cf19c8a9718d15047443f3e4ebe (patch) | |
tree | ccebc9cbd2709ad13a14ec00372fdcfe9226cd9f /src/home/inverter | |
parent | 28c67c4510a3bee574b4077be35147dba257c8f7 (diff) |
inverter_bot: refactor and introduce new functions
Diffstat (limited to 'src/home/inverter')
-rw-r--r-- | src/home/inverter/emulator.py | 556 |
1 files changed, 556 insertions, 0 deletions
diff --git a/src/home/inverter/emulator.py b/src/home/inverter/emulator.py new file mode 100644 index 0000000..e86b8bb --- /dev/null +++ b/src/home/inverter/emulator.py @@ -0,0 +1,556 @@ +import asyncio +import logging + +from inverterd import Format + +from typing import Union +from enum import Enum +from ..util import Addr, stringify + + +class InverterEnum(Enum): + def as_text(self) -> str: + raise RuntimeError('abstract method') + + +class BatteryType(InverterEnum): + AGM = 0 + Flooded = 1 + User = 2 + + def as_text(self) -> str: + return ('AGM', 'Flooded', 'User')[self.value] + + +class InputVoltageRange(InverterEnum): + Appliance = 0 + USP = 1 + + def as_text(self) -> str: + return ('Appliance', 'USP')[self.value] + + +class OutputSourcePriority(InverterEnum): + SolarUtilityBattery = 0 + SolarBatteryUtility = 1 + + def as_text(self) -> str: + return ('Solar-Utility-Battery', 'Solar-Battery-Utility')[self.value] + + +class ChargeSourcePriority(InverterEnum): + SolarFirst = 0 + SolarAndUtility = 1 + SolarOnly = 2 + + def as_text(self) -> str: + return ('Solar-First', 'Solar-and-Utility', 'Solar-only')[self.value] + + +class MachineType(InverterEnum): + OffGridTie = 0 + GridTie = 1 + + def as_text(self) -> str: + return ('Off-Grid-Tie', 'Grid-Tie')[self.value] + + +class Topology(InverterEnum): + TransformerLess = 0 + Transformer = 1 + + def as_text(self) -> str: + return ('Transformer-less', 'Transformer')[self.value] + + +class OutputMode(InverterEnum): + SingleOutput = 0 + ParallelOutput = 1 + Phase_1_of_3 = 2 + Phase_2_of_3 = 3 + Phase_3_of_3 = 4 + + def as_text(self) -> str: + return ( + 'Single output', + 'Parallel output', + 'Phase 1 of 3-phase output', + 'Phase 2 of 3-phase output', + 'Phase 3 of 3-phase' + )[self.value] + + +class SolarPowerPriority(InverterEnum): + BatteryLoadUtility = 0 + LoadBatteryUtility = 1 + + def as_text(self) -> str: + return ('Battery-Load-Utility', 'Load-Battery-Utility')[self.value] + + +class MPPTChargerStatus(InverterEnum): + Abnormal = 0 + NotCharging = 1 + Charging = 2 + + def as_text(self) -> str: + return ('Abnormal', 'Not charging', 'Charging')[self.value] + + +class BatteryPowerDirection(InverterEnum): + DoNothing = 0 + Charge = 1 + Discharge = 2 + + def as_text(self) -> str: + return ('Do nothing', 'Charge', 'Discharge')[self.value] + + +class DC_AC_PowerDirection(InverterEnum): + DoNothing = 0 + AC_DC = 1 + DC_AC = 2 + + def as_text(self) -> str: + return ('Do nothing', 'AC/DC', 'DC/AC')[self.value] + + +class LinePowerDirection(InverterEnum): + DoNothing = 0 + Input = 1 + Output = 2 + + def as_text(self) -> str: + return ('Do nothing', 'Input', 'Output')[self.value] + + +class WorkingMode(InverterEnum): + PowerOnMode = 0 + StandbyMode = 1 + BypassMode = 2 + BatteryMode = 3 + FaultMode = 4 + HybridMode = 5 + + def as_text(self) -> str: + return ( + 'Power on mode', + 'Standby mode', + 'Bypass mode', + 'Battery mode', + 'Fault mode', + 'Hybrid mode' + )[self.value] + + +class ParallelConnectionStatus(InverterEnum): + NotExistent = 0 + Existent = 1 + + def as_text(self) -> str: + return ('Non-existent', 'Existent')[self.value] + + +class LoadConnectionStatus(InverterEnum): + Disconnected = 0 + Connected = 1 + + def as_text(self) -> str: + return ('Disconnected', 'Connected')[self.value] + + +class ConfigurationStatus(InverterEnum): + Default = 0 + Changed = 1 + + def as_text(self) -> str: + return ('Default', 'Changed')[self.value] + + +_g_human_readable = {"grid_voltage": "Grid voltage", + "grid_freq": "Grid frequency", + "ac_output_voltage": "AC output voltage", + "ac_output_freq": "AC output frequency", + "ac_output_apparent_power": "AC output apparent power", + "ac_output_active_power": "AC output active power", + "output_load_percent": "Output load percent", + "battery_voltage": "Battery voltage", + "battery_voltage_scc": "Battery voltage from SCC", + "battery_voltage_scc2": "Battery voltage from SCC2", + "battery_discharge_current": "Battery discharge current", + "battery_charge_current": "Battery charge current", + "battery_capacity": "Battery capacity", + "inverter_heat_sink_temp": "Inverter heat sink temperature", + "mppt1_charger_temp": "MPPT1 charger temperature", + "mppt2_charger_temp": "MPPT2 charger temperature", + "pv1_input_power": "PV1 input power", + "pv2_input_power": "PV2 input power", + "pv1_input_voltage": "PV1 input voltage", + "pv2_input_voltage": "PV2 input voltage", + "configuration_status": "Configuration state", + "mppt1_charger_status": "MPPT1 charger status", + "mppt2_charger_status": "MPPT2 charger status", + "load_connected": "Load connection", + "battery_power_direction": "Battery power direction", + "dc_ac_power_direction": "DC/AC power direction", + "line_power_direction": "Line power direction", + "local_parallel_id": "Local parallel ID", + "ac_input_rating_voltage": "AC input rating voltage", + "ac_input_rating_current": "AC input rating current", + "ac_output_rating_voltage": "AC output rating voltage", + "ac_output_rating_freq": "AC output rating frequency", + "ac_output_rating_current": "AC output rating current", + "ac_output_rating_apparent_power": "AC output rating apparent power", + "ac_output_rating_active_power": "AC output rating active power", + "battery_rating_voltage": "Battery rating voltage", + "battery_recharge_voltage": "Battery re-charge voltage", + "battery_redischarge_voltage": "Battery re-discharge voltage", + "battery_under_voltage": "Battery under voltage", + "battery_bulk_voltage": "Battery bulk voltage", + "battery_float_voltage": "Battery float voltage", + "battery_type": "Battery type", + "max_charge_current": "Max charge current", + "max_ac_charge_current": "Max AC charge current", + "input_voltage_range": "Input voltage range", + "output_source_priority": "Output source priority", + "charge_source_priority": "Charge source priority", + "parallel_max_num": "Parallel max num", + "machine_type": "Machine type", + "topology": "Topology", + "output_mode": "Output mode", + "solar_power_priority": "Solar power priority", + "mppt": "MPPT string", + "fault_code": "Fault code", + "line_fail": "Line fail", + "output_circuit_short": "Output circuit short", + "inverter_over_temperature": "Inverter over temperature", + "fan_lock": "Fan lock", + "battery_voltage_high": "Battery voltage high", + "battery_low": "Battery low", + "battery_under": "Battery under", + "over_load": "Over load", + "eeprom_fail": "EEPROM fail", + "power_limit": "Power limit", + "pv1_voltage_high": "PV1 voltage high", + "pv2_voltage_high": "PV2 voltage high", + "mppt1_overload_warning": "MPPT1 overload warning", + "mppt2_overload_warning": "MPPT2 overload warning", + "battery_too_low_to_charge_for_scc1": "Battery too low to charge for SCC1", + "battery_too_low_to_charge_for_scc2": "Battery too low to charge for SCC2", + "buzzer": "Buzzer", + "overload_bypass": "Overload bypass function", + "escape_to_default_screen_after_1min_timeout": "Escape to default screen after 1min timeout", + "overload_restart": "Overload restart", + "over_temp_restart": "Over temperature restart", + "backlight_on": "Backlight on", + "alarm_on_on_primary_source_interrupt": "Alarm on on primary source interrupt", + "fault_code_record": "Fault code record", + "wh": "Wh"} + + +class InverterEmulator: + def __init__(self, addr: Addr, wait=True): + self.status = {"grid_voltage": {"unit": "V", "value": 236.3}, + "grid_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_voltage": {"unit": "V", "value": 229.9}, + "ac_output_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_apparent_power": {"unit": "VA", "value": 207}, + "ac_output_active_power": {"unit": "Wh", "value": 146}, + "output_load_percent": {"unit": "%", "value": 4}, + "battery_voltage": {"unit": "V", "value": 49.1}, + "battery_voltage_scc": {"unit": "V", "value": 0.0}, + "battery_voltage_scc2": {"unit": "V", "value": 0.0}, + "battery_discharge_current": {"unit": "A", "value": 3}, + "battery_charge_current": {"unit": "A", "value": 0}, + "battery_capacity": {"unit": "%", "value": 69}, + "inverter_heat_sink_temp": {"unit": "°C", "value": 17}, + "mppt1_charger_temp": {"unit": "°C", "value": 0}, + "mppt2_charger_temp": {"unit": "°C", "value": 0}, + "pv1_input_power": {"unit": "Wh", "value": 0}, + "pv2_input_power": {"unit": "Wh", "value": 0}, + "pv1_input_voltage": {"unit": "V", "value": 0.0}, + "pv2_input_voltage": {"unit": "V", "value": 0.0}, + "configuration_status": ConfigurationStatus.Default, + "mppt1_charger_status": MPPTChargerStatus.Abnormal, + "mppt2_charger_status": MPPTChargerStatus.Abnormal, + "load_connected": LoadConnectionStatus.Connected, + "battery_power_direction": BatteryPowerDirection.Discharge, + "dc_ac_power_direction": DC_AC_PowerDirection.DC_AC, + "line_power_direction": LinePowerDirection.DoNothing, + "local_parallel_id": 0} + + self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_input_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_voltage": {"unit": "V", "value": 230.0}, + "ac_output_rating_freq": {"unit": "Hz", "value": 50.0}, + "ac_output_rating_current": {"unit": "A", "value": 21.7}, + "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000}, + "ac_output_rating_active_power": {"unit": "Wh", "value": 5000}, + "battery_rating_voltage": {"unit": "V", "value": 48.0}, + "battery_recharge_voltage": {"unit": "V", "value": 48.0}, + "battery_redischarge_voltage": {"unit": "V", "value": 55.0}, + "battery_under_voltage": {"unit": "V", "value": 42.0}, + "battery_bulk_voltage": {"unit": "V", "value": 57.6}, + "battery_float_voltage": {"unit": "V", "value": 54.0}, + "battery_type": BatteryType.User, + "max_charge_current": {"unit": "A", "value": 60}, + "max_ac_charge_current": {"unit": "A", "value": 30}, + "input_voltage_range": InputVoltageRange.Appliance, + "output_source_priority": OutputSourcePriority.SolarBatteryUtility, + "charge_source_priority": ChargeSourcePriority.SolarAndUtility, + "parallel_max_num": 6, + "machine_type": MachineType.OffGridTie, + "topology": Topology.TransformerLess, + "output_mode": OutputMode.SingleOutput, + "solar_power_priority": SolarPowerPriority.LoadBatteryUtility, + "mppt": "2"} + + self.errors = {"fault_code": 0, + "line_fail": False, + "output_circuit_short": False, + "inverter_over_temperature": False, + "fan_lock": False, + "battery_voltage_high": False, + "battery_low": False, + "battery_under": False, + "over_load": False, + "eeprom_fail": False, + "power_limit": False, + "pv1_voltage_high": False, + "pv2_voltage_high": False, + "mppt1_overload_warning": False, + "mppt2_overload_warning": False, + "battery_too_low_to_charge_for_scc1": False, + "battery_too_low_to_charge_for_scc2": False} + + self.flags = {"buzzer": False, + "overload_bypass": True, + "escape_to_default_screen_after_1min_timeout": False, + "overload_restart": True, + "over_temp_restart": True, + "backlight_on": False, + "alarm_on_on_primary_source_interrupt": True, + "fault_code_record": False} + + self.day_generated = 1000 + + self.logger = logging.getLogger(self.__class__.__name__) + + host, port = addr + asyncio.run(self.run_server(host, port, wait)) + # self.max_ac_charge_current = 30 + # self.max_charge_current = 60 + # self.charge_thresholds = [48, 54] + + async def run_server(self, host, port, wait: bool): + server = await asyncio.start_server(self.client_handler, host, port) + async with server: + self.logger.info(f'listening on {host}:{port}') + if wait: + await server.serve_forever() + else: + asyncio.ensure_future(server.serve_forever()) + + async def client_handler(self, reader, writer): + client_fmt = Format.JSON + + def w(s: str): + writer.write(s.encode('utf-8')) + + def return_error(message=None): + w('err\r\n') + if message: + if client_fmt in (Format.JSON, Format.SIMPLE_JSON): + w(stringify({ + 'result': 'error', + 'message': message + })) + elif client_fmt in (Format.TABLE, Format.SIMPLE_TABLE): + w(f'error: {message}') + w('\r\n') + w('\r\n') + + def return_ok(data=None): + w('ok\r\n') + if client_fmt in (Format.JSON, Format.SIMPLE_JSON): + jdata = { + 'result': 'ok' + } + if data: + jdata['data'] = data + w(stringify(jdata)) + w('\r\n') + elif data: + w(data) + w('\r\n') + w('\r\n') + + request = None + while request != 'quit': + try: + request = await reader.read(255) + if request == b'\x04': + break + request = request.decode('utf-8').strip() + except Exception: + break + + if request.startswith('format '): + requested_format = request[7:] + try: + client_fmt = Format(requested_format) + except ValueError: + return_error('invalid format') + + return_ok() + + elif request.startswith('exec '): + buf = request[5:].split(' ') + command = buf[0] + args = buf[1:] + + try: + return_ok(self.process_command(client_fmt, command, *args)) + except ValueError as e: + return_error(str(e)) + + else: + return_error(f'invalid token: {request}') + + try: + await writer.drain() + except ConnectionResetError as e: + # self.logger.exception(e) + pass + + writer.close() + + def process_command(self, fmt: Format, c: str, *args) -> Union[dict, str, list[int], None]: + ac_charge_currents = [2, 10, 20, 30, 40, 50, 60] + + if c == 'get-status': + return self.format_dict(self.status, fmt) + + elif c == 'get-rated': + return self.format_dict(self.rated, fmt) + + elif c == 'get-errors': + return self.format_dict(self.errors, fmt) + + elif c == 'get-flags': + return self.format_dict(self.flags, fmt) + + elif c == 'get-day-generated': + return self.format_dict({'wh': 1000}, fmt) + + elif c == 'get-allowed-ac-charge-currents': + return self.format_list(ac_charge_currents, fmt) + + elif c == 'set-max-ac-charge-current': + if int(args[0]) != 0: + raise ValueError(f'invalid machine id: {args[0]}') + amps = int(args[1]) + if amps not in ac_charge_currents: + raise ValueError(f'invalid value: {amps}') + self.rated['max_ac_charge_current']['value'] = amps + + elif c == 'set-charge-thresholds': + self.rated['battery_recharge_voltage']['value'] = float(args[0]) + self.rated['battery_redischarge_voltage']['value'] = float(args[1]) + + elif c == 'set-output-source-priority': + self.rated['output_source_priority'] = OutputSourcePriority.SolarBatteryUtility if args[0] == 'SBU' else OutputSourcePriority.SolarUtilityBattery + + elif c == 'set-battery-cutoff-voltage': + self.rated['battery_under_voltage']['value'] = float(args[0]) + + elif c == 'set-flag': + flag = args[0] + val = bool(int(args[1])) + + if flag == 'BUZZ': + k = 'buzzer' + elif flag == 'OLBP': + k = 'overload_bypass' + elif flag == 'LCDE': + k = 'escape_to_default_screen_after_1min_timeout' + elif flag == 'OLRS': + k = 'overload_restart' + elif flag == 'OTRS': + k = 'over_temp_restart' + elif flag == 'BLON': + k = 'backlight_on' + elif flag == 'ALRM': + k = 'alarm_on_on_primary_source_interrupt' + elif flag == 'FTCR': + k = 'fault_code_record' + else: + raise ValueError('invalid flag') + + self.flags[k] = val + + else: + raise ValueError(f'{c}: unsupported command') + + @staticmethod + def format_list(values: list, fmt: Format) -> Union[str, list]: + if fmt in (Format.JSON, Format.SIMPLE_JSON): + return values + return '\n'.join(map(lambda v: str(v), values)) + + @staticmethod + def format_dict(data: dict, fmt: Format) -> Union[str, dict]: + new_data = {} + for k, v in data.items(): + new_val = None + if fmt in (Format.JSON, Format.TABLE, Format.SIMPLE_TABLE): + if isinstance(v, dict): + new_val = v + elif isinstance(v, InverterEnum): + new_val = v.as_text() + else: + new_val = v + elif fmt == Format.SIMPLE_JSON: + if isinstance(v, dict): + new_val = v['value'] + elif isinstance(v, InverterEnum): + new_val = v.value + else: + new_val = str(v) + new_data[k] = new_val + + if fmt in (Format.JSON, Format.SIMPLE_JSON): + return new_data + + lines = [] + + if fmt == Format.SIMPLE_TABLE: + for k, v in new_data.items(): + buf = k + if isinstance(v, dict): + buf += ' ' + str(v['value']) + ' ' + v['unit'] + elif isinstance(v, InverterEnum): + buf += ' ' + v.as_text() + else: + buf += ' ' + str(v) + lines.append(buf) + + elif fmt == Format.TABLE: + max_k_len = 0 + for k in new_data.keys(): + if len(_g_human_readable[k]) > max_k_len: + max_k_len = len(_g_human_readable[k]) + for k, v in new_data.items(): + buf = _g_human_readable[k] + ':' + buf += ' ' * (max_k_len - len(_g_human_readable[k]) + 1) + if isinstance(v, dict): + buf += str(v['value']) + ' ' + v['unit'] + elif isinstance(v, InverterEnum): + buf += v.as_text() + elif isinstance(v, bool): + buf += str(int(v)) + else: + buf += str(v) + lines.append(buf) + + return '\n'.join(lines) |