#!/usr/bin/env python3 import logging, re, datetime, json from inverterd import Format, Client as InverterClient, InverterError from typing import Optional from argparse import ArgumentParser from html import escape from pprint import pprint from time import sleep from strings import lang as _ from telegram import ( Update, ParseMode, KeyboardButton, ReplyKeyboardMarkup ) from telegram.ext import ( Updater, Filters, CommandHandler, MessageHandler, CallbackContext ) from telegram.error import TimedOut class InverterClientWrapper: def __init__(self, host: str, port: str): self._host = host self._port = port self._inverter = None self.create() def create(self): self._inverter = InverterClient(host=self._host, port=self._port) self._inverter.connect() def exec(self, command: str, arguments: tuple = (), format=Format.JSON): try: self._inverter.format(format) return self._inverter.exec(command, arguments) except InverterError as e: raise e except Exception as e: # silently try to reconnect try: self.create() except Exception: pass raise e inverter: Optional[InverterClientWrapper] = None # # helpers # def get_markup() -> ReplyKeyboardMarkup: button = [ [ _('status'), _('generation') ], [ _('gs'), _('ri'), _('errors') ] ] return ReplyKeyboardMarkup(button, one_time_keyboard=False) def reply(update: Update, text: str) -> None: update.message.reply_text(text, reply_markup=get_markup(), parse_mode=ParseMode.HTML) def handle_exc(update: Update, e) -> None: logging.exception(str(e)) if isinstance(e, InverterError): try: err = json.loads(str(e))['message'] except json.decoder.JSONDecodeError: err = str(e) err = re.sub(r'((?:.*)?error:) (.*)', r'\1 \2', err) reply(update, err) elif not isinstance(e, TimedOut): reply(update, 'exception: ' + str(e)) def beautify_table(s): lines = s.split('\n') lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines)) lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines)) return '\n'.join(lines) # # command/message handlers # def start(update: Update, context: CallbackContext) -> None: reply(update, 'Select a command on the keyboard.') def msg_status(update: Update, context: CallbackContext) -> None: try: gs = json.loads(inverter.exec('get-status'))['data'] # render response power_direction = gs['battery_power_direction'].lower() power_direction = re.sub(r'ge$', 'ging', power_direction) charging_rate = '' if power_direction == 'charging': charging_rate = ' @ %s %s' % ( gs['battery_charging_current']['value'], gs['battery_charging_current']['unit']) elif power_direction == 'discharging': charging_rate = ' @ %s %s' % ( gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit']) html = 'Battery: %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit']) html += ' (%s%s, ' % (gs['battery_capacity']['value'], gs['battery_capacity']['unit']) html += '%s%s)' % (power_direction, charging_rate) html += '\nLoad: %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit']) html += ' (%s%%)' % (gs['output_load_percent']['value']) if gs['pv1_input_power']['value'] > 0: html += '\nInput power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0: html += '\nGenerator: %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value']) html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_generation(update: Update, context: CallbackContext) -> None: try: today = datetime.date.today() yday = today - datetime.timedelta(days=1) yday2 = today - datetime.timedelta(days=2) gs = json.loads(inverter.exec('get-status'))['data'] # sleep(0.1) gen_today = json.loads(inverter.exec('get-day-generated', (today.year, today.month, today.day)))['data'] gen_yday = None gen_yday2 = None if yday.month == today.month: # sleep(0.1) gen_yday = json.loads(inverter.exec('get-day-generated', (yday.year, yday.month, yday.day)))['data'] if yday2.month == today.month: # sleep(0.1) gen_yday2 = json.loads(inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day)))['data'] # render response html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit']) html += '\nToday: %s Wh' % (gen_today['wh']) if gen_yday is not None: html += '\nYesterday: %s Wh' % (gen_yday['wh']) if gen_yday2 is not None: html += '\nThe day before yesterday: %s Wh' % (gen_yday2['wh']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_gs(update: Update, context: CallbackContext) -> None: try: status = inverter.exec('get-status', format=Format.TABLE) reply(update, beautify_table(status)) except Exception as e: handle_exc(update, e) def msg_ri(update: Update, context: CallbackContext) -> None: try: rated = inverter.exec('get-rated', format=Format.TABLE) reply(update, beautify_table(rated)) except Exception as e: handle_exc(update, e) def msg_errors(update: Update, context: CallbackContext) -> None: try: errors = inverter.exec('get-errors', format=Format.TABLE) reply(update, beautify_table(errors)) except Exception as e: handle_exc(update, e) def msg_all(update: Update, context: CallbackContext) -> None: reply(update, "Command not recognized. Please try again.") def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None: try: current = int(context.args[0]) allowed_values = json.loads(inverter.exec('get-allowed-ac-charging-currents'))['data'] if current not in allowed_values: raise ValueError(f'invalid value {current}, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values))) response = json.loads(inverter.exec('set-max-ac-charging-current', (0, current))) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') except IndexError: reply(update, escape('Usage: /setacchargingcurrent ')) except ValueError as e: handle_exc(update, e) def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None: try: cv = float(context.args[0]) dv = float(context.args[1]) if 44 <= cv <= 51 and 48 <= dv <= 58: response = json.loads(inverter.exec('set-charging-thresholds', (cv, dv))) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid values') except (IndexError, ValueError): reply(update, escape('Usage: /setacchargingthresholds CV DV\n\n' '44 <= CV <= 51\n' '48 <= DV <= 58')) def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None: try: v = float(context.args[0]) if 40.0 <= v <= 48.0: response = json.loads(inverter.exec('set-battery-cut-off-voltage', (v,))) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid voltage') except (IndexError, ValueError): reply(update, escape('Usage: /setbatteryundervoltage VOLTAGE\n\n' 'VOLTAGE must be a floating point number between 40.0 and 48.0')) if __name__ == '__main__': # command-line arguments parser = ArgumentParser() parser.add_argument('--token', required=True, type=str, help='Telegram bot token') parser.add_argument('--users-whitelist', nargs='+', help='ID of users allowed to use the bot') parser.add_argument('--inverterd-host', default='127.0.0.1', type=str) parser.add_argument('--inverterd-port', default=8305, type=int) args = parser.parse_args() whitelist = list(map(lambda x: int(x), args.users_whitelist)) # connect to inverterd inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port) # configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # configure bot updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) dispatcher = updater.dispatcher user_filter = Filters.user(whitelist) dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status)) dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation)) dispatcher.add_handler(MessageHandler(Filters.text(_('gs')) & user_filter, msg_gs)) dispatcher.add_handler(MessageHandler(Filters.text(_('ri')) & user_filter, msg_ri)) dispatcher.add_handler(MessageHandler(Filters.text(_('errors')) & user_filter, msg_errors)) dispatcher.add_handler(CommandHandler('setacchargingcurrent', on_set_ac_charging_current)) dispatcher.add_handler(CommandHandler('setacchargingthresholds', on_set_ac_charging_thresholds)) dispatcher.add_handler(CommandHandler('setbatteryundervoltage', on_set_battery_under_voltage)) dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all)) # start the bot updater.start_polling() # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT updater.idle()