summaryrefslogtreecommitdiff
path: root/src/home/inverter/emulator.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/home/inverter/emulator.py')
-rw-r--r--src/home/inverter/emulator.py556
1 files changed, 0 insertions, 556 deletions
diff --git a/src/home/inverter/emulator.py b/src/home/inverter/emulator.py
deleted file mode 100644
index e86b8bb..0000000
--- a/src/home/inverter/emulator.py
+++ /dev/null
@@ -1,556 +0,0 @@
-import asyncio
-import logging
-
-from inverterd import Format
-
-from typing import Union
-from enum import Enum
-from ..util import Addr, stringify
-
-
-class InverterEnum(Enum):
- def as_text(self) -> str:
- raise RuntimeError('abstract method')
-
-
-class BatteryType(InverterEnum):
- AGM = 0
- Flooded = 1
- User = 2
-
- def as_text(self) -> str:
- return ('AGM', 'Flooded', 'User')[self.value]
-
-
-class InputVoltageRange(InverterEnum):
- Appliance = 0
- USP = 1
-
- def as_text(self) -> str:
- return ('Appliance', 'USP')[self.value]
-
-
-class OutputSourcePriority(InverterEnum):
- SolarUtilityBattery = 0
- SolarBatteryUtility = 1
-
- def as_text(self) -> str:
- return ('Solar-Utility-Battery', 'Solar-Battery-Utility')[self.value]
-
-
-class ChargeSourcePriority(InverterEnum):
- SolarFirst = 0
- SolarAndUtility = 1
- SolarOnly = 2
-
- def as_text(self) -> str:
- return ('Solar-First', 'Solar-and-Utility', 'Solar-only')[self.value]
-
-
-class MachineType(InverterEnum):
- OffGridTie = 0
- GridTie = 1
-
- def as_text(self) -> str:
- return ('Off-Grid-Tie', 'Grid-Tie')[self.value]
-
-
-class Topology(InverterEnum):
- TransformerLess = 0
- Transformer = 1
-
- def as_text(self) -> str:
- return ('Transformer-less', 'Transformer')[self.value]
-
-
-class OutputMode(InverterEnum):
- SingleOutput = 0
- ParallelOutput = 1
- Phase_1_of_3 = 2
- Phase_2_of_3 = 3
- Phase_3_of_3 = 4
-
- def as_text(self) -> str:
- return (
- 'Single output',
- 'Parallel output',
- 'Phase 1 of 3-phase output',
- 'Phase 2 of 3-phase output',
- 'Phase 3 of 3-phase'
- )[self.value]
-
-
-class SolarPowerPriority(InverterEnum):
- BatteryLoadUtility = 0
- LoadBatteryUtility = 1
-
- def as_text(self) -> str:
- return ('Battery-Load-Utility', 'Load-Battery-Utility')[self.value]
-
-
-class MPPTChargerStatus(InverterEnum):
- Abnormal = 0
- NotCharging = 1
- Charging = 2
-
- def as_text(self) -> str:
- return ('Abnormal', 'Not charging', 'Charging')[self.value]
-
-
-class BatteryPowerDirection(InverterEnum):
- DoNothing = 0
- Charge = 1
- Discharge = 2
-
- def as_text(self) -> str:
- return ('Do nothing', 'Charge', 'Discharge')[self.value]
-
-
-class DC_AC_PowerDirection(InverterEnum):
- DoNothing = 0
- AC_DC = 1
- DC_AC = 2
-
- def as_text(self) -> str:
- return ('Do nothing', 'AC/DC', 'DC/AC')[self.value]
-
-
-class LinePowerDirection(InverterEnum):
- DoNothing = 0
- Input = 1
- Output = 2
-
- def as_text(self) -> str:
- return ('Do nothing', 'Input', 'Output')[self.value]
-
-
-class WorkingMode(InverterEnum):
- PowerOnMode = 0
- StandbyMode = 1
- BypassMode = 2
- BatteryMode = 3
- FaultMode = 4
- HybridMode = 5
-
- def as_text(self) -> str:
- return (
- 'Power on mode',
- 'Standby mode',
- 'Bypass mode',
- 'Battery mode',
- 'Fault mode',
- 'Hybrid mode'
- )[self.value]
-
-
-class ParallelConnectionStatus(InverterEnum):
- NotExistent = 0
- Existent = 1
-
- def as_text(self) -> str:
- return ('Non-existent', 'Existent')[self.value]
-
-
-class LoadConnectionStatus(InverterEnum):
- Disconnected = 0
- Connected = 1
-
- def as_text(self) -> str:
- return ('Disconnected', 'Connected')[self.value]
-
-
-class ConfigurationStatus(InverterEnum):
- Default = 0
- Changed = 1
-
- def as_text(self) -> str:
- return ('Default', 'Changed')[self.value]
-
-
-_g_human_readable = {"grid_voltage": "Grid voltage",
- "grid_freq": "Grid frequency",
- "ac_output_voltage": "AC output voltage",
- "ac_output_freq": "AC output frequency",
- "ac_output_apparent_power": "AC output apparent power",
- "ac_output_active_power": "AC output active power",
- "output_load_percent": "Output load percent",
- "battery_voltage": "Battery voltage",
- "battery_voltage_scc": "Battery voltage from SCC",
- "battery_voltage_scc2": "Battery voltage from SCC2",
- "battery_discharge_current": "Battery discharge current",
- "battery_charge_current": "Battery charge current",
- "battery_capacity": "Battery capacity",
- "inverter_heat_sink_temp": "Inverter heat sink temperature",
- "mppt1_charger_temp": "MPPT1 charger temperature",
- "mppt2_charger_temp": "MPPT2 charger temperature",
- "pv1_input_power": "PV1 input power",
- "pv2_input_power": "PV2 input power",
- "pv1_input_voltage": "PV1 input voltage",
- "pv2_input_voltage": "PV2 input voltage",
- "configuration_status": "Configuration state",
- "mppt1_charger_status": "MPPT1 charger status",
- "mppt2_charger_status": "MPPT2 charger status",
- "load_connected": "Load connection",
- "battery_power_direction": "Battery power direction",
- "dc_ac_power_direction": "DC/AC power direction",
- "line_power_direction": "Line power direction",
- "local_parallel_id": "Local parallel ID",
- "ac_input_rating_voltage": "AC input rating voltage",
- "ac_input_rating_current": "AC input rating current",
- "ac_output_rating_voltage": "AC output rating voltage",
- "ac_output_rating_freq": "AC output rating frequency",
- "ac_output_rating_current": "AC output rating current",
- "ac_output_rating_apparent_power": "AC output rating apparent power",
- "ac_output_rating_active_power": "AC output rating active power",
- "battery_rating_voltage": "Battery rating voltage",
- "battery_recharge_voltage": "Battery re-charge voltage",
- "battery_redischarge_voltage": "Battery re-discharge voltage",
- "battery_under_voltage": "Battery under voltage",
- "battery_bulk_voltage": "Battery bulk voltage",
- "battery_float_voltage": "Battery float voltage",
- "battery_type": "Battery type",
- "max_charge_current": "Max charge current",
- "max_ac_charge_current": "Max AC charge current",
- "input_voltage_range": "Input voltage range",
- "output_source_priority": "Output source priority",
- "charge_source_priority": "Charge source priority",
- "parallel_max_num": "Parallel max num",
- "machine_type": "Machine type",
- "topology": "Topology",
- "output_mode": "Output mode",
- "solar_power_priority": "Solar power priority",
- "mppt": "MPPT string",
- "fault_code": "Fault code",
- "line_fail": "Line fail",
- "output_circuit_short": "Output circuit short",
- "inverter_over_temperature": "Inverter over temperature",
- "fan_lock": "Fan lock",
- "battery_voltage_high": "Battery voltage high",
- "battery_low": "Battery low",
- "battery_under": "Battery under",
- "over_load": "Over load",
- "eeprom_fail": "EEPROM fail",
- "power_limit": "Power limit",
- "pv1_voltage_high": "PV1 voltage high",
- "pv2_voltage_high": "PV2 voltage high",
- "mppt1_overload_warning": "MPPT1 overload warning",
- "mppt2_overload_warning": "MPPT2 overload warning",
- "battery_too_low_to_charge_for_scc1": "Battery too low to charge for SCC1",
- "battery_too_low_to_charge_for_scc2": "Battery too low to charge for SCC2",
- "buzzer": "Buzzer",
- "overload_bypass": "Overload bypass function",
- "escape_to_default_screen_after_1min_timeout": "Escape to default screen after 1min timeout",
- "overload_restart": "Overload restart",
- "over_temp_restart": "Over temperature restart",
- "backlight_on": "Backlight on",
- "alarm_on_on_primary_source_interrupt": "Alarm on on primary source interrupt",
- "fault_code_record": "Fault code record",
- "wh": "Wh"}
-
-
-class InverterEmulator:
- def __init__(self, addr: Addr, wait=True):
- self.status = {"grid_voltage": {"unit": "V", "value": 236.3},
- "grid_freq": {"unit": "Hz", "value": 50.0},
- "ac_output_voltage": {"unit": "V", "value": 229.9},
- "ac_output_freq": {"unit": "Hz", "value": 50.0},
- "ac_output_apparent_power": {"unit": "VA", "value": 207},
- "ac_output_active_power": {"unit": "Wh", "value": 146},
- "output_load_percent": {"unit": "%", "value": 4},
- "battery_voltage": {"unit": "V", "value": 49.1},
- "battery_voltage_scc": {"unit": "V", "value": 0.0},
- "battery_voltage_scc2": {"unit": "V", "value": 0.0},
- "battery_discharge_current": {"unit": "A", "value": 3},
- "battery_charge_current": {"unit": "A", "value": 0},
- "battery_capacity": {"unit": "%", "value": 69},
- "inverter_heat_sink_temp": {"unit": "°C", "value": 17},
- "mppt1_charger_temp": {"unit": "°C", "value": 0},
- "mppt2_charger_temp": {"unit": "°C", "value": 0},
- "pv1_input_power": {"unit": "Wh", "value": 0},
- "pv2_input_power": {"unit": "Wh", "value": 0},
- "pv1_input_voltage": {"unit": "V", "value": 0.0},
- "pv2_input_voltage": {"unit": "V", "value": 0.0},
- "configuration_status": ConfigurationStatus.Default,
- "mppt1_charger_status": MPPTChargerStatus.Abnormal,
- "mppt2_charger_status": MPPTChargerStatus.Abnormal,
- "load_connected": LoadConnectionStatus.Connected,
- "battery_power_direction": BatteryPowerDirection.Discharge,
- "dc_ac_power_direction": DC_AC_PowerDirection.DC_AC,
- "line_power_direction": LinePowerDirection.DoNothing,
- "local_parallel_id": 0}
-
- self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0},
- "ac_input_rating_current": {"unit": "A", "value": 21.7},
- "ac_output_rating_voltage": {"unit": "V", "value": 230.0},
- "ac_output_rating_freq": {"unit": "Hz", "value": 50.0},
- "ac_output_rating_current": {"unit": "A", "value": 21.7},
- "ac_output_rating_apparent_power": {"unit": "VA", "value": 5000},
- "ac_output_rating_active_power": {"unit": "Wh", "value": 5000},
- "battery_rating_voltage": {"unit": "V", "value": 48.0},
- "battery_recharge_voltage": {"unit": "V", "value": 48.0},
- "battery_redischarge_voltage": {"unit": "V", "value": 55.0},
- "battery_under_voltage": {"unit": "V", "value": 42.0},
- "battery_bulk_voltage": {"unit": "V", "value": 57.6},
- "battery_float_voltage": {"unit": "V", "value": 54.0},
- "battery_type": BatteryType.User,
- "max_charge_current": {"unit": "A", "value": 60},
- "max_ac_charge_current": {"unit": "A", "value": 30},
- "input_voltage_range": InputVoltageRange.Appliance,
- "output_source_priority": OutputSourcePriority.SolarBatteryUtility,
- "charge_source_priority": ChargeSourcePriority.SolarAndUtility,
- "parallel_max_num": 6,
- "machine_type": MachineType.OffGridTie,
- "topology": Topology.TransformerLess,
- "output_mode": OutputMode.SingleOutput,
- "solar_power_priority": SolarPowerPriority.LoadBatteryUtility,
- "mppt": "2"}
-
- self.errors = {"fault_code": 0,
- "line_fail": False,
- "output_circuit_short": False,
- "inverter_over_temperature": False,
- "fan_lock": False,
- "battery_voltage_high": False,
- "battery_low": False,
- "battery_under": False,
- "over_load": False,
- "eeprom_fail": False,
- "power_limit": False,
- "pv1_voltage_high": False,
- "pv2_voltage_high": False,
- "mppt1_overload_warning": False,
- "mppt2_overload_warning": False,
- "battery_too_low_to_charge_for_scc1": False,
- "battery_too_low_to_charge_for_scc2": False}
-
- self.flags = {"buzzer": False,
- "overload_bypass": True,
- "escape_to_default_screen_after_1min_timeout": False,
- "overload_restart": True,
- "over_temp_restart": True,
- "backlight_on": False,
- "alarm_on_on_primary_source_interrupt": True,
- "fault_code_record": False}
-
- self.day_generated = 1000
-
- self.logger = logging.getLogger(self.__class__.__name__)
-
- host, port = addr
- asyncio.run(self.run_server(host, port, wait))
- # self.max_ac_charge_current = 30
- # self.max_charge_current = 60
- # self.charge_thresholds = [48, 54]
-
- async def run_server(self, host, port, wait: bool):
- server = await asyncio.start_server(self.client_handler, host, port)
- async with server:
- self.logger.info(f'listening on {host}:{port}')
- if wait:
- await server.serve_forever()
- else:
- asyncio.ensure_future(server.serve_forever())
-
- async def client_handler(self, reader, writer):
- client_fmt = Format.JSON
-
- def w(s: str):
- writer.write(s.encode('utf-8'))
-
- def return_error(message=None):
- w('err\r\n')
- if message:
- if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
- w(stringify({
- 'result': 'error',
- 'message': message
- }))
- elif client_fmt in (Format.TABLE, Format.SIMPLE_TABLE):
- w(f'error: {message}')
- w('\r\n')
- w('\r\n')
-
- def return_ok(data=None):
- w('ok\r\n')
- if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
- jdata = {
- 'result': 'ok'
- }
- if data:
- jdata['data'] = data
- w(stringify(jdata))
- w('\r\n')
- elif data:
- w(data)
- w('\r\n')
- w('\r\n')
-
- request = None
- while request != 'quit':
- try:
- request = await reader.read(255)
- if request == b'\x04':
- break
- request = request.decode('utf-8').strip()
- except Exception:
- break
-
- if request.startswith('format '):
- requested_format = request[7:]
- try:
- client_fmt = Format(requested_format)
- except ValueError:
- return_error('invalid format')
-
- return_ok()
-
- elif request.startswith('exec '):
- buf = request[5:].split(' ')
- command = buf[0]
- args = buf[1:]
-
- try:
- return_ok(self.process_command(client_fmt, command, *args))
- except ValueError as e:
- return_error(str(e))
-
- else:
- return_error(f'invalid token: {request}')
-
- try:
- await writer.drain()
- except ConnectionResetError as e:
- # self.logger.exception(e)
- pass
-
- writer.close()
-
- def process_command(self, fmt: Format, c: str, *args) -> Union[dict, str, list[int], None]:
- ac_charge_currents = [2, 10, 20, 30, 40, 50, 60]
-
- if c == 'get-status':
- return self.format_dict(self.status, fmt)
-
- elif c == 'get-rated':
- return self.format_dict(self.rated, fmt)
-
- elif c == 'get-errors':
- return self.format_dict(self.errors, fmt)
-
- elif c == 'get-flags':
- return self.format_dict(self.flags, fmt)
-
- elif c == 'get-day-generated':
- return self.format_dict({'wh': 1000}, fmt)
-
- elif c == 'get-allowed-ac-charge-currents':
- return self.format_list(ac_charge_currents, fmt)
-
- elif c == 'set-max-ac-charge-current':
- if int(args[0]) != 0:
- raise ValueError(f'invalid machine id: {args[0]}')
- amps = int(args[1])
- if amps not in ac_charge_currents:
- raise ValueError(f'invalid value: {amps}')
- self.rated['max_ac_charge_current']['value'] = amps
-
- elif c == 'set-charge-thresholds':
- self.rated['battery_recharge_voltage']['value'] = float(args[0])
- self.rated['battery_redischarge_voltage']['value'] = float(args[1])
-
- elif c == 'set-output-source-priority':
- self.rated['output_source_priority'] = OutputSourcePriority.SolarBatteryUtility if args[0] == 'SBU' else OutputSourcePriority.SolarUtilityBattery
-
- elif c == 'set-battery-cutoff-voltage':
- self.rated['battery_under_voltage']['value'] = float(args[0])
-
- elif c == 'set-flag':
- flag = args[0]
- val = bool(int(args[1]))
-
- if flag == 'BUZZ':
- k = 'buzzer'
- elif flag == 'OLBP':
- k = 'overload_bypass'
- elif flag == 'LCDE':
- k = 'escape_to_default_screen_after_1min_timeout'
- elif flag == 'OLRS':
- k = 'overload_restart'
- elif flag == 'OTRS':
- k = 'over_temp_restart'
- elif flag == 'BLON':
- k = 'backlight_on'
- elif flag == 'ALRM':
- k = 'alarm_on_on_primary_source_interrupt'
- elif flag == 'FTCR':
- k = 'fault_code_record'
- else:
- raise ValueError('invalid flag')
-
- self.flags[k] = val
-
- else:
- raise ValueError(f'{c}: unsupported command')
-
- @staticmethod
- def format_list(values: list, fmt: Format) -> Union[str, list]:
- if fmt in (Format.JSON, Format.SIMPLE_JSON):
- return values
- return '\n'.join(map(lambda v: str(v), values))
-
- @staticmethod
- def format_dict(data: dict, fmt: Format) -> Union[str, dict]:
- new_data = {}
- for k, v in data.items():
- new_val = None
- if fmt in (Format.JSON, Format.TABLE, Format.SIMPLE_TABLE):
- if isinstance(v, dict):
- new_val = v
- elif isinstance(v, InverterEnum):
- new_val = v.as_text()
- else:
- new_val = v
- elif fmt == Format.SIMPLE_JSON:
- if isinstance(v, dict):
- new_val = v['value']
- elif isinstance(v, InverterEnum):
- new_val = v.value
- else:
- new_val = str(v)
- new_data[k] = new_val
-
- if fmt in (Format.JSON, Format.SIMPLE_JSON):
- return new_data
-
- lines = []
-
- if fmt == Format.SIMPLE_TABLE:
- for k, v in new_data.items():
- buf = k
- if isinstance(v, dict):
- buf += ' ' + str(v['value']) + ' ' + v['unit']
- elif isinstance(v, InverterEnum):
- buf += ' ' + v.as_text()
- else:
- buf += ' ' + str(v)
- lines.append(buf)
-
- elif fmt == Format.TABLE:
- max_k_len = 0
- for k in new_data.keys():
- if len(_g_human_readable[k]) > max_k_len:
- max_k_len = len(_g_human_readable[k])
- for k, v in new_data.items():
- buf = _g_human_readable[k] + ':'
- buf += ' ' * (max_k_len - len(_g_human_readable[k]) + 1)
- if isinstance(v, dict):
- buf += str(v['value']) + ' ' + v['unit']
- elif isinstance(v, InverterEnum):
- buf += v.as_text()
- elif isinstance(v, bool):
- buf += str(int(v))
- else:
- buf += str(v)
- lines.append(buf)
-
- return '\n'.join(lines)