#!/usr/bin/env python3
import logging
import re
import datetime
import json
from typing import Optional, Tuple
from argparse import ArgumentParser
from html import escape
from inverter_wrapper import InverterClientWrapper, wrapper_instance as inverter
from monitor import InverterMonitor, ChargingEvent
from inverterd import Format, InverterError
from telegram import (
Update,
ParseMode,
KeyboardButton,
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup
)
from telegram.ext import (
Updater,
Filters,
CommandHandler,
MessageHandler,
CallbackContext,
CallbackQueryHandler
)
from telegram.error import TimedOut
monitor: Optional[InverterMonitor] = None
updater: Optional[Updater] = None
notify_to: list[int] = []
LT = escape('<=')
flags_map = {
'buzzer': 'BUZZ',
'overload_bypass': 'OLBP',
'escape_to_default_screen_after_1min_timeout': 'LCDE',
'overload_restart': 'OLRS',
'over_temp_restart': 'OTRS',
'backlight_on': 'BLON',
'alarm_on_on_primary_source_interrupt': 'ALRM',
'fault_code_record': 'FTCR',
}
_strings = {
'status': 'Status',
'generation': 'Generation',
# flags
'flag_buzzer': 'Buzzer',
'flag_overload_bypass': 'Overload bypass',
'flag_escape_to_default_screen_after_1min_timeout': 'Reset to default LCD page after 1min timeout',
'flag_overload_restart': 'Restart on overload',
'flag_over_temp_restart': 'Restart on overtemp',
'flag_backlight_on': 'LCD backlight',
'flag_alarm_on_on_primary_source_interrupt': 'Beep on primary source interruption',
'flag_fault_code_record': 'Fault code recording',
# monitor
'chrg_evt_started': 'Started charging from AC.',
'chrg_evt_finished': 'Finished charging from AC.',
'chrg_evt_disconnected': 'AC line disconnected.',
'chrg_evt_current_changed': 'AC charging current set to %dA.',
'chrg_evt_na_solar': 'AC line detected, but battery charging is unavailable due to active solar power line.'
}
logger = logging.getLogger(__name__)
#
# helpers
#
def _(key, *args):
global _strings
return (_strings[key] if key in _strings else f'{{{key}}}') % args
def get_usage(command: str, arguments: dict) -> str:
blocks = []
argument_names = []
argument_lines = []
for k, v in arguments.items():
argument_names.append(k)
argument_lines.append(
f'{k}
: {v}'
)
command = f'/{command}'
if argument_names:
command += ' ' + ' '.join(argument_names)
blocks.append(
'Usage\n'
f'{command}
'
)
if argument_lines:
blocks.append(
'Arguments\n' + '\n'.join(argument_lines)
)
return '\n\n'.join(blocks)
def get_markup() -> ReplyKeyboardMarkup:
button = [
[
_('status'),
_('generation')
],
]
return ReplyKeyboardMarkup(button, one_time_keyboard=False)
def reply(update: Update, text: str, reply_markup=None) -> None:
if reply_markup is None:
reply_markup = get_markup()
update.message.reply_text(text,
reply_markup=reply_markup,
parse_mode=ParseMode.HTML)
def handle_exc(update: Update, e) -> None:
logging.exception(str(e))
if isinstance(e, InverterError):
try:
err = json.loads(str(e))['message']
except json.decoder.JSONDecodeError:
err = str(e)
err = re.sub(r'((?:.*)?error:) (.*)', r'\1 \2', err)
reply(update, err)
elif not isinstance(e, TimedOut):
reply(update, 'exception: ' + str(e))
def beautify_table(s):
lines = s.split('\n')
lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines))
lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines))
return '\n'.join(lines)
#
# command/message handlers
#
def start(update: Update, context: CallbackContext) -> None:
reply(update, 'Select a command on the keyboard.')
def msg_status(update: Update, context: CallbackContext) -> None:
try:
gs = inverter.exec('get-status')['data']
# render response
power_direction = gs['battery_power_direction'].lower()
power_direction = re.sub(r'ge$', 'ging', power_direction)
charging_rate = ''
if power_direction == 'charging':
charging_rate = ' @ %s %s' % (
gs['battery_charging_current']['value'], gs['battery_charging_current']['unit'])
elif power_direction == 'discharging':
charging_rate = ' @ %s %s' % (
gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit'])
html = 'Battery: %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit'])
html += ' (%s%s)' % (power_direction, charging_rate)
html += '\nLoad: %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit'])
html += ' (%s%%)' % (gs['output_load_percent']['value'])
if gs['pv1_input_power']['value'] > 0:
html += '\nInput power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0:
html += '\nGenerator: %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value'])
html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit'])
# send response
reply(update, html)
except Exception as e:
handle_exc(update, e)
def msg_generation(update: Update, context: CallbackContext) -> None:
try:
today = datetime.date.today()
yday = today - datetime.timedelta(days=1)
yday2 = today - datetime.timedelta(days=2)
gs = inverter.exec('get-status')['data']
# sleep(0.1)
gen_today = inverter.exec('get-day-generated', (today.year, today.month, today.day))['data']
gen_yday = None
gen_yday2 = None
if yday.month == today.month:
# sleep(0.1)
gen_yday = inverter.exec('get-day-generated', (yday.year, yday.month, yday.day))['data']
if yday2.month == today.month:
# sleep(0.1)
gen_yday2 = inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day))['data']
# render response
html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit'])
html += '\nToday: %s Wh' % (gen_today['wh'])
if gen_yday is not None:
html += '\nYesterday: %s Wh' % (gen_yday['wh'])
if gen_yday2 is not None:
html += '\nThe day before yesterday: %s Wh' % (gen_yday2['wh'])
# send response
reply(update, html)
except Exception as e:
handle_exc(update, e)
def msg_all(update: Update, context: CallbackContext) -> None:
reply(update, "Command not recognized. Please try again.")
def on_set_ac_charging_current(update: Update, context: CallbackContext) -> None:
allowed_values = inverter.exec('get-allowed-ac-charging-currents')['data']
try:
current = int(context.args[0])
if current not in allowed_values:
raise ValueError(f'invalid value {current}')
response = inverter.exec('set-max-ac-charging-current', (0, current))
reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
except (IndexError, ValueError):
usage = get_usage('setgencc', {
'A': 'max charging current, allowed values: ' + ', '.join(map(lambda x: str(x), allowed_values))
})
reply(update, usage)
def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> None:
try:
cv = float(context.args[0])
dv = float(context.args[1])
if 44 <= cv <= 51 and 48 <= dv <= 58:
response = inverter.exec('set-charging-thresholds', (cv, dv))
reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
else:
raise ValueError('invalid values')
except (IndexError, ValueError):
usage = get_usage('setgenct', {
'CV': f'charging voltage, 44 {LT} CV {LT} 51',
'DV': f'discharging voltage, 48 {LT} DV {LT} 58'
})
reply(update, usage)
def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> None:
try:
v = float(context.args[0])
if 40.0 <= v <= 48.0:
response = inverter.exec('set-battery-cut-off-voltage', (v,))
reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
monitor.set_battery_under_voltage(v)
else:
raise ValueError('invalid voltage')
except (IndexError, ValueError):
usage = get_usage('setbatuv', {
'V': f'floating point number, 40.0 {LT} V {LT} 48.0'
})
reply(update, usage)
def build_flags_keyboard(flags: dict) -> Tuple[str, InlineKeyboardMarkup]:
keyboard = []
for k, v in flags.items():
label = ('✅' if v else '❌') + ' ' + _(f'flag_{k}')
proto_flag = flags_map[k]
keyboard.append([InlineKeyboardButton(label, callback_data=f'flag_{proto_flag}')])
text = 'Press a button to toggle a flag.'
return text, InlineKeyboardMarkup(keyboard)
def on_flags(update: Update, context: CallbackContext) -> None:
flags = inverter.exec('get-flags')['data']
text, markup = build_flags_keyboard(flags)
reply(update, text, reply_markup=markup)
def on_status(update: Update, context: CallbackContext) -> None:
try:
status = inverter.exec('get-status', format=Format.TABLE)
reply(update, beautify_table(status))
except Exception as e:
handle_exc(update, e)
def on_config(update: Update, context: CallbackContext) -> None:
try:
rated = inverter.exec('get-rated', format=Format.TABLE)
reply(update, beautify_table(rated))
except Exception as e:
handle_exc(update, e)
def on_errors(update: Update, context: CallbackContext) -> None:
try:
errors = inverter.exec('get-errors', format=Format.TABLE)
reply(update, beautify_table(errors))
except Exception as e:
handle_exc(update, e)
def on_button(update: Update, context: CallbackContext) -> None:
query = update.callback_query
if query.data.startswith('flag_'):
flag = query.data[5:]
found = False
json_key = None
for k, v in flags_map.items():
if v == flag:
found = True
json_key = k
break
if not found:
query.answer('unknown flag')
return
flags = inverter.exec('get-flags')['data']
cur_flag_value = flags[json_key]
target_flag_value = '0' if cur_flag_value else '1'
# set flag
response = inverter.exec('set-flag', (flag, target_flag_value))
# notify user
query.answer('Done' if response['result'] == 'ok' else 'failed to toggle flag')
# edit message
flags[json_key] = not cur_flag_value
text, markup = build_flags_keyboard(flags)
query.edit_message_text(text, reply_markup=markup)
else:
query.answer('unexpected callback data')
def monitor_charging_event_handler(event: ChargingEvent, **kwargs):
key = None
args = []
if event == ChargingEvent.AC_CHARGING_STARTED:
key = 'started'
elif event == ChargingEvent.AC_CHARGING_FINISHED:
key = 'finished'
elif event == ChargingEvent.AC_DISCONNECTED:
key = 'disconnected'
elif event == ChargingEvent.AC_CURRENT_CHANGED:
key = 'current_changed'
args.append(kwargs['current'])
elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR:
key = 'na_solar'
if key is None:
logger.error('unknown charging event:', event)
return
text = _(f'chrg_evt_{key}', *args)
for chat_id in notify_to:
updater.bot.send_message(chat_id=chat_id,
text=text,
parse_mode='HTML',
reply_markup=get_markup())
def monitor_battery_event_handler(event):
pass
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
parser.add_argument('--token', required=True, type=str,
help='Telegram bot token')
parser.add_argument('--users-whitelist', nargs='+',
help='ID of users allowed to use the bot')
parser.add_argument('--notify-to', nargs='+')
parser.add_argument('--ac-current-range', nargs='+', default=(10, 30))
parser.add_argument('--inverterd-host', default='127.0.0.1', type=str)
parser.add_argument('--inverterd-port', default=8305, type=int)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
whitelist = list(map(lambda x: int(x), args.users_whitelist))
notify_to = list(map(lambda x: int(x), args.notify_to)) if args.notify_to is not None else []
# connect to inverterd
inverter.init(host=args.inverterd_host, port=args.inverterd_port)
# start monitoring
monitor = InverterMonitor(args.ac_current_range)
monitor.set_charging_event_handler(monitor_charging_event_handler)
monitor.set_battery_event_handler(monitor_battery_event_handler)
monitor.start()
# configure logging
logging_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging_level)
# configure bot
updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
dispatcher = updater.dispatcher
user_filter = Filters.user(whitelist)
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status))
dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation))
dispatcher.add_handler(CommandHandler('setgencc', on_set_ac_charging_current))
dispatcher.add_handler(CommandHandler('setgenct', on_set_ac_charging_thresholds))
dispatcher.add_handler(CommandHandler('setbatuv', on_set_battery_under_voltage))
dispatcher.add_handler(CallbackQueryHandler(on_button))
dispatcher.add_handler(CommandHandler('flags', on_flags))
dispatcher.add_handler(CommandHandler('status', on_status))
dispatcher.add_handler(CommandHandler('config', on_config))
dispatcher.add_handler(CommandHandler('errors', on_errors))
dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
# start the bot
updater.start_polling()
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
updater.idle()
monitor.stop()