#!/usr/bin/env python3
import logging, re, datetime, json, subprocess, os
from inverterd import Format, Client as InverterClient, InverterError
from typing import Optional
from argparse import ArgumentParser
from html import escape
from pprint import pprint
from time import sleep
from strings import lang as _
from telegram import (
Update,
ParseMode,
KeyboardButton,
ReplyKeyboardMarkup
)
from telegram.ext import (
Updater,
Filters,
CommandHandler,
MessageHandler,
CallbackContext
)
from telegram.error import TimedOut
class InverterClientWrapper:
def __init__(self, host: str, port: str):
self._host = host
self._port = port
self._inverter = None
self.create()
def create(self):
self._inverter = InverterClient(host=self._host, port=self._port)
self._inverter.connect()
def exec(self, command: str, arguments: tuple = (), format=Format.JSON):
try:
self._inverter.format(format)
return self._inverter.exec(command, arguments)
except InverterError as e:
raise e
except Exception as e:
# silently try to reconnect
try:
self.create()
except Exception:
pass
raise e
inverter: Optional[InverterClientWrapper] = None
#
# helpers
#
def get_markup() -> ReplyKeyboardMarkup:
button = [
[
_('status'),
_('generation')
],
[
_('gs'),
_('ri'),
_('errors')
]
]
return ReplyKeyboardMarkup(button, one_time_keyboard=False)
def reply(update: Update, text: str) -> None:
update.message.reply_text(text,
reply_markup=get_markup(),
parse_mode=ParseMode.HTML)
def handle_exc(update: Update, e) -> None:
logging.exception(str(e))
if isinstance(e, InverterError):
try:
err = json.loads(str(e))['message']
except json.decoder.JSONDecodeError:
err = str(e)
reply(update, f'Error: {err}')
elif not isinstance(e, TimedOut):
reply(update, 'exception: ' + str(e))
def beautify_table(s):
lines = s.split('\n')
lines = list(map(lambda line: re.sub(r'\s+', ' ', line), lines))
lines = list(map(lambda line: re.sub(r'(.*?): (.*)', r'\1: \2', line), lines))
return '\n'.join(lines)
#
# command/message handlers
#
def start(update: Update, context: CallbackContext) -> None:
reply(update, 'Select a command on the keyboard.')
def msg_status(update: Update, context: CallbackContext) -> None:
try:
gs = json.loads(inverter.exec('get-status'))['data']
# render response
power_direction = gs['battery_power_direction'].lower()
power_direction = re.sub(r'ge$', 'ging', power_direction)
charging_rate = ''
if power_direction == 'charging':
charging_rate = ' @ %s %s' % (
gs['battery_charging_current']['value'], gs['battery_charging_current']['unit'])
elif power_direction == 'discharging':
charging_rate = ' @ %s %s' % (
gs['battery_discharging_current']['value'], gs['battery_discharging_current']['unit'])
html = 'Battery: %s %s' % (gs['battery_voltage']['value'], gs['battery_voltage']['unit'])
html += ' (%s%s, ' % (gs['battery_capacity']['value'], gs['battery_capacity']['unit'])
html += '%s%s)' % (power_direction, charging_rate)
html += '\nLoad: %s %s' % (gs['ac_output_active_power']['value'], gs['ac_output_active_power']['unit'])
html += ' (%s%%)' % (gs['output_load_percent']['value'])
if gs['pv1_input_power']['value'] > 0:
html += '\nInput power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
if gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0:
html += '\nGenerator: %s %s' % (gs['grid_voltage']['unit'], gs['grid_voltage']['value'])
html += ', %s %s' % (gs['grid_freq']['value'], gs['grid_freq']['unit'])
# send response
reply(update, html)
except Exception as e:
handle_exc(update, e)
def msg_shell(update: Update, context: CallbackContext) -> None:
try:
argv = re.findall('^shell (.*)$', update.message.text)[0].split()
result = subprocess.run(argv, capture_output=True)
if result.returncode != 0:
raise ChildProcessError('spawned process returned ' + str(result.returncode))
buf = '[stdout] ' + result.stdout.decode('utf-8')
buf += '\n[stderr] ' + result.stderr.decode('utf-8')
reply(update, escape(buf))
except Exception as e:
logging.exception(str(e))
reply(update, 'exception: ' + escape(str(e)))
def msg_spawn(update: Update, context: CallbackContext) -> None:
try:
argv = re.findall('^spawn (.*)$', update.message.text)[0].split()
os.spawnlp(os.P_NOWAIT, argv[0], *argv)
except Exception as e:
logging.exception(str(e))
reply(update, 'exception: ' + escape(str(e)))
def msg_generation(update: Update, context: CallbackContext) -> None:
try:
today = datetime.date.today()
yday = today - datetime.timedelta(days=1)
yday2 = today - datetime.timedelta(days=2)
gs = json.loads(inverter.exec('get-status'))['data']
# sleep(0.1)
gen_today = json.loads(inverter.exec('get-day-generated', (today.year, today.month, today.day)))['data']
gen_yday = None
gen_yday2 = None
if yday.month == today.month:
# sleep(0.1)
gen_yday = json.loads(inverter.exec('get-day-generated', (yday.year, yday.month, yday.day)))['data']
if yday2.month == today.month:
# sleep(0.1)
gen_yday2 = json.loads(inverter.exec('get-day-generated', (yday2.year, yday2.month, yday2.day)))['data']
# render response
html = 'Input power: %s %s' % (gs['pv1_input_power']['value'], gs['pv1_input_power']['unit'])
html += ' (%s %s)' % (gs['pv1_input_voltage']['value'], gs['pv1_input_voltage']['unit'])
html += '\nToday: %s Wh' % (gen_today['wh'])
if gen_yday is not None:
html += '\nYesterday: %s Wh' % (gen_yday['wh'])
if gen_yday2 is not None:
html += '\nThe day before yesterday: %s Wh' % (gen_yday2['wh'])
# send response
reply(update, html)
except Exception as e:
handle_exc(update, e)
def msg_gs(update: Update, context: CallbackContext) -> None:
try:
status = inverter.exec('get-status', format=Format.TABLE)
reply(update, beautify_table(status))
except Exception as e:
handle_exc(update, e)
def msg_ri(update: Update, context: CallbackContext) -> None:
try:
rated = inverter.exec('get-rated', format=Format.TABLE)
reply(update, beautify_table(rated))
except Exception as e:
handle_exc(update, e)
def msg_errors(update: Update, context: CallbackContext) -> None:
try:
errors = inverter.exec('get-errors', format=Format.TABLE)
reply(update, beautify_table(errors))
except Exception as e:
handle_exc(update, e)
def msg_all(update: Update, context: CallbackContext) -> None:
reply(update, "Command not recognized. Please try again.")
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
parser.add_argument('--token', required=True, type=str,
help='Telegram bot token')
parser.add_argument('--users-whitelist', nargs='+',
help='ID of users allowed to use the bot')
parser.add_argument('--inverterd-host', default='127.0.0.1', type=str)
parser.add_argument('--inverterd-port', default=8305, type=int)
parser.add_argument('--shell-admin', required=True, type=int)
args = parser.parse_args()
whitelist = list(map(lambda x: int(x), args.users_whitelist))
# connect to inverterd
inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port)
# configure logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
# configure bot
updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
dispatcher = updater.dispatcher
user_filter = Filters.user(whitelist)
shell_admin_filter = Filters.user((args.shell_admin, ))
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(MessageHandler(Filters.regex(r'^shell ') & shell_admin_filter, msg_shell))
dispatcher.add_handler(MessageHandler(Filters.regex(r'^spawn ') & shell_admin_filter, msg_spawn))
dispatcher.add_handler(MessageHandler(Filters.text(_('status')) & user_filter, msg_status))
dispatcher.add_handler(MessageHandler(Filters.text(_('generation')) & user_filter, msg_generation))
dispatcher.add_handler(MessageHandler(Filters.text(_('gs')) & user_filter, msg_gs))
dispatcher.add_handler(MessageHandler(Filters.text(_('ri')) & user_filter, msg_ri))
dispatcher.add_handler(MessageHandler(Filters.text(_('errors')) & user_filter, msg_errors))
dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
# start the bot
updater.start_polling()
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
updater.idle()