kupferbootstrap/config/state.py

322 lines
13 KiB
Python
Raw Normal View History

import appdirs
import logging
import os
import toml
from copy import deepcopy
from typing import Mapping, Optional
from constants import DEFAULT_PACKAGE_BRANCH
from .scheme import Config, ConfigLoadState, DataClass, Profile, RuntimeConfiguration
from .profile import PROFILE_DEFAULTS, PROFILE_DEFAULTS_DICT, resolve_profile
CONFIG_DIR = appdirs.user_config_dir('kupfer')
CACHE_DIR = appdirs.user_cache_dir('kupfer')
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
CONFIG_DEFAULTS_DICT = {
'wrapper': {
'type': 'docker',
},
'build': {
'ccache': True,
'clean_mode': True,
'crosscompile': True,
'crossdirect': True,
'threads': 0,
},
'pkgbuilds': {
'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git',
'git_branch': DEFAULT_PACKAGE_BRANCH,
},
'pacman': {
'parallel_downloads': 4,
2022-08-25 16:53:13 +02:00
'check_space': False, # TODO: investigate why True causes issues
'repo_branch': DEFAULT_PACKAGE_BRANCH,
},
'paths': {
'cache_dir': CACHE_DIR,
'chroots': os.path.join('%cache_dir%', 'chroots'),
'pacman': os.path.join('%cache_dir%', 'pacman'),
'packages': os.path.join('%cache_dir%', 'packages'),
'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'),
'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'),
'images': os.path.join('%cache_dir%', 'images'),
'ccache': os.path.join('%cache_dir%', 'ccache'),
2022-08-29 04:43:12 +02:00
'rust': os.path.join('%cache_dir%', 'rust'),
},
'profiles': {
'current': 'default',
'default': deepcopy(PROFILE_DEFAULTS_DICT),
},
}
CONFIG_DEFAULTS: Config = Config.fromDict(CONFIG_DEFAULTS_DICT)
CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
2022-08-18 05:37:24 +02:00
CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'verbose': False,
'no_wrap': False,
'error_shell': False,
2022-08-27 03:49:07 +02:00
'config_file': None,
'script_source_dir': None,
'arch': None,
'uid': None,
2022-08-18 05:37:24 +02:00
})
def resolve_path_template(path_template: str, paths: dict[str, str]) -> str:
terminator = '%' # i'll be back
result = path_template
for path_name, path in paths.items():
result = result.replace(terminator + path_name + terminator, path)
return result
def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> dict[str, dict]:
"""checks the input config dict for unknown keys and returns only the known parts"""
return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile)
def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]:
"""
Returns `conf_new` semantically merged into `conf_base`, after validating
`conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`.
Pass `conf_base={}` to get a sanitized version of `conf_new`.
NOTE: `conf_base` is NOT checked for invalid keys. Sanitize beforehand.
"""
parsed = deepcopy(dict(conf_base))
for outer_name, outer_conf in deepcopy(conf_new).items():
# only handle known config sections
if outer_name not in CONFIG_SECTIONS:
logging.warning(f'Skipped unknown config section "{outer_name}"')
continue
logging.debug(f'Parsing config section "{outer_name}"')
# check if outer_conf is a dict
if not (isinstance(outer_conf, (dict, DataClass))):
parsed[outer_name] = outer_conf
else:
# init section
if outer_name not in parsed:
parsed[outer_name] = {}
# profiles need special handling:
# 1. profile names are unknown keys by definition, but we want 'default' to exist
# 2. A profile's subkeys must be compared against PROFILE_DEFAULTS.keys()
if outer_name == 'profiles':
if warn_missing_defaultprofile and 'default' not in outer_conf.keys():
logging.warning('Default profile is not defined in config file')
update = dict[str, dict]()
for profile_name, profile_conf in outer_conf.items():
if not isinstance(profile_conf, (dict, Profile)):
if profile_name == 'current':
parsed[outer_name][profile_name] = profile_conf
else:
logging.warning(f'Skipped key "{profile_name}" in profile section: only subsections and "current" allowed')
continue
# init profile
if profile_name in parsed[outer_name]:
profile = parsed[outer_name][profile_name]
else:
profile = {}
for key, val in profile_conf.items():
if key not in PROFILE_DEFAULTS:
logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"')
continue
profile[key] = val
update |= {profile_name: profile}
parsed[outer_name].update(update)
else:
# handle generic inner config dict
for inner_name, inner_conf in outer_conf.items():
if inner_name not in CONFIG_DEFAULTS[outer_name].keys():
logging.warning(f'Skipped unknown config item "{inner_name}" in section "{outer_name}"')
continue
parsed[outer_name][inner_name] = inner_conf
return parsed
def dump_toml(conf) -> str:
return toml.dumps(conf)
def dump_file(file_path: str, config: dict, file_mode: int = 0o600):
def _opener(path, flags):
return os.open(path, flags, file_mode)
conf_dir = os.path.dirname(file_path)
if not os.path.exists(conf_dir):
os.makedirs(conf_dir)
old_umask = os.umask(0)
with open(file_path, 'w', opener=_opener) as f:
f.write(dump_toml(conf=config))
os.umask(old_umask)
def parse_file(config_file: str, base: dict = CONFIG_DEFAULTS) -> dict:
"""
Parse the toml contents of `config_file`, validating keys against `CONFIG_DEFAULTS`.
The parsed results are semantically merged into `base` before returning.
`base` itself is NOT checked for invalid keys.
"""
_conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH
logging.debug(f'Trying to load config file: {_conf_file}')
loaded_conf = toml.load(_conf_file)
return merge_configs(conf_new=loaded_conf, conf_base=base)
class ConfigLoadException(Exception):
inner = None
def __init__(self, extra_msg='', inner_exception: Optional[Exception] = None):
msg: list[str] = ['Config load failed!']
if extra_msg:
msg.append(extra_msg)
if inner_exception:
self.inner = inner_exception
msg.append(str(inner_exception))
super().__init__(self, ' '.join(msg))
class ConfigStateHolder:
# config options that are persisted to file
2022-08-18 05:37:24 +02:00
file: Config
# runtime config not persisted anywhere
2022-08-18 05:37:24 +02:00
runtime: RuntimeConfiguration
file_state: ConfigLoadState
_profile_cache: Optional[dict[str, Profile]]
def __init__(self, file_conf_path: Optional[str] = None, runtime_conf={}, file_conf_base: dict = {}):
"""init a stateholder, optionally loading `file_conf_path`"""
2022-08-18 05:37:24 +02:00
self.file = Config.fromDict(merge_configs(conf_new=file_conf_base, conf_base=CONFIG_DEFAULTS))
self.file_state = ConfigLoadState()
2022-08-18 05:37:24 +02:00
self.runtime = RuntimeConfiguration.fromDict(CONFIG_RUNTIME_DEFAULTS | runtime_conf)
2022-08-27 03:49:07 +02:00
self.runtime.arch = os.uname().machine
self.runtime.script_source_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
self.runtime.uid = os.getuid()
self._profile_cache = {}
if file_conf_path:
self.try_load_file(file_conf_path)
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS_DICT):
config_file = config_file or CONFIG_DEFAULT_PATH
self.runtime.config_file = config_file
self._profile_cache = None
try:
self.file = Config.fromDict(parse_file(config_file=config_file, base=base), validate=True)
self.file_state.exception = None
except Exception as ex:
self.file_state.exception = ex
self.file_state.load_finished = True
def is_loaded(self) -> bool:
"returns True if a file was **sucessfully** loaded"
return self.file_state.load_finished and self.file_state.exception is None
def enforce_config_loaded(self):
if not self.file_state.load_finished:
m = "Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O"
raise ConfigLoadException(Exception(m))
ex = self.file_state.exception
if ex:
if type(ex) == FileNotFoundError:
ex = Exception("Config file doesn't exist. Try running `kupferbootstrap config init` first?")
raise ex
def get_profile(self, name: Optional[str] = None) -> Profile:
name = name or self.file.profiles.current
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file.profiles, resolved=self._profile_cache)
return self._profile_cache[name]
def _enforce_profile_field(self, field: str, profile_name: Optional[str] = None, hint_or_set_arch: bool = False) -> Profile:
# TODO: device
profile_name = profile_name if profile_name is not None else self.file.profiles.current
arch_hint = ''
if not hint_or_set_arch:
self.enforce_config_loaded()
else:
arch_hint = (' or specifiy the target architecture by passing `--arch` to the current command,\n'
'e.g. `kupferbootstrap packages build --arch x86_64`')
if not self.is_loaded():
if not self.file_state.exception:
raise Exception(f'Error enforcing config profile {field}: config hadn\'t even been loaded yet.\n'
'This is a bug in kupferbootstrap!')
raise Exception(f"Profile {field} couldn't be resolved because the config file couldn't be loaded.\n"
"If the config doesn't exist, try running `kupferbootstrap config init`.\n"
f"Error: {self.file_state.exception}")
if profile_name and profile_name not in self.file.profiles:
raise Exception(f'Unknown profile "{profile_name}". Please run `kupferbootstrap config profile init`{arch_hint}')
profile = self.get_profile(profile_name)
if field not in profile or not profile[field]:
m = (f'Profile "{profile_name}" has no {field.upper()} configured.\n'
f'Please run `kupferbootstrap config profile init {field}`{arch_hint}')
raise Exception(m)
return profile
def enforce_profile_device_set(self, **kwargs) -> Profile:
return self._enforce_profile_field(field='device', **kwargs)
def enforce_profile_flavour_set(self, **kwargs) -> Profile:
return self._enforce_profile_field(field='flavour', **kwargs)
def get_path(self, path_name: str) -> str:
paths = self.file.paths
return resolve_path_template(paths[path_name], paths)
def get_package_dir(self, arch: str):
return os.path.join(self.get_path('packages'), arch)
def dump(self) -> str:
"""dump toml representation of `self.file`"""
return dump_toml(self.file)
def write(self, path=None):
"""write toml representation of `self.file` to `path`"""
if path is None:
path = self.runtime.config_file
assert path
os.makedirs(os.path.dirname(path), exist_ok=True)
new = not os.path.exists(path)
dump_file(path, self.file)
logging.info(f'{"Created" if new else "Written changes to"} config file at {path}')
def invalidate_profile_cache(self):
"""Clear the profile cache (usually after modification)"""
self._profile_cache = None
def update(self, config_fragment: dict[str, dict], warn_missing_defaultprofile: bool = True) -> bool:
"""Update `self.file` with `config_fragment`. Returns `True` if the config was changed"""
merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile)
changed = self.file.toDict() != merged
2022-08-18 05:37:24 +02:00
self.file.update(merged)
if changed and 'profiles' in config_fragment and self.file.profiles.toDict() != config_fragment['profiles']:
self.invalidate_profile_cache()
return changed
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
new = {}
if name not in self.file.profiles:
if not create:
raise Exception(f'Unknown profile: {name}')
else:
if merge:
new = deepcopy(self.file.profiles[name])
logging.debug(f'new: {new}')
logging.debug(f'profile: {profile}')
new |= profile
if prune:
new = {key: val for key, val in new.items() if val is not None}
self.file.profiles[name] = new
self.invalidate_profile_cache()
config: ConfigStateHolder = ConfigStateHolder(file_conf_base=CONFIG_DEFAULTS)