1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from home.config import config
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay.sunxi_h3_client import RelayClient
from home.api.types import BotType
config.load('pump_bot')
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
enable="Включить",
enable_silently="Включить тихо",
enabled="Включен ✅",
disable="Выключить",
disable_silently="Выключить тихо",
disabled="Выключен ❌",
status="Статус",
done="Готово 👌",
user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
user_action_on="включил",
user_action_off="выключил",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
enable="Turn ON",
enable_silently="Turn ON silently",
enabled="Turned ON ✅",
disable="Turn OFF",
disable_silently="Turn OFF silently",
disabled="Turned OFF ❌",
status="Status",
done="Done 👌",
user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
user_action_on="ON",
user_action_off="OFF",
)
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay.connect()
return relay
def on(ctx: bot.Context, silent=False) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.ON)
def off(ctx: bot.Context, silent=False) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.OFF)
def notify(user: User, action: UserAction) -> None:
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return 'ℹ ' + bot.lang.get('user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
on(ctx)
@bot.handler(message='enable_silently')
def enable_s_handler(ctx: bot.Context) -> None:
on(ctx, True)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('enable'), ctx.lang('disable')],
]
if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
if __name__ == '__main__':
bot.enable_logging(BotType.PUMP)
bot.run()
|