From 0a065f48be99d4ebae49de622a335f23e50c6ca0 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 18 Dec 2022 06:31:24 +0300 Subject: pump-mqtt-bot: wip; relayctl: somewhat stable --- tools/mcuota.py | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100755 tools/mcuota.py (limited to 'tools/mcuota.py') diff --git a/tools/mcuota.py b/tools/mcuota.py new file mode 100755 index 0000000..f7f2968 --- /dev/null +++ b/tools/mcuota.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +import sys +import os.path +sys.path.extend([ + os.path.realpath( + os.path.join(os.path.dirname(os.path.join(__file__)), '..') + ) +]) + +from time import sleep +from argparse import ArgumentParser +from src.home.config import config +from src.home.mqtt import MQTTRelay + + +def guess_filename(product: str, build_target: str): + return os.path.join( + products_dir, + product, + '.pio', + 'build', + build_target, + 'firmware.bin' + ) + + +def relayctl_publish_ota(filename: str, + home_id: str, + home_secret: str, + qos: int): + global stop + + def published(): + global stop + stop = True + + mqtt_relay = MQTTRelay(home_id=home_id, + secret=home_secret) + mqtt_relay.configure_tls() + mqtt_relay.connect_and_loop(loop_forever=False) + mqtt_relay.push_ota(filename, published, qos) + while not stop: + sleep(0.1) + mqtt_relay.disconnect() + + +stop = False +products = { + 'relayctl': { + 'build_target': 'esp12e', + 'callback': relayctl_publish_ota + } +} + +products_dir = os.path.join( + os.path.dirname(__file__), + '..', + 'platformio' +) + + +def main(): + parser = ArgumentParser() + parser.add_argument('--filename', type=str) + parser.add_argument('--home-id', type=str, required=True) + parser.add_argument('--product', type=str, required=True) + parser.add_argument('--qos', type=int, default=1) + + config.load('mcuota_push', parser=parser) + arg = parser.parse_args() + + if arg.product not in products: + raise ValueError(f'invalid product: \'{arg.product}\' not found') + + if arg.home_id not in config['mqtt']['home_secrets']: + raise ValueError(f'home_secret for home {arg.home_id} not found in config!') + + filename = arg.filename if arg.filename else guess_filename(arg.product, products[arg.product]['build_target']) + if not os.path.exists(filename): + raise OSError(f'file \'{filename}\' does not exists') + + print('Please confirm following OTA params.') + print('') + print(f' Home ID: {arg.home_id}') + print(f' Product: {arg.product}') + print(f'Firmware file: {filename}') + print('') + input('Press any key to continue or Ctrl+C to abort.') + + products[arg.product]['callback'](filename, arg.home_id, config['mqtt']['home_secrets'][arg.home_id], qos=arg.qos) + + +if __name__ == '__main__': + try: + main() + except Exception as e: + print(str(e), file=sys.stderr) + sys.exit(1) -- cgit v1.2.3