config: split up into state.py and profile.py, fixup tests
This commit is contained in:
parent
16fd2f1590
commit
7eefafc386
5 changed files with 371 additions and 367 deletions
|
@ -1,370 +1,11 @@
|
|||
import appdirs
|
||||
import click
|
||||
import os
|
||||
import toml
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from typing import Optional, Union, TypedDict, Any, Mapping
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from constants import DEFAULT_PACKAGE_BRANCH
|
||||
|
||||
CONFIG_DIR = appdirs.user_config_dir('kupfer')
|
||||
CACHE_DIR = appdirs.user_cache_dir('kupfer')
|
||||
|
||||
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
|
||||
|
||||
|
||||
class Profile(TypedDict, total=False):
|
||||
parent: str
|
||||
device: str
|
||||
flavour: str
|
||||
pkgs_include: list[str]
|
||||
pkgs_exclude: list[str]
|
||||
hostname: str
|
||||
username: str
|
||||
password: Optional[str]
|
||||
size_extra_mb: Union[str, int]
|
||||
|
||||
|
||||
PROFILE_DEFAULTS: Profile = {
|
||||
'parent': '',
|
||||
'device': '',
|
||||
'flavour': '',
|
||||
'pkgs_include': [],
|
||||
'pkgs_exclude': [],
|
||||
'hostname': 'kupfer',
|
||||
'username': 'kupfer',
|
||||
'password': None,
|
||||
'size_extra_mb': "0",
|
||||
}
|
||||
|
||||
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
|
||||
|
||||
CONFIG_DEFAULTS: dict = {
|
||||
'wrapper': {
|
||||
'type': 'docker',
|
||||
},
|
||||
'build': {
|
||||
'ccache': True,
|
||||
'clean_mode': True,
|
||||
'crosscompile': True,
|
||||
'crossdirect': True,
|
||||
'threads': 0,
|
||||
},
|
||||
'pkgbuilds': {
|
||||
'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git',
|
||||
'git_branch': DEFAULT_PACKAGE_BRANCH,
|
||||
},
|
||||
'pacman': {
|
||||
'parallel_downloads': 4,
|
||||
'check_space': True,
|
||||
'repo_branch': DEFAULT_PACKAGE_BRANCH,
|
||||
},
|
||||
'paths': {
|
||||
'cache_dir': CACHE_DIR,
|
||||
'chroots': os.path.join('%cache_dir%', 'chroots'),
|
||||
'pacman': os.path.join('%cache_dir%', 'pacman'),
|
||||
'packages': os.path.join('%cache_dir%', 'packages'),
|
||||
'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'),
|
||||
'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'),
|
||||
'images': os.path.join('%cache_dir%', 'images'),
|
||||
},
|
||||
'profiles': {
|
||||
'current': 'default',
|
||||
'default': deepcopy(PROFILE_DEFAULTS),
|
||||
},
|
||||
}
|
||||
CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
|
||||
|
||||
CONFIG_RUNTIME_DEFAULTS = {
|
||||
'verbose': False,
|
||||
'config_file': None,
|
||||
'arch': None,
|
||||
'no_wrap': False,
|
||||
'script_source_dir': os.path.dirname(os.path.realpath(__file__)),
|
||||
'error_shell': False,
|
||||
}
|
||||
|
||||
|
||||
def resolve_path_template(path_template: str, paths: dict[str, str]) -> str:
|
||||
terminator = '%' # i'll be back
|
||||
result = path_template
|
||||
for path_name, path in paths.items():
|
||||
result = result.replace(terminator + path_name + terminator, path)
|
||||
return result
|
||||
|
||||
|
||||
def resolve_profile(
|
||||
name: str,
|
||||
sparse_profiles: dict[str, Profile],
|
||||
resolved: dict[str, Profile] = None,
|
||||
_visited=None,
|
||||
) -> dict[str, Profile]:
|
||||
"""
|
||||
Recursively resolves the specified profile by `name` and its parents to merge the config semantically,
|
||||
applying include and exclude overrides along the hierarchy.
|
||||
If `resolved` is passed `None`, a fresh dictionary will be created.
|
||||
`resolved` will be modified in-place during parsing and also returned.
|
||||
A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed.
|
||||
`_visited` should not be passed by users.
|
||||
"""
|
||||
if _visited is None:
|
||||
_visited = list[str]()
|
||||
if resolved is None:
|
||||
resolved = dict[str, Profile]()
|
||||
if name in _visited:
|
||||
loop = list(_visited)
|
||||
raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}')
|
||||
if name in resolved:
|
||||
return resolved
|
||||
|
||||
logging.debug(f'Resolving profile {name}')
|
||||
_visited.append(name)
|
||||
sparse = sparse_profiles[name]
|
||||
full = deepcopy(sparse)
|
||||
if 'parent' in sparse and (parent_name := sparse['parent']):
|
||||
parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name]
|
||||
full = parent | sparse
|
||||
# add up size_extra_mb
|
||||
if 'size_extra_mb' in sparse:
|
||||
size = sparse['size_extra_mb']
|
||||
if isinstance(size, str) and size.startswith('+'):
|
||||
full['size_extra_mb'] = int(parent.get('size_extra_mb', 0)) + int(size.lstrip('+'))
|
||||
else:
|
||||
full['size_extra_mb'] = int(sparse['size_extra_mb'])
|
||||
# join our includes with parent's
|
||||
includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', []))
|
||||
if 'pkgs_exclude' in sparse:
|
||||
includes -= set(sparse['pkgs_exclude'])
|
||||
full['pkgs_include'] = list(includes)
|
||||
|
||||
# join our includes with parent's
|
||||
excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', []))
|
||||
# our includes override parent excludes
|
||||
if 'pkgs_include' in sparse:
|
||||
excludes -= set(sparse['pkgs_include'])
|
||||
full['pkgs_exclude'] = list(excludes)
|
||||
|
||||
# now init missing keys
|
||||
for key, value in PROFILE_DEFAULTS.items():
|
||||
if key not in full.keys():
|
||||
full[key] = None # type: ignore[literal-required]
|
||||
if type(value) == list:
|
||||
full[key] = [] # type: ignore[literal-required]
|
||||
|
||||
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)
|
||||
|
||||
resolved[name] = full
|
||||
return resolved
|
||||
|
||||
|
||||
def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> dict[str, dict]:
|
||||
"""checks the input config dict for unknown keys and returns only the known parts"""
|
||||
return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile)
|
||||
|
||||
|
||||
def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]:
|
||||
"""
|
||||
Returns `conf_new` semantically merged into `conf_base`, after validating
|
||||
`conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`.
|
||||
Pass `conf_base={}` to get a sanitized version of `conf_new`.
|
||||
NOTE: `conf_base` is NOT checked for invalid keys. Sanitize beforehand.
|
||||
"""
|
||||
parsed = deepcopy(conf_base)
|
||||
|
||||
for outer_name, outer_conf in deepcopy(conf_new).items():
|
||||
# only handle known config sections
|
||||
if outer_name not in CONFIG_DEFAULTS.keys():
|
||||
logging.warning(f'Skipped unknown config section "{outer_name}"')
|
||||
continue
|
||||
logging.debug(f'Parsing config section "{outer_name}"')
|
||||
# check if outer_conf is a dict
|
||||
if not isinstance(outer_conf, dict):
|
||||
parsed[outer_name] = outer_conf
|
||||
else:
|
||||
# init section
|
||||
if outer_name not in parsed:
|
||||
parsed[outer_name] = {}
|
||||
|
||||
# profiles need special handling:
|
||||
# 1. profile names are unknown keys by definition, but we want 'default' to exist
|
||||
# 2. A profile's subkeys must be compared against PROFILE_DEFAULTS.keys()
|
||||
if outer_name == 'profiles':
|
||||
if warn_missing_defaultprofile and 'default' not in outer_conf.keys():
|
||||
logging.warning('Default profile is not defined in config file')
|
||||
|
||||
for profile_name, profile_conf in outer_conf.items():
|
||||
if not isinstance(profile_conf, dict):
|
||||
if profile_name == 'current':
|
||||
parsed[outer_name][profile_name] = profile_conf
|
||||
else:
|
||||
logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed')
|
||||
continue
|
||||
|
||||
# init profile
|
||||
if profile_name not in parsed[outer_name]:
|
||||
parsed[outer_name][profile_name] = {}
|
||||
|
||||
for key, val in profile_conf.items():
|
||||
if key not in PROFILE_DEFAULTS:
|
||||
logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"')
|
||||
continue
|
||||
parsed[outer_name][profile_name][key] = val
|
||||
|
||||
else:
|
||||
# handle generic inner config dict
|
||||
for inner_name, inner_conf in outer_conf.items():
|
||||
if inner_name not in CONFIG_DEFAULTS[outer_name].keys():
|
||||
logging.warning(f'Skipped unknown config item "{inner_name}" in "{outer_name}"')
|
||||
continue
|
||||
parsed[outer_name][inner_name] = inner_conf
|
||||
|
||||
return parsed
|
||||
|
||||
|
||||
def dump_toml(conf) -> str:
|
||||
return toml.dumps(conf)
|
||||
|
||||
|
||||
def dump_file(file_path: str, config: dict, file_mode: int = 0o600):
|
||||
|
||||
def _opener(path, flags):
|
||||
return os.open(path, flags, file_mode)
|
||||
|
||||
conf_dir = os.path.dirname(file_path)
|
||||
if not os.path.exists(conf_dir):
|
||||
os.makedirs(conf_dir)
|
||||
old_umask = os.umask(0)
|
||||
with open(file_path, 'w', opener=_opener) as f:
|
||||
f.write(dump_toml(conf=config))
|
||||
os.umask(old_umask)
|
||||
|
||||
|
||||
def parse_file(config_file: str, base: dict = CONFIG_DEFAULTS) -> dict:
|
||||
"""
|
||||
Parse the toml contents of `config_file`, validating keys against `CONFIG_DEFAULTS`.
|
||||
The parsed results are semantically merged into `base` before returning.
|
||||
`base` itself is NOT checked for invalid keys.
|
||||
"""
|
||||
_conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH
|
||||
logging.debug(f'Trying to load config file: {_conf_file}')
|
||||
loaded_conf = toml.load(_conf_file)
|
||||
return merge_configs(conf_new=loaded_conf, conf_base=base)
|
||||
|
||||
|
||||
class ConfigLoadException(Exception):
|
||||
inner = None
|
||||
|
||||
def __init__(self, extra_msg='', inner_exception: Exception = None):
|
||||
msg: list[str] = ['Config load failed!']
|
||||
if extra_msg:
|
||||
msg.append(extra_msg)
|
||||
if inner_exception:
|
||||
self.inner = inner_exception
|
||||
msg.append(str(inner_exception))
|
||||
super().__init__(self, ' '.join(msg))
|
||||
|
||||
|
||||
class ConfigLoadState:
|
||||
load_finished = False
|
||||
exception = None
|
||||
|
||||
|
||||
class ConfigStateHolder:
|
||||
# config options that are persisted to file
|
||||
file: dict = {}
|
||||
# runtime config not persisted anywhere
|
||||
runtime: dict
|
||||
file_state: ConfigLoadState
|
||||
_profile_cache: dict[str, Profile]
|
||||
|
||||
def __init__(self, file_conf_path: Optional[str] = None, runtime_conf={}, file_conf_base: dict = {}):
|
||||
"""init a stateholder, optionally loading `file_conf_path`"""
|
||||
self.file_state = ConfigLoadState()
|
||||
self.runtime = CONFIG_RUNTIME_DEFAULTS.copy()
|
||||
self.runtime.update(runtime_conf)
|
||||
self.runtime['arch'] = os.uname().machine
|
||||
self.file.update(file_conf_base)
|
||||
if file_conf_path:
|
||||
self.try_load_file(file_conf_path)
|
||||
|
||||
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
|
||||
config_file = config_file or CONFIG_DEFAULT_PATH
|
||||
self.runtime['config_file'] = config_file
|
||||
self._profile_cache = None
|
||||
try:
|
||||
self.file = parse_file(config_file=config_file, base=base)
|
||||
except Exception as ex:
|
||||
self.file_state.exception = ex
|
||||
self.file_state.load_finished = True
|
||||
|
||||
def is_loaded(self) -> bool:
|
||||
"returns True if a file was **sucessfully** loaded"
|
||||
return self.file_state.load_finished and self.file_state.exception is None
|
||||
|
||||
def enforce_config_loaded(self):
|
||||
if not self.file_state.load_finished:
|
||||
raise ConfigLoadException(Exception("Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O"))
|
||||
ex = self.file_state.exception
|
||||
if ex:
|
||||
if type(ex) == FileNotFoundError:
|
||||
ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?")
|
||||
raise ex
|
||||
|
||||
def get_profile(self, name: Optional[str] = None) -> Profile:
|
||||
name = name or self.file['profiles']['current']
|
||||
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
|
||||
return self._profile_cache[name]
|
||||
|
||||
def get_path(self, path_name: str) -> str:
|
||||
paths = self.file['paths']
|
||||
return resolve_path_template(paths[path_name], paths)
|
||||
|
||||
def get_package_dir(self, arch: str):
|
||||
return os.path.join(self.get_path('packages'), arch)
|
||||
|
||||
def dump(self) -> str:
|
||||
"""dump toml representation of `self.file`"""
|
||||
return dump_toml(self.file)
|
||||
|
||||
def write(self, path=None):
|
||||
"""write toml representation of `self.file` to `path`"""
|
||||
if path is None:
|
||||
path = self.runtime['config_file']
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
dump_file(path, self.file)
|
||||
logging.info(f'Created config file at {path}')
|
||||
|
||||
def invalidate_profile_cache(self):
|
||||
"""Clear the profile cache (usually after modification)"""
|
||||
self._profile_cache = None
|
||||
|
||||
def update(self, config_fragment: dict[str, dict], warn_missing_defaultprofile: bool = True) -> bool:
|
||||
"""Update `self.file` with `config_fragment`. Returns `True` if the config was changed"""
|
||||
merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile)
|
||||
changed = self.file != merged
|
||||
self.file = merged
|
||||
if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']:
|
||||
self.invalidate_profile_cache()
|
||||
return changed
|
||||
|
||||
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
|
||||
new = {}
|
||||
if name not in self.file['profiles']:
|
||||
if not create:
|
||||
raise Exception(f'Unknown profile: {name}')
|
||||
else:
|
||||
if merge:
|
||||
new = deepcopy(self.file['profiles'][name])
|
||||
|
||||
logging.debug(f'new: {new}')
|
||||
logging.debug(f'profile: {profile}')
|
||||
new |= profile
|
||||
|
||||
if prune:
|
||||
new = {key: val for key, val in new.items() if val is not None}
|
||||
self.file['profiles'][name] = new
|
||||
self.invalidate_profile_cache()
|
||||
from .scheme import Profile
|
||||
from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS
|
||||
from .state import ConfigStateHolder, CONFIG_DEFAULTS, CONFIG_SECTIONS, merge_configs
|
||||
|
||||
|
||||
def list_to_comma_str(str_list: list[str], default='') -> str:
|
||||
|
@ -417,11 +58,10 @@ def prompt_config(
|
|||
changed = result != (original_default if field_type == list else default) and (true_or_zero(default) or true_or_zero(result))
|
||||
if changed and echo_changes:
|
||||
print(f'value changed: "{text}" = "{result}"')
|
||||
|
||||
return result, changed
|
||||
|
||||
|
||||
def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> tuple[Profile, bool]:
|
||||
def prompt_profile(name: str, create: bool = True, defaults: Union[Profile, dict] = {}) -> tuple[Profile, bool]:
|
||||
"""Prompts the user for every field in `defaults`. Set values to None for an empty profile."""
|
||||
|
||||
profile: Any = PROFILE_EMPTY | defaults
|
||||
|
|
83
config/profile.py
Normal file
83
config/profile.py
Normal file
|
@ -0,0 +1,83 @@
|
|||
import logging
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
from .scheme import Profile
|
||||
|
||||
PROFILE_DEFAULTS = Profile.fromDict({
|
||||
'parent': '',
|
||||
'device': '',
|
||||
'flavour': '',
|
||||
'pkgs_include': [],
|
||||
'pkgs_exclude': [],
|
||||
'hostname': 'kupfer',
|
||||
'username': 'kupfer',
|
||||
'password': None,
|
||||
'size_extra_mb': "0",
|
||||
})
|
||||
|
||||
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
|
||||
|
||||
|
||||
def resolve_profile(
|
||||
name: str,
|
||||
sparse_profiles: dict[str, Profile],
|
||||
resolved: dict[str, Profile] = None,
|
||||
_visited=None,
|
||||
) -> dict[str, Profile]:
|
||||
"""
|
||||
Recursively resolves the specified profile by `name` and its parents to merge the config semantically,
|
||||
applying include and exclude overrides along the hierarchy.
|
||||
If `resolved` is passed `None`, a fresh dictionary will be created.
|
||||
`resolved` will be modified in-place during parsing and also returned.
|
||||
A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed.
|
||||
`_visited` should not be passed by users.
|
||||
"""
|
||||
if _visited is None:
|
||||
_visited = list[str]()
|
||||
if resolved is None:
|
||||
resolved = dict[str, Profile]()
|
||||
if name in _visited:
|
||||
loop = list(_visited)
|
||||
raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}')
|
||||
if name in resolved:
|
||||
return resolved
|
||||
|
||||
logging.debug(f'Resolving profile {name}')
|
||||
_visited.append(name)
|
||||
sparse = sparse_profiles[name]
|
||||
full = deepcopy(sparse)
|
||||
if 'parent' in sparse and (parent_name := sparse['parent']):
|
||||
parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name]
|
||||
full = parent | sparse
|
||||
# add up size_extra_mb
|
||||
if 'size_extra_mb' in sparse:
|
||||
size = sparse['size_extra_mb']
|
||||
if isinstance(size, str) and size.startswith('+'):
|
||||
full['size_extra_mb'] = int(parent.get('size_extra_mb', 0)) + int(size.lstrip('+'))
|
||||
else:
|
||||
full['size_extra_mb'] = int(sparse['size_extra_mb'])
|
||||
# join our includes with parent's
|
||||
includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', []))
|
||||
if 'pkgs_exclude' in sparse:
|
||||
includes -= set(sparse['pkgs_exclude'])
|
||||
full['pkgs_include'] = list(includes)
|
||||
|
||||
# join our includes with parent's
|
||||
excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', []))
|
||||
# our includes override parent excludes
|
||||
if 'pkgs_include' in sparse:
|
||||
excludes -= set(sparse['pkgs_include'])
|
||||
full['pkgs_exclude'] = list(excludes)
|
||||
|
||||
# now init missing keys
|
||||
for key, value in PROFILE_DEFAULTS.items():
|
||||
if key not in full.keys():
|
||||
full[key] = None # type: ignore[literal-required]
|
||||
if type(value) == list:
|
||||
full[key] = [] # type: ignore[literal-required]
|
||||
|
||||
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)
|
||||
|
||||
resolved[name] = full
|
||||
return resolved
|
279
config/state.py
Normal file
279
config/state.py
Normal file
|
@ -0,0 +1,279 @@
|
|||
import appdirs
|
||||
import logging
|
||||
import os
|
||||
import toml
|
||||
from copy import deepcopy
|
||||
from typing import Mapping, Optional
|
||||
|
||||
from constants import DEFAULT_PACKAGE_BRANCH
|
||||
|
||||
from .scheme import Profile
|
||||
from .profile import PROFILE_DEFAULTS, resolve_profile
|
||||
|
||||
|
||||
CONFIG_DIR = appdirs.user_config_dir('kupfer')
|
||||
CACHE_DIR = appdirs.user_cache_dir('kupfer')
|
||||
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
|
||||
|
||||
|
||||
CONFIG_DEFAULTS: dict = {
|
||||
'wrapper': {
|
||||
'type': 'docker',
|
||||
},
|
||||
'build': {
|
||||
'ccache': True,
|
||||
'clean_mode': True,
|
||||
'crosscompile': True,
|
||||
'crossdirect': True,
|
||||
'threads': 0,
|
||||
},
|
||||
'pkgbuilds': {
|
||||
'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git',
|
||||
'git_branch': DEFAULT_PACKAGE_BRANCH,
|
||||
},
|
||||
'pacman': {
|
||||
'parallel_downloads': 4,
|
||||
'check_space': True,
|
||||
'repo_branch': DEFAULT_PACKAGE_BRANCH,
|
||||
},
|
||||
'paths': {
|
||||
'cache_dir': CACHE_DIR,
|
||||
'chroots': os.path.join('%cache_dir%', 'chroots'),
|
||||
'pacman': os.path.join('%cache_dir%', 'pacman'),
|
||||
'packages': os.path.join('%cache_dir%', 'packages'),
|
||||
'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'),
|
||||
'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'),
|
||||
'images': os.path.join('%cache_dir%', 'images'),
|
||||
},
|
||||
'profiles': {
|
||||
'current': 'default',
|
||||
'default': deepcopy(PROFILE_DEFAULTS),
|
||||
},
|
||||
}
|
||||
CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
|
||||
|
||||
CONFIG_RUNTIME_DEFAULTS = {
|
||||
'verbose': False,
|
||||
'config_file': None,
|
||||
'arch': None,
|
||||
'no_wrap': False,
|
||||
'script_source_dir': os.path.dirname(os.path.realpath(__file__)),
|
||||
'error_shell': False,
|
||||
}
|
||||
|
||||
|
||||
def resolve_path_template(path_template: str, paths: dict[str, str]) -> str:
|
||||
terminator = '%' # i'll be back
|
||||
result = path_template
|
||||
for path_name, path in paths.items():
|
||||
result = result.replace(terminator + path_name + terminator, path)
|
||||
return result
|
||||
|
||||
|
||||
def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> dict[str, dict]:
|
||||
"""checks the input config dict for unknown keys and returns only the known parts"""
|
||||
return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile)
|
||||
|
||||
|
||||
def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]:
|
||||
"""
|
||||
Returns `conf_new` semantically merged into `conf_base`, after validating
|
||||
`conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`.
|
||||
Pass `conf_base={}` to get a sanitized version of `conf_new`.
|
||||
NOTE: `conf_base` is NOT checked for invalid keys. Sanitize beforehand.
|
||||
"""
|
||||
parsed = deepcopy(conf_base)
|
||||
|
||||
for outer_name, outer_conf in deepcopy(conf_new).items():
|
||||
# only handle known config sections
|
||||
if outer_name not in CONFIG_DEFAULTS.keys():
|
||||
logging.warning(f'Skipped unknown config section "{outer_name}"')
|
||||
continue
|
||||
logging.debug(f'Parsing config section "{outer_name}"')
|
||||
# check if outer_conf is a dict
|
||||
if not isinstance(outer_conf, dict):
|
||||
parsed[outer_name] = outer_conf
|
||||
else:
|
||||
# init section
|
||||
if outer_name not in parsed:
|
||||
parsed[outer_name] = {}
|
||||
|
||||
# profiles need special handling:
|
||||
# 1. profile names are unknown keys by definition, but we want 'default' to exist
|
||||
# 2. A profile's subkeys must be compared against PROFILE_DEFAULTS.keys()
|
||||
if outer_name == 'profiles':
|
||||
if warn_missing_defaultprofile and 'default' not in outer_conf.keys():
|
||||
logging.warning('Default profile is not defined in config file')
|
||||
|
||||
for profile_name, profile_conf in outer_conf.items():
|
||||
if not isinstance(profile_conf, dict):
|
||||
if profile_name == 'current':
|
||||
parsed[outer_name][profile_name] = profile_conf
|
||||
else:
|
||||
logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed')
|
||||
continue
|
||||
|
||||
# init profile
|
||||
if profile_name not in parsed[outer_name]:
|
||||
parsed[outer_name][profile_name] = {}
|
||||
|
||||
for key, val in profile_conf.items():
|
||||
if key not in PROFILE_DEFAULTS:
|
||||
logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"')
|
||||
continue
|
||||
parsed[outer_name][profile_name][key] = val
|
||||
|
||||
else:
|
||||
# handle generic inner config dict
|
||||
for inner_name, inner_conf in outer_conf.items():
|
||||
if inner_name not in CONFIG_DEFAULTS[outer_name].keys():
|
||||
logging.warning(f'Skipped unknown config item "{inner_name}" in "{outer_name}"')
|
||||
continue
|
||||
parsed[outer_name][inner_name] = inner_conf
|
||||
|
||||
return parsed
|
||||
|
||||
|
||||
def dump_toml(conf) -> str:
|
||||
return toml.dumps(conf)
|
||||
|
||||
|
||||
def dump_file(file_path: str, config: dict, file_mode: int = 0o600):
|
||||
|
||||
def _opener(path, flags):
|
||||
return os.open(path, flags, file_mode)
|
||||
|
||||
conf_dir = os.path.dirname(file_path)
|
||||
if not os.path.exists(conf_dir):
|
||||
os.makedirs(conf_dir)
|
||||
old_umask = os.umask(0)
|
||||
with open(file_path, 'w', opener=_opener) as f:
|
||||
f.write(dump_toml(conf=config))
|
||||
os.umask(old_umask)
|
||||
|
||||
|
||||
def parse_file(config_file: str, base: dict = CONFIG_DEFAULTS) -> dict:
|
||||
"""
|
||||
Parse the toml contents of `config_file`, validating keys against `CONFIG_DEFAULTS`.
|
||||
The parsed results are semantically merged into `base` before returning.
|
||||
`base` itself is NOT checked for invalid keys.
|
||||
"""
|
||||
_conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH
|
||||
logging.debug(f'Trying to load config file: {_conf_file}')
|
||||
loaded_conf = toml.load(_conf_file)
|
||||
return merge_configs(conf_new=loaded_conf, conf_base=base)
|
||||
|
||||
|
||||
class ConfigLoadException(Exception):
|
||||
inner = None
|
||||
|
||||
def __init__(self, extra_msg='', inner_exception: Exception = None):
|
||||
msg: list[str] = ['Config load failed!']
|
||||
if extra_msg:
|
||||
msg.append(extra_msg)
|
||||
if inner_exception:
|
||||
self.inner = inner_exception
|
||||
msg.append(str(inner_exception))
|
||||
super().__init__(self, ' '.join(msg))
|
||||
|
||||
|
||||
class ConfigLoadState:
|
||||
load_finished = False
|
||||
exception = None
|
||||
|
||||
|
||||
class ConfigStateHolder:
|
||||
# config options that are persisted to file
|
||||
file: dict = {}
|
||||
# runtime config not persisted anywhere
|
||||
runtime: dict
|
||||
file_state: ConfigLoadState
|
||||
_profile_cache: dict[str, Profile]
|
||||
|
||||
def __init__(self, file_conf_path: Optional[str] = None, runtime_conf={}, file_conf_base: dict = {}):
|
||||
"""init a stateholder, optionally loading `file_conf_path`"""
|
||||
self.file_state = ConfigLoadState()
|
||||
self.runtime = CONFIG_RUNTIME_DEFAULTS.copy()
|
||||
self.runtime.update(runtime_conf)
|
||||
self.runtime['arch'] = os.uname().machine
|
||||
self.file.update(file_conf_base)
|
||||
if file_conf_path:
|
||||
self.try_load_file(file_conf_path)
|
||||
|
||||
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
|
||||
config_file = config_file or CONFIG_DEFAULT_PATH
|
||||
self.runtime['config_file'] = config_file
|
||||
self._profile_cache = None
|
||||
try:
|
||||
self.file = parse_file(config_file=config_file, base=base)
|
||||
except Exception as ex:
|
||||
self.file_state.exception = ex
|
||||
self.file_state.load_finished = True
|
||||
|
||||
def is_loaded(self) -> bool:
|
||||
"returns True if a file was **sucessfully** loaded"
|
||||
return self.file_state.load_finished and self.file_state.exception is None
|
||||
|
||||
def enforce_config_loaded(self):
|
||||
if not self.file_state.load_finished:
|
||||
raise ConfigLoadException(Exception("Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O"))
|
||||
ex = self.file_state.exception
|
||||
if ex:
|
||||
if type(ex) == FileNotFoundError:
|
||||
ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?")
|
||||
raise ex
|
||||
|
||||
def get_profile(self, name: Optional[str] = None) -> Profile:
|
||||
name = name or self.file['profiles']['current']
|
||||
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
|
||||
return self._profile_cache[name]
|
||||
|
||||
def get_path(self, path_name: str) -> str:
|
||||
paths = self.file['paths']
|
||||
return resolve_path_template(paths[path_name], paths)
|
||||
|
||||
def get_package_dir(self, arch: str):
|
||||
return os.path.join(self.get_path('packages'), arch)
|
||||
|
||||
def dump(self) -> str:
|
||||
"""dump toml representation of `self.file`"""
|
||||
return dump_toml(self.file)
|
||||
|
||||
def write(self, path=None):
|
||||
"""write toml representation of `self.file` to `path`"""
|
||||
if path is None:
|
||||
path = self.runtime['config_file']
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
dump_file(path, self.file)
|
||||
logging.info(f'Created config file at {path}')
|
||||
|
||||
def invalidate_profile_cache(self):
|
||||
"""Clear the profile cache (usually after modification)"""
|
||||
self._profile_cache = None
|
||||
|
||||
def update(self, config_fragment: dict[str, dict], warn_missing_defaultprofile: bool = True) -> bool:
|
||||
"""Update `self.file` with `config_fragment`. Returns `True` if the config was changed"""
|
||||
merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile)
|
||||
changed = self.file != merged
|
||||
self.file = merged
|
||||
if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']:
|
||||
self.invalidate_profile_cache()
|
||||
return changed
|
||||
|
||||
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
|
||||
new = {}
|
||||
if name not in self.file['profiles']:
|
||||
if not create:
|
||||
raise Exception(f'Unknown profile: {name}')
|
||||
else:
|
||||
if merge:
|
||||
new = deepcopy(self.file['profiles'][name])
|
||||
|
||||
logging.debug(f'new: {new}')
|
||||
logging.debug(f'profile: {profile}')
|
||||
new |= profile
|
||||
|
||||
if prune:
|
||||
new = {key: val for key, val in new.items() if val is not None}
|
||||
self.file['profiles'][name] = new
|
||||
self.invalidate_profile_cache()
|
|
@ -5,8 +5,9 @@ from tempfile import mktemp, gettempdir as get_system_tempdir
|
|||
import toml
|
||||
from typing import Optional
|
||||
|
||||
from config import CONFIG_DEFAULTS, ConfigStateHolder, PROFILE_DEFAULTS
|
||||
from config.profile import PROFILE_DEFAULTS
|
||||
from config.scheme import Config, Profile
|
||||
from config.state import CONFIG_DEFAULTS, ConfigStateHolder
|
||||
|
||||
|
||||
def get_filename():
|
||||
|
|
|
@ -5,7 +5,8 @@ import pathlib
|
|||
|
||||
from typing import Protocol
|
||||
|
||||
from config import config, dump_file as dump_config_file
|
||||
from config import config
|
||||
from config.state import dump_file as dump_config_file
|
||||
from constants import CHROOT_PATHS
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue