1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup
from functools import partial
from home.config import config
from home.telegram import bot
from home.mqtt import MqttPayload, MqttNode, MqttWrapper
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load_app('relay_mqtt_bot')
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
done="Готово 👌",
)
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
done="Done 👌",
)
type_emojis = {
'lamp': '💡'
}
status_emoji = {
'on': '✅',
'off': '❌'
}
# mqtt_relay: Optional[MqttRelayModule] = None
mqtt: Optional[MqttWrapper] = None
relay_nodes: dict[str, MqttRelayModule] = {}
relay_states: dict[str, MqttRelayState] = {}
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
def enable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(True)
ctx.reply(ctx.lang('done'))
def disable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(False)
ctx.reply(ctx.lang('done'))
def start(ctx: bot.Context) -> None:
ctx.reply(ctx.lang('start_message'))
@bot.exceptionhandler
def exception_handler(e: Exception, ctx: bot.Context) -> bool:
return False
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
for device_id, data in config['relays'].items():
labels = data['labels']
type_emoji = type_emojis[data['type']]
row = [f'{type_emoji}{status_emoji[i.value]} {labels[ctx.user_lang]}'
for i in UserAction]
buttons.append(row)
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
if __name__ == '__main__':
devices = []
mqtt = MqttWrapper()
for device_id, data in config['relays'].items():
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
relay_nodes[device_id] = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
labels = data['labels']
bot.lang.ru(**{device_id: labels['ru']})
bot.lang.en(**{device_id: labels['en']})
type_emoji = type_emojis[data['type']]
for action in UserAction:
messages = []
for _lang, _label in labels.items():
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
bot.run(start_handler=start)
mqtt.disconnect()
|