summaryrefslogtreecommitdiff
path: root/src/pump_bot.py
blob: ae36e2775dc840419ddcc789ec55360c1a06d195 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
from typing import Optional
from home.config import config
from home.bot import Wrapper, Context, text_filter, user_any_name
from home.relay import RelayClient
from home.api.types import BotType
from telegram import ReplyKeyboardMarkup, User
from telegram.ext import MessageHandler
from enum import Enum
from functools import partial

bot: Optional[Wrapper] = None


class UserAction(Enum):
    ON = 'on'
    OFF = 'off'


def get_relay() -> RelayClient:
    relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
    relay.connect()
    return relay


def on(silent: bool, ctx: Context) -> None:
    get_relay().on()
    ctx.reply(ctx.lang('done'))
    if not silent:
        notify(ctx.user, UserAction.ON)


def off(silent: bool, ctx: Context) -> None:
    get_relay().off()
    ctx.reply(ctx.lang('done'))
    if not silent:
        notify(ctx.user, UserAction.OFF)


def status(ctx: Context) -> None:
    ctx.reply(
        ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
    )


def notify(user: User, action: UserAction) -> None:
    def text_getter(lang: str):
        action_name = bot.lang.get(f'user_action_{action.value}', lang)
        user_name = user_any_name(user)
        return 'ℹ ' + bot.lang.get('user_action_notification', lang,
                                   user.id, user_name, action_name)

    bot.notify_all(text_getter, exclude=(user.id,))


class PumpBot(Wrapper):
    def __init__(self):
        super().__init__()

        self.lang.ru(
            start_message="Выберите команду на клавиатуре",
            unknown_command="Неизвестная команда",

            enable="Включить",
            enable_silently="Включить тихо",
            enabled="Включен ✅",

            disable="Выключить",
            disable_silently="Выключить тихо",
            disabled="Выключен ❌",

            status="Статус",
            done="Готово 👌",
            user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
            user_action_on="включил",
            user_action_off="выключил",
        )

        self.lang.en(
            start_message="Select command on the keyboard",
            unknown_command="Unknown command",

            enable="Turn ON",
            enable_silently="Turn ON silently",
            enabled="Turned ON ✅",

            disable="Turn OFF",
            disable_silently="Turn OFF silently",
            disabled="Turned OFF ❌",

            status="Status",
            done="Done 👌",
            user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
            user_action_on="ON",
            user_action_off="OFF",
        )

        self.add_handler(MessageHandler(text_filter(self.lang.all('enable')), self.wrap(partial(on, False))))
        self.add_handler(MessageHandler(text_filter(self.lang.all('disable')), self.wrap(partial(off, False))))

        self.add_handler(MessageHandler(text_filter(self.lang.all('enable_silently')), self.wrap(partial(on, True))))
        self.add_handler(MessageHandler(text_filter(self.lang.all('disable_silently')), self.wrap(partial(off, True))))

        self.add_handler(MessageHandler(text_filter(self.lang.all('status')), self.wrap(status)))

    def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
        buttons = [
            [ctx.lang('enable'), ctx.lang('disable')],
        ]

        if ctx.user_id in config['bot']['silent_users']:
            buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])

        buttons.append([ctx.lang('status')])

        return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)


if __name__ == '__main__':
    config.load('pump_bot')

    bot = PumpBot()
    bot.enable_logging(BotType.PUMP)
    bot.run()