aboutsummaryrefslogtreecommitdiff
path: root/src/inverter-bot
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-11-02 21:29:25 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-11-02 21:59:29 +0300
commiteb970844576f0f9d84b5a385f615582b50e0afa9 (patch)
tree184992ca7371a48e96afdc88dbe57ab998df59e7 /src/inverter-bot
parent3346b29044740090d4ecee0056e456e90888c218 (diff)
implement AC charging program
Diffstat (limited to 'src/inverter-bot')
-rwxr-xr-xsrc/inverter-bot463
1 files changed, 463 insertions, 0 deletions
diff --git a/src/inverter-bot b/src/inverter-bot
new file mode 100755
index 0000000..e5d9fc6
--- /dev/null
+++ b/src/inverter-bot
@@ -0,0 +1,463 @@
+#!/usr/bin/env python3
+import logging
+import re
+import datetime
+import json
+
+from typing import Optional, Tuple
+from argparse import ArgumentParser
+from html import escape
+from inverter_wrapper import InverterClientWrapper, wrapper_instance as inverter
+from monitor import InverterMonitor, ChargingEvent
+from inverterd import Format, InverterError
+from telegram import (
+ Update,
+ ParseMode,
+ KeyboardButton,
+ InlineKeyboardButton,
+ InlineKeyboardMarkup,
+ ReplyKeyboardMarkup
+)
+from telegram.ext import (
+ Updater,
+ Filters,
+ CommandHandler,
+ MessageHandler,
+ CallbackContext,
+ CallbackQueryHandler
+)
+from telegram.error import TimedOut
+
+
+monitor: Optional[InverterMonitor] = None
+updater: Optional[Updater] = None
+notify_to: list[int] = []
+LT = escape('<=')
+flags_map = {
+ 'buzzer': 'BUZZ',
+ 'overload_bypass': 'OLBP',
+ 'escape_to_default_screen_after_1min_timeout': 'LCDE',
+ 'overload_restart': 'OLRS',
+ 'over_temp_restart': 'OTRS',
+ 'backlight_on': 'BLON',
+ 'alarm_on_on_primary_source_interrupt': 'ALRM',
+ 'fault_code_record': 'FTCR',
+}
+_strings = {
+ 'status': 'Status',
+ 'generation': 'Generation',
+
+ # flags
+ 'flag_buzzer': 'Buzzer',
+ 'flag_overload_bypass': 'Overload bypass',
+ 'flag_escape_to_default_screen_after_1min_timeout': 'Reset to default LCD page after 1min timeout',
+ 'flag_overload_restart': 'Restart on overload',
+ 'flag_over_temp_restart': 'Restart on overtemp',
+ 'flag_backlight_on': 'LCD backlight',
+ 'flag_alarm_on_on_primary_source_interrupt': 'Beep on primary source interruption',
+ 'flag_fault_code_record': 'Fault code recording',
+
+ # monitor
+ 'chrg_evt_started': 'Started charging from AC.',
+ 'chrg_evt_finished': 'Finished charging from AC.',
+ 'chrg_evt_disconnected': 'AC line disconnected.',
+ 'chrg_evt_current_changed': 'AC charging current set to <b>%dA</b>.',
+ 'chrg_evt_na_solar': 'AC line detected, but battery charging is unavailable due to active solar power line.'
+}
+logger = logging.getLogger(__name__)
+
+
+#
+# helpers
+#
+
+def _(key, *args):
+ global _strings
+ return (_strings[key] if key in _strings else f'{{{key}}}') % args
+
+
+def get_usage(command: str, arguments: dict) -> str:
+ blocks = []
+ argument_names = []
+ argument_lines = []
+ for k, v in arguments.items():
+ argument_names.append(k)
+ argument_lines.append(
+ f'<code>{k}</code>: {v}'
+ )
+
+ command = f'/{command}'
+ if argument_names:
+ command += ' ' + ' '.join(argument_names)
+
+ blocks.append(
+ '<b>Usage</b>\n'
+ f'<code>{command}</code>'
+ )
+
+ if argument_lines:
+ blocks.append(
+ '<b>Arguments</b>\n' + '\n'.join(argument_lines)
+ )
+
+ return '\n\n'.join(blocks)
+
+
+def get_markup() -> ReplyKeyboardMarkup:
+ button = [
+ [
+ _('status'),
+ _('generation')
+ ],
+ ]
+ return ReplyKeyboardMarkup(button, one_time_keyboard=False)
+
+
+def reply(update: Update, text: str, reply_markup=None) -> None:
+ if reply_markup is None:
+ reply_markup = get_markup()
+
+ update.message.reply_text(text,
+ reply_markup=reply_markup,
+ parse_mode=ParseMode.HTML)
+
+
+def handle_exc(update: Update, e) -> None:
+ logging.exception(str(e))
+
+ if isinstance(e, InverterError):
+ try:
+ err = json.loads(str(e))['message']
+ except json.decoder.JSONDecodeError:
+ err = str(e)
+ err = re.sub(r'((?:.*)?error:) (.*)', r'<b>\1</b> \2', err)
+ reply(update, err)
+
+ elif not isinstance(e, TimedOut):
+ reply(update, 'exception: ' + str(e))
+
+
+def beautify_table(s):
+ lines = s.split('\n')
+ lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines))
+ lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'<b>\1:</b> \2', line), lines))
+ return '\n'.join(lines)
+
+
+#
+# command/message handlers
+#
+
+
+def start(update: Update, context: CallbackContext) -> None:
+ reply(update, 'Select a command on the keyboard.')
+
+
+def msg_status(update: Update, context: CallbackContext) -> None:
+ try:
+ gs = inverter.exec('get-status')['data']
+
+ # render response
+ power_direction = gs['battery_power_direction'].lower()
+ power_direction = re.sub(r'ge$', 'ging', power_direction)
+
+ charging_rate = ''
+ if power_direction == 'charging':
+ charging_rate = ' @ %s %s' % (
+ gs['battery_charging_current']['value'], gs['battery_charging_current']['unit'])
+ elif power_direction == 'discharging':
+ charging_rate = ' @ %s %s' % (
+ gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit'])
+
+ html = '<b>Battery:</b> %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit'])
+ html += ' (%s%s)' % (power_direction, charging_rate)
+
+ html += '\n<b>Load:</b> %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit'])
+ html += ' (%s%%)' % (gs['output_load_percent']['value'])
+
+ if gs['pv1_input_power']['value'] > 0:
+ html += '\n<b>Input power:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
+
+ if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0:
+ html += '\n<b>Generator:</b> %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value'])
+ html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit'])
+
+ # send response
+ reply(update, html)
+ except Exception as e:
+ handle_exc(update, e)
+
+
+def msg_generation(update: Update, context: CallbackContext) -> None:
+ try:
+ today = datetime.date.today()
+ yday = today - datetime.timedelta(days=1)
+ yday2 = today - datetime.timedelta(days=2)
+
+ gs = inverter.exec('get-status')['data']
+ # sleep(0.1)
+
+ gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data']
+ gen_yday = None
+ gen_yday2 = None
+
+ if yday.month == today.month:
+ # sleep(0.1)
+ gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data']
+
+ if yday2.month == today.month:
+ # sleep(0.1)
+ gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data']
+
+ # render response
+ html = '<b>Input power:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
+ html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit'])
+
+ html += '\n<b>Today:</b> %s Wh' % (gen_today['wh'])
+
+ if gen_yday is not None:
+ html += '\n<b>Yesterday:</b> %s Wh' % (gen_yday['wh'])
+
+ if gen_yday2 is not None:
+ html += '\n<b>The day before yesterday:</b> %s Wh' % (gen_yday2['wh'])
+
+ # send response
+ reply(update, html)
+ except Exception as e:
+ handle_exc(update, e)
+
+
+def msg_all(update: Update, context: CallbackContext) -> None:
+ reply(update, "Command not recognized. Please try again.")
+
+
+def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None:
+ allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data']
+
+ try:
+ current = int(context.args[0])
+ if current not in allowed_values:
+ raise ValueError(f'invalid value {current}')
+
+ response = inverter.exec('set-max-ac-charging-current', (0, current))
+ reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
+
+ except (IndexError, ValueError):
+ usage = get_usage('setgencc', {
+ 'A': 'max charging current, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values))
+ })
+ reply(update, usage)
+
+
+def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None:
+ try:
+ cv = float(context.args[0])
+ dv = float(context.args[1])
+
+ if 44 <= cv <= 51 and 48 <= dv <= 58:
+ response = inverter.exec('set-charging-thresholds', (cv, dv))
+ reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
+ else:
+ raise ValueError('invalid values')
+
+ except (IndexError, ValueError):
+ usage = get_usage('setgenct', {
+ 'CV': f'charging voltage, 44 {LT} CV {LT} 51',
+ 'DV': f'discharging voltage, 48 {LT} DV {LT} 58'
+ })
+ reply(update, usage)
+
+
+def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None:
+ try:
+ v = float(context.args[0])
+
+ if 40.0 <= v <= 48.0:
+ response = inverter.exec('set-battery-cut-off-voltage', (v,))
+ reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
+ monitor.set_battery_under_voltage(v)
+ else:
+ raise ValueError('invalid voltage')
+
+ except (IndexError, ValueError):
+ usage = get_usage('setbatuv', {
+ 'V': f'floating point number, 40.0 {LT} V {LT} 48.0'
+ })
+ reply(update, usage)
+
+
+def build_flags_keyboard(flags: dict) -> Tuple[str, InlineKeyboardMarkup]:
+ keyboard = []
+ for k, v in flags.items():
+ label = ('✅' if v else '❌') + ' ' + _(f'flag_{k}')
+ proto_flag = flags_map[k]
+ keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')])
+
+ text = 'Press a button to toggle a flag.'
+
+ return text, InlineKeyboardMarkup(keyboard)
+
+
+def on_flags(update: Update, context: CallbackContext) -> None:
+ flags = inverter.exec('get-flags')['data']
+ text, markup = build_flags_keyboard(flags)
+ reply(update, text, reply_markup=markup)
+
+
+def on_status(update: Update, context: CallbackContext) -> None:
+ try:
+ status = inverter.exec('get-status', format=Format.TABLE)
+ reply(update, beautify_table(status))
+ except Exception as e:
+ handle_exc(update, e)
+
+
+def on_config(update: Update, context: CallbackContext) -> None:
+ try:
+ rated = inverter.exec('get-rated', format=Format.TABLE)
+ reply(update, beautify_table(rated))
+ except Exception as e:
+ handle_exc(update, e)
+
+
+def on_errors(update: Update, context: CallbackContext) -> None:
+ try:
+ errors = inverter.exec('get-errors', format=Format.TABLE)
+ reply(update, beautify_table(errors))
+ except Exception as e:
+ handle_exc(update, e)
+
+
+def on_test(update: Update, context: CallbackContext) -> None:
+ monitor_charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=20)
+
+
+def on_button(update: Update, context: CallbackContext) -> None:
+ query = update.callback_query
+
+ if query.data.startswith('flag_'):
+ flag = query.data[5:]
+ found = False
+ json_key = None
+ for k, v in flags_map.items():
+ if v == flag:
+ found = True
+ json_key = k
+ break
+ if not found:
+ query.answer('unknown flag')
+ return
+
+ flags = inverter.exec('get-flags')['data']
+ cur_flag_value = flags[json_key]
+ target_flag_value = '0' if cur_flag_value else '1'
+
+ # set flag
+ response = inverter.exec('set-flag', (flag, target_flag_value))
+
+ # notify user
+ query.answer('Done' if response['result'] == 'ok' else 'failed to toggle flag')
+
+ # edit message
+ flags[json_key] = not cur_flag_value
+ text, markup = build_flags_keyboard(flags)
+ query.edit_message_text(text, reply_markup=markup)
+
+ else:
+ query.answer('unexpected callback data')
+
+
+def monitor_charging_event_handler(event: ChargingEvent, **kwargs):
+ key = None
+ args = []
+
+ if event == ChargingEvent.AC_CHARGING_STARTED:
+ key = 'started'
+ elif event == ChargingEvent.AC_CHARGING_FINISHED:
+ key = 'finished'
+ elif event == ChargingEvent.AC_DISCONNECTED:
+ key = 'disconnected'
+ elif event == ChargingEvent.AC_CURRENT_CHANGED:
+ key = 'current_changed'
+ args.append(kwargs['current'])
+ elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR:
+ key = 'na_solar'
+
+ if key is None:
+ logger.error('unknown charging event:', event)
+ return
+
+ text = _(f'chrg_evt_{key}', *args)
+
+ for chat_id in notify_to:
+ updater.bot.send_message(chat_id=chat_id,
+ text=text,
+ parse_mode='HTML',
+ reply_markup=get_markup())
+
+
+def monitor_battery_event_handler(event):
+ pass
+
+
+if __name__ == '__main__':
+ # command-line arguments
+ parser = ArgumentParser()
+ parser.add_argument('--token', required=True, type=str,
+ help='Telegram bot token')
+ parser.add_argument('--users-whitelist', nargs='+',
+ help='ID of users allowed to use the bot')
+ parser.add_argument('--notify-to', nargs='+')
+ parser.add_argument('--ac-current-range', nargs='+', default=(10, 30))
+ parser.add_argument('--inverterd-host', default='127.0.0.1', type=str)
+ parser.add_argument('--inverterd-port', default=8305, type=int)
+ parser.add_argument('--verbose', action='store_true')
+ args = parser.parse_args()
+
+ whitelist = list(map(lambda x: int(x), args.users_whitelist))
+ notify_to = list(map(lambda x: int(x), args.notify_to)) if args.notify_to is not None else []
+
+ # connect to inverterd
+ inverter.init(host=args.inverterd_host, port=args.inverterd_port)
+
+ # start monitoring
+ monitor = InverterMonitor(args.ac_current_range)
+ monitor.set_charging_event_handler(monitor_charging_event_handler)
+ monitor.set_battery_event_handler(monitor_battery_event_handler)
+ monitor.start()
+
+ # configure logging
+ logging_level = logging.DEBUG if args.verbose else logging.INFO
+ logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ level=logging_level)
+
+ # configure bot
+ updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
+ dispatcher = updater.dispatcher
+
+ user_filter = Filters.user(whitelist)
+
+ dispatcher.add_handler(CommandHandler('start', start))
+ dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status))
+ dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation))
+
+ dispatcher.add_handler(CommandHandler('setgencc', on_set_ac_charging_current))
+ dispatcher.add_handler(CommandHandler('setgenct', on_set_ac_charging_thresholds))
+ dispatcher.add_handler(CommandHandler('setbatuv', on_set_battery_under_voltage))
+
+ dispatcher.add_handler(CallbackQueryHandler(on_button))
+ dispatcher.add_handler(CommandHandler('flags', on_flags))
+ dispatcher.add_handler(CommandHandler('status', on_status))
+ dispatcher.add_handler(CommandHandler('config', on_config))
+ dispatcher.add_handler(CommandHandler('errors', on_errors))
+ dispatcher.add_handler(CommandHandler('test', on_test))
+
+ dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
+
+ # start the bot
+ updater.start_polling()
+
+ # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
+ updater.idle()
+
+ monitor.stop()