1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#!/usr/bin/env python3
import logging
import __py_include
from homekit import http
from homekit.config import config, AppConfigUnit
from homekit.mqtt import MqttPayload, MqttWrapper, MqttNode, MqttModule, MqttNodesConfig
from homekit.mqtt.module.relay import MqttRelayState, MqttRelayModule, MqttPowerStatusPayload
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
from typing import Optional, Union
logger = logging.getLogger(__name__)
mqtt: Optional[MqttWrapper] = None
mqtt_nodes: dict[str, MqttNode] = {}
relay_modules: dict[str, Union[MqttRelayModule, MqttModule]] = {}
relay_states: dict[str, MqttRelayState] = {}
mqtt_nodes_config = MqttNodesConfig()
class RelayMqttHttpProxyConfig(AppConfigUnit):
NAME = 'relay_mqtt_http_proxy'
@classmethod
def schema(cls) -> Optional[dict]:
return {
'relay_nodes': {
'type': 'list',
'required': True,
'schema': {
'type': 'string'
}
},
'listen_addr': cls._addr_schema(required=True)
}
@staticmethod
def custom_validator(data):
relay_node_names = mqtt_nodes_config.get_nodes(filters=('relay',), only_names=True)
for node in data['relay_nodes']:
if node not in relay_node_names:
raise ValueError(f'unknown relay node "{node}"')
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
try:
is_legacy = mqtt_nodes_config[node.id]['relay']['legacy_topics']
logger.debug(f'on_mqtt_message: relay {node.id} uses legacy topic names')
except KeyError:
is_legacy = False
kwargs = {}
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs['rssi'] = message.rssi
if is_legacy:
kwargs['enabled'] = message.flags.state
if not is_legacy and isinstance(message, MqttPowerStatusPayload):
kwargs['enabled'] = message.opened
if len(kwargs):
logger.debug(f'on_mqtt_message: {node.id}: going to update relay state: {str(kwargs)}')
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
class RelayMqttHttpProxy(http.HTTPServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.get('/relay/{id}/on', self.relay_on)
self.get('/relay/{id}/off', self.relay_off)
self.get('/relay/{id}/toggle', self.relay_toggle)
async def _relay_on_off(self,
enable: Optional[bool],
req: http.Request):
node_id = req.match_info['id']
node_secret = req.query['secret']
node = mqtt_nodes[node_id]
relay_module = relay_modules[node_id]
if enable is None:
if node_id in relay_states and relay_states[node_id].ever_updated:
cur_state = relay_states[node_id].enabled
else:
cur_state = False
enable = not cur_state
node.secret = node_secret
relay_module.switchpower(enable)
return self.ok()
async def relay_on(self, req: http.Request):
return await self._relay_on_off(True, req)
async def relay_off(self, req: http.Request):
return await self._relay_on_off(False, req)
async def relay_toggle(self, req: http.Request):
return await self._relay_on_off(None, req)
if __name__ == '__main__':
config.load_app(RelayMqttHttpProxyConfig)
mqtt = MqttWrapper(client_id='relay_mqtt_http_proxy',
randomize_client_id=True)
for node_id in config.app_config['relay_nodes']:
node_data = mqtt_nodes_config.get_node(node_id)
mqtt_node = MqttNode(node_id=node_id)
module_kwargs = {}
try:
if node_data['relay']['legacy_topics']:
module_kwargs['legacy_topics'] = True
except KeyError:
pass
relay_modules[node_id] = mqtt_node.load_module('relay', **module_kwargs)
if 'legacy_topics' in module_kwargs:
mqtt_node.load_module('diagnostics')
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
mqtt_nodes[node_id] = mqtt_node
mqtt.connect_and_loop(loop_forever=False)
proxy = RelayMqttHttpProxy(config.app_config['listen_addr'])
try:
proxy.run()
except KeyboardInterrupt:
mqtt.disconnect()
|