aboutsummaryrefslogtreecommitdiff
path: root/tools/mcuota.py
blob: 8d47c8ce6d698e07fec887147f43ad886f2ab12e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
import sys
import os.path
sys.path.extend([
    os.path.realpath(
        os.path.join(os.path.dirname(os.path.join(__file__)), '..')
    )
])

from time import sleep
from argparse import ArgumentParser
from src.home.config import config
from src.home.mqtt import MQTTRelay, MQTTRelayDevice


def guess_filename(product: str, build_target: str):
    return os.path.join(
        products_dir,
        product,
        '.pio',
        'build',
        build_target,
        'firmware.bin'
    )


def relayctl_publish_ota(filename: str,
                         home_id: str,
                         home_secret: str,
                         qos: int):
    global stop

    def published():
        global stop
        stop = True

    mqtt_relay = MQTTRelay(devices=MQTTRelayDevice(home_id=home_id, secret=home_secret))
    mqtt_relay.configure_tls()
    mqtt_relay.connect_and_loop(loop_forever=False)
    mqtt_relay.push_ota(home_id, filename, published, qos)
    while not stop:
        sleep(0.1)
    mqtt_relay.disconnect()


stop = False
products = {
    'relayctl': {
        'build_target': 'esp12e',
        'callback': relayctl_publish_ota
    }
}

products_dir = os.path.join(
    os.path.dirname(__file__),
    '..',
    'platformio'
)


def main():
    parser = ArgumentParser()
    parser.add_argument('--filename', type=str)
    parser.add_argument('--home-id', type=str, required=True)
    parser.add_argument('--product', type=str, required=True)
    parser.add_argument('--qos', type=int, default=1)

    config.load('mcuota_push', parser=parser)
    arg = parser.parse_args()

    if arg.product not in products:
        raise ValueError(f'invalid product: \'{arg.product}\' not found')

    if arg.home_id not in config['mqtt']['home_secrets']:
        raise ValueError(f'home_secret for home {arg.home_id} not found in config!')

    filename = arg.filename if arg.filename else guess_filename(arg.product, products[arg.product]['build_target'])
    if not os.path.exists(filename):
        raise OSError(f'file \'{filename}\' does not exists')

    print('Please confirm following OTA params.')
    print('')
    print(f'      Home ID: {arg.home_id}')
    print(f'      Product: {arg.product}')
    print(f'Firmware file: {filename}')
    print('')
    input('Press any key to continue or Ctrl+C to abort.')

    products[arg.product]['callback'](filename, arg.home_id, config['mqtt']['home_secrets'][arg.home_id], qos=arg.qos)


if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        print(str(e), file=sys.stderr)
        sys.exit(1)