summaryrefslogtreecommitdiff
path: root/tools/mcuota.py
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-12-18 06:31:24 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-12-24 12:57:55 +0300
commit0a065f48be99d4ebae49de622a335f23e50c6ca0 (patch)
treeb591d91fac26e5bf7a4dd6d37178b978061ef060 /tools/mcuota.py
parent022ec129bb8f511a7bf8cf537f165afce2303262 (diff)
pump-mqtt-bot: wip; relayctl: somewhat stable
Diffstat (limited to 'tools/mcuota.py')
-rwxr-xr-xtools/mcuota.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/tools/mcuota.py b/tools/mcuota.py
new file mode 100755
index 0000000..f7f2968
--- /dev/null
+++ b/tools/mcuota.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python3
+import sys
+import os.path
+sys.path.extend([
+ os.path.realpath(
+ os.path.join(os.path.dirname(os.path.join(__file__)), '..')
+ )
+])
+
+from time import sleep
+from argparse import ArgumentParser
+from src.home.config import config
+from src.home.mqtt import MQTTRelay
+
+
+def guess_filename(product: str, build_target: str):
+ return os.path.join(
+ products_dir,
+ product,
+ '.pio',
+ 'build',
+ build_target,
+ 'firmware.bin'
+ )
+
+
+def relayctl_publish_ota(filename: str,
+ home_id: str,
+ home_secret: str,
+ qos: int):
+ global stop
+
+ def published():
+ global stop
+ stop = True
+
+ mqtt_relay = MQTTRelay(home_id=home_id,
+ secret=home_secret)
+ mqtt_relay.configure_tls()
+ mqtt_relay.connect_and_loop(loop_forever=False)
+ mqtt_relay.push_ota(filename, published, qos)
+ while not stop:
+ sleep(0.1)
+ mqtt_relay.disconnect()
+
+
+stop = False
+products = {
+ 'relayctl': {
+ 'build_target': 'esp12e',
+ 'callback': relayctl_publish_ota
+ }
+}
+
+products_dir = os.path.join(
+ os.path.dirname(__file__),
+ '..',
+ 'platformio'
+)
+
+
+def main():
+ parser = ArgumentParser()
+ parser.add_argument('--filename', type=str)
+ parser.add_argument('--home-id', type=str, required=True)
+ parser.add_argument('--product', type=str, required=True)
+ parser.add_argument('--qos', type=int, default=1)
+
+ config.load('mcuota_push', parser=parser)
+ arg = parser.parse_args()
+
+ if arg.product not in products:
+ raise ValueError(f'invalid product: \'{arg.product}\' not found')
+
+ if arg.home_id not in config['mqtt']['home_secrets']:
+ raise ValueError(f'home_secret for home {arg.home_id} not found in config!')
+
+ filename = arg.filename if arg.filename else guess_filename(arg.product, products[arg.product]['build_target'])
+ if not os.path.exists(filename):
+ raise OSError(f'file \'{filename}\' does not exists')
+
+ print('Please confirm following OTA params.')
+ print('')
+ print(f' Home ID: {arg.home_id}')
+ print(f' Product: {arg.product}')
+ print(f'Firmware file: {filename}')
+ print('')
+ input('Press any key to continue or Ctrl+C to abort.')
+
+ products[arg.product]['callback'](filename, arg.home_id, config['mqtt']['home_secrets'][arg.home_id], qos=arg.qos)
+
+
+if __name__ == '__main__':
+ try:
+ main()
+ except Exception as e:
+ print(str(e), file=sys.stderr)
+ sys.exit(1)