diff --git a/config/__init__.py b/config/__init__.py index b11e958..3a8c815 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,370 +1,11 @@ -import appdirs import click -import os -import toml import logging from copy import deepcopy -from typing import Optional, Union, TypedDict, Any, Mapping +from typing import Any, Optional, Union -from constants import DEFAULT_PACKAGE_BRANCH - -CONFIG_DIR = appdirs.user_config_dir('kupfer') -CACHE_DIR = appdirs.user_cache_dir('kupfer') - -CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml') - - -class Profile(TypedDict, total=False): - parent: str - device: str - flavour: str - pkgs_include: list[str] - pkgs_exclude: list[str] - hostname: str - username: str - password: Optional[str] - size_extra_mb: Union[str, int] - - -PROFILE_DEFAULTS: Profile = { - 'parent': '', - 'device': '', - 'flavour': '', - 'pkgs_include': [], - 'pkgs_exclude': [], - 'hostname': 'kupfer', - 'username': 'kupfer', - 'password': None, - 'size_extra_mb': "0", -} - -PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore - -CONFIG_DEFAULTS: dict = { - 'wrapper': { - 'type': 'docker', - }, - 'build': { - 'ccache': True, - 'clean_mode': True, - 'crosscompile': True, - 'crossdirect': True, - 'threads': 0, - }, - 'pkgbuilds': { - 'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git', - 'git_branch': DEFAULT_PACKAGE_BRANCH, - }, - 'pacman': { - 'parallel_downloads': 4, - 'check_space': True, - 'repo_branch': DEFAULT_PACKAGE_BRANCH, - }, - 'paths': { - 'cache_dir': CACHE_DIR, - 'chroots': os.path.join('%cache_dir%', 'chroots'), - 'pacman': os.path.join('%cache_dir%', 'pacman'), - 'packages': os.path.join('%cache_dir%', 'packages'), - 'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'), - 'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'), - 'images': os.path.join('%cache_dir%', 'images'), - }, - 'profiles': { - 'current': 'default', - 'default': deepcopy(PROFILE_DEFAULTS), - }, -} -CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys()) - -CONFIG_RUNTIME_DEFAULTS = { - 'verbose': False, - 'config_file': None, - 'arch': None, - 'no_wrap': False, - 'script_source_dir': os.path.dirname(os.path.realpath(__file__)), - 'error_shell': False, -} - - -def resolve_path_template(path_template: str, paths: dict[str, str]) -> str: - terminator = '%' # i'll be back - result = path_template - for path_name, path in paths.items(): - result = result.replace(terminator + path_name + terminator, path) - return result - - -def resolve_profile( - name: str, - sparse_profiles: dict[str, Profile], - resolved: dict[str, Profile] = None, - _visited=None, -) -> dict[str, Profile]: - """ - Recursively resolves the specified profile by `name` and its parents to merge the config semantically, - applying include and exclude overrides along the hierarchy. - If `resolved` is passed `None`, a fresh dictionary will be created. - `resolved` will be modified in-place during parsing and also returned. - A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed. - `_visited` should not be passed by users. - """ - if _visited is None: - _visited = list[str]() - if resolved is None: - resolved = dict[str, Profile]() - if name in _visited: - loop = list(_visited) - raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}') - if name in resolved: - return resolved - - logging.debug(f'Resolving profile {name}') - _visited.append(name) - sparse = sparse_profiles[name] - full = deepcopy(sparse) - if 'parent' in sparse and (parent_name := sparse['parent']): - parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name] - full = parent | sparse - # add up size_extra_mb - if 'size_extra_mb' in sparse: - size = sparse['size_extra_mb'] - if isinstance(size, str) and size.startswith('+'): - full['size_extra_mb'] = int(parent.get('size_extra_mb', 0)) + int(size.lstrip('+')) - else: - full['size_extra_mb'] = int(sparse['size_extra_mb']) - # join our includes with parent's - includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', [])) - if 'pkgs_exclude' in sparse: - includes -= set(sparse['pkgs_exclude']) - full['pkgs_include'] = list(includes) - - # join our includes with parent's - excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', [])) - # our includes override parent excludes - if 'pkgs_include' in sparse: - excludes -= set(sparse['pkgs_include']) - full['pkgs_exclude'] = list(excludes) - - # now init missing keys - for key, value in PROFILE_DEFAULTS.items(): - if key not in full.keys(): - full[key] = None # type: ignore[literal-required] - if type(value) == list: - full[key] = [] # type: ignore[literal-required] - - full['size_extra_mb'] = int(full['size_extra_mb'] or 0) - - resolved[name] = full - return resolved - - -def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> dict[str, dict]: - """checks the input config dict for unknown keys and returns only the known parts""" - return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile) - - -def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]: - """ - Returns `conf_new` semantically merged into `conf_base`, after validating - `conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`. - Pass `conf_base={}` to get a sanitized version of `conf_new`. - NOTE: `conf_base` is NOT checked for invalid keys. Sanitize beforehand. - """ - parsed = deepcopy(conf_base) - - for outer_name, outer_conf in deepcopy(conf_new).items(): - # only handle known config sections - if outer_name not in CONFIG_DEFAULTS.keys(): - logging.warning(f'Skipped unknown config section "{outer_name}"') - continue - logging.debug(f'Parsing config section "{outer_name}"') - # check if outer_conf is a dict - if not isinstance(outer_conf, dict): - parsed[outer_name] = outer_conf - else: - # init section - if outer_name not in parsed: - parsed[outer_name] = {} - - # profiles need special handling: - # 1. profile names are unknown keys by definition, but we want 'default' to exist - # 2. A profile's subkeys must be compared against PROFILE_DEFAULTS.keys() - if outer_name == 'profiles': - if warn_missing_defaultprofile and 'default' not in outer_conf.keys(): - logging.warning('Default profile is not defined in config file') - - for profile_name, profile_conf in outer_conf.items(): - if not isinstance(profile_conf, dict): - if profile_name == 'current': - parsed[outer_name][profile_name] = profile_conf - else: - logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed') - continue - - # init profile - if profile_name not in parsed[outer_name]: - parsed[outer_name][profile_name] = {} - - for key, val in profile_conf.items(): - if key not in PROFILE_DEFAULTS: - logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"') - continue - parsed[outer_name][profile_name][key] = val - - else: - # handle generic inner config dict - for inner_name, inner_conf in outer_conf.items(): - if inner_name not in CONFIG_DEFAULTS[outer_name].keys(): - logging.warning(f'Skipped unknown config item "{inner_name}" in "{outer_name}"') - continue - parsed[outer_name][inner_name] = inner_conf - - return parsed - - -def dump_toml(conf) -> str: - return toml.dumps(conf) - - -def dump_file(file_path: str, config: dict, file_mode: int = 0o600): - - def _opener(path, flags): - return os.open(path, flags, file_mode) - - conf_dir = os.path.dirname(file_path) - if not os.path.exists(conf_dir): - os.makedirs(conf_dir) - old_umask = os.umask(0) - with open(file_path, 'w', opener=_opener) as f: - f.write(dump_toml(conf=config)) - os.umask(old_umask) - - -def parse_file(config_file: str, base: dict = CONFIG_DEFAULTS) -> dict: - """ - Parse the toml contents of `config_file`, validating keys against `CONFIG_DEFAULTS`. - The parsed results are semantically merged into `base` before returning. - `base` itself is NOT checked for invalid keys. - """ - _conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH - logging.debug(f'Trying to load config file: {_conf_file}') - loaded_conf = toml.load(_conf_file) - return merge_configs(conf_new=loaded_conf, conf_base=base) - - -class ConfigLoadException(Exception): - inner = None - - def __init__(self, extra_msg='', inner_exception: Exception = None): - msg: list[str] = ['Config load failed!'] - if extra_msg: - msg.append(extra_msg) - if inner_exception: - self.inner = inner_exception - msg.append(str(inner_exception)) - super().__init__(self, ' '.join(msg)) - - -class ConfigLoadState: - load_finished = False - exception = None - - -class ConfigStateHolder: - # config options that are persisted to file - file: dict = {} - # runtime config not persisted anywhere - runtime: dict - file_state: ConfigLoadState - _profile_cache: dict[str, Profile] - - def __init__(self, file_conf_path: Optional[str] = None, runtime_conf={}, file_conf_base: dict = {}): - """init a stateholder, optionally loading `file_conf_path`""" - self.file_state = ConfigLoadState() - self.runtime = CONFIG_RUNTIME_DEFAULTS.copy() - self.runtime.update(runtime_conf) - self.runtime['arch'] = os.uname().machine - self.file.update(file_conf_base) - if file_conf_path: - self.try_load_file(file_conf_path) - - def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS): - config_file = config_file or CONFIG_DEFAULT_PATH - self.runtime['config_file'] = config_file - self._profile_cache = None - try: - self.file = parse_file(config_file=config_file, base=base) - except Exception as ex: - self.file_state.exception = ex - self.file_state.load_finished = True - - def is_loaded(self) -> bool: - "returns True if a file was **sucessfully** loaded" - return self.file_state.load_finished and self.file_state.exception is None - - def enforce_config_loaded(self): - if not self.file_state.load_finished: - raise ConfigLoadException(Exception("Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O")) - ex = self.file_state.exception - if ex: - if type(ex) == FileNotFoundError: - ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?") - raise ex - - def get_profile(self, name: Optional[str] = None) -> Profile: - name = name or self.file['profiles']['current'] - self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache) - return self._profile_cache[name] - - def get_path(self, path_name: str) -> str: - paths = self.file['paths'] - return resolve_path_template(paths[path_name], paths) - - def get_package_dir(self, arch: str): - return os.path.join(self.get_path('packages'), arch) - - def dump(self) -> str: - """dump toml representation of `self.file`""" - return dump_toml(self.file) - - def write(self, path=None): - """write toml representation of `self.file` to `path`""" - if path is None: - path = self.runtime['config_file'] - os.makedirs(os.path.dirname(path), exist_ok=True) - dump_file(path, self.file) - logging.info(f'Created config file at {path}') - - def invalidate_profile_cache(self): - """Clear the profile cache (usually after modification)""" - self._profile_cache = None - - def update(self, config_fragment: dict[str, dict], warn_missing_defaultprofile: bool = True) -> bool: - """Update `self.file` with `config_fragment`. Returns `True` if the config was changed""" - merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile) - changed = self.file != merged - self.file = merged - if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']: - self.invalidate_profile_cache() - return changed - - def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True): - new = {} - if name not in self.file['profiles']: - if not create: - raise Exception(f'Unknown profile: {name}') - else: - if merge: - new = deepcopy(self.file['profiles'][name]) - - logging.debug(f'new: {new}') - logging.debug(f'profile: {profile}') - new |= profile - - if prune: - new = {key: val for key, val in new.items() if val is not None} - self.file['profiles'][name] = new - self.invalidate_profile_cache() +from .scheme import Profile +from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS +from .state import ConfigStateHolder, CONFIG_DEFAULTS, CONFIG_SECTIONS, merge_configs def list_to_comma_str(str_list: list[str], default='') -> str: @@ -417,11 +58,10 @@ def prompt_config( changed = result != (original_default if field_type == list else default) and (true_or_zero(default) or true_or_zero(result)) if changed and echo_changes: print(f'value changed: "{text}" = "{result}"') - return result, changed -def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> tuple[Profile, bool]: +def prompt_profile(name: str, create: bool = True, defaults: Union[Profile, dict] = {}) -> tuple[Profile, bool]: """Prompts the user for every field in `defaults`. Set values to None for an empty profile.""" profile: Any = PROFILE_EMPTY | defaults diff --git a/config/profile.py b/config/profile.py new file mode 100644 index 0000000..431e620 --- /dev/null +++ b/config/profile.py @@ -0,0 +1,83 @@ +import logging + +from copy import deepcopy + +from .scheme import Profile + +PROFILE_DEFAULTS = Profile.fromDict({ + 'parent': '', + 'device': '', + 'flavour': '', + 'pkgs_include': [], + 'pkgs_exclude': [], + 'hostname': 'kupfer', + 'username': 'kupfer', + 'password': None, + 'size_extra_mb': "0", +}) + +PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore + + +def resolve_profile( + name: str, + sparse_profiles: dict[str, Profile], + resolved: dict[str, Profile] = None, + _visited=None, +) -> dict[str, Profile]: + """ + Recursively resolves the specified profile by `name` and its parents to merge the config semantically, + applying include and exclude overrides along the hierarchy. + If `resolved` is passed `None`, a fresh dictionary will be created. + `resolved` will be modified in-place during parsing and also returned. + A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed. + `_visited` should not be passed by users. + """ + if _visited is None: + _visited = list[str]() + if resolved is None: + resolved = dict[str, Profile]() + if name in _visited: + loop = list(_visited) + raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}') + if name in resolved: + return resolved + + logging.debug(f'Resolving profile {name}') + _visited.append(name) + sparse = sparse_profiles[name] + full = deepcopy(sparse) + if 'parent' in sparse and (parent_name := sparse['parent']): + parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name] + full = parent | sparse + # add up size_extra_mb + if 'size_extra_mb' in sparse: + size = sparse['size_extra_mb'] + if isinstance(size, str) and size.startswith('+'): + full['size_extra_mb'] = int(parent.get('size_extra_mb', 0)) + int(size.lstrip('+')) + else: + full['size_extra_mb'] = int(sparse['size_extra_mb']) + # join our includes with parent's + includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', [])) + if 'pkgs_exclude' in sparse: + includes -= set(sparse['pkgs_exclude']) + full['pkgs_include'] = list(includes) + + # join our includes with parent's + excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', [])) + # our includes override parent excludes + if 'pkgs_include' in sparse: + excludes -= set(sparse['pkgs_include']) + full['pkgs_exclude'] = list(excludes) + + # now init missing keys + for key, value in PROFILE_DEFAULTS.items(): + if key not in full.keys(): + full[key] = None # type: ignore[literal-required] + if type(value) == list: + full[key] = [] # type: ignore[literal-required] + + full['size_extra_mb'] = int(full['size_extra_mb'] or 0) + + resolved[name] = full + return resolved diff --git a/config/state.py b/config/state.py new file mode 100644 index 0000000..968fb6c --- /dev/null +++ b/config/state.py @@ -0,0 +1,279 @@ +import appdirs +import logging +import os +import toml +from copy import deepcopy +from typing import Mapping, Optional + +from constants import DEFAULT_PACKAGE_BRANCH + +from .scheme import Profile +from .profile import PROFILE_DEFAULTS, resolve_profile + + +CONFIG_DIR = appdirs.user_config_dir('kupfer') +CACHE_DIR = appdirs.user_cache_dir('kupfer') +CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml') + + +CONFIG_DEFAULTS: dict = { + 'wrapper': { + 'type': 'docker', + }, + 'build': { + 'ccache': True, + 'clean_mode': True, + 'crosscompile': True, + 'crossdirect': True, + 'threads': 0, + }, + 'pkgbuilds': { + 'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git', + 'git_branch': DEFAULT_PACKAGE_BRANCH, + }, + 'pacman': { + 'parallel_downloads': 4, + 'check_space': True, + 'repo_branch': DEFAULT_PACKAGE_BRANCH, + }, + 'paths': { + 'cache_dir': CACHE_DIR, + 'chroots': os.path.join('%cache_dir%', 'chroots'), + 'pacman': os.path.join('%cache_dir%', 'pacman'), + 'packages': os.path.join('%cache_dir%', 'packages'), + 'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'), + 'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'), + 'images': os.path.join('%cache_dir%', 'images'), + }, + 'profiles': { + 'current': 'default', + 'default': deepcopy(PROFILE_DEFAULTS), + }, +} +CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys()) + +CONFIG_RUNTIME_DEFAULTS = { + 'verbose': False, + 'config_file': None, + 'arch': None, + 'no_wrap': False, + 'script_source_dir': os.path.dirname(os.path.realpath(__file__)), + 'error_shell': False, +} + + +def resolve_path_template(path_template: str, paths: dict[str, str]) -> str: + terminator = '%' # i'll be back + result = path_template + for path_name, path in paths.items(): + result = result.replace(terminator + path_name + terminator, path) + return result + + +def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> dict[str, dict]: + """checks the input config dict for unknown keys and returns only the known parts""" + return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile) + + +def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]: + """ + Returns `conf_new` semantically merged into `conf_base`, after validating + `conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`. + Pass `conf_base={}` to get a sanitized version of `conf_new`. + NOTE: `conf_base` is NOT checked for invalid keys. Sanitize beforehand. + """ + parsed = deepcopy(conf_base) + + for outer_name, outer_conf in deepcopy(conf_new).items(): + # only handle known config sections + if outer_name not in CONFIG_DEFAULTS.keys(): + logging.warning(f'Skipped unknown config section "{outer_name}"') + continue + logging.debug(f'Parsing config section "{outer_name}"') + # check if outer_conf is a dict + if not isinstance(outer_conf, dict): + parsed[outer_name] = outer_conf + else: + # init section + if outer_name not in parsed: + parsed[outer_name] = {} + + # profiles need special handling: + # 1. profile names are unknown keys by definition, but we want 'default' to exist + # 2. A profile's subkeys must be compared against PROFILE_DEFAULTS.keys() + if outer_name == 'profiles': + if warn_missing_defaultprofile and 'default' not in outer_conf.keys(): + logging.warning('Default profile is not defined in config file') + + for profile_name, profile_conf in outer_conf.items(): + if not isinstance(profile_conf, dict): + if profile_name == 'current': + parsed[outer_name][profile_name] = profile_conf + else: + logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed') + continue + + # init profile + if profile_name not in parsed[outer_name]: + parsed[outer_name][profile_name] = {} + + for key, val in profile_conf.items(): + if key not in PROFILE_DEFAULTS: + logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"') + continue + parsed[outer_name][profile_name][key] = val + + else: + # handle generic inner config dict + for inner_name, inner_conf in outer_conf.items(): + if inner_name not in CONFIG_DEFAULTS[outer_name].keys(): + logging.warning(f'Skipped unknown config item "{inner_name}" in "{outer_name}"') + continue + parsed[outer_name][inner_name] = inner_conf + + return parsed + + +def dump_toml(conf) -> str: + return toml.dumps(conf) + + +def dump_file(file_path: str, config: dict, file_mode: int = 0o600): + + def _opener(path, flags): + return os.open(path, flags, file_mode) + + conf_dir = os.path.dirname(file_path) + if not os.path.exists(conf_dir): + os.makedirs(conf_dir) + old_umask = os.umask(0) + with open(file_path, 'w', opener=_opener) as f: + f.write(dump_toml(conf=config)) + os.umask(old_umask) + + +def parse_file(config_file: str, base: dict = CONFIG_DEFAULTS) -> dict: + """ + Parse the toml contents of `config_file`, validating keys against `CONFIG_DEFAULTS`. + The parsed results are semantically merged into `base` before returning. + `base` itself is NOT checked for invalid keys. + """ + _conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH + logging.debug(f'Trying to load config file: {_conf_file}') + loaded_conf = toml.load(_conf_file) + return merge_configs(conf_new=loaded_conf, conf_base=base) + + +class ConfigLoadException(Exception): + inner = None + + def __init__(self, extra_msg='', inner_exception: Exception = None): + msg: list[str] = ['Config load failed!'] + if extra_msg: + msg.append(extra_msg) + if inner_exception: + self.inner = inner_exception + msg.append(str(inner_exception)) + super().__init__(self, ' '.join(msg)) + + +class ConfigLoadState: + load_finished = False + exception = None + + +class ConfigStateHolder: + # config options that are persisted to file + file: dict = {} + # runtime config not persisted anywhere + runtime: dict + file_state: ConfigLoadState + _profile_cache: dict[str, Profile] + + def __init__(self, file_conf_path: Optional[str] = None, runtime_conf={}, file_conf_base: dict = {}): + """init a stateholder, optionally loading `file_conf_path`""" + self.file_state = ConfigLoadState() + self.runtime = CONFIG_RUNTIME_DEFAULTS.copy() + self.runtime.update(runtime_conf) + self.runtime['arch'] = os.uname().machine + self.file.update(file_conf_base) + if file_conf_path: + self.try_load_file(file_conf_path) + + def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS): + config_file = config_file or CONFIG_DEFAULT_PATH + self.runtime['config_file'] = config_file + self._profile_cache = None + try: + self.file = parse_file(config_file=config_file, base=base) + except Exception as ex: + self.file_state.exception = ex + self.file_state.load_finished = True + + def is_loaded(self) -> bool: + "returns True if a file was **sucessfully** loaded" + return self.file_state.load_finished and self.file_state.exception is None + + def enforce_config_loaded(self): + if not self.file_state.load_finished: + raise ConfigLoadException(Exception("Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O")) + ex = self.file_state.exception + if ex: + if type(ex) == FileNotFoundError: + ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?") + raise ex + + def get_profile(self, name: Optional[str] = None) -> Profile: + name = name or self.file['profiles']['current'] + self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache) + return self._profile_cache[name] + + def get_path(self, path_name: str) -> str: + paths = self.file['paths'] + return resolve_path_template(paths[path_name], paths) + + def get_package_dir(self, arch: str): + return os.path.join(self.get_path('packages'), arch) + + def dump(self) -> str: + """dump toml representation of `self.file`""" + return dump_toml(self.file) + + def write(self, path=None): + """write toml representation of `self.file` to `path`""" + if path is None: + path = self.runtime['config_file'] + os.makedirs(os.path.dirname(path), exist_ok=True) + dump_file(path, self.file) + logging.info(f'Created config file at {path}') + + def invalidate_profile_cache(self): + """Clear the profile cache (usually after modification)""" + self._profile_cache = None + + def update(self, config_fragment: dict[str, dict], warn_missing_defaultprofile: bool = True) -> bool: + """Update `self.file` with `config_fragment`. Returns `True` if the config was changed""" + merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile) + changed = self.file != merged + self.file = merged + if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']: + self.invalidate_profile_cache() + return changed + + def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True): + new = {} + if name not in self.file['profiles']: + if not create: + raise Exception(f'Unknown profile: {name}') + else: + if merge: + new = deepcopy(self.file['profiles'][name]) + + logging.debug(f'new: {new}') + logging.debug(f'profile: {profile}') + new |= profile + + if prune: + new = {key: val for key, val in new.items() if val is not None} + self.file['profiles'][name] = new + self.invalidate_profile_cache() diff --git a/config/test_config.py b/config/test_config.py index 5ec0420..aecb3dc 100644 --- a/config/test_config.py +++ b/config/test_config.py @@ -5,8 +5,9 @@ from tempfile import mktemp, gettempdir as get_system_tempdir import toml from typing import Optional -from config import CONFIG_DEFAULTS, ConfigStateHolder, PROFILE_DEFAULTS +from config.profile import PROFILE_DEFAULTS from config.scheme import Config, Profile +from config.state import CONFIG_DEFAULTS, ConfigStateHolder def get_filename(): diff --git a/wrapper/wrapper.py b/wrapper/wrapper.py index 714e19f..4dcc6b3 100644 --- a/wrapper/wrapper.py +++ b/wrapper/wrapper.py @@ -5,7 +5,8 @@ import pathlib from typing import Protocol -from config import config, dump_file as dump_config_file +from config import config +from config.state import dump_file as dump_config_file from constants import CHROOT_PATHS