1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#!/usr/bin/env python3
import sys
from enum import Enum
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup
from functools import partial
from home.config import config, AppConfigUnit, TranslationsUnit
from home.telegram import bot
from home.telegram.config import TelegramBotConfig
from home.mqtt import MqttPayload, MqttNode, MqttWrapper, MqttModule
from home.mqtt import MqttNodesConfig
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
if __name__ != '__main__':
print(f'this script can not be imported as module', file=sys.stderr)
sys.exit(1)
mqtt_nodes_config = MqttNodesConfig()
class RelayMqttBotConfig(AppConfigUnit, TelegramBotConfig):
NAME = 'relay_mqtt_bot'
def __init__(self):
super().__init__()
self._mqtt_nodes_strings = TranslationsUnit('mqtt_nodes')
@staticmethod
def schema() -> Optional[dict]:
return {
**super(TelegramBotConfig).schema(),
'relay_nodes': {
'type': 'list',
'required': True,
'schema': {
'type': 'string'
}
},
}
@staticmethod
def custom_validator(data):
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
for node in data['relay_nodes']:
if node not in relay_node_names:
raise ValueError(f'unknown relay node "{node}"')
def get_relay_name_translated(self, lang: str, relay_name: str) -> str:
pass
config.load_app(RelayMqttBotConfig)
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
done="Готово 👌",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
done="Done 👌",
)
type_emojis = {
'lamp': '💡'
}
status_emoji = {
'on': '✅',
'off': '❌'
}
mqtt: Optional[MqttWrapper] = None
relay_nodes: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
def enable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(True)
ctx.reply(ctx.lang('done'))
def disable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(False)
ctx.reply(ctx.lang('done'))
def start(ctx: bot.Context) -> None:
ctx.reply(ctx.lang('start_message'))
@bot.exceptionhandler
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
for device_id, data in config['relays'].items():
labels = data['labels']
type_emoji = type_emojis[data['type']]
row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
for i in UserAction]
buttons.append(row)
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
devices = []
mqtt = MqttWrapper(client_id='relay_mqtt_bot')
for device_id, data in config['relays'].items():
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
relay_nodes[device_id] = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
labels = data['labels']
bot.lang.ru(**{device_id: labels['ru']})
bot.lang.en(**{device_id: labels['en']})
type_emoji = type_emojis[data['type']]
for action in UserAction:
messages = []
for _lang, _label in labels.items():
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
bot.run(start_handler=start)
mqtt.disconnect()
|