summaryrefslogtreecommitdiff
path: root/bin/sound_node.py
blob: 90e6997279e6f1899eef1f11d3e434b0dbfddcf9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3
import os
import __py_include

from typing import Optional

from homekit.config import config
from homekit.audio import amixer
from homekit.media import MediaNodeServer, SoundRecordStorage, SoundRecorder
from homekit import http


# This script must be run as root as it runs arecord.
# Implements HTTP API for amixer and arecord.
# -------------------------------------------

def _amixer_control_response(control):
    info = amixer.get(control)
    caps = amixer.get_caps(control)
    return http.ok({
        'caps': caps,
        'info': info
    })


class SoundNodeServer(MediaNodeServer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.get('/amixer/get-all/', self.amixer_get_all)
        self.get('/amixer/get/{control}/', self.amixer_get)
        self.get('/amixer/{op:mute|unmute|cap|nocap}/{control}/', self.amixer_set)
        self.get('/amixer/{op:incr|decr}/{control}/', self.amixer_volume)

    async def amixer_get_all(self, request: http.Request):
        controls_info = amixer.get_all()
        return self.ok(controls_info)

    async def amixer_get(self, request: http.Request):
        control = request.match_info['control']
        if not amixer.has_control(control):
            raise ValueError(f'invalid control: {control}')

        return _amixer_control_response(control)

    async def amixer_set(self, request: http.Request):
        op = request.match_info['op']
        control = request.match_info['control']
        if not amixer.has_control(control):
            raise ValueError(f'invalid control: {control}')

        f = getattr(amixer, op)
        f(control)

        return _amixer_control_response(control)

    async def amixer_volume(self, request: http.Request):
        op = request.match_info['op']
        control = request.match_info['control']
        if not amixer.has_control(control):
            raise ValueError(f'invalid control: {control}')

        def get_step() -> Optional[int]:
            if 'step' in request.query:
                step = int(request.query['step'])
                if not 1 <= step <= 50:
                    raise ValueError('invalid step value')
                return step
            return None

        f = getattr(amixer, op)
        f(control, step=get_step())

        return _amixer_control_response(control)


if __name__ == '__main__':
    if not os.getegid() == 0:
        raise RuntimeError("Must be run as root.")

    config.load_app('sound_node')

    storage = SoundRecordStorage(config['node']['storage'])

    recorder = SoundRecorder(storage=storage)
    recorder.start_thread()

    server = SoundNodeServer(recorder=recorder,
                             storage=storage,
                             addr=config.get_addr('node.listen'))
    server.run()