#!/usr/bin/env python3 import logging, re, datetime, json from inverterd import Format, Client as InverterClient, InverterError from typing import Optional, Tuple from argparse import ArgumentParser from html import escape # from pprint import pprint # from time import sleep from strings import lang as _ from telegram import ( Update, ParseMode, KeyboardButton, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup ) from telegram.ext import ( Updater, Filters, CommandHandler, MessageHandler, CallbackContext, CallbackQueryHandler ) from telegram.error import TimedOut LT = escape('<=') flags_map = { 'buzzer': 'BUZZ', 'overload_bypass': 'OLBP', 'escape_to_default_screen_after_1min_timeout': 'LCDE', 'overload_restart': 'OLRS', 'over_temp_restart': 'OTRS', 'backlight_on': 'BLON', 'alarm_on_on_primary_source_interrupt': 'ALRM', 'fault_code_record': 'FTCR', } class InverterClientWrapper: def __init__(self, host: str, port: str): self._host = host self._port = port self._inverter = None self.create() def create(self): self._inverter = InverterClient(host=self._host, port=self._port) self._inverter.connect() def exec(self, command: str, arguments: tuple = (), format=Format.JSON): try: self._inverter.format(format) response = self._inverter.exec(command, arguments) if format == Format.JSON: response = json.loads(response) return response except InverterError as e: raise e except Exception as e: # silently try to reconnect try: self.create() except Exception: pass raise e inverter: Optional[InverterClientWrapper] = None # # helpers # def get_usage(command: str, arguments: dict) -> str: blocks = [] argument_names = [] argument_lines = [] for k, v in arguments.items(): argument_names.append(k) argument_lines.append( f'{k}: {v}' ) command = f'/{command}' if argument_names: command += ' ' + ' '.join(argument_names) blocks.append( 'Usage\n' f'{command}' ) if argument_lines: blocks.append( 'Arguments\n' + '\n'.join(argument_lines) ) return '\n\n'.join(blocks) def get_markup() -> ReplyKeyboardMarkup: button = [ [ _('status'), _('generation') ], ] return ReplyKeyboardMarkup(button, one_time_keyboard=False) def reply(update: Update, text: str, reply_markup=None) -> None: if reply_markup is None: reply_markup = get_markup() update.message.reply_text(text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) def handle_exc(update: Update, e) -> None: logging.exception(str(e)) if isinstance(e, InverterError): try: err = json.loads(str(e))['message'] except json.decoder.JSONDecodeError: err = str(e) err = re.sub(r'((?:.*)?error:) (.*)', r'\1 \2', err) reply(update, err) elif not isinstance(e, TimedOut): reply(update, 'exception: ' + str(e)) def beautify_table(s): lines = s.split('\n') lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines)) lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines)) return '\n'.join(lines) # # command/message handlers # def start(update: Update, context: CallbackContext) -> None: reply(update, 'Select a command on the keyboard.') def msg_status(update: Update, context: CallbackContext) -> None: try: gs = inverter.exec('get-status')['data'] # render response power_direction = gs['battery_power_direction'].lower() power_direction = re.sub(r'ge$', 'ging', power_direction) charging_rate = '' if power_direction == 'charging': charging_rate = ' @ %s %s' % ( gs['battery_charging_current']['value'], gs['battery_charging_current']['unit']) elif power_direction == 'discharging': charging_rate = ' @ %s %s' % ( gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit']) html = 'Battery: %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit']) html += ' (%s%s, ' % (gs['battery_capacity']['value'], gs['battery_capacity']['unit']) html += '%s%s)' % (power_direction, charging_rate) html += '\nLoad: %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit']) html += ' (%s%%)' % (gs['output_load_percent']['value']) if gs['pv1_input_power']['value'] > 0: html += '\nInput power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0: html += '\nGenerator: %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value']) html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_generation(update: Update, context: CallbackContext) -> None: try: today = datetime.date.today() yday = today - datetime.timedelta(days=1) yday2 = today - datetime.timedelta(days=2) gs = inverter.exec('get-status')['data'] # sleep(0.1) gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data'] gen_yday = None gen_yday2 = None if yday.month == today.month: # sleep(0.1) gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data'] if yday2.month == today.month: # sleep(0.1) gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data'] # render response html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit']) html += '\nToday: %s Wh' % (gen_today['wh']) if gen_yday is not None: html += '\nYesterday: %s Wh' % (gen_yday['wh']) if gen_yday2 is not None: html += '\nThe day before yesterday: %s Wh' % (gen_yday2['wh']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_all(update: Update, context: CallbackContext) -> None: reply(update, "Command not recognized. Please try again.") def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None: allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data'] try: current = int(context.args[0]) if current not in allowed_values: raise ValueError(f'invalid value {current}') response = inverter.exec('set-max-ac-charging-current', (0, current)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') except (IndexError, ValueError): usage = get_usage('setgencc', { 'A': 'max charging current, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values)) }) reply(update, usage) def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None: try: cv = float(context.args[0]) dv = float(context.args[1]) if 44 <= cv <= 51 and 48 <= dv <= 58: response = inverter.exec('set-charging-thresholds', (cv, dv)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid values') except (IndexError, ValueError): usage = get_usage('setgenct', { 'CV': f'charging voltage, 44 {LT} CV {LT} 51', 'DV': f'discharging voltage, 48 {LT} DV {LT} 58' }) reply(update, usage) def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None: try: v = float(context.args[0]) if 40.0 <= v <= 48.0: response = inverter.exec('set-battery-cut-off-voltage', (v,)) reply(update, 'OK' if response['result'] == 'ok' else 'ERROR') else: raise ValueError('invalid voltage') except (IndexError, ValueError): usage = get_usage('setbatuv', { 'V': f'floating point number, 40.0 {LT} V {LT} 48.0' }) reply(update, usage) def build_flags_keyboard(flags: dict) -> Tuple[str, InlineKeyboardMarkup]: keyboard = [] for k, v in flags.items(): label = ('✅' if v else '❌') + ' ' + _(f'flag_{k}') proto_flag = flags_map[k] keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')]) text = 'Press a button to toggle a flag.' return text, InlineKeyboardMarkup(keyboard) def on_flags(update: Update, context: CallbackContext) -> None: flags = inverter.exec('get-flags')['data'] text, markup = build_flags_keyboard(flags) reply(update, text, reply_markup=markup) def on_status(update: Update, context: CallbackContext) -> None: try: status = inverter.exec('get-status', format=Format.TABLE) reply(update, beautify_table(status)) except Exception as e: handle_exc(update, e) def on_config(update: Update, context: CallbackContext) -> None: try: rated = inverter.exec('get-rated', format=Format.TABLE) reply(update, beautify_table(rated)) except Exception as e: handle_exc(update, e) def on_errors(update: Update, context: CallbackContext) -> None: try: errors = inverter.exec('get-errors', format=Format.TABLE) reply(update, beautify_table(errors)) except Exception as e: handle_exc(update, e) def on_button(update: Update, context: CallbackContext) -> None: query = update.callback_query if query.data.startswith('flag_'): flag = query.data[5:] found = False json_key = None for k, v in flags_map.items(): if v == flag: found = True json_key = k break if not found: query.answer('unknown flag') return flags = inverter.exec('get-flags')['data'] cur_flag_value = flags[json_key] target_flag_value = '0' if cur_flag_value else '1' # set flag response = inverter.exec('set-flag', (flag, target_flag_value)) # notify user query.answer('Done' if response['result'] == 'ok' else 'failed to toggle flag') # edit message flags[json_key] = not cur_flag_value text, markup = build_flags_keyboard(flags) query.edit_message_text(text, reply_markup=markup) else: query.answer('unexpected callback data') if __name__ == '__main__': # command-line arguments parser = ArgumentParser() parser.add_argument('--token', required=True, type=str, help='Telegram bot token') parser.add_argument('--users-whitelist', nargs='+', help='ID of users allowed to use the bot') parser.add_argument('--inverterd-host', default='127.0.0.1', type=str) parser.add_argument('--inverterd-port', default=8305, type=int) args = parser.parse_args() whitelist = list(map(lambda x: int(x), args.users_whitelist)) # connect to inverterd inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port) # configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # configure bot updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) dispatcher = updater.dispatcher user_filter = Filters.user(whitelist) dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status)) dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation)) dispatcher.add_handler(CommandHandler('setgencc', on_set_ac_charging_current)) dispatcher.add_handler(CommandHandler('setgenct', on_set_ac_charging_thresholds)) dispatcher.add_handler(CommandHandler('setbatuv', on_set_battery_under_voltage)) dispatcher.add_handler(CallbackQueryHandler(on_button)) dispatcher.add_handler(CommandHandler('flags', on_flags)) dispatcher.add_handler(CommandHandler('status', on_status)) dispatcher.add_handler(CommandHandler('config', on_config)) dispatcher.add_handler(CommandHandler('errors', on_errors)) dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all)) # start the bot updater.start_polling() # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT updater.idle()