#!/usr/bin/env python3
import logging
import os
import tempfile
from enum import Enum
from datetime import datetime, timedelta
from html import escape
from typing import Optional, List, Dict, Tuple
from home.config import config
from home.api import WebAPIClient
from home.api.types import SoundSensorLocation, BotType
from home.api.errors import ApiResponseError
from home.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient
from home.soundsensor import SoundSensorServerGuardClient
from home.util import parse_addr, chunks, filesize_fmt
from home.telegram import bot
from telegram.error import TelegramError
from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User
from PIL import Image
config.load_app('sound_bot')
nodes = {}
for nodename, nodecfg in config['nodes'].items():
nodes[nodename] = parse_addr(nodecfg['addr'])
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
unexpected_callback_data="Ошибка: неверные данные",
settings="Настройки микшера",
record="Запись",
loading="Загрузка...",
select_place="Выберите место:",
invalid_location="Неверное место",
invalid_interval="Неверная длительность",
unsupported_action="Неподдерживаемое действие",
# select_control="Выберите контрол для изменения настроек:",
control_state="Состояние контрола %s",
incr="громкость +",
decr="громкость -",
back="◀️ Назад",
n_min="%d мин.",
n_sec="%d сек.",
select_interval="Выберите длительность:",
place="Место",
beginning="Начало",
end="Конец",
record_result="Результат записи",
record_started='Запись запущена!',
record_error="Ошибка записи",
files="Локальные файлы",
remote_files="Файлы на сервере",
file_line="— Запись с %s до %s (%s)",
access_denied="Доступ запрещён",
guard_disable="Снять с охраны",
guard_enable="Поставить на охрану",
guard_status="Статус охраны",
guard_user_action_notification='Пользователь %s %s.',
guard_user_action_enable="включил охрану ✅",
guard_user_action_disable="выключил охрану ❌",
guard_status_enabled="Включена ✅",
guard_status_disabled="Выключена ❌",
done="Готово 👌",
sound_sensors="Датчики звука",
sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.",
sound_sensors_no_24h_data="За последние 24 часа данных нет.",
sound_sensors_show_anything="Показать, что есть",
cameras="Камеры",
select_option="Выберите опцию",
w_flash="Со вспышкой",
wo_flash="Без вспышки",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
settings="Mixer settings",
record="Record",
unexpected_callback_data="Unexpected callback data",
loading="Loading...",
select_place="Select place:",
invalid_location="Invalid place",
invalid_interval="Invalid duration",
unsupported_action="Unsupported action",
# select_control="Select control to adjust its parameters:",
control_state="%s control state",
incr="vol +",
decr="vol -",
back="◀️ Back",
n_min="%d min.",
n_sec="%d s.",
select_interval="Select duration:",
place="Place",
beginning="Started",
end="Ended",
record_result="Result",
record_started='Recording started!',
record_error="Recording error",
files="Local files",
remote_files="Remote files",
file_line="— From %s to %s (%s)",
access_denied="Access denied",
guard_disable="Disable guard",
guard_enable="Enable guard",
guard_status="Guard status",
guard_user_action_notification='User %s %s.',
guard_user_action_enable="turned the guard ON ✅",
guard_user_action_disable="turn the guard OFF ❌",
guard_status_enabled="Active ✅",
guard_status_disabled="Disabled ❌",
done="Done 👌",
sound_sensors="Sound sensors",
sound_sensors_info="Here you can get information about last sound sensors hits.",
sound_sensors_no_24h_data="No data for the last 24 hours.",
sound_sensors_show_anything="Show me at least something",
cameras="Cameras",
select_option="Select option",
w_flash="With flash",
wo_flash="Without flash",
)
logger = logging.getLogger(__name__)
RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]]
record_client: Optional[SoundRecordClient] = None
node_client_links: Dict[str, SoundNodeClient] = {}
cam_client_links: Dict[str, CameraNodeClient] = {}
def node_client(node: str) -> SoundNodeClient:
if node not in node_client_links:
node_client_links[node] = SoundNodeClient(parse_addr(config['nodes'][node]['addr']))
return node_client_links[node]
def camera_client(cam: str) -> CameraNodeClient:
if cam not in node_client_links:
cam_client_links[cam] = CameraNodeClient(parse_addr(config['cameras'][cam]['addr']))
return cam_client_links[cam]
def node_exists(node: str) -> bool:
return node in config['nodes']
def camera_exists(name: str) -> bool:
return name in config['cameras']
def camera_settings(name: str) -> Optional[dict]:
try:
return config['cameras'][name]['settings']
except KeyError:
return None
def have_cameras() -> bool:
return 'cameras' in config and config['cameras']
def sound_sensor_exists(node: str) -> bool:
return node in config['sound_sensors']
def interval_defined(interval: int) -> bool:
return interval in config['bot']['record_intervals']
def callback_unpack(ctx: bot.Context) -> List[str]:
return ctx.callback_query.data[3:].split('/')
def manual_recording_allowed(user_id: int) -> bool:
return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist']
def guard_client() -> SoundSensorServerGuardClient:
return SoundSensorServerGuardClient(parse_addr(config['bot']['guard_server']))
# message renderers
# -----------------
class Renderer:
@classmethod
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
buttons = []
for node, nodeconfig in config['nodes'].items():
buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')])
return InlineKeyboardMarkup(buttons)
@classmethod
def back_button(cls,
ctx: bot.Context,
buttons: list,
callback_data: str):
buttons.append([
InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data)
])
class SettingsRenderer(Renderer):
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("settings")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='s0')
@classmethod
def node(cls, ctx: bot.Context,
controls: List[dict]) -> RenderedContent:
node, = callback_unpack(ctx)
html = []
buttons = []
for control in controls:
html.append(f'{control["name"]}\n{escape(control["info"])}')
buttons.append([
InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}')
])
html = "\n\n".join(html)
cls.back_button(ctx, buttons, callback_data='s0')
return html, InlineKeyboardMarkup(buttons)
@classmethod
def control(cls, ctx: bot.Context, data) -> RenderedContent:
node, control, *rest = callback_unpack(ctx)
html = '' + ctx.lang('control_state', control) + '\n\n'
html += escape(data['info'])
buttons = []
callback_prefix = f's2/{node}/{control}'
for cap in data['caps']:
if cap == 'mute':
muted = 'dB] [off]' in data['info']
act = 'unmute' if muted else 'mute'
buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
elif cap == 'cap':
cap_dis = 'Capture [off]' in data['info']
act = 'cap' if cap_dis else 'nocap'
buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
elif cap == 'volume':
buttons.append(
list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'),
['decr', 'incr']))
)
cls.back_button(ctx, buttons, callback_data=f's0/{node}')
return html, InlineKeyboardMarkup(buttons)
class RecordRenderer(Renderer):
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("record")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='r0')
@classmethod
def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent:
node, = callback_unpack(ctx)
html = ctx.lang('select_interval')
buttons = []
for s in durations:
if s >= 60:
m = int(s / 60)
label = ctx.lang('n_min', m)
else:
label = ctx.lang('n_sec', s)
buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}'))
buttons = list(chunks(buttons, 3))
cls.back_button(ctx, buttons, callback_data=f'r0')
return html, InlineKeyboardMarkup(buttons)
@classmethod
def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
node, *rest = callback_unpack(ctx)
place = config['nodes'][node]['label'][ctx.user_lang]
html = f'{ctx.lang("record_started")} ({place}, id={rid})'
return html, None
@classmethod
def record_done(cls, info: dict, node: str, uid: int) -> str:
ulang = bot.db.get_user_lang(uid)
def lang(key, *args):
return bot.lang.get(key, ulang, *args)
rid = info['id']
fmt = '%d.%m.%y %H:%M:%S'
start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
place = config['nodes'][node]['label'][ulang]
html = f'{lang("record_result")} ({place}, id={rid})\n\n'
html += f'{lang("beginning")}: {start_time}\n'
html += f'{lang("end")}: {stop_time}'
return html
@classmethod
def record_error(cls, info: dict, node: str, uid: int) -> str:
ulang = bot.db.get_user_lang(uid)
def lang(key, *args):
return bot.lang.get(key, ulang, *args)
place = config['nodes'][node]['label'][ulang]
rid = info['id']
html = f'{lang("record_error")} ({place}, id={rid})'
if 'error' in info:
html += '\n'+str(info['error'])
return html
class FilesRenderer(Renderer):
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("files")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='f0')
@classmethod
def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent:
node, = callback_unpack(ctx)
html_files = map(lambda file: cls.file(ctx, file, node), files)
html = '\n\n'.join(html_files)
buttons = []
cls.back_button(ctx, buttons, callback_data='f0')
return html, InlineKeyboardMarkup(buttons)
@classmethod
def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str:
html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize))
if file.file_id is not None:
html += f'/audio_{node}_{file.file_id}'
return html
class RemoteFilesRenderer(FilesRenderer):
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("remote_files")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='g0')
class SoundSensorRenderer(Renderer):
@classmethod
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
buttons = []
for sensor, sensor_label in config['sound_sensors'].items():
buttons.append(
[InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')])
return InlineKeyboardMarkup(buttons)
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("sound_sensors_info")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='S0')
@classmethod
def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent:
node, = callback_unpack(ctx)
buttons = []
if not data:
html = ctx.lang('sound_sensors_no_24h_data')
if not is_last:
buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')])
else:
html = ''
prev_date = None
for item in data:
item_date = item['time'].strftime('%d.%m.%y')
if prev_date is None or prev_date != item_date:
if html != '':
html += '\n\n'
html += f'{item_date}'
prev_date = item_date
html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
cls.back_button(ctx, buttons, callback_data='S0')
return html, InlineKeyboardMarkup(buttons)
@classmethod
def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes:
node, = callback_unpack(ctx)
text = ''
prev_date = None
for item in data:
item_date = item['time'].strftime('%d.%m.%y')
if prev_date is None or prev_date != item_date:
if text != '':
text += '\n\n'
text += item_date
prev_date = item_date
text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
return text.encode()
class CamerasRenderer(Renderer):
@classmethod
def index(cls, ctx: bot.Context) -> RenderedContent:
html = f'{ctx.lang("cameras")}\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='c0')
@classmethod
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
buttons = []
for camera_name, camera_data in config['cameras'].items():
buttons.append(
[InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')])
return InlineKeyboardMarkup(buttons)
@classmethod
def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent:
node, = callback_unpack(ctx)
html = ctx.lang('select_option')
buttons = []
if flash_available:
buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'))
buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'))
cls.back_button(ctx, [buttons], callback_data=f'c0')
return html, InlineKeyboardMarkup([buttons])
#
# @classmethod
# def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
# node, *rest = callback_unpack(ctx)
#
# place = config['nodes'][node]['label'][ctx.user_lang]
#
# html = f'{ctx.lang("record_started")} ({place}, id={rid})'
# return html, None
#
# @classmethod
# def record_done(cls, info: dict, node: str, uid: int) -> str:
# ulang = bot.db.get_user_lang(uid)
#
# def lang(key, *args):
# return bot.lang.get(key, ulang, *args)
#
# rid = info['id']
# fmt = '%d.%m.%y %H:%M:%S'
# start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
# stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
#
# place = config['nodes'][node]['label'][ulang]
#
# html = f'{lang("record_result")} ({place}, id={rid})\n\n'
# html += f'{lang("beginning")}: {start_time}\n'
# html += f'{lang("end")}: {stop_time}'
#
# return html
#
# @classmethod
# def record_error(cls, info: dict, node: str, uid: int) -> str:
# ulang = bot.db.get_user_lang(uid)
#
# def lang(key, *args):
# return bot.lang.get(key, ulang, *args)
#
# place = config['nodes'][node]['label'][ulang]
# rid = info['id']
#
# html = f'{lang("record_error")} ({place}, id={rid})'
# if 'error' in info:
# html += '\n'+str(info['error'])
#
# return html
# cameras handlers
# ----------------
@bot.handler(message='cameras', callback=r'^c0$')
def cameras(ctx: bot.Context):
""" List of cameras """
text, markup = CamerasRenderer.index(ctx)
if not ctx.is_callback_context():
return ctx.reply(text, markup=markup)
else:
ctx.answer()
return ctx.edit(text, markup=markup)
@bot.callbackhandler(callback=r'^c0/.*')
def camera_options(ctx: bot.Context) -> None:
""" List of options (with/without flash etc) """
cam, = callback_unpack(ctx)
if not camera_exists(cam):
ctx.answer(ctx.lang('invalid_location'))
return
ctx.answer()
flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True
text, markup = CamerasRenderer.camera(ctx, flash_available)
ctx.edit(text, markup)
@bot.callbackhandler(callback=r'^c1/.*')
def camera_capture(ctx: bot.Context) -> None:
""" Cheese """
cam, flash = callback_unpack(ctx)
flash = int(flash)
if not camera_exists(cam):
ctx.answer(ctx.lang('invalid_location'))
return
ctx.answer()
client = camera_client(cam)
fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
fd.close()
client.capture(fd.name, with_flash=bool(flash))
logger.debug(f'captured photo ({cam}), saved to {fd.name}')
camera_config = config['cameras'][cam]
if 'rotate' in camera_config:
im = Image.open(fd.name)
im.rotate(camera_config['rotate'], expand=True)
# im.show()
im.save(fd.name)
logger.debug(f"rotated image {camera_config['rotate']} degrees")
try:
with open(fd.name, 'rb') as f:
bot.send_photo(ctx.user_id, photo=f)
except TelegramError as exc:
logger.exception(exc)
try:
os.unlink(fd.name)
except OSError as exc:
logger.exception(exc)
# settings handlers
# -----------------
@bot.handler(message='settings', callback=r'^s0$')
def settings(ctx: bot.Context):
""" List of nodes """
text, markup = SettingsRenderer.index(ctx)
if not ctx.is_callback_context():
return ctx.reply(text, markup=markup)
else:
ctx.answer()
return ctx.edit(text, markup=markup)
@bot.callbackhandler(callback=r'^s0/.*')
def settings_place(ctx: bot.Context):
""" List of controls """
node, = callback_unpack(ctx)
if not node_exists(node):
ctx.answer(ctx.lang('invalid_location'))
return
cl = node_client(node)
controls = cl.amixer_get_all()
ctx.answer()
text, markup = SettingsRenderer.node(ctx, controls)
ctx.edit(text, markup)
@bot.callbackhandler(callback=r'^s1/.*')
def settings_place_control(ctx: bot.Context):
""" List of available tunes for control """
node, control = callback_unpack(ctx)
if not node_exists(node):
ctx.answer(ctx.lang('invalid_location'))
return
cl = node_client(node)
control_data = cl.amixer_get(control)
ctx.answer()
text, markup = SettingsRenderer.control(ctx, control_data)
ctx.edit(text, markup)
@bot.callbackhandler(callback=r'^s2/.*')
def settings_place_control_action(ctx: bot.Context):
""" Tuning """
node, control, action = callback_unpack(ctx)
if not node_exists(node):
return
cl = node_client(node)
if not hasattr(cl, f'amixer_{action}'):
ctx.answer(ctx.lang('invalid_action'))
return
func = getattr(cl, f'amixer_{action}')
control_data = func(control)
ctx.answer()
text, markup = SettingsRenderer.control(ctx, control_data)
ctx.edit(text, markup)
# recording handlers
# ------------------
@bot.handler(message='record', callback=r'^r0$')
def record(ctx: bot.Context):
""" List of nodes """
if not manual_recording_allowed(ctx.user_id):
return ctx.reply(ctx.lang('access_denied'))
text, markup = RecordRenderer.index(ctx)
if not ctx.is_callback_context():
return ctx.reply(text, markup=markup)
else:
ctx.answer()
return ctx.edit(text, markup=markup)
@bot.callbackhandler(callback=r'^r0/.*')
def record_place(ctx: bot.Context):
""" List of available intervals """
node, = callback_unpack(ctx)
if not node_exists(node):
ctx.answer(ctx.lang('invalid_location'))
return
ctx.answer()
text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals'])
ctx.edit(text, markup)
@bot.callbackhandler(callback=r'^r1/.*')
def record_place_interval(ctx: bot.Context):
""" Do record! """
node, interval = callback_unpack(ctx)
interval = int(interval)
if not node_exists(node):
ctx.answer(ctx.lang('invalid_location'))
return
if not interval_defined(interval):
ctx.answer(ctx.lang('invalid_interval'))
return
try:
record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node})
except ApiResponseError as e:
ctx.answer(e.error_message)
logger.error(e)
return
ctx.answer()
html, markup = RecordRenderer.record_started(ctx, record_id)
ctx.edit(html, markup)
# sound sensor handlers
# ---------------------
@bot.handler(message='sound_sensors', callback=r'^S0$')
def sound_sensors(ctx: bot.Context):
""" List of places """
text, markup = SoundSensorRenderer.index(ctx)
if not ctx.is_callback_context():
return ctx.reply(text, markup=markup)
else:
ctx.answer()
return ctx.edit(text, markup=markup)
@bot.callbackhandler(callback=r'^S0/.*')
def sound_sensors_last_24h(ctx: bot.Context):
""" Last 24h log """
node, = callback_unpack(ctx)
if not sound_sensor_exists(node):
ctx.answer(ctx.lang('invalid location'))
return
ctx.answer()
cl = WebAPIClient()
data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
after=datetime.now() - timedelta(hours=24))
text, markup = SoundSensorRenderer.hits(ctx, data)
if len(text) > 4096:
plain = SoundSensorRenderer.hits_plain(ctx, data)
bot.send_file(ctx.user_id, document=plain, filename='data.txt')
else:
ctx.edit(text, markup=markup)
@bot.callbackhandler(callback=r'^S1/.*')
def sound_sensors_last_anything(ctx: bot.Context):
""" Last _something_ """
node, = callback_unpack(ctx)
if not sound_sensor_exists(node):
ctx.answer(ctx.lang('invalid location'))
return
ctx.answer()
cl = WebAPIClient()
data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
last=20)
text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True)
if len(text) > 4096:
plain = SoundSensorRenderer.hits_plain(ctx, data)
bot.send_file(ctx.user_id, document=plain, filename='data.txt')
else:
ctx.edit(text, markup=markup)
# guard enable/disable handlers
# -----------------------------
class GuardUserAction(Enum):
ENABLE = 'enable'
DISABLE = 'disable'
if 'guard_server' in config['bot']:
@bot.handler(message='guard_status')
def guard_status(ctx: bot.Context):
guard = guard_client()
resp = guard.guard_status()
key = 'enabled' if resp['enabled'] is True else 'disabled'
ctx.reply(ctx.lang(f'guard_status_{key}'))
@bot.handler(message='guard_enable')
def guard_enable(ctx: bot.Context):
guard = guard_client()
guard.guard_enable()
ctx.reply(ctx.lang('done'))
_guard_notify(ctx.user, GuardUserAction.ENABLE)
@bot.handler(message='guard_disable')
def guard_disable(ctx: bot.Context):
guard = guard_client()
guard.guard_disable()
ctx.reply(ctx.lang('done'))
_guard_notify(ctx.user, GuardUserAction.DISABLE)
def _guard_notify(user: User, action: GuardUserAction):
def text_getter(lang: str):
action_name = bot.lang.get(f'guard_user_action_{action.value}', lang)
user_name = bot.user_any_name(user)
return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('record'), ctx.lang('settings')],
# [ctx.lang('files'), ctx.lang('remote_files')],
]
if 'guard_server' in config['bot']:
buttons.append([
ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status')
])
buttons.append([ctx.lang('sound_sensors')])
if have_cameras():
buttons.append([ctx.lang('cameras')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
# record client callbacks
# -----------------------
def record_onerror(info: dict, userdata: dict):
uid = userdata['user_id']
node = userdata['node']
html = RecordRenderer.record_error(info, node, uid)
try:
bot.notify_user(userdata['user_id'], html)
except TelegramError as exc:
logger.exception(exc)
finally:
record_client.forget(node, info['id'])
def record_onfinished(info: dict, fn: str, userdata: dict):
logger.info('record finished: ' + str(info))
uid = userdata['user_id']
node = userdata['node']
html = RecordRenderer.record_done(info, node, uid)
bot.notify_user(uid, html)
try:
# sending audiofile to telegram
with open(fn, 'rb') as f:
bot.send_audio(uid, audio=f, filename='audio.mp3')
# deleting temp file
try:
os.unlink(fn)
except OSError as exc:
logger.exception(exc)
bot.notify_user(uid, exc)
# remove the recording from sound_node's history
record_client.forget(node, info['id'])
# remove file from storage
# node_client(node).storage_delete(info['file']['fileid'])
except Exception as e:
logger.exception(e)
if __name__ == '__main__':
record_client = SoundRecordClient(nodes,
error_handler=record_onerror,
finished_handler=record_onfinished,
download_on_finish=True)
if 'api' in config:
bot.enable_logging(BotType.SOUND)
bot.run()
record_client.stop()