summaryrefslogtreecommitdiff
path: root/src/home/bot
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-11-06 20:40:42 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-11-06 20:53:55 +0300
commit75ee161b6eb64cf19c8a9718d15047443f3e4ebe (patch)
treeccebc9cbd2709ad13a14ec00372fdcfe9226cd9f /src/home/bot
parent28c67c4510a3bee574b4077be35147dba257c8f7 (diff)
inverter_bot: refactor and introduce new functions
Diffstat (limited to 'src/home/bot')
-rw-r--r--src/home/bot/__init__.py6
-rw-r--r--src/home/bot/errors.py2
-rw-r--r--src/home/bot/lang.py81
-rw-r--r--src/home/bot/reporting.py22
-rw-r--r--src/home/bot/store.py32
-rw-r--r--src/home/bot/util.py57
-rw-r--r--src/home/bot/wrapper.py369
7 files changed, 0 insertions, 569 deletions
diff --git a/src/home/bot/__init__.py b/src/home/bot/__init__.py
deleted file mode 100644
index 41ad78e..0000000
--- a/src/home/bot/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from .reporting import ReportingHelper
-from .lang import LangPack
-from .wrapper import Wrapper, Context, text_filter, handlermethod, IgnoreMarkup
-from .store import Store
-from .errors import *
-from .util import command_usage, user_any_name
diff --git a/src/home/bot/errors.py b/src/home/bot/errors.py
deleted file mode 100644
index 74eee6f..0000000
--- a/src/home/bot/errors.py
+++ /dev/null
@@ -1,2 +0,0 @@
-class StoreNotEnabledError(Exception):
- pass \ No newline at end of file
diff --git a/src/home/bot/lang.py b/src/home/bot/lang.py
deleted file mode 100644
index 624c748..0000000
--- a/src/home/bot/lang.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from __future__ import annotations
-
-import logging
-
-from typing import Union, Optional, List, Dict
-
-logger = logging.getLogger(__name__)
-
-
-class LangStrings(dict):
- _lang: Optional[str]
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._lang = None
-
- def setlang(self, lang: str):
- self._lang = lang
-
- def __missing__(self, key):
- logger.warning(f'key {key} is missing in language {self._lang}')
- return '{%s}' % key
-
- def __setitem__(self, key, value):
- raise NotImplementedError(f'setting translation strings this way is prohibited (was trying to set {key}={value})')
-
-
-class LangPack:
- strings: Dict[str, LangStrings[str, str]]
- default_lang: str
-
- def __init__(self):
- self.strings = {}
- self.default_lang = 'en'
-
- def ru(self, **kwargs) -> None:
- self.set(kwargs, 'ru')
-
- def en(self, **kwargs) -> None:
- self.set(kwargs, 'en')
-
- def set(self,
- strings: Union[LangStrings, dict],
- lang: str) -> None:
-
- if isinstance(strings, dict) and not isinstance(strings, LangStrings):
- strings = LangStrings(**strings)
- strings.setlang(lang)
-
- if lang not in self.strings:
- self.strings[lang] = strings
- else:
- self.strings[lang].update(strings)
-
- def all(self, key):
- result = []
- for strings in self.strings.values():
- result.append(strings[key])
- return result
-
- @property
- def languages(self) -> List[str]:
- return list(self.strings.keys())
-
- def get(self, key: str, lang: str, *args) -> str:
- if args:
- return self.strings[lang][key] % args
- else:
- return self.strings[lang][key]
-
- def __call__(self, *args, **kwargs):
- return self.strings[self.default_lang][args[0]]
-
- def __getitem__(self, key):
- return self.strings[self.default_lang][key]
-
- def __setitem__(self, key, value):
- raise NotImplementedError('setting translation strings this way is prohibited')
-
- def __contains__(self, key):
- return key in self.strings[self.default_lang]
diff --git a/src/home/bot/reporting.py b/src/home/bot/reporting.py
deleted file mode 100644
index df3da2a..0000000
--- a/src/home/bot/reporting.py
+++ /dev/null
@@ -1,22 +0,0 @@
-import logging
-
-from telegram import Message
-from ..api import WebAPIClient as APIClient
-from ..api.errors import ApiResponseError
-from ..api.types import BotType
-
-logger = logging.getLogger(__name__)
-
-
-class ReportingHelper:
- def __init__(self, client: APIClient, bot_type: BotType):
- self.client = client
- self.bot_type = bot_type
-
- def report(self, message, text: str = None) -> None:
- if text is None:
- text = message.text
- try:
- self.client.log_bot_request(self.bot_type, message.chat_id, text)
- except ApiResponseError as error:
- logger.exception(error)
diff --git a/src/home/bot/store.py b/src/home/bot/store.py
deleted file mode 100644
index e655d8f..0000000
--- a/src/home/bot/store.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from ..database.sqlite import SQLiteBase
-
-
-class Store(SQLiteBase):
- def __init__(self):
- super().__init__()
-
- def schema_init(self, version: int) -> None:
- if version < 1:
- cursor = self.cursor()
- cursor.execute("""CREATE TABLE IF NOT EXISTS users (
- id INTEGER PRIMARY KEY,
- lang TEXT NOT NULL
- )""")
- self.commit()
-
- def get_user_lang(self, user_id: int, default: str = 'en') -> str:
- cursor = self.cursor()
- cursor.execute('SELECT lang FROM users WHERE id=?', (user_id,))
- row = cursor.fetchone()
-
- if row is None:
- cursor.execute('INSERT INTO users (id, lang) VALUES (?, ?)', (user_id, default))
- self.commit()
- return default
- else:
- return row[0]
-
- def set_user_lang(self, user_id: int, lang: str) -> None:
- cursor = self.cursor()
- cursor.execute('UPDATE users SET lang=? WHERE id=?', (lang, user_id))
- self.commit()
diff --git a/src/home/bot/util.py b/src/home/bot/util.py
deleted file mode 100644
index 4f80a67..0000000
--- a/src/home/bot/util.py
+++ /dev/null
@@ -1,57 +0,0 @@
-from telegram import User
-from .lang import LangStrings
-
-_strings = {
- 'en': LangStrings(
- usage='Usage',
- arguments='Arguments'
- ),
- 'ru': LangStrings(
- usage='Использование',
- arguments='Аргументы'
- )
-}
-
-
-def command_usage(command: str, arguments: dict, language='en') -> str:
- if language not in _strings:
- raise ValueError('unsupported language')
-
- blocks = []
- argument_names = []
- argument_lines = []
- for k, v in arguments.items():
- argument_names.append(k)
- argument_lines.append(
- f'<code>{k}</code>: {v}'
- )
-
- command = f'/{command}'
- if argument_names:
- command += ' ' + ' '.join(argument_names)
-
- blocks.append(
- f'<b>{_strings[language]["usage"]}</b>\n'
- f'<code>{command}</code>'
- )
-
- if argument_lines:
- blocks.append(
- f'<b>{_strings[language]["arguments"]}</b>\n' + '\n'.join(argument_lines)
- )
-
- return '\n\n'.join(blocks)
-
-
-def user_any_name(user: User) -> str:
- name = [user.first_name, user.last_name]
- name = list(filter(lambda s: s is not None, name))
- name = ' '.join(name).strip()
-
- if not name:
- name = user.username
-
- if not name:
- name = str(user.id)
-
- return name
diff --git a/src/home/bot/wrapper.py b/src/home/bot/wrapper.py
deleted file mode 100644
index 98946ed..0000000
--- a/src/home/bot/wrapper.py
+++ /dev/null
@@ -1,369 +0,0 @@
-import logging
-import traceback
-
-from html import escape
-from telegram import (
- Update,
- ParseMode,
- ReplyKeyboardMarkup,
- CallbackQuery,
- User,
- Message,
-)
-from telegram.ext import (
- Updater,
- Filters,
- BaseFilter,
- Handler,
- CommandHandler,
- MessageHandler,
- CallbackQueryHandler,
- CallbackContext,
- ConversationHandler
-)
-from telegram.error import TimedOut
-from ..config import config
-from typing import Optional, Union, List, Tuple
-from .store import Store
-from .lang import LangPack
-from ..api.types import BotType
-from ..api import WebAPIClient
-from .reporting import ReportingHelper
-
-logger = logging.getLogger(__name__)
-languages = {
- 'en': 'English',
- 'ru': 'Русский'
-}
-LANG_STARTED, = range(1)
-user_filter: Optional[BaseFilter] = None
-
-
-def default_langpack() -> LangPack:
- lang = LangPack()
- lang.en(
- start_message="Select command on the keyboard.",
- unknown_message="Unknown message",
- cancel="Cancel",
- select_language="Select language on the keyboard.",
- invalid_language="Invalid language. Please try again.",
- saved='Saved.',
- )
- lang.ru(
- start_message="Выберите команду на клавиатуре.",
- unknown_message="Неизвестная команда",
- cancel="Отмена",
- select_language="Выберите язык на клавиатуре.",
- invalid_language="Неверный язык. Пожалуйста, попробуйте снова",
- saved="Настройки сохранены."
- )
- return lang
-
-
-def init_user_filter():
- global user_filter
- if user_filter is None:
- if 'users' in config['bot']:
- logger.info('allowed users: ' + str(config['bot']['users']))
- user_filter = Filters.user(config['bot']['users'])
- else:
- user_filter = Filters.all # not sure if this is correct
-
-
-def text_filter(*args):
- init_user_filter()
- return Filters.text(args[0] if isinstance(args[0], list) else [*args]) & user_filter
-
-
-def exc2text(e: Exception) -> str:
- tb = ''.join(traceback.format_tb(e.__traceback__))
- return f'{e.__class__.__name__}: ' + escape(str(e)) + "\n\n" + escape(tb)
-
-
-class IgnoreMarkup:
- pass
-
-
-class Context:
- _update: Optional[Update]
- _callback_context: Optional[CallbackContext]
- _markup_getter: callable
- _lang: LangPack
- _store: Optional[Store]
- _user_lang: Optional[str]
-
- def __init__(self,
- update: Optional[Update],
- callback_context: Optional[CallbackContext],
- markup_getter: callable,
- lang: LangPack,
- store: Optional[Store]):
- self._update = update
- self._callback_context = callback_context
- self._markup_getter = markup_getter
- self._lang = lang
- self._store = store
- self._user_lang = None
-
- def reply(self, text, markup=None):
- if markup is None:
- markup = self._markup_getter(self)
- kwargs = dict(parse_mode=ParseMode.HTML)
- if not isinstance(markup, IgnoreMarkup):
- kwargs['reply_markup'] = markup
- return self._update.message.reply_text(text, **kwargs)
-
- def reply_exc(self, e: Exception) -> None:
- self.reply(exc2text(e))
-
- def answer(self, text: str = None):
- self.callback_query.answer(text)
-
- def edit(self, text, markup=None):
- kwargs = dict(parse_mode=ParseMode.HTML)
- if not isinstance(markup, IgnoreMarkup):
- kwargs['reply_markup'] = markup
- self.callback_query.edit_message_text(text, **kwargs)
-
- @property
- def text(self) -> str:
- return self._update.message.text
-
- @property
- def callback_query(self) -> CallbackQuery:
- return self._update.callback_query
-
- @property
- def args(self) -> Optional[List[str]]:
- return self._callback_context.args
-
- @property
- def user_id(self) -> int:
- return self.user.id
-
- @property
- def user(self) -> User:
- return self._update.effective_user
-
- @property
- def user_lang(self) -> str:
- if self._user_lang is None:
- self._user_lang = self._store.get_user_lang(self.user_id)
- return self._user_lang
-
- def lang(self, key: str, *args) -> str:
- return self._lang.get(key, self.user_lang, *args)
-
- def is_callback_context(self) -> bool:
- return self._update.callback_query and self._update.callback_query.data and self._update.callback_query.data != ''
-
-
-def handlermethod(f: callable):
- def _handler(self, update: Update, context: CallbackContext, *args, **kwargs):
- ctx = Context(update,
- callback_context=context,
- markup_getter=self.markup,
- lang=self.lang,
- store=self.store)
- try:
- return f(self, ctx, *args, **kwargs)
- except Exception as e:
- if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut):
- logger.exception(e)
- if not ctx.is_callback_context():
- ctx.reply_exc(e)
- else:
- self.notify_user(ctx.user_id, exc2text(e))
- return _handler
-
-
-class Wrapper:
- store: Optional[Store]
- updater: Updater
- lang: LangPack
- reporting: Optional[ReportingHelper]
-
- def __init__(self,
- store: Optional[Store] = None):
- self.updater = Updater(config['bot']['token'],
- request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
- self.lang = default_langpack()
- self.store = store if store else Store()
- self.reporting = None
-
- init_user_filter()
-
- dispatcher = self.updater.dispatcher
- dispatcher.add_handler(CommandHandler('start', self.wrap(self.start), user_filter))
-
- # transparently log all messages
- self.add_handler(MessageHandler(Filters.all & user_filter, self.logging_message_handler), group=10)
- self.add_handler(CallbackQueryHandler(self.logging_callback_handler), group=10)
-
- def run(self):
- self._lang_setup()
- self.updater.dispatcher.add_handler(
- MessageHandler(Filters.all & user_filter, self.wrap(self.any))
- )
-
- # start the bot
- self.updater.start_polling()
-
- # run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
- self.updater.idle()
-
- def enable_logging(self, bot_type: BotType):
- api = WebAPIClient(timeout=3)
- api.enable_async()
-
- self.reporting = ReportingHelper(api, bot_type)
-
- def logging_message_handler(self, update: Update, context: CallbackContext):
- if self.reporting is None:
- return
-
- self.reporting.report(update.message)
-
- def logging_callback_handler(self, update: Update, context: CallbackContext):
- if self.reporting is None:
- return
-
- self.reporting.report(update.callback_query.message, text=update.callback_query.data)
-
- def wrap(self, f: callable):
- def handler(update: Update, context: CallbackContext):
- ctx = Context(update,
- callback_context=context,
- markup_getter=self.markup,
- lang=self.lang,
- store=self.store)
-
- try:
- return f(ctx)
- except Exception as e:
- if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut):
- logger.exception(e)
- if not ctx.is_callback_context():
- ctx.reply_exc(e)
- else:
- self.notify_user(ctx.user_id, exc2text(e))
-
- return handler
-
- def add_handler(self, handler: Handler, group=0):
- self.updater.dispatcher.add_handler(handler, group=group)
-
- def start(self, ctx: Context):
- if 'start_message' not in self.lang:
- ctx.reply('Please define start_message or override start()')
- return
-
- ctx.reply(ctx.lang('start_message'))
-
- def any(self, ctx: Context):
- if 'invalid_command' not in self.lang:
- ctx.reply('Please define invalid_command or override any()')
- return
-
- ctx.reply(ctx.lang('invalid_command'))
-
- def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
- return None
-
- def exception_handler(self, e: Exception, ctx: Context) -> Optional[bool]:
- pass
-
- def notify_all(self, text_getter: callable, exclude: Tuple[int] = ()) -> None:
- if 'notify_users' not in config['bot']:
- logger.error('notify_all() called but no notify_users directive found in the config')
- return
-
- for user_id in config['bot']['notify_users']:
- if user_id in exclude:
- continue
-
- text = text_getter(self.store.get_user_lang(user_id))
- self.updater.bot.send_message(chat_id=user_id,
- text=text,
- parse_mode='HTML')
-
- def notify_user(self, user_id: int, text: Union[str, Exception], **kwargs) -> None:
- if isinstance(text, Exception):
- text = exc2text(text)
- self.updater.bot.send_message(chat_id=user_id, text=text, parse_mode='HTML', **kwargs)
-
- def send_photo(self, user_id, **kwargs):
- self.updater.bot.send_photo(chat_id=user_id, **kwargs)
-
- def send_audio(self, user_id, **kwargs):
- self.updater.bot.send_audio(chat_id=user_id, **kwargs)
-
- def send_file(self, user_id, **kwargs):
- self.updater.bot.send_document(chat_id=user_id, **kwargs)
-
- def edit_message_text(self, user_id, message_id, *args, **kwargs):
- self.updater.bot.edit_message_text(chat_id=user_id, message_id=message_id, parse_mode='HTML', *args, **kwargs)
-
- def delete_message(self, user_id, message_id):
- self.updater.bot.delete_message(chat_id=user_id, message_id=message_id)
-
- #
- # Language Selection
- #
-
- def _lang_setup(self):
- supported = self.lang.languages
- if len(supported) > 1:
- cancel_filter = Filters.text(self.lang.all('cancel'))
-
- self.add_handler(ConversationHandler(
- entry_points=[CommandHandler('lang', self.wrap(self._lang_command), user_filter)],
- states={
- LANG_STARTED: [
- *list(map(lambda key: MessageHandler(text_filter(languages[key]),
- self.wrap(self._lang_input)), supported)),
- MessageHandler(user_filter & ~cancel_filter, self.wrap(self._lang_invalid_input))
- ]
- },
- fallbacks=[MessageHandler(user_filter & cancel_filter, self.wrap(self._lang_cancel_input))]
- ))
-
- def _lang_command(self, ctx: Context):
- logger.debug(f'current language: {ctx.user_lang}')
-
- buttons = []
- for name in languages.values():
- buttons.append(name)
- markup = ReplyKeyboardMarkup([buttons, [ctx.lang('cancel')]], one_time_keyboard=False)
-
- ctx.reply(ctx.lang('select_language'), markup=markup)
- return LANG_STARTED
-
- def _lang_input(self, ctx: Context):
- lang = None
- for key, value in languages.items():
- if value == ctx.text:
- lang = key
- break
-
- if lang is None:
- raise ValueError('could not find the language')
-
- self.store.set_user_lang(ctx.user_id, lang)
-
- ctx.reply(ctx.lang('saved'), markup=IgnoreMarkup())
-
- self.start(ctx)
- return ConversationHandler.END
-
- def _lang_invalid_input(self, ctx: Context):
- ctx.reply(self.lang('invalid_language'), markup=IgnoreMarkup())
- return LANG_STARTED
-
- def _lang_cancel_input(self, ctx: Context):
- self.start(ctx)
- return ConversationHandler.END
-
- @property
- def user_filter(self):
- return user_filter