aboutsummaryrefslogtreecommitdiff
path: root/inverter-bot
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-11-02 21:29:25 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-11-02 21:59:29 +0300
commiteb970844576f0f9d84b5a385f615582b50e0afa9 (patch)
tree184992ca7371a48e96afdc88dbe57ab998df59e7 /inverter-bot
parent3346b29044740090d4ecee0056e456e90888c218 (diff)
implement AC charging program
Diffstat (limited to 'inverter-bot')
-rwxr-xr-xinverter-bot414
1 files changed, 0 insertions, 414 deletions
diff --git a/inverter-bot b/inverter-bot
deleted file mode 100755
index f41efbe..0000000
--- a/inverter-bot
+++ /dev/null
@@ -1,414 +0,0 @@
-#!/usr/bin/env python3
-import logging, re, datetime, json
-
-from inverterd import Format, Client as InverterClient, InverterError
-from typing import Optional, Tuple
-from argparse import ArgumentParser
-from html import escape
-# from pprint import pprint
-# from time import sleep
-from strings import lang as _
-from telegram import (
- Update,
- ParseMode,
- KeyboardButton,
- InlineKeyboardButton,
- InlineKeyboardMarkup,
- ReplyKeyboardMarkup
-)
-from telegram.ext import (
- Updater,
- Filters,
- CommandHandler,
- MessageHandler,
- CallbackContext,
- CallbackQueryHandler
-)
-from telegram.error import TimedOut
-
-
-LT = escape('<=')
-flags_map = {
- 'buzzer': 'BUZZ',
- 'overload_bypass': 'OLBP',
- 'escape_to_default_screen_after_1min_timeout': 'LCDE',
- 'overload_restart': 'OLRS',
- 'over_temp_restart': 'OTRS',
- 'backlight_on': 'BLON',
- 'alarm_on_on_primary_source_interrupt': 'ALRM',
- 'fault_code_record': 'FTCR',
-}
-
-
-class InverterClientWrapper:
- def __init__(self, host: str, port: str):
- self._host = host
- self._port = port
- self._inverter = None
-
- self.create()
-
- def create(self):
- self._inverter = InverterClient(host=self._host, port=self._port)
- self._inverter.connect()
-
- def exec(self, command: str, arguments: tuple = (), format=Format.JSON):
- try:
- self._inverter.format(format)
- response = self._inverter.exec(command, arguments)
- if format == Format.JSON:
- response = json.loads(response)
- return response
- except InverterError as e:
- raise e
- except Exception as e:
- # silently try to reconnect
- try:
- self.create()
- except Exception:
- pass
- raise e
-
-
-inverter: Optional[InverterClientWrapper] = None
-
-
-#
-# helpers
-#
-
-
-def get_usage(command: str, arguments: dict) -> str:
- blocks = []
- argument_names = []
- argument_lines = []
- for k, v in arguments.items():
- argument_names.append(k)
- argument_lines.append(
- f'<code>{k}</code>: {v}'
- )
-
- command = f'/{command}'
- if argument_names:
- command += ' ' + ' '.join(argument_names)
-
- blocks.append(
- '<b>Usage</b>\n'
- f'<code>{command}</code>'
- )
-
- if argument_lines:
- blocks.append(
- '<b>Arguments</b>\n' + '\n'.join(argument_lines)
- )
-
- return '\n\n'.join(blocks)
-
-
-def get_markup() -> ReplyKeyboardMarkup:
- button = [
- [
- _('status'),
- _('generation')
- ],
- ]
- return ReplyKeyboardMarkup(button, one_time_keyboard=False)
-
-
-def reply(update: Update, text: str, reply_markup=None) -> None:
- if reply_markup is None:
- reply_markup = get_markup()
-
- update.message.reply_text(text,
- reply_markup=reply_markup,
- parse_mode=ParseMode.HTML)
-
-
-def handle_exc(update: Update, e) -> None:
- logging.exception(str(e))
-
- if isinstance(e, InverterError):
- try:
- err = json.loads(str(e))['message']
- except json.decoder.JSONDecodeError:
- err = str(e)
- err = re.sub(r'((?:.*)?error:) (.*)', r'<b>\1</b> \2', err)
- reply(update, err)
-
- elif not isinstance(e, TimedOut):
- reply(update, 'exception: ' + str(e))
-
-
-def beautify_table(s):
- lines = s.split('\n')
- lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines))
- lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'<b>\1:</b> \2', line), lines))
- return '\n'.join(lines)
-
-
-#
-# command/message handlers
-#
-
-
-def start(update: Update, context: CallbackContext) -> None:
- reply(update, 'Select a command on the keyboard.')
-
-
-def msg_status(update: Update, context: CallbackContext) -> None:
- try:
- gs = inverter.exec('get-status')['data']
-
- # render response
- power_direction = gs['battery_power_direction'].lower()
- power_direction = re.sub(r'ge$', 'ging', power_direction)
-
- charging_rate = ''
- if power_direction == 'charging':
- charging_rate = ' @ %s %s' % (
- gs['battery_charging_current']['value'], gs['battery_charging_current']['unit'])
- elif power_direction == 'discharging':
- charging_rate = ' @ %s %s' % (
- gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit'])
-
- html = '<b>Battery:</b> %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit'])
- html += ' (%s%s)' % (power_direction, charging_rate)
-
- html += '\n<b>Load:</b> %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit'])
- html += ' (%s%%)' % (gs['output_load_percent']['value'])
-
- if gs['pv1_input_power']['value'] > 0:
- html += '\n<b>Input power:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
-
- if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0:
- html += '\n<b>Generator:</b> %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value'])
- html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit'])
-
- # send response
- reply(update, html)
- except Exception as e:
- handle_exc(update, e)
-
-
-def msg_generation(update: Update, context: CallbackContext) -> None:
- try:
- today = datetime.date.today()
- yday = today - datetime.timedelta(days=1)
- yday2 = today - datetime.timedelta(days=2)
-
- gs = inverter.exec('get-status')['data']
- # sleep(0.1)
-
- gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data']
- gen_yday = None
- gen_yday2 = None
-
- if yday.month == today.month:
- # sleep(0.1)
- gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data']
-
- if yday2.month == today.month:
- # sleep(0.1)
- gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data']
-
- # render response
- html = '<b>Input power:</b> %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
- html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit'])
-
- html += '\n<b>Today:</b> %s Wh' % (gen_today['wh'])
-
- if gen_yday is not None:
- html += '\n<b>Yesterday:</b> %s Wh' % (gen_yday['wh'])
-
- if gen_yday2 is not None:
- html += '\n<b>The day before yesterday:</b> %s Wh' % (gen_yday2['wh'])
-
- # send response
- reply(update, html)
- except Exception as e:
- handle_exc(update, e)
-
-
-def msg_all(update: Update, context: CallbackContext) -> None:
- reply(update, "Command not recognized. Please try again.")
-
-
-def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None:
- allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data']
-
- try:
- current = int(context.args[0])
- if current not in allowed_values:
- raise ValueError(f'invalid value {current}')
-
- response = inverter.exec('set-max-ac-charging-current', (0, current))
- reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
-
- except (IndexError, ValueError):
- usage = get_usage('setgencc', {
- 'A': 'max charging current, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values))
- })
- reply(update, usage)
-
-
-def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None:
- try:
- cv = float(context.args[0])
- dv = float(context.args[1])
-
- if 44 <= cv <= 51 and 48 <= dv <= 58:
- response = inverter.exec('set-charging-thresholds', (cv, dv))
- reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
- else:
- raise ValueError('invalid values')
-
- except (IndexError, ValueError):
- usage = get_usage('setgenct', {
- 'CV': f'charging voltage, 44 {LT} CV {LT} 51',
- 'DV': f'discharging voltage, 48 {LT} DV {LT} 58'
- })
- reply(update, usage)
-
-
-def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None:
- try:
- v = float(context.args[0])
-
- if 40.0 <= v <= 48.0:
- response = inverter.exec('set-battery-cut-off-voltage', (v,))
- reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
- else:
- raise ValueError('invalid voltage')
-
- except (IndexError, ValueError):
- usage = get_usage('setbatuv', {
- 'V': f'floating point number, 40.0 {LT} V {LT} 48.0'
- })
- reply(update, usage)
-
-
-def build_flags_keyboard(flags: dict) -> Tuple[str, InlineKeyboardMarkup]:
- keyboard = []
- for k, v in flags.items():
- label = ('✅' if v else '❌') + ' ' + _(f'flag_{k}')
- proto_flag = flags_map[k]
- keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')])
-
- text = 'Press a button to toggle a flag.'
-
- return text, InlineKeyboardMarkup(keyboard)
-
-
-def on_flags(update: Update, context: CallbackContext) -> None:
- flags = inverter.exec('get-flags')['data']
- text, markup = build_flags_keyboard(flags)
- reply(update, text, reply_markup=markup)
-
-
-def on_status(update: Update, context: CallbackContext) -> None:
- try:
- status = inverter.exec('get-status', format=Format.TABLE)
- reply(update, beautify_table(status))
- except Exception as e:
- handle_exc(update, e)
-
-
-def on_config(update: Update, context: CallbackContext) -> None:
- try:
- rated = inverter.exec('get-rated', format=Format.TABLE)
- reply(update, beautify_table(rated))
- except Exception as e:
- handle_exc(update, e)
-
-
-def on_errors(update: Update, context: CallbackContext) -> None:
- try:
- errors = inverter.exec('get-errors', format=Format.TABLE)
- reply(update, beautify_table(errors))
- except Exception as e:
- handle_exc(update, e)
-
-
-def on_button(update: Update, context: CallbackContext) -> None:
- query = update.callback_query
-
- if query.data.startswith('flag_'):
- flag = query.data[5:]
- found = False
- json_key = None
- for k, v in flags_map.items():
- if v == flag:
- found = True
- json_key = k
- break
- if not found:
- query.answer('unknown flag')
- return
-
- flags = inverter.exec('get-flags')['data']
- cur_flag_value = flags[json_key]
- target_flag_value = '0' if cur_flag_value else '1'
-
- # set flag
- response = inverter.exec('set-flag', (flag, target_flag_value))
-
- # notify user
- query.answer('Done' if response['result'] == 'ok' else 'failed to toggle flag')
-
- # edit message
- flags[json_key] = not cur_flag_value
- text, markup = build_flags_keyboard(flags)
- query.edit_message_text(text, reply_markup=markup)
-
- else:
- query.answer('unexpected callback data')
-
-
-if __name__ == '__main__':
- # command-line arguments
- parser = ArgumentParser()
- parser.add_argument('--token', required=True, type=str,
- help='Telegram bot token')
- parser.add_argument('--users-whitelist', nargs='+',
- help='ID of users allowed to use the bot')
- parser.add_argument('--inverterd-host', default='127.0.0.1', type=str)
- parser.add_argument('--inverterd-port', default=8305, type=int)
- args = parser.parse_args()
-
- whitelist = list(map(lambda x: int(x), args.users_whitelist))
-
- # connect to inverterd
- inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port)
-
- # configure logging
- logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- level=logging.INFO)
-
- # configure bot
- updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
- dispatcher = updater.dispatcher
-
- user_filter = Filters.user(whitelist)
-
- dispatcher.add_handler(CommandHandler('start', start))
- dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status))
- dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation))
-
- dispatcher.add_handler(CommandHandler('setgencc', on_set_ac_charging_current))
- dispatcher.add_handler(CommandHandler('setgenct', on_set_ac_charging_thresholds))
- dispatcher.add_handler(CommandHandler('setbatuv', on_set_battery_under_voltage))
-
- dispatcher.add_handler(CallbackQueryHandler(on_button))
- dispatcher.add_handler(CommandHandler('flags', on_flags))
- dispatcher.add_handler(CommandHandler('status', on_status))
- dispatcher.add_handler(CommandHandler('config', on_config))
- dispatcher.add_handler(CommandHandler('errors', on_errors))
-
-
- dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
-
- # start the bot
- updater.start_polling()
-
- # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
- updater.idle()