config/: rework code around parsing profiles, add scheme.SparseProfile to account for the partial profiles in config
This commit is contained in:
parent
688f9e2375
commit
2d13d82943
4 changed files with 123 additions and 34 deletions
|
@ -2,9 +2,9 @@ import logging
|
||||||
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
from .scheme import Profile
|
from .scheme import Profile, SparseProfile
|
||||||
|
|
||||||
PROFILE_DEFAULTS = Profile.fromDict({
|
PROFILE_DEFAULTS_DICT = {
|
||||||
'parent': '',
|
'parent': '',
|
||||||
'device': '',
|
'device': '',
|
||||||
'flavour': '',
|
'flavour': '',
|
||||||
|
@ -14,14 +14,15 @@ PROFILE_DEFAULTS = Profile.fromDict({
|
||||||
'username': 'kupfer',
|
'username': 'kupfer',
|
||||||
'password': None,
|
'password': None,
|
||||||
'size_extra_mb': "0",
|
'size_extra_mb': "0",
|
||||||
})
|
}
|
||||||
|
PROFILE_DEFAULTS = Profile.fromDict(PROFILE_DEFAULTS_DICT)
|
||||||
|
|
||||||
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
|
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
|
||||||
|
|
||||||
|
|
||||||
def resolve_profile(
|
def resolve_profile(
|
||||||
name: str,
|
name: str,
|
||||||
sparse_profiles: dict[str, Profile],
|
sparse_profiles: dict[str, SparseProfile],
|
||||||
resolved: dict[str, Profile] = None,
|
resolved: dict[str, Profile] = None,
|
||||||
_visited=None,
|
_visited=None,
|
||||||
) -> dict[str, Profile]:
|
) -> dict[str, Profile]:
|
||||||
|
@ -45,8 +46,10 @@ def resolve_profile(
|
||||||
|
|
||||||
logging.debug(f'Resolving profile {name}')
|
logging.debug(f'Resolving profile {name}')
|
||||||
_visited.append(name)
|
_visited.append(name)
|
||||||
sparse = sparse_profiles[name]
|
sparse = sparse_profiles[name].copy()
|
||||||
full = deepcopy(sparse)
|
full = deepcopy(sparse)
|
||||||
|
if name != 'default' and 'parent' not in sparse:
|
||||||
|
sparse['parent'] = 'default'
|
||||||
if 'parent' in sparse and (parent_name := sparse['parent']):
|
if 'parent' in sparse and (parent_name := sparse['parent']):
|
||||||
parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name]
|
parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name]
|
||||||
full = parent | sparse
|
full = parent | sparse
|
||||||
|
@ -71,13 +74,13 @@ def resolve_profile(
|
||||||
full['pkgs_exclude'] = list(excludes)
|
full['pkgs_exclude'] = list(excludes)
|
||||||
|
|
||||||
# now init missing keys
|
# now init missing keys
|
||||||
for key, value in PROFILE_DEFAULTS.items():
|
for key, value in PROFILE_DEFAULTS_DICT.items():
|
||||||
if key not in full.keys():
|
if key not in full.keys():
|
||||||
full[key] = None # type: ignore[literal-required]
|
full[key] = value # type: ignore[literal-required]
|
||||||
if type(value) == list:
|
if type(value) == list:
|
||||||
full[key] = [] # type: ignore[literal-required]
|
full[key] = [] # type: ignore[literal-required]
|
||||||
|
|
||||||
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)
|
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)
|
||||||
|
|
||||||
resolved[name] = full
|
resolved[name] = Profile.fromDict(full)
|
||||||
return resolved
|
return resolved
|
||||||
|
|
|
@ -8,7 +8,7 @@ from constants import Arch
|
||||||
|
|
||||||
|
|
||||||
def munchclass(*args, init=False, **kwargs):
|
def munchclass(*args, init=False, **kwargs):
|
||||||
return dataclass(*args, init=init, **kwargs)
|
return dataclass(*args, init=init, slots=True, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def resolve_type_hint(hint: type):
|
def resolve_type_hint(hint: type):
|
||||||
|
@ -26,8 +26,8 @@ def resolve_type_hint(hint: type):
|
||||||
|
|
||||||
class DataClass(Munch):
|
class DataClass(Munch):
|
||||||
|
|
||||||
def __init__(self, validate: bool = True, **kwargs):
|
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
|
||||||
self.update(kwargs, validate=validate)
|
self.update(d | kwargs, validate=validate)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def transform(cls, values: Mapping[str, Any], validate: bool = True) -> Any:
|
def transform(cls, values: Mapping[str, Any], validate: bool = True) -> Any:
|
||||||
|
@ -72,10 +72,29 @@ class DataClass(Munch):
|
||||||
super().__init_subclass__()
|
super().__init_subclass__()
|
||||||
cls._type_hints = get_type_hints(cls)
|
cls._type_hints = get_type_hints(cls)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'{type(self)}{dict.__repr__(self.toDict())}'
|
||||||
|
|
||||||
|
|
||||||
@munchclass()
|
@munchclass()
|
||||||
class Profile(DataClass):
|
class SparseProfile(DataClass):
|
||||||
parent: str
|
parent: Optional[str]
|
||||||
|
device: Optional[str]
|
||||||
|
flavour: Optional[str]
|
||||||
|
pkgs_include: Optional[list[str]]
|
||||||
|
pkgs_exclude: Optional[list[str]]
|
||||||
|
hostname: Optional[str]
|
||||||
|
username: Optional[str]
|
||||||
|
password: Optional[str]
|
||||||
|
size_extra_mb: Optional[Union[str, int]]
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'{type(self)}{dict.__repr__(self.toDict())}'
|
||||||
|
|
||||||
|
|
||||||
|
@munchclass()
|
||||||
|
class Profile(SparseProfile):
|
||||||
|
parent: Optional[str]
|
||||||
device: str
|
device: str
|
||||||
flavour: str
|
flavour: str
|
||||||
pkgs_include: list[str]
|
pkgs_include: list[str]
|
||||||
|
@ -124,10 +143,9 @@ class PathsSection(DataClass):
|
||||||
images: str
|
images: str
|
||||||
|
|
||||||
|
|
||||||
@munchclass()
|
|
||||||
class ProfilesSection(DataClass):
|
class ProfilesSection(DataClass):
|
||||||
current: str
|
current: str
|
||||||
default: Profile
|
default: SparseProfile
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def transform(cls, values: Mapping[str, Any], validate: bool = True):
|
def transform(cls, values: Mapping[str, Any], validate: bool = True):
|
||||||
|
@ -136,11 +154,16 @@ class ProfilesSection(DataClass):
|
||||||
if k == 'current':
|
if k == 'current':
|
||||||
results[k] = v
|
results[k] = v
|
||||||
continue
|
continue
|
||||||
results[k] = Profile.fromDict(v, validate=True)
|
if not isinstance(v, dict):
|
||||||
|
raise Exception(f'profile {v} is not a dict!')
|
||||||
|
results[k] = SparseProfile.fromDict(v, validate=True)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def update(self, d, validate: bool = True):
|
def update(self, d, validate: bool = True):
|
||||||
Munch.update(self, self.transform(d, validate=validate))
|
Munch.update(self, self.transform(values=d, validate=validate))
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'{type(self)}{dict.__repr__(self.toDict())}'
|
||||||
|
|
||||||
|
|
||||||
@munchclass()
|
@munchclass()
|
||||||
|
@ -184,7 +207,11 @@ class RuntimeConfiguration(DataClass):
|
||||||
error_shell: bool
|
error_shell: bool
|
||||||
|
|
||||||
|
|
||||||
@munchclass()
|
|
||||||
class ConfigLoadState(DataClass):
|
class ConfigLoadState(DataClass):
|
||||||
load_finished: bool = False
|
load_finished: bool
|
||||||
exception: Optional[Exception] = None
|
exception: Optional[Exception]
|
||||||
|
|
||||||
|
def __init__(self, d: dict = {}):
|
||||||
|
self.load_finished = False
|
||||||
|
self.exception = None
|
||||||
|
self.update(d)
|
||||||
|
|
|
@ -7,14 +7,14 @@ from typing import Mapping, Optional
|
||||||
|
|
||||||
from constants import DEFAULT_PACKAGE_BRANCH
|
from constants import DEFAULT_PACKAGE_BRANCH
|
||||||
|
|
||||||
from .scheme import Config, ConfigLoadState, Profile, RuntimeConfiguration
|
from .scheme import Config, ConfigLoadState, DataClass, Profile, RuntimeConfiguration
|
||||||
from .profile import PROFILE_DEFAULTS, resolve_profile
|
from .profile import PROFILE_DEFAULTS, PROFILE_DEFAULTS_DICT, resolve_profile
|
||||||
|
|
||||||
CONFIG_DIR = appdirs.user_config_dir('kupfer')
|
CONFIG_DIR = appdirs.user_config_dir('kupfer')
|
||||||
CACHE_DIR = appdirs.user_cache_dir('kupfer')
|
CACHE_DIR = appdirs.user_cache_dir('kupfer')
|
||||||
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
|
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
|
||||||
|
|
||||||
CONFIG_DEFAULTS: Config = Config.fromDict({
|
CONFIG_DEFAULTS_DICT = {
|
||||||
'wrapper': {
|
'wrapper': {
|
||||||
'type': 'docker',
|
'type': 'docker',
|
||||||
},
|
},
|
||||||
|
@ -45,9 +45,10 @@ CONFIG_DEFAULTS: Config = Config.fromDict({
|
||||||
},
|
},
|
||||||
'profiles': {
|
'profiles': {
|
||||||
'current': 'default',
|
'current': 'default',
|
||||||
'default': deepcopy(PROFILE_DEFAULTS),
|
'default': deepcopy(PROFILE_DEFAULTS_DICT),
|
||||||
},
|
},
|
||||||
})
|
}
|
||||||
|
CONFIG_DEFAULTS: Config = Config.fromDict(CONFIG_DEFAULTS_DICT)
|
||||||
CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
|
CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
|
||||||
|
|
||||||
CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
|
CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
|
||||||
|
@ -84,12 +85,12 @@ def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defau
|
||||||
|
|
||||||
for outer_name, outer_conf in deepcopy(conf_new).items():
|
for outer_name, outer_conf in deepcopy(conf_new).items():
|
||||||
# only handle known config sections
|
# only handle known config sections
|
||||||
if outer_name not in CONFIG_DEFAULTS.keys():
|
if outer_name not in CONFIG_SECTIONS:
|
||||||
logging.warning(f'Skipped unknown config section "{outer_name}"')
|
logging.warning(f'Skipped unknown config section "{outer_name}"')
|
||||||
continue
|
continue
|
||||||
logging.debug(f'Parsing config section "{outer_name}"')
|
logging.debug(f'Parsing config section "{outer_name}"')
|
||||||
# check if outer_conf is a dict
|
# check if outer_conf is a dict
|
||||||
if not isinstance(outer_conf, dict):
|
if not (isinstance(outer_conf, (dict, DataClass))):
|
||||||
parsed[outer_name] = outer_conf
|
parsed[outer_name] = outer_conf
|
||||||
else:
|
else:
|
||||||
# init section
|
# init section
|
||||||
|
@ -103,8 +104,9 @@ def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defau
|
||||||
if warn_missing_defaultprofile and 'default' not in outer_conf.keys():
|
if warn_missing_defaultprofile and 'default' not in outer_conf.keys():
|
||||||
logging.warning('Default profile is not defined in config file')
|
logging.warning('Default profile is not defined in config file')
|
||||||
|
|
||||||
|
update = dict[str, dict]()
|
||||||
for profile_name, profile_conf in outer_conf.items():
|
for profile_name, profile_conf in outer_conf.items():
|
||||||
if not isinstance(profile_conf, dict):
|
if not isinstance(profile_conf, (dict, Profile)):
|
||||||
if profile_name == 'current':
|
if profile_name == 'current':
|
||||||
parsed[outer_name][profile_name] = profile_conf
|
parsed[outer_name][profile_name] = profile_conf
|
||||||
else:
|
else:
|
||||||
|
@ -112,14 +114,18 @@ def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defau
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# init profile
|
# init profile
|
||||||
if profile_name not in parsed[outer_name]:
|
if profile_name in parsed[outer_name]:
|
||||||
parsed[outer_name][profile_name] = {}
|
profile = parsed[outer_name][profile_name]
|
||||||
|
else:
|
||||||
|
profile = {}
|
||||||
|
|
||||||
for key, val in profile_conf.items():
|
for key, val in profile_conf.items():
|
||||||
if key not in PROFILE_DEFAULTS:
|
if key not in PROFILE_DEFAULTS:
|
||||||
logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"')
|
logging.warning(f'Skipped unknown config item "{key}" in profile "{profile_name}"')
|
||||||
continue
|
continue
|
||||||
parsed[outer_name][profile_name][key] = val
|
profile[key] = val
|
||||||
|
update |= {profile_name: profile}
|
||||||
|
parsed[outer_name].update(update)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# handle generic inner config dict
|
# handle generic inner config dict
|
||||||
|
@ -209,11 +215,12 @@ class ConfigStateHolder:
|
||||||
|
|
||||||
def enforce_config_loaded(self):
|
def enforce_config_loaded(self):
|
||||||
if not self.file_state.load_finished:
|
if not self.file_state.load_finished:
|
||||||
raise ConfigLoadException(Exception("Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O"))
|
m = "Config file wasn't even parsed yet. This is probably a bug in kupferbootstrap :O"
|
||||||
|
raise ConfigLoadException(Exception(m))
|
||||||
ex = self.file_state.exception
|
ex = self.file_state.exception
|
||||||
if ex:
|
if ex:
|
||||||
if type(ex) == FileNotFoundError:
|
if type(ex) == FileNotFoundError:
|
||||||
ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?")
|
ex = Exception("Config file doesn't exist. Try running `kupferbootstrap config init` first?")
|
||||||
raise ex
|
raise ex
|
||||||
|
|
||||||
def get_profile(self, name: Optional[str] = None) -> Profile:
|
def get_profile(self, name: Optional[str] = None) -> Profile:
|
||||||
|
@ -221,6 +228,29 @@ class ConfigStateHolder:
|
||||||
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
|
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
|
||||||
return self._profile_cache[name]
|
return self._profile_cache[name]
|
||||||
|
|
||||||
|
def enforce_profile_device_set(self, profile_name: Optional[str] = None, hint_or_set_arch: bool = False) -> Profile:
|
||||||
|
arch_hint = ''
|
||||||
|
if not hint_or_set_arch:
|
||||||
|
self.enforce_config_loaded()
|
||||||
|
else:
|
||||||
|
arch_hint = (' or specifiy the target architecture by passing `--arch` to the current command,\n'
|
||||||
|
'e.g. `kupferbootstrap packages build --arch x86_64`')
|
||||||
|
if not self.is_loaded():
|
||||||
|
if not self.file_state.exception:
|
||||||
|
raise Exception('Error enforcing/ config profile device: config hadnt even been loaded yet.\n'
|
||||||
|
'This is a bug in kupferbootstrap!')
|
||||||
|
raise Exception("Profile device couldn't be resolved because the config file couldn't be loaded.\n"
|
||||||
|
"If the config doesn't exist, try running `kupferbootstrap config init`.\n"
|
||||||
|
f"Error: {self.file_state.exception}")
|
||||||
|
if profile_name and profile_name not in self.file.profiles:
|
||||||
|
raise Exception(f'Unknown profile "{profile_name}". Please run `kupferbootstrap config profile init`{arch_hint}')
|
||||||
|
profile = self.get_profile(profile_name)
|
||||||
|
if not profile.device:
|
||||||
|
m = (f'Profile "{profile_name}" has no device configured.\n'
|
||||||
|
f'Please run `kupferbootstrap config profile init device`{arch_hint}')
|
||||||
|
raise Exception(m)
|
||||||
|
return profile
|
||||||
|
|
||||||
def get_path(self, path_name: str) -> str:
|
def get_path(self, path_name: str) -> str:
|
||||||
paths = self.file['paths']
|
paths = self.file['paths']
|
||||||
return resolve_path_template(paths[path_name], paths)
|
return resolve_path_template(paths[path_name], paths)
|
||||||
|
|
|
@ -7,7 +7,7 @@ import toml
|
||||||
from tempfile import mktemp, gettempdir as get_system_tempdir
|
from tempfile import mktemp, gettempdir as get_system_tempdir
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from config.profile import PROFILE_DEFAULTS
|
from config.profile import Profile, PROFILE_DEFAULTS
|
||||||
from config.scheme import Config, Profile
|
from config.scheme import Config, Profile
|
||||||
from config.state import CONFIG_DEFAULTS, ConfigStateHolder
|
from config.state import CONFIG_DEFAULTS, ConfigStateHolder
|
||||||
|
|
||||||
|
@ -185,3 +185,32 @@ def test_profile():
|
||||||
p = Profile.fromDict(PROFILE_DEFAULTS)
|
p = Profile.fromDict(PROFILE_DEFAULTS)
|
||||||
assert p is not None
|
assert p is not None
|
||||||
assert isinstance(p, Profile)
|
assert isinstance(p, Profile)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_profile():
|
||||||
|
c = ConfigStateHolder()
|
||||||
|
d = {'username': 'kupfer123', 'hostname': 'test123'}
|
||||||
|
c.file.profiles['testprofile'] = d
|
||||||
|
p = c.get_profile('testprofile')
|
||||||
|
assert p
|
||||||
|
assert isinstance(p, Profile)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_profile_from_disk(configstate_emptyfile):
|
||||||
|
profile_name = 'testprofile'
|
||||||
|
device = 'sdm845-oneplus-enchilada'
|
||||||
|
c = configstate_emptyfile
|
||||||
|
c.file.profiles.default.device = device
|
||||||
|
d = {'parent': 'default', 'username': 'kupfer123', 'hostname': 'test123'}
|
||||||
|
c.file.profiles[profile_name] = d
|
||||||
|
filepath = c.runtime.config_file
|
||||||
|
assert filepath
|
||||||
|
c.write()
|
||||||
|
del c
|
||||||
|
c = ConfigStateHolder(filepath)
|
||||||
|
c.try_load_file(filepath)
|
||||||
|
c.enforce_config_loaded()
|
||||||
|
p: Profile = c.get_profile(profile_name)
|
||||||
|
assert isinstance(p, Profile)
|
||||||
|
assert 'device' in p
|
||||||
|
assert p.device == device
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue