import yaml import logging import os import cerberus import cerberus.errors import re from abc import ABC from typing import Optional, Any, MutableMapping, Union from argparse import ArgumentParser from enum import Enum, auto from os.path import join, isdir, isfile from ..util import Addr class MyValidator(cerberus.Validator): def _normalize_coerce_addr(self, value): return Addr.fromstring(value) MyValidator.types_mapping['addr'] = cerberus.TypeDefinition('Addr', (Addr,), ()) CONFIG_DIRECTORIES = ( join(os.environ['HOME'], '.config', 'homekit'), '/etc/homekit' ) class RootSchemaType(Enum): DEFAULT = auto() DICT = auto() LIST = auto() class BaseConfigUnit(ABC): _data: MutableMapping[str, Any] _logger: logging.Logger def __init__(self): self._data = {} self._logger = logging.getLogger(self.__class__.__name__) def __iter__(self): return iter(self._data) def __getitem__(self, key): return self._data[key] def __setitem__(self, key, value): raise NotImplementedError('overwriting config values is prohibited') def __contains__(self, key): return key in self._data def load_from(self, path: str): print(f'loading config from {path}') with open(path, 'r') as fd: self._data = yaml.safe_load(fd) if self._data is None: raise TypeError(f'config file {path} is empty') def get(self, key: Optional[str] = None, default=None): if key is None: return self._data cur = self._data pts = str(key).split('.') for i in range(len(pts)): k = pts[i] if i < len(pts)-1: if k not in cur: raise KeyError(f'key {k} not found') else: return cur[k] if k in cur else default cur = self._data[k] raise KeyError(f'option {key} not found') def values(self): return self._data.values() def keys(self): return self._data.keys() def items(self): return self._data.items() class ConfigUnit(BaseConfigUnit): NAME = 'dumb' _instance = None __initialized: bool def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(ConfigUnit, cls).__new__(cls, *args, **kwargs) if cls._instance is not None: cls._instance.__initialized = False return cls._instance def __init__(self, name=None, load=True): if self.__initialized: return super().__init__() self._data = {} self._logger = logging.getLogger(self.__class__.__name__) if self.NAME != 'dumb' and load: self.load_from(self.get_config_path()) self.validate() elif name is not None: self.NAME = name self.__initialized = True @classmethod def get_config_path(cls, name=None) -> str: if name is None: name = cls.NAME if name is None: raise ValueError('get_config_path: name is none') for dirname in CONFIG_DIRECTORIES: if isdir(dirname): filename = join(dirname, f'{name}.yaml') if isfile(filename): return filename raise IOError(f'\'{name}.yaml\' not found') @classmethod def schema(cls) -> Optional[dict]: return None @classmethod def _addr_schema(cls, required=False, mac=False, only_ip=False, **kwargs): def validate_mac_address(field, value, error): if not re.match("[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$", value): error(field, "Invalid MAC address format") if mac: l_kwargs = { 'type': 'string', 'check_with': validate_mac_address } else: l_kwargs = { 'type': 'addr', 'coerce': Addr.fromstring if not only_ip else Addr.fromipstring, } return { 'required': required, **l_kwargs, **kwargs } def validate(self): schema = self.schema() if not schema: self._logger.warning('validate: no schema') return if isinstance(self, AppConfigUnit): schema['logging'] = { 'type': 'dict', 'schema': { 'verbose': {'type': 'boolean'} } } rst = RootSchemaType.DEFAULT try: if schema['type'] == 'dict': rst = RootSchemaType.DICT elif schema['type'] == 'list': rst = RootSchemaType.LIST elif schema['roottype'] == 'dict': del schema['roottype'] rst = RootSchemaType.DICT except KeyError: pass v = MyValidator() need_document = False if rst == RootSchemaType.DICT: normalized = v.validated({'document': self._data}, {'document': { 'type': 'dict', 'keysrules': {'type': 'string'}, 'valuesrules': schema }}) need_document = True elif rst == RootSchemaType.LIST: v = MyValidator() normalized = v.validated({'document': self._data}, {'document': schema}) need_document = True else: normalized = v.validated(self._data, schema) if not normalized: raise cerberus.DocumentError(f'validation failed: {v.errors}') if need_document: normalized = normalized['document'] self._data = normalized try: self.custom_validator(self._data) except Exception as e: raise cerberus.DocumentError(f'{self.__class__.__name__}: {str(e)}') @staticmethod def custom_validator(data): pass def get_addr(self, key: str): return Addr.fromstring(self.get(key)) class AppConfigUnit(ConfigUnit): _logging_verbose: bool _logging_fmt: Optional[str] _logging_file: Optional[str] def __init__(self, *args, **kwargs): super().__init__(load=False, *args, **kwargs) self._logging_verbose = False self._logging_fmt = None self._logging_file = None def logging_set_fmt(self, fmt: str) -> None: self._logging_fmt = fmt def logging_get_fmt(self) -> Optional[str]: try: return self['logging']['default_fmt'] except (KeyError, TypeError): return self._logging_fmt def logging_set_file(self, file: str) -> None: self._logging_file = file def logging_get_file(self) -> Optional[str]: try: return self['logging']['file'] except (KeyError, TypeError): return self._logging_file def logging_set_verbose(self): self._logging_verbose = True def logging_is_verbose(self) -> bool: try: return bool(self['logging']['verbose']) except (KeyError, TypeError): return self._logging_verbose class Language(Enum): RU = 'ru' EN = 'en' def name(self): if self == Language.RU: return 'Русский' elif self == Language.EN: return 'English' class TranslationUnit(BaseConfigUnit): pass TranslationInstances = {} class Translation: DEFAULT_LANGUAGE = Language.RU _langs: dict[Language, TranslationUnit] __initialized: bool # def __init_subclass__(cls, **kwargs): # super().__init_subclass__(**kwargs) # cls._instance = None def __new__(cls, *args, **kwargs): unit = args[0] if unit not in TranslationInstances: TranslationInstances[unit] = super(Translation, cls).__new__(cls) TranslationInstances[unit].__initialized = False return TranslationInstances[unit] def __init__(self, name: str): if self.__initialized: return self._langs = {} for lang in Language: for dirname in CONFIG_DIRECTORIES: if isdir(dirname): filename = join(dirname, f'i18n-{lang.value}', f'{name}.yaml') if lang in self._langs: raise RuntimeError(f'{name}: translation unit for lang \'{lang}\' already loaded') self._langs[lang] = TranslationUnit() self._langs[lang].load_from(filename) diff = set() for data in self._langs.values(): diff ^= data.get().keys() if len(diff) > 0: raise RuntimeError(f'{name}: translation units have difference in keys: ' + ', '.join(diff)) self.__initialized = True def get(self, lang: Language = DEFAULT_LANGUAGE) -> TranslationUnit: return self._langs[lang] class Config: app_name: Optional[str] app_config: AppConfigUnit def __init__(self): self.app_name = None self.app_config = AppConfigUnit() def load_app(self, name: Optional[Union[str, AppConfigUnit, bool]] = None, use_cli=True, parser: ArgumentParser = None, no_config=False): global app_config if not no_config \ and not isinstance(name, str) \ and not isinstance(name, bool) \ and issubclass(name, AppConfigUnit) or name == AppConfigUnit: self.app_name = name.NAME self.app_config = name() app_config = self.app_config else: self.app_name = name if isinstance(name, str) else None if self.app_name is None and not use_cli: raise RuntimeError('either config name must be none or use_cli must be True') no_config = name is False or no_config path = None if use_cli: if parser is None: parser = ArgumentParser() if not no_config: parser.add_argument('-c', '--config', type=str, required=name is None, help='Path to the config in TOML or YAML format') parser.add_argument('-V', '--verbose', action='store_true') parser.add_argument('--log-file', type=str) parser.add_argument('--log-default-fmt', action='store_true') args = parser.parse_args() if not no_config and args.config: path = args.config if args.verbose: self.app_config.logging_set_verbose() if args.log_file: self.app_config.logging_set_file(args.log_file) if args.log_default_fmt: self.app_config.logging_set_fmt(args.log_default_fmt) if not isinstance(name, ConfigUnit): if not no_config and path is None: path = ConfigUnit.get_config_path(name=self.app_name) if not no_config: self.app_config.load_from(path) self.app_config.validate() setup_logging(self.app_config.logging_is_verbose(), self.app_config.logging_get_file(), self.app_config.logging_get_fmt()) if use_cli: return args config = Config() def is_development_mode() -> bool: if 'HK_MODE' in os.environ and os.environ['HK_MODE'] == 'dev': return True return ('logging' in config.app_config) and ('verbose' in config.app_config['logging']) and (config.app_config['logging']['verbose'] is True) def setup_logging(verbose=False, log_file=None, default_fmt=None): logging_level = logging.INFO if is_development_mode() or verbose: logging_level = logging.DEBUG _add_logging_level('TRACE', logging.DEBUG-5) log_config = {'level': logging_level} if not default_fmt: log_config['format'] = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' if log_file is not None: log_config['filename'] = log_file log_config['encoding'] = 'utf-8' logging.basicConfig(**log_config) # https://stackoverflow.com/questions/2183233/how-to-add-a-custom-loglevel-to-pythons-logging-facility/35804945#35804945 def _add_logging_level(levelName, levelNum, methodName=None): """ Comprehensively adds a new logging level to the `logging` module and the currently configured logging class. `levelName` becomes an attribute of the `logging` module with the value `levelNum`. `methodName` becomes a convenience method for both `logging` itself and the class returned by `logging.getLoggerClass()` (usually just `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is used. To avoid accidental clobberings of existing attributes, this method will raise an `AttributeError` if the level name is already an attribute of the `logging` module or if the method name is already present Example ------- >>> addLoggingLevel('TRACE', logging.DEBUG - 5) >>> logging.getLogger(__name__).setLevel("TRACE") >>> logging.getLogger(__name__).trace('that worked') >>> logging.trace('so did this') >>> logging.TRACE 5 """ if not methodName: methodName = levelName.lower() if hasattr(logging, levelName): raise AttributeError('{} already defined in logging module'.format(levelName)) if hasattr(logging, methodName): raise AttributeError('{} already defined in logging module'.format(methodName)) if hasattr(logging.getLoggerClass(), methodName): raise AttributeError('{} already defined in logger class'.format(methodName)) # This method was inspired by the answers to Stack Overflow post # http://stackoverflow.com/q/2183233/2988730, especially # http://stackoverflow.com/a/13638084/2988730 def logForLevel(self, message, *args, **kwargs): if self.isEnabledFor(levelNum): self._log(levelNum, message, args, **kwargs) def logToRoot(message, *args, **kwargs): logging.log(levelNum, message, *args, **kwargs) logging.addLevelName(levelNum, levelName) setattr(logging, levelName, levelNum) setattr(logging.getLoggerClass(), methodName, logForLevel) setattr(logging, methodName, logToRoot)