aboutsummaryrefslogtreecommitdiff
path: root/bin/relay_mqtt_bot.py
blob: 3ad0a9b9e2c2884c946a8783d5e6d119309e90f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python3
import sys
import __py_include

from enum import Enum
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial

from homekit.config import config, AppConfigUnit, Translation
from homekit.telegram import bot
from homekit.telegram.config import TelegramBotConfig
from homekit.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule, MqttNodesConfig
from homekit.mqtt.module.relay import MqttRelayModule, MqttRelayState
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload


if __name__ != '__main__':
    print(f'this script can not be imported as module', file=sys.stderr)
    sys.exit(1)


mqtt_nodes_config = MqttNodesConfig()


class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
    NAME = 'relay_mqtt_bot'

    _strings: Translation

    def __init__(self):
        super().__init__()
        self._strings = Translation('mqtt_nodes')

    @classmethod
    def schema(cls) -> Optional[dict]:
        return {
            **super(TelegramBotConfig).schema(),
            'relay_nodes': {
                'type': 'list',
                'required': True,
                'schema': {
                    'type': 'string'
                }
            },
        }

    @staticmethod
    def custom_validator(data):
        relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
        for node in data['relay_nodes']:
            if node not in relay_node_names:
                raise ValueError(f'unknown relay node "{node}"')

    def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
        return self._strings.get(lang)[relay_name]['relay']


config.load_app(RelayMqttBotConfig)

bot.initialize()
bot.lang.ru(
    start_message="Выберите команду на клавиатуре",
    unknown_command="Неизвестная команда",
    done="Готово 👌",
)
bot.lang.en(
    start_message="Select command on the keyboard",
    unknown_command="Unknown command",
    done="Done 👌",
)


type_emojis = {
    'lamp': '💡'
}
status_emoji = {
    'on': '✅',
    'off': '❌'
}


mqtt: MqttWrapper
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}


class UserAction(Enum):
    ON = 'on'
    OFF = 'off'


def on_mqtt_message(node: MqttNode,
                    message: MqttPayload):
    if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
        kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
        if isinstance(message, InitialDiagnosticsPayload):
            kwargs['fw_version'] = message.fw_version
        if node.id not in relay_states:
            relay_states[node.id] = MqttRelayState()
        relay_states[node.id].update(**kwargs)


async def enable_handler(node_id: str, ctx: bot.Context) -> None:
    relay_nodes[node_id].switchpower(True)
    await ctx.reply(ctx.lang('done'))


async def disable_handler(node_id: str, ctx: bot.Context) -> None:
    relay_nodes[node_id].switchpower(False)
    await ctx.reply(ctx.lang('done'))


async def start(ctx: bot.Context) -> None:
    await ctx.reply(ctx.lang('start_message'))


@bot.exceptionhandler
async def exception_handler(e: Exception, ctx: bot.Context) -> bool:
    return False


@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
    buttons = []
    for node_id in config.app_config['relay_nodes']:
        node_data = mqtt_nodes_config.get_node(node_id)
        type_emoji = type_emojis[node_data['relay']['device_type']]
        row = [f'{type_emoji}{status_emoji[i.value]} {config.app_config.get_relay_name_translated(ctx.user_lang, node_id)}'
               for i in UserAction]
        buttons.append(row)
    return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)


devices = []
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
for node_id in config.app_config['relay_nodes']:
    node_data = mqtt_nodes_config.get_node(node_id)
    mqtt_node = MqttNode(node_id=node_id,
                         node_secret=node_data['password'])
    module_kwargs = {}
    try:
        if node_data['relay']['legacy_topics']:
            module_kwargs['legacy_topics'] = True
    except KeyError:
        pass
    relay_nodes[node_id] = mqtt_node.load_module('relay',  **module_kwargs)
    mqtt_node.add_payload_callback(on_mqtt_message)
    mqtt.add_node(mqtt_node)

    type_emoji = type_emojis[node_data['relay']['device_type']]

    for action in UserAction:
        messages = []
        for _lang in Translation.LANGUAGES:
            _label = config.app_config.get_relay_name_translated(_lang, node_id)
            messages.append(f'{type_emoji}{status_emoji[action.value]} {_label}')
        bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, node_id))

mqtt.connect_and_loop(loop_forever=False)

bot.run(start_handler=start)

mqtt.disconnect()