import socket from enum import Enum class Format(Enum): JSON = 'json' SIMPLE_JSON = 'simple-json' TABLE = 'table' SIMPLE_TABLE = 'simple-table' class Client: def __init__(self, port=8305, host='127.0.0.1'): self._host = host self._port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def __del__(self): self.sock.close() def connect(self): self.sock.connect((self._host, self._port)) def _write(self, line): self.sock.sendall((line+'\r\n').encode()) def _read(self): buf = bytearray() while True: buf.extend(self.sock.recv(256)) if b'\r\n\r\n' in buf: break response = buf.decode().strip().split("\r\n") status = response.pop(0) if status not in ('ok', 'err'): raise InverterError(f"Unexpected status '{status}'") if status == 'err': raise InverterError(response[0] if len(response) >= 0 else "Unknown error") return '\r\n'.join(response) def protocol(self, v): self._write(f'v {v}') return self._read() def format(self, fmt): if isinstance(fmt, Format): fmt = fmt.value self._write(f'format {fmt}') return self._read() def exec(self, command, arguments=()): buf = f'exec {command}' for arg in arguments: buf += f' {arg}' self._write(buf) return self._read() class InverterError(RuntimeError): pass