diff options
Diffstat (limited to 'src/home/config/config.py')
-rw-r--r-- | src/home/config/config.py | 387 |
1 files changed, 0 insertions, 387 deletions
diff --git a/src/home/config/config.py b/src/home/config/config.py deleted file mode 100644 index 7344386..0000000 --- a/src/home/config/config.py +++ /dev/null @@ -1,387 +0,0 @@ -import yaml -import logging -import os -import cerberus -import cerberus.errors - -from abc import ABC -from typing import Optional, Any, MutableMapping, Union -from argparse import ArgumentParser -from enum import Enum, auto -from os.path import join, isdir, isfile -from ..util import Addr - - -class MyValidator(cerberus.Validator): - def _normalize_coerce_addr(self, value): - return Addr.fromstring(value) - - -MyValidator.types_mapping['addr'] = cerberus.TypeDefinition('Addr', (Addr,), ()) - - -CONFIG_DIRECTORIES = ( - join(os.environ['HOME'], '.config', 'homekit'), - '/etc/homekit' -) - - -class RootSchemaType(Enum): - DEFAULT = auto() - DICT = auto() - LIST = auto() - - -class BaseConfigUnit(ABC): - _data: MutableMapping[str, Any] - _logger: logging.Logger - - def __init__(self): - self._data = {} - self._logger = logging.getLogger(self.__class__.__name__) - - def __getitem__(self, key): - return self._data[key] - - def __setitem__(self, key, value): - raise NotImplementedError('overwriting config values is prohibited') - - def __contains__(self, key): - return key in self._data - - def load_from(self, path: str): - with open(path, 'r') as fd: - self._data = yaml.safe_load(fd) - - def get(self, - key: Optional[str] = None, - default=None): - if key is None: - return self._data - - cur = self._data - pts = key.split('.') - for i in range(len(pts)): - k = pts[i] - if i < len(pts)-1: - if k not in cur: - raise KeyError(f'key {k} not found') - else: - return cur[k] if k in cur else default - cur = self._data[k] - - raise KeyError(f'option {key} not found') - - -class ConfigUnit(BaseConfigUnit): - NAME = 'dumb' - - def __init__(self, name=None, load=True): - super().__init__() - - self._data = {} - self._logger = logging.getLogger(self.__class__.__name__) - - if self.NAME != 'dumb' and load: - self.load_from(self.get_config_path()) - self.validate() - - elif name is not None: - self.NAME = name - - @classmethod - def get_config_path(cls, name=None) -> str: - if name is None: - name = cls.NAME - if name is None: - raise ValueError('get_config_path: name is none') - - for dirname in CONFIG_DIRECTORIES: - if isdir(dirname): - filename = join(dirname, f'{name}.yaml') - if isfile(filename): - return filename - - raise IOError(f'\'{name}.yaml\' not found') - - @classmethod - def schema(cls) -> Optional[dict]: - return None - - @classmethod - def _addr_schema(cls, required=False, **kwargs): - return { - 'type': 'addr', - 'coerce': Addr.fromstring, - 'required': required, - **kwargs - } - - def validate(self): - schema = self.schema() - if not schema: - self._logger.warning('validate: no schema') - return - - if isinstance(self, AppConfigUnit): - schema['logging'] = { - 'type': 'dict', - 'schema': { - 'logging': {'type': 'boolean'} - } - } - - rst = RootSchemaType.DEFAULT - try: - if schema['type'] == 'dict': - rst = RootSchemaType.DICT - elif schema['type'] == 'list': - rst = RootSchemaType.LIST - elif schema['roottype'] == 'dict': - del schema['roottype'] - rst = RootSchemaType.DICT - except KeyError: - pass - - v = MyValidator() - - if rst == RootSchemaType.DICT: - normalized = v.validated({'document': self._data}, - {'document': { - 'type': 'dict', - 'keysrules': {'type': 'string'}, - 'valuesrules': schema - }})['document'] - elif rst == RootSchemaType.LIST: - v = MyValidator() - normalized = v.validated({'document': self._data}, {'document': schema})['document'] - else: - normalized = v.validated(self._data, schema) - - self._data = normalized - - try: - self.custom_validator(self._data) - except Exception as e: - raise cerberus.DocumentError(f'{self.__class__.__name__}: {str(e)}') - - @staticmethod - def custom_validator(data): - pass - - def get_addr(self, key: str): - return Addr.fromstring(self.get(key)) - - -class AppConfigUnit(ConfigUnit): - _logging_verbose: bool - _logging_fmt: Optional[str] - _logging_file: Optional[str] - - def __init__(self, *args, **kwargs): - super().__init__(load=False, *args, **kwargs) - self._logging_verbose = False - self._logging_fmt = None - self._logging_file = None - - def logging_set_fmt(self, fmt: str) -> None: - self._logging_fmt = fmt - - def logging_get_fmt(self) -> Optional[str]: - try: - return self['logging']['default_fmt'] - except KeyError: - return self._logging_fmt - - def logging_set_file(self, file: str) -> None: - self._logging_file = file - - def logging_get_file(self) -> Optional[str]: - try: - return self['logging']['file'] - except KeyError: - return self._logging_file - - def logging_set_verbose(self): - self._logging_verbose = True - - def logging_is_verbose(self) -> bool: - try: - return bool(self['logging']['verbose']) - except KeyError: - return self._logging_verbose - - -class TranslationUnit(BaseConfigUnit): - pass - - -class Translation: - LANGUAGES = ('en', 'ru') - _langs: dict[str, TranslationUnit] - - def __init__(self, name: str): - super().__init__() - self._langs = {} - for lang in self.LANGUAGES: - for dirname in CONFIG_DIRECTORIES: - if isdir(dirname): - filename = join(dirname, f'i18n-{lang}', f'{name}.yaml') - if lang in self._langs: - raise RuntimeError(f'{name}: translation unit for lang \'{lang}\' already loaded') - self._langs[lang] = TranslationUnit() - self._langs[lang].load_from(filename) - diff = set() - for data in self._langs.values(): - diff ^= data.get().keys() - if len(diff) > 0: - raise RuntimeError(f'{name}: translation units have difference in keys: ' + ', '.join(diff)) - - def get(self, lang: str) -> TranslationUnit: - return self._langs[lang] - - -class Config: - app_name: Optional[str] - app_config: AppConfigUnit - - def __init__(self): - self.app_name = None - self.app_config = AppConfigUnit() - - def load_app(self, - name: Optional[Union[str, AppConfigUnit, bool]] = None, - use_cli=True, - parser: ArgumentParser = None, - no_config=False): - global app_config - - if not no_config \ - and not isinstance(name, str) \ - and not isinstance(name, bool) \ - and issubclass(name, AppConfigUnit) or name == AppConfigUnit: - self.app_name = name.NAME - self.app_config = name() - app_config = self.app_config - else: - self.app_name = name if isinstance(name, str) else None - - if self.app_name is None and not use_cli: - raise RuntimeError('either config name must be none or use_cli must be True') - - no_config = name is False or no_config - path = None - - if use_cli: - if parser is None: - parser = ArgumentParser() - if not no_config: - parser.add_argument('-c', '--config', type=str, required=name is None, - help='Path to the config in TOML or YAML format') - parser.add_argument('-V', '--verbose', action='store_true') - parser.add_argument('--log-file', type=str) - parser.add_argument('--log-default-fmt', action='store_true') - args = parser.parse_args() - - if not no_config and args.config: - path = args.config - - if args.verbose: - self.app_config.logging_set_verbose() - if args.log_file: - self.app_config.logging_set_file(args.log_file) - if args.log_default_fmt: - self.app_config.logging_set_fmt(args.log_default_fmt) - - if not isinstance(name, ConfigUnit): - if not no_config and path is None: - path = ConfigUnit.get_config_path(name=self.app_name) - - if not no_config: - self.app_config.load_from(path) - self.app_config.validate() - - setup_logging(self.app_config.logging_is_verbose(), - self.app_config.logging_get_file(), - self.app_config.logging_get_fmt()) - - if use_cli: - return args - - -config = Config() - - -def is_development_mode() -> bool: - if 'HK_MODE' in os.environ and os.environ['HK_MODE'] == 'dev': - return True - - return ('logging' in config.app_config) and ('verbose' in config.app_config['logging']) and (config.app_config['logging']['verbose'] is True) - - -def setup_logging(verbose=False, log_file=None, default_fmt=None): - logging_level = logging.INFO - if is_development_mode() or verbose: - logging_level = logging.DEBUG - _add_logging_level('TRACE', logging.DEBUG-5) - - log_config = {'level': logging_level} - if not default_fmt: - log_config['format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - - if log_file is not None: - log_config['filename'] = log_file - log_config['encoding'] = 'utf-8' - - logging.basicConfig(**log_config) - - -# https://stackoverflow.com/questions/2183233/how-to-add-a-custom-loglevel-to-pythons-logging-facility/35804945#35804945 -def _add_logging_level(levelName, levelNum, methodName=None): - """ - Comprehensively adds a new logging level to the `logging` module and the - currently configured logging class. - - `levelName` becomes an attribute of the `logging` module with the value - `levelNum`. `methodName` becomes a convenience method for both `logging` - itself and the class returned by `logging.getLoggerClass()` (usually just - `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is - used. - - To avoid accidental clobberings of existing attributes, this method will - raise an `AttributeError` if the level name is already an attribute of the - `logging` module or if the method name is already present - - Example - ------- - >>> addLoggingLevel('TRACE', logging.DEBUG - 5) - >>> logging.getLogger(__name__).setLevel("TRACE") - >>> logging.getLogger(__name__).trace('that worked') - >>> logging.trace('so did this') - >>> logging.TRACE - 5 - - """ - if not methodName: - methodName = levelName.lower() - - if hasattr(logging, levelName): - raise AttributeError('{} already defined in logging module'.format(levelName)) - if hasattr(logging, methodName): - raise AttributeError('{} already defined in logging module'.format(methodName)) - if hasattr(logging.getLoggerClass(), methodName): - raise AttributeError('{} already defined in logger class'.format(methodName)) - - # This method was inspired by the answers to Stack Overflow post - # http://stackoverflow.com/q/2183233/2988730, especially - # http://stackoverflow.com/a/13638084/2988730 - def logForLevel(self, message, *args, **kwargs): - if self.isEnabledFor(levelNum): - self._log(levelNum, message, args, **kwargs) - def logToRoot(message, *args, **kwargs): - logging.log(levelNum, message, *args, **kwargs) - - logging.addLevelName(levelNum, levelName) - setattr(logging, levelName, levelNum) - setattr(logging.getLoggerClass(), methodName, logForLevel) - setattr(logging, methodName, logToRoot)
\ No newline at end of file |