#!/usr/bin/env python3 import os.path import __py_include from time import sleep from typing import Optional from argparse import ArgumentParser, ArgumentError from homekit.config import config from homekit.mqtt import MqttNode, MqttWrapper, get_mqtt_modules, MqttNodesConfig from homekit.mqtt.module.relay import MqttRelayModule from homekit.mqtt.module.ota import MqttOtaModule mqtt_node: Optional[MqttNode] = None mqtt: Optional[MqttWrapper] = None relay_module: Optional[MqttOtaModule] = None relay_val = None ota_module: Optional[MqttRelayModule] = None ota_val = False no_wait = False stop_loop = False def on_mqtt_connect(): global stop_loop if relay_module: relay_module.switchpower(relay_val == 1) if ota_val: if not os.path.exists(arg.push_ota): raise OSError(f'--push-ota: file \"{arg.push_ota}\" does not exists') ota_module.push_ota(arg.push_ota, 1) if no_wait: stop_loop = True if __name__ == '__main__': nodes_config = MqttNodesConfig() parser = ArgumentParser() parser.add_argument('--node-id', type=str, required=True, choices=nodes_config.get_nodes(only_names=True)) parser.add_argument('--modules', type=str, choices=get_mqtt_modules(), nargs='*', help='mqtt modules to include') parser.add_argument('--switch-relay', choices=[0, 1], type=int, help='send relay state') parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME', help='push OTA, receives path to firmware.bin') parser.add_argument('--no-wait', action='store_true', help='execute command and exit') config.load_app(parser=parser, no_config=True) arg = parser.parse_args() if arg.no_wait: no_wait = True if arg.switch_relay is not None and 'relay' not in arg.modules: raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules') mqtt = MqttWrapper(randomize_client_id=True, client_id='mqtt_node_util') mqtt.add_connect_callback(on_mqtt_connect) mqtt_node = MqttNode(node_id=arg.node_id, node_secret=nodes_config.get_node(arg.node_id)['password']) mqtt.add_node(mqtt_node) # must-have modules ota_module = mqtt_node.load_module('ota') ota_val = arg.push_ota mqtt_node.load_module('diagnostics') if arg.modules: for m in arg.modules: kwargs = {} if m == 'relay' and MqttNodesConfig().node_uses_legacy_relay_power_payload(arg.node_id): kwargs['legacy_topics'] = True if m == 'temphum' and MqttNodesConfig().node_uses_legacy_temphum_data_payload(arg.node_id): kwargs['legacy_payload'] = True module_instance = mqtt_node.load_module(m, **kwargs) if m == 'relay' and arg.switch_relay is not None: relay_module = module_instance relay_val = arg.switch_relay try: mqtt.connect_and_loop(loop_forever=False) while not stop_loop: sleep(0.1) except KeyboardInterrupt: pass finally: mqtt.disconnect()