diff options
Diffstat (limited to 'src/home/inverter/emulator.py')
-rw-r--r-- | src/home/inverter/emulator.py | 556 |
1 files changed, 0 insertions, 556 deletions
diff --git a/src/home/inverter/emulator.py b/src/home/inverter/emulator.py deleted file mode 100644 index e86b8bb..0000000 --- a/src/home/inverter/emulator.py +++ /dev/null @@ -1,556 +0,0 @@ -import asyncio -import logging - -from inverterd import Format - -from typing import Union -from enum import Enum -from ..util import Addr, stringify - - -class InverterEnum(Enum): - def as_text(self) -> str: - raise RuntimeError('abstract method') - - -class BatteryType(InverterEnum): - AGM = 0 - Flooded = 1 - User = 2 - - def as_text(self) -> str: - return ('AGM', 'Flooded', 'User')[self.value] - - -class InputVoltageRange(InverterEnum): - Appliance = 0 - USP = 1 - - def as_text(self) -> str: - return ('Appliance', 'USP')[self.value] - - -class OutputSourcePriority(InverterEnum): - SolarUtilityBattery = 0 - SolarBatteryUtility = 1 - - def as_text(self) -> str: - return ('Solar-Utility-Battery', 'Solar-Battery-Utility')[self.value] - - -class ChargeSourcePriority(InverterEnum): - SolarFirst = 0 - SolarAndUtility = 1 - SolarOnly = 2 - - def as_text(self) -> str: - return ('Solar-First', 'Solar-and-Utility', 'Solar-only')[self.value] - - -class MachineType(InverterEnum): - OffGridTie = 0 - GridTie = 1 - - def as_text(self) -> str: - return ('Off-Grid-Tie', 'Grid-Tie')[self.value] - - -class Topology(InverterEnum): - TransformerLess = 0 - Transformer = 1 - - def as_text(self) -> str: - return ('Transformer-less', 'Transformer')[self.value] - - -class OutputMode(InverterEnum): - SingleOutput = 0 - ParallelOutput = 1 - Phase_1_of_3 = 2 - Phase_2_of_3 = 3 - Phase_3_of_3 = 4 - - def as_text(self) -> str: - return ( - 'Single output', - 'Parallel output', - 'Phase 1 of 3-phase output', - 'Phase 2 of 3-phase output', - 'Phase 3 of 3-phase' - )[self.value] - - -class SolarPowerPriority(InverterEnum): - BatteryLoadUtility = 0 - LoadBatteryUtility = 1 - - def as_text(self) -> str: - return ('Battery-Load-Utility', 'Load-Battery-Utility')[self.value] - - -class MPPTChargerStatus(InverterEnum): - Abnormal = 0 - NotCharging = 1 - Charging = 2 - - def as_text(self) -> str: - return ('Abnormal', 'Not charging', 'Charging')[self.value] - - -class BatteryPowerDirection(InverterEnum): - DoNothing = 0 - Charge = 1 - Discharge = 2 - - def as_text(self) -> str: - return ('Do nothing', 'Charge', 'Discharge')[self.value] - - -class DC_AC_PowerDirection(InverterEnum): - DoNothing = 0 - AC_DC = 1 - DC_AC = 2 - - def as_text(self) -> str: - return ('Do nothing', 'AC/DC', 'DC/AC')[self.value] - - -class LinePowerDirection(InverterEnum): - DoNothing = 0 - Input = 1 - Output = 2 - - def as_text(self) -> str: - return ('Do nothing', 'Input', 'Output')[self.value] - - -class WorkingMode(InverterEnum): - PowerOnMode = 0 - StandbyMode = 1 - BypassMode = 2 - BatteryMode = 3 - FaultMode = 4 - HybridMode = 5 - - def as_text(self) -> str: - return ( - 'Power on mode', - 'Standby mode', - 'Bypass mode', - 'Battery mode', - 'Fault mode', - 'Hybrid mode' - )[self.value] - - -class ParallelConnectionStatus(InverterEnum): - NotExistent = 0 - Existent = 1 - - def as_text(self) -> str: - return ('Non-existent', 'Existent')[self.value] - - -class LoadConnectionStatus(InverterEnum): - Disconnected = 0 - Connected = 1 - - def as_text(self) -> str: - return ('Disconnected', 'Connected')[self.value] - - -class ConfigurationStatus(InverterEnum): - Default = 0 - Changed = 1 - - def as_text(self) -> str: - return ('Default', 'Changed')[self.value] - - -_g_human_readable = {"grid_voltage": "Grid voltage", - "grid_freq": "Grid frequency", - "ac_output_voltage": "AC output voltage", - "ac_output_freq": "AC output frequency", - "ac_output_apparent_power": "AC output apparent power", - "ac_output_active_power": "AC output active power", - "output_load_percent": "Output load percent", - "battery_voltage": "Battery voltage", - "battery_voltage_scc": "Battery voltage from SCC", - "battery_voltage_scc2": "Battery voltage from SCC2", - "battery_discharge_current": "Battery discharge current", - "battery_charge_current": "Battery charge current", - "battery_capacity": "Battery capacity", - "inverter_heat_sink_temp": "Inverter heat sink temperature", - "mppt1_charger_temp": "MPPT1 charger temperature", - "mppt2_charger_temp": "MPPT2 charger temperature", - "pv1_input_power": "PV1 input power", - "pv2_input_power": "PV2 input power", - "pv1_input_voltage": "PV1 input voltage", - "pv2_input_voltage": "PV2 input voltage", - "configuration_status": "Configuration state", - "mppt1_charger_status": "MPPT1 charger status", - "mppt2_charger_status": "MPPT2 charger status", - "load_connected": "Load connection", - "battery_power_direction": "Battery power direction", - "dc_ac_power_direction": "DC/AC power direction", - "line_power_direction": "Line power direction", - "local_parallel_id": "Local parallel ID", - "ac_input_rating_voltage": "AC input rating voltage", - "ac_input_rating_current": "AC input rating current", - "ac_output_rating_voltage": "AC output rating voltage", - "ac_output_rating_freq": "AC output rating frequency", - "ac_output_rating_current": "AC output rating current", - "ac_output_rating_apparent_power": "AC output rating apparent power", - "ac_output_rating_active_power": "AC output rating active power", - "battery_rating_voltage": "Battery rating voltage", - "battery_recharge_voltage": "Battery re-charge voltage", - "battery_redischarge_voltage": "Battery re-discharge voltage", - "battery_under_voltage": "Battery under voltage", - "battery_bulk_voltage": "Battery bulk voltage", - "battery_float_voltage": "Battery float voltage", - "battery_type": "Battery type", - "max_charge_current": "Max charge current", - "max_ac_charge_current": "Max AC charge current", - "input_voltage_range": "Input voltage range", - "output_source_priority": "Output source priority", - "charge_source_priority": "Charge source priority", - "parallel_max_num": "Parallel max num", - "machine_type": "Machine type", - "topology": "Topology", - "output_mode": "Output mode", - "solar_power_priority": "Solar power priority", - "mppt": "MPPT string", - "fault_code": "Fault code", - "line_fail": "Line fail", - "output_circuit_short": "Output circuit short", - "inverter_over_temperature": "Inverter over temperature", - "fan_lock": "Fan lock", - "battery_voltage_high": "Battery voltage high", - "battery_low": "Battery low", - "battery_under": "Battery under", - "over_load": "Over load", - "eeprom_fail": "EEPROM fail", - "power_limit": "Power limit", - "pv1_voltage_high": "PV1 voltage high", - "pv2_voltage_high": "PV2 voltage high", - "mppt1_overload_warning": "MPPT1 overload warning", - "mppt2_overload_warning": "MPPT2 overload warning", - "battery_too_low_to_charge_for_scc1": "Battery too low to charge for SCC1", - "battery_too_low_to_charge_for_scc2": "Battery too low to charge for SCC2", - "buzzer": "Buzzer", - "overload_bypass": "Overload bypass function", - "escape_to_default_screen_after_1min_timeout": "Escape to default screen after 1min timeout", - "overload_restart": "Overload restart", - "over_temp_restart": "Over temperature restart", - "backlight_on": "Backlight on", - "alarm_on_on_primary_source_interrupt": "Alarm on on primary source interrupt", - "fault_code_record": "Fault code record", - "wh": "Wh"} - - -class InverterEmulator: - def __init__(self, addr: Addr, wait=True): - self.status = {"grid_voltage": {"unit": "V", "value": 236.3}, - "grid_freq": {"unit": "Hz", "value": 50.0}, - "ac_output_voltage": {"unit": "V", "value": 229.9}, - "ac_output_freq": {"unit": "Hz", "value": 50.0}, - "ac_output_apparent_power": {"unit": "VA", "value": 207}, - "ac_output_active_power": {"unit": "Wh", "value": 146}, - "output_load_percent": {"unit": "%", "value": 4}, - "battery_voltage": {"unit": "V", "value": 49.1}, - "battery_voltage_scc": {"unit": "V", "value": 0.0}, - "battery_voltage_scc2": {"unit": "V", "value": 0.0}, - "battery_discharge_current": {"unit": "A", "value": 3}, - "battery_charge_current": {"unit": "A", "value": 0}, - "battery_capacity": {"unit": "%", "value": 69}, - "inverter_heat_sink_temp": {"unit": "°C", "value": 17}, - "mppt1_charger_temp": {"unit": "°C", "value": 0}, - "mppt2_charger_temp": {"unit": "°C", "value": 0}, - "pv1_input_power": {"unit": "Wh", "value": 0}, - "pv2_input_power": {"unit": "Wh", "value": 0}, - "pv1_input_voltage": {"unit": "V", "value": 0.0}, - "pv2_input_voltage": {"unit": "V", "value": 0.0}, - "configuration_status": ConfigurationStatus.Default, - "mppt1_charger_status": MPPTChargerStatus.Abnormal, - "mppt2_charger_status": MPPTChargerStatus.Abnormal, - "load_connected": LoadConnectionStatus.Connected, - "battery_power_direction": BatteryPowerDirection.Discharge, - "dc_ac_power_direction": DC_AC_PowerDirection.DC_AC, - "line_power_direction": LinePowerDirection.DoNothing, - "local_parallel_id": 0} - - self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0}, - "ac_input_rating_current": {"unit": "A", "value": 21.7}, - "ac_output_rating_voltage": {"unit": "V", "value": 230.0}, - "ac_output_rating_freq": {"unit": "Hz", "value": 50.0}, - "ac_output_rating_current": {"unit": "A", "value": 21.7}, - "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000}, - "ac_output_rating_active_power": {"unit": "Wh", "value": 5000}, - "battery_rating_voltage": {"unit": "V", "value": 48.0}, - "battery_recharge_voltage": {"unit": "V", "value": 48.0}, - "battery_redischarge_voltage": {"unit": "V", "value": 55.0}, - "battery_under_voltage": {"unit": "V", "value": 42.0}, - "battery_bulk_voltage": {"unit": "V", "value": 57.6}, - "battery_float_voltage": {"unit": "V", "value": 54.0}, - "battery_type": BatteryType.User, - "max_charge_current": {"unit": "A", "value": 60}, - "max_ac_charge_current": {"unit": "A", "value": 30}, - "input_voltage_range": InputVoltageRange.Appliance, - "output_source_priority": OutputSourcePriority.SolarBatteryUtility, - "charge_source_priority": ChargeSourcePriority.SolarAndUtility, - "parallel_max_num": 6, - "machine_type": MachineType.OffGridTie, - "topology": Topology.TransformerLess, - "output_mode": OutputMode.SingleOutput, - "solar_power_priority": SolarPowerPriority.LoadBatteryUtility, - "mppt": "2"} - - self.errors = {"fault_code": 0, - "line_fail": False, - "output_circuit_short": False, - "inverter_over_temperature": False, - "fan_lock": False, - "battery_voltage_high": False, - "battery_low": False, - "battery_under": False, - "over_load": False, - "eeprom_fail": False, - "power_limit": False, - "pv1_voltage_high": False, - "pv2_voltage_high": False, - "mppt1_overload_warning": False, - "mppt2_overload_warning": False, - "battery_too_low_to_charge_for_scc1": False, - "battery_too_low_to_charge_for_scc2": False} - - self.flags = {"buzzer": False, - "overload_bypass": True, - "escape_to_default_screen_after_1min_timeout": False, - "overload_restart": True, - "over_temp_restart": True, - "backlight_on": False, - "alarm_on_on_primary_source_interrupt": True, - "fault_code_record": False} - - self.day_generated = 1000 - - self.logger = logging.getLogger(self.__class__.__name__) - - host, port = addr - asyncio.run(self.run_server(host, port, wait)) - # self.max_ac_charge_current = 30 - # self.max_charge_current = 60 - # self.charge_thresholds = [48, 54] - - async def run_server(self, host, port, wait: bool): - server = await asyncio.start_server(self.client_handler, host, port) - async with server: - self.logger.info(f'listening on {host}:{port}') - if wait: - await server.serve_forever() - else: - asyncio.ensure_future(server.serve_forever()) - - async def client_handler(self, reader, writer): - client_fmt = Format.JSON - - def w(s: str): - writer.write(s.encode('utf-8')) - - def return_error(message=None): - w('err\r\n') - if message: - if client_fmt in (Format.JSON, Format.SIMPLE_JSON): - w(stringify({ - 'result': 'error', - 'message': message - })) - elif client_fmt in (Format.TABLE, Format.SIMPLE_TABLE): - w(f'error: {message}') - w('\r\n') - w('\r\n') - - def return_ok(data=None): - w('ok\r\n') - if client_fmt in (Format.JSON, Format.SIMPLE_JSON): - jdata = { - 'result': 'ok' - } - if data: - jdata['data'] = data - w(stringify(jdata)) - w('\r\n') - elif data: - w(data) - w('\r\n') - w('\r\n') - - request = None - while request != 'quit': - try: - request = await reader.read(255) - if request == b'\x04': - break - request = request.decode('utf-8').strip() - except Exception: - break - - if request.startswith('format '): - requested_format = request[7:] - try: - client_fmt = Format(requested_format) - except ValueError: - return_error('invalid format') - - return_ok() - - elif request.startswith('exec '): - buf = request[5:].split(' ') - command = buf[0] - args = buf[1:] - - try: - return_ok(self.process_command(client_fmt, command, *args)) - except ValueError as e: - return_error(str(e)) - - else: - return_error(f'invalid token: {request}') - - try: - await writer.drain() - except ConnectionResetError as e: - # self.logger.exception(e) - pass - - writer.close() - - def process_command(self, fmt: Format, c: str, *args) -> Union[dict, str, list[int], None]: - ac_charge_currents = [2, 10, 20, 30, 40, 50, 60] - - if c == 'get-status': - return self.format_dict(self.status, fmt) - - elif c == 'get-rated': - return self.format_dict(self.rated, fmt) - - elif c == 'get-errors': - return self.format_dict(self.errors, fmt) - - elif c == 'get-flags': - return self.format_dict(self.flags, fmt) - - elif c == 'get-day-generated': - return self.format_dict({'wh': 1000}, fmt) - - elif c == 'get-allowed-ac-charge-currents': - return self.format_list(ac_charge_currents, fmt) - - elif c == 'set-max-ac-charge-current': - if int(args[0]) != 0: - raise ValueError(f'invalid machine id: {args[0]}') - amps = int(args[1]) - if amps not in ac_charge_currents: - raise ValueError(f'invalid value: {amps}') - self.rated['max_ac_charge_current']['value'] = amps - - elif c == 'set-charge-thresholds': - self.rated['battery_recharge_voltage']['value'] = float(args[0]) - self.rated['battery_redischarge_voltage']['value'] = float(args[1]) - - elif c == 'set-output-source-priority': - self.rated['output_source_priority'] = OutputSourcePriority.SolarBatteryUtility if args[0] == 'SBU' else OutputSourcePriority.SolarUtilityBattery - - elif c == 'set-battery-cutoff-voltage': - self.rated['battery_under_voltage']['value'] = float(args[0]) - - elif c == 'set-flag': - flag = args[0] - val = bool(int(args[1])) - - if flag == 'BUZZ': - k = 'buzzer' - elif flag == 'OLBP': - k = 'overload_bypass' - elif flag == 'LCDE': - k = 'escape_to_default_screen_after_1min_timeout' - elif flag == 'OLRS': - k = 'overload_restart' - elif flag == 'OTRS': - k = 'over_temp_restart' - elif flag == 'BLON': - k = 'backlight_on' - elif flag == 'ALRM': - k = 'alarm_on_on_primary_source_interrupt' - elif flag == 'FTCR': - k = 'fault_code_record' - else: - raise ValueError('invalid flag') - - self.flags[k] = val - - else: - raise ValueError(f'{c}: unsupported command') - - @staticmethod - def format_list(values: list, fmt: Format) -> Union[str, list]: - if fmt in (Format.JSON, Format.SIMPLE_JSON): - return values - return '\n'.join(map(lambda v: str(v), values)) - - @staticmethod - def format_dict(data: dict, fmt: Format) -> Union[str, dict]: - new_data = {} - for k, v in data.items(): - new_val = None - if fmt in (Format.JSON, Format.TABLE, Format.SIMPLE_TABLE): - if isinstance(v, dict): - new_val = v - elif isinstance(v, InverterEnum): - new_val = v.as_text() - else: - new_val = v - elif fmt == Format.SIMPLE_JSON: - if isinstance(v, dict): - new_val = v['value'] - elif isinstance(v, InverterEnum): - new_val = v.value - else: - new_val = str(v) - new_data[k] = new_val - - if fmt in (Format.JSON, Format.SIMPLE_JSON): - return new_data - - lines = [] - - if fmt == Format.SIMPLE_TABLE: - for k, v in new_data.items(): - buf = k - if isinstance(v, dict): - buf += ' ' + str(v['value']) + ' ' + v['unit'] - elif isinstance(v, InverterEnum): - buf += ' ' + v.as_text() - else: - buf += ' ' + str(v) - lines.append(buf) - - elif fmt == Format.TABLE: - max_k_len = 0 - for k in new_data.keys(): - if len(_g_human_readable[k]) > max_k_len: - max_k_len = len(_g_human_readable[k]) - for k, v in new_data.items(): - buf = _g_human_readable[k] + ':' - buf += ' ' * (max_k_len - len(_g_human_readable[k]) + 1) - if isinstance(v, dict): - buf += str(v['value']) + ' ' + v['unit'] - elif isinstance(v, InverterEnum): - buf += v.as_text() - elif isinstance(v, bool): - buf += str(int(v)) - else: - buf += str(v) - lines.append(buf) - - return '\n'.join(lines) |