From 1815f4be372b44c1ac738e01b59eec5db538c22f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Mon, 25 Oct 2021 00:15:36 +0300 Subject: nice upgrade --- inverter-bot | 233 +++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 170 insertions(+), 63 deletions(-) (limited to 'inverter-bot') diff --git a/inverter-bot b/inverter-bot index a56e9fe..de7d99e 100755 --- a/inverter-bot +++ b/inverter-bot @@ -2,16 +2,18 @@ import logging, re, datetime, json from inverterd import Format, Client as InverterClient, InverterError -from typing import Optional +from typing import Optional, Tuple from argparse import ArgumentParser from html import escape -from pprint import pprint -from time import sleep +# from pprint import pprint +# from time import sleep from strings import lang as _ from telegram import ( Update, ParseMode, KeyboardButton, + InlineKeyboardButton, + InlineKeyboardMarkup, ReplyKeyboardMarkup ) from telegram.ext import ( @@ -19,11 +21,25 @@ from telegram.ext import ( Filters, CommandHandler, MessageHandler, - CallbackContext + CallbackContext, + CallbackQueryHandler ) from telegram.error import TimedOut +LT = escape('<=') +flags_map = { + 'buzzer': 'BUZZ', + 'overload_bypass': 'OLBP', + 'escape_to_default_screen_after_1min_timeout': 'LCDE', + 'overload_restart': 'OLRS', + 'over_temp_restart': 'OTRS', + 'backlight_on': 'BLON', + 'alarm_on_on_primary_source_interrupt': 'ALRM', + 'fault_code_record': 'FTCR', +} + + class InverterClientWrapper: def __init__(self, host: str, port: str): self._host = host @@ -39,7 +55,10 @@ class InverterClientWrapper: def exec(self, command: str, arguments: tuple = (), format=Format.JSON): try: self._inverter.format(format) - return self._inverter.exec(command, arguments) + response = self._inverter.exec(command, arguments) + if format == Format.JSON: + response = json.loads(response) + return response except InverterError as e: raise e except Exception as e: @@ -59,24 +78,49 @@ inverter: Optional[InverterClientWrapper] = None # +def get_usage(command: str, arguments: dict) -> str: + blocks = [] + argument_names = [] + argument_lines = [] + for k, v in arguments.items(): + argument_names.append(k) + argument_lines.append( + f'{k}: {v}' + ) + + command = f'/{command}' + if argument_names: + command += ' ' + ' '.join(argument_names) + + blocks.append( + 'Usage\n' + f'{command}' + ) + + if argument_lines: + blocks.append( + 'Arguments\n' + '\n'.join(argument_lines) + ) + + return '\n\n'.join(blocks) + + def get_markup() -> ReplyKeyboardMarkup: button = [ [ _('status'), _('generation') ], - [ - _('gs'), - _('ri'), - _('errors') - ] ] return ReplyKeyboardMarkup(button, one_time_keyboard=False) -def reply(update: Update, text: str) -> None: +def reply(update: Update, text: str, reply_markup=None) -> None: + if reply_markup is None: + reply_markup = get_markup() + update.message.reply_text(text, - reply_markup=get_markup(), + reply_markup=reply_markup, parse_mode=ParseMode.HTML) @@ -101,17 +145,19 @@ def beautify_table(s): lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines)) return '\n'.join(lines) + # # command/message handlers # + def start(update: Update, context: CallbackContext) -> None: reply(update, 'Select a command on the keyboard.') def msg_status(update: Update, context: CallbackContext) -> None: try: - gs = json.loads(inverter.exec('get-status'))['data'] + gs = inverter.exec('get-status')['data'] # render response power_direction = gs['battery_power_direction'].lower() @@ -151,20 +197,20 @@ def msg_generation(update: Update, context: CallbackContext) -> None: yday = today - datetime.timedelta(days=1) yday2 = today - datetime.timedelta(days=2) - gs = json.loads(inverter.exec('get-status'))['data'] + gs = inverter.exec('get-status')['data'] # sleep(0.1) - gen_today = json.loads(inverter.exec('get-day-generated', (today.year, today.month, today.day)))['data'] + gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data'] gen_yday = None gen_yday2 = None if yday.month == today.month: # sleep(0.1) - gen_yday = json.loads(inverter.exec('get-day-generated', (yday.year, yday.month, yday.day)))['data'] + gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data'] if yday2.month == today.month: # sleep(0.1) - gen_yday2 = json.loads(inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day)))['data'] + gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data'] # render response html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) @@ -184,50 +230,26 @@ def msg_generation(update: Update, context: CallbackContext) -> None: handle_exc(update, e) -def msg_gs(update: Update, context: CallbackContext) -> None: - try: - status = inverter.exec('get-status', format=Format.TABLE) - reply(update, beautify_table(status)) - except Exception as e: - handle_exc(update, e) - - -def msg_ri(update: Update, context: CallbackContext) -> None: - try: - rated = inverter.exec('get-rated', format=Format.TABLE) - reply(update, beautify_table(rated)) - except Exception as e: - handle_exc(update, e) - - -def msg_errors(update: Update, context: CallbackContext) -> None: - try: - errors = inverter.exec('get-errors', format=Format.TABLE) - reply(update, beautify_table(errors)) - except Exception as e: - handle_exc(update, e) - - def msg_all(update: Update, context: CallbackContext) -> None: reply(update, "Command not recognized. Please try again.") def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None: + allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data'] + try: current = int(context.args[0]) - allowed_values = json.loads(inverter.exec('get-allowed-ac-charging-currents'))['data'] - if current not in allowed_values: - raise ValueError(f'invalid value {current}, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values))) + raise ValueError(f'invalid value {current}') - response = json.loads(inverter.exec('set-max-ac-charging-current', (0, current))) + response = inverter.exec('set-max-ac-charging-current', (0, current)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') - except IndexError: - reply(update, escape('Usage: /setacchargingcurrent ')) - - except ValueError as e: - handle_exc(update, e) + except (IndexError, ValueError): + usage = get_usage('setgencc', { + 'A': 'max charging current, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values)) + }) + reply(update, usage) def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None: @@ -236,15 +258,17 @@ def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> N dv = float(context.args[1]) if 44 <= cv <= 51 and 48 <= dv <= 58: - response = json.loads(inverter.exec('set-charging-thresholds', (cv, dv))) + response = inverter.exec('set-charging-thresholds', (cv, dv)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid values') except (IndexError, ValueError): - reply(update, escape('Usage: /setacchargingthresholds CV DV\n\n' - '44 <= CV <= 51\n' - '48 <= DV <= 58')) + usage = get_usage('setgenct', { + 'CV': f'charging voltage 44, {LT} CV {LT} 51', + 'DV': f'discharging voltage, 48 {LT} DV {LT} 58' + }) + reply(update, usage) def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None: @@ -252,14 +276,93 @@ def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> No v = float(context.args[0]) if 40.0 <= v <= 48.0: - response = json.loads(inverter.exec('set-battery-cut-off-voltage', (v,))) + response = inverter.exec('set-battery-cut-off-voltage', (v,)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid voltage') except (IndexError, ValueError): - reply(update, escape('Usage: /setbatteryundervoltage VOLTAGE\n\n' - 'VOLTAGE must be a floating point number between 40.0 and 48.0')) + usage = get_usage('setbatuv', { + 'V': f'floating point number, 40.0 {LT} V {LT} 48.0' + }) + reply(update, usage) + + +def build_flags_keyboard(flags: dict) -> Tuple[str, InlineKeyboardMarkup]: + keyboard = [] + for k, v in flags.items(): + label = ('✅' if v else '❌') + ' ' + _(f'flag_{k}') + proto_flag = flags_map[k] + keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')]) + + text = 'Press a button to toggle a flag.' + + return text, InlineKeyboardMarkup(keyboard) + + +def on_flags(update: Update, context: CallbackContext) -> None: + flags = inverter.exec('get-flags')['data'] + text, markup = build_flags_keyboard(flags) + reply(update, text, reply_markup=markup) + + +def on_status(update: Update, context: CallbackContext) -> None: + try: + status = inverter.exec('get-status', format=Format.TABLE) + reply(update, beautify_table(status)) + except Exception as e: + handle_exc(update, e) + + +def on_config(update: Update, context: CallbackContext) -> None: + try: + rated = inverter.exec('get-rated', format=Format.TABLE) + reply(update, beautify_table(rated)) + except Exception as e: + handle_exc(update, e) + + +def on_errors(update: Update, context: CallbackContext) -> None: + try: + errors = inverter.exec('get-errors', format=Format.TABLE) + reply(update, beautify_table(errors)) + except Exception as e: + handle_exc(update, e) + + +def on_button(update: Update, context: CallbackContext) -> None: + query = update.callback_query + + if query.data.startswith('flag_'): + flag = query.data[5:] + found = False + json_key = None + for k, v in flags_map.items(): + if v == flag: + found = True + json_key = k + break + if not found: + query.answer('unknown flag') + return + + flags = inverter.exec('get-flags')['data'] + cur_flag_value = flags[json_key] + target_flag_value = '0' if cur_flag_value else '1' + + # set flag + response = inverter.exec('set-flag', (flag, target_flag_value)) + + # notify user + query.answer('Done' if response['result'] == 'ok' else 'failed to toggle flag') + + # edit message + flags[json_key] = not cur_flag_value + text, markup = build_flags_keyboard(flags) + query.edit_message_text(text, reply_markup=markup) + + else: + query.answer('unexpected callback data') if __name__ == '__main__': @@ -291,13 +394,17 @@ if __name__ == '__main__': dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status)) dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation)) - dispatcher.add_handler(MessageHandler(Filters.text(_('gs')) & user_filter, msg_gs)) - dispatcher.add_handler(MessageHandler(Filters.text(_('ri')) & user_filter, msg_ri)) - dispatcher.add_handler(MessageHandler(Filters.text(_('errors')) & user_filter, msg_errors)) - dispatcher.add_handler(CommandHandler('setacchargingcurrent', on_set_ac_charging_current)) - dispatcher.add_handler(CommandHandler('setacchargingthresholds', on_set_ac_charging_thresholds)) - dispatcher.add_handler(CommandHandler('setbatteryundervoltage', on_set_battery_under_voltage)) + dispatcher.add_handler(CommandHandler('setgencc', on_set_ac_charging_current)) + dispatcher.add_handler(CommandHandler('setgenct', on_set_ac_charging_thresholds)) + dispatcher.add_handler(CommandHandler('setbatuv', on_set_battery_under_voltage)) + + dispatcher.add_handler(CallbackQueryHandler(on_button)) + dispatcher.add_handler(CommandHandler('flags', on_flags)) + dispatcher.add_handler(CommandHandler('status', on_status)) + dispatcher.add_handler(CommandHandler('config', on_config)) + dispatcher.add_handler(CommandHandler('errors', on_errors)) + dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all)) -- cgit v1.2.3