#!/usr/bin/env python3 import logging, re, datetime, json, subprocess, os from inverterd import Format, Client as InverterClient, InverterError from typing import Optional from argparse import ArgumentParser from html import escape from pprint import pprint from time import sleep from strings import lang as _ from telegram import ( Update, ParseMode, KeyboardButton, ReplyKeyboardMarkup ) from telegram.ext import ( Updater, Filters, CommandHandler, MessageHandler, CallbackContext ) from telegram.error import TimedOut class InverterClientWrapper: def __init__(self, host: str, port: str): self._host = host self._port = port self._inverter = None self.create() def create(self): self._inverter = InverterClient(host=self._host, port=self._port) self._inverter.connect() def exec(self, command: str, arguments: tuple = (), format=Format.JSON): try: self._inverter.format(format) return self._inverter.exec(command, arguments) except InverterError as e: raise e except Exception as e: # silently try to reconnect try: self.create() except Exception: pass raise e inverter: Optional[InverterClientWrapper] = None # # helpers # def get_markup() -> ReplyKeyboardMarkup: button = [ [ _('status'), _('generation') ], [ _('gs'), _('ri'), _('errors') ] ] return ReplyKeyboardMarkup(button, one_time_keyboard=False) def reply(update: Update, text: str) -> None: update.message.reply_text(text, reply_markup=get_markup(), parse_mode=ParseMode.HTML) def handle_exc(update: Update, e) -> None: logging.exception(str(e)) if isinstance(e, InverterError): try: err = json.loads(str(e))['message'] except json.decoder.JSONDecodeError: err = str(e) reply(update, f'Error: {err}') elif not isinstance(e, TimedOut): reply(update, 'exception: ' + str(e)) def beautify_table(s): lines = s.split('\n') lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines)) lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines)) return '\n'.join(lines) # # command/message handlers # def start(update: Update, context: CallbackContext) -> None: reply(update, 'Select a command on the keyboard.') def msg_status(update: Update, context: CallbackContext) -> None: try: gs = json.loads(inverter.exec('get-status'))['data'] # render response power_direction = gs['battery_power_direction'].lower() power_direction = re.sub(r'ge$', 'ging', power_direction) charging_rate = '' if power_direction == 'charging': charging_rate = ' @ %s %s' % ( gs['battery_charging_current']['value'], gs['battery_charging_current']['unit']) elif power_direction == 'discharging': charging_rate = ' @ %s %s' % ( gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit']) html = 'Battery: %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit']) html += ' (%s%s, ' % (gs['battery_capacity']['value'], gs['battery_capacity']['unit']) html += '%s%s)' % (power_direction, charging_rate) html += '\nLoad: %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit']) html += ' (%s%%)' % (gs['output_load_percent']['value']) if gs['pv1_input_power']['value'] > 0: html += '\nInput power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0: html += '\nGenerator: %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value']) html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_shell(update: Update, context: CallbackContext) -> None: try: argv = re.findall('^shell (.*)$', update.message.text)[0].split() result = subprocess.run(argv, capture_output=True) if result.returncode != 0: raise ChildProcessError('spawned process returned ' + str(result.returncode)) buf = '[stdout] ' + result.stdout.decode('utf-8') buf += '\n[stderr] ' + result.stderr.decode('utf-8') reply(update, escape(buf)) except Exception as e: logging.exception(str(e)) reply(update, 'exception: ' + escape(str(e))) def msg_spawn(update: Update, context: CallbackContext) -> None: try: argv = re.findall('^spawn (.*)$', update.message.text)[0].split() os.spawnlp(os.P_NOWAIT, argv[0], *argv) except Exception as e: logging.exception(str(e)) reply(update, 'exception: ' + escape(str(e))) def msg_generation(update: Update, context: CallbackContext) -> None: try: today = datetime.date.today() yday = today - datetime.timedelta(days=1) yday2 = today - datetime.timedelta(days=2) gs = json.loads(inverter.exec('get-status'))['data'] # sleep(0.1) gen_today = json.loads(inverter.exec('get-day-generated', (today.year, today.month, today.day)))['data'] gen_yday = None gen_yday2 = None if yday.month == today.month: # sleep(0.1) gen_yday = json.loads(inverter.exec('get-day-generated', (yday.year, yday.month, yday.day)))['data'] if yday2.month == today.month: # sleep(0.1) gen_yday2 = json.loads(inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day)))['data'] # render response html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit']) html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit']) html += '\nToday: %s Wh' % (gen_today['wh']) if gen_yday is not None: html += '\nYesterday: %s Wh' % (gen_yday['wh']) if gen_yday2 is not None: html += '\nThe day before yesterday: %s Wh' % (gen_yday2['wh']) # send response reply(update, html) except Exception as e: handle_exc(update, e) def msg_gs(update: Update, context: CallbackContext) -> None: try: status = inverter.exec('get-status', format=Format.TABLE) reply(update, beautify_table(status)) except Exception as e: handle_exc(update, e) def msg_ri(update: Update, context: CallbackContext) -> None: try: rated = inverter.exec('get-rated', format=Format.TABLE) reply(update, beautify_table(rated)) except Exception as e: handle_exc(update, e) def msg_errors(update: Update, context: CallbackContext) -> None: try: errors = inverter.exec('get-errors', format=Format.TABLE) reply(update, beautify_table(errors)) except Exception as e: handle_exc(update, e) def msg_all(update: Update, context: CallbackContext) -> None: reply(update, "Command not recognized. Please try again.") if __name__ == '__main__': # command-line arguments parser = ArgumentParser() parser.add_argument('--token', required=True, type=str, help='Telegram bot token') parser.add_argument('--users-whitelist', nargs='+', help='ID of users allowed to use the bot') parser.add_argument('--inverterd-host', default='127.0.0.1', type=str) parser.add_argument('--inverterd-port', default=8305, type=int) parser.add_argument('--shell-admin', required=True, type=int) args = parser.parse_args() whitelist = list(map(lambda x: int(x), args.users_whitelist)) # connect to inverterd inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port) # configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # configure bot updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7}) dispatcher = updater.dispatcher user_filter = Filters.user(whitelist) shell_admin_filter = Filters.user((args.shell_admin, )) dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(MessageHandler(Filters.regex(r'^shell ') & shell_admin_filter, msg_shell)) dispatcher.add_handler(MessageHandler(Filters.regex(r'^spawn ') & shell_admin_filter, msg_spawn)) dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status)) dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation)) dispatcher.add_handler(MessageHandler(Filters.text(_('gs')) & user_filter, msg_gs)) dispatcher.add_handler(MessageHandler(Filters.text(_('ri')) & user_filter, msg_ri)) dispatcher.add_handler(MessageHandler(Filters.text(_('errors')) & user_filter, msg_errors)) dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all)) # start the bot updater.start_polling() # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT updater.idle()