summaryrefslogtreecommitdiff
path: root/py_include/homekit/audio/amixer.py
blob: 5133c975245023f2d1c2e3a0c61f3896927c025c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import subprocess

from ..config import app_config as config
from threading import Lock
from typing import Union, List


_lock = Lock()
_default_step = 5


def has_control(s: str) -> bool:
    for control in config['amixer']['controls']:
        if control['name'] == s:
            return True
    return False


def get_caps(s: str) -> List[str]:
    for control in config['amixer']['controls']:
        if control['name'] == s:
            return control['caps']
    raise KeyError(f'control {s} not found')


def get_all() -> list:
    controls = []
    for control in config['amixer']['controls']:
        controls.append({
            'name': control['name'],
            'info': get(control['name']),
            'caps': control['caps']
        })
    return controls


def get(control: str):
    return call('get', control)


def mute(control):
    return call('set', control, 'mute')


def unmute(control):
    return call('set', control, 'unmute')


def cap(control):
    return call('set', control, 'cap')


def nocap(control):
    return call('set', control, 'nocap')


def _get_default_step() -> int:
    if 'step' in config['amixer']:
        return int(config['amixer']['step'])

    return _default_step


def incr(control, step=None):
    if step is None:
        step = _get_default_step()
    return call('set', control, f'{step}%+')


def decr(control, step=None):
    if step is None:
        step = _get_default_step()
    return call('set', control, f'{step}%-')


def call(*args, return_code=False) -> Union[int, str]:
    with _lock:
        result = subprocess.run([config['amixer']['bin'], *args],
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
        if return_code:
            return result.returncode

        if result.returncode != 0:
            raise AmixerError(result.stderr.decode().strip())

        return result.stdout.decode().strip()


class AmixerError(OSError):
    pass