mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-22 21:25:43 -05:00
config: add data schemas based on munch in scheme.py, add unit tests
This commit is contained in:
parent
4298d15178
commit
16fd2f1590
4 changed files with 193 additions and 8 deletions
|
@ -3,7 +3,7 @@
|
|||
Kupfer Linux bootstrapping tool - drives pacstrap, makepkg, mkfs and fastboot, just to name a few.
|
||||
|
||||
## Installation
|
||||
Install Docker, Python 3 with libraries `click`, `appdirs`, `joblib`, `toml`, `typing_extentions`, and `coloredlogs` and put `bin/` into your `PATH`.
|
||||
Install Docker, Python 3 with the libraries from `requirements.txt` and put `bin/` into your `PATH`.
|
||||
Then use `kupferbootstrap`.
|
||||
|
||||
## Usage
|
||||
|
|
159
config/scheme.py
Normal file
159
config/scheme.py
Normal file
|
@ -0,0 +1,159 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
|
||||
from munch import Munch
|
||||
|
||||
|
||||
def munchclass(*args, init=False, **kwargs):
|
||||
return dataclass(*args, init=init, **kwargs)
|
||||
|
||||
|
||||
def resolve_type_hint(hint: type):
|
||||
origin = get_origin(hint)
|
||||
args: Iterable[type] = get_args(hint)
|
||||
if origin is Optional:
|
||||
args = set(list(args) + [type(None)])
|
||||
if origin in [Union, Optional]:
|
||||
results = []
|
||||
for arg in args:
|
||||
results += resolve_type_hint(arg)
|
||||
return results
|
||||
return [origin or hint]
|
||||
|
||||
|
||||
class DataClass(Munch):
|
||||
|
||||
def __init__(self, validate: bool = True, **kwargs):
|
||||
self.update(kwargs, validate=validate)
|
||||
|
||||
@classmethod
|
||||
def transform(cls, values: Mapping[str, Any], validate: bool = True) -> Any:
|
||||
results = {}
|
||||
values = dict(values)
|
||||
for key in list(values.keys()):
|
||||
value = values.pop(key)
|
||||
type_hints = cls._type_hints
|
||||
if key in type_hints:
|
||||
_classes = tuple(resolve_type_hint(type_hints[key]))
|
||||
if issubclass(_classes[0], dict):
|
||||
assert isinstance(value, dict)
|
||||
target_class = _classes[0]
|
||||
if not issubclass(_classes[0], Munch):
|
||||
target_class = DataClass
|
||||
if not isinstance(value, target_class):
|
||||
value = target_class.fromDict(value, validate=validate)
|
||||
if validate:
|
||||
if not isinstance(value, _classes):
|
||||
raise Exception(f'key "{key}" has value of wrong type {_classes}: {value}')
|
||||
elif validate:
|
||||
raise Exception(f'Unknown key "{key}"')
|
||||
else:
|
||||
if isinstance(value, dict) and not isinstance(value, Munch):
|
||||
value = DataClass.fromDict(value)
|
||||
results[key] = value
|
||||
if values:
|
||||
if validate:
|
||||
raise Exception(f'values contained unknown keys: {list(values.keys())}')
|
||||
results |= values
|
||||
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
|
||||
return cls(**cls.transform(values, validate))
|
||||
|
||||
def update(self, d: Mapping[str, Any], validate: bool = True):
|
||||
Munch.update(self, type(self).transform(d, validate))
|
||||
|
||||
def __init_subclass__(cls):
|
||||
super().__init_subclass__()
|
||||
cls._type_hints = get_type_hints(cls)
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Profile(DataClass):
|
||||
parent: str
|
||||
device: str
|
||||
flavour: str
|
||||
pkgs_include: list[str]
|
||||
pkgs_exclude: list[str]
|
||||
hostname: str
|
||||
username: str
|
||||
password: Optional[str]
|
||||
size_extra_mb: Union[str, int]
|
||||
|
||||
|
||||
@munchclass()
|
||||
class WrapperSection(DataClass):
|
||||
type: str # NOTE: rename to 'wrapper_type' if this causes problems
|
||||
|
||||
|
||||
@munchclass()
|
||||
class BuildSection(DataClass):
|
||||
ccache: bool
|
||||
clean_mode: bool
|
||||
crosscompile: bool
|
||||
crossdirect: bool
|
||||
threads: int
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PkgbuildsSection(DataClass):
|
||||
git_repo: str
|
||||
git_branch: str
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PacmanSection(DataClass):
|
||||
parallel_downloads: int
|
||||
check_space: bool
|
||||
repo_branch: str
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PathsSection(DataClass):
|
||||
cache_dir: str
|
||||
chroots: str
|
||||
pacman: str
|
||||
packages: str
|
||||
pkgbuilds: str
|
||||
jumpdrive: str
|
||||
images: str
|
||||
|
||||
|
||||
@munchclass()
|
||||
class ProfilesSection(DataClass):
|
||||
current: str
|
||||
default: Profile
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Config(DataClass):
|
||||
wrapper: WrapperSection
|
||||
build: BuildSection
|
||||
pkgbuilds: PkgbuildsSection
|
||||
pacman: PacmanSection
|
||||
paths: PathsSection
|
||||
profiles: ProfilesSection
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, values: Mapping[str, Any], validate: bool = True, allow_incomplete: bool = False):
|
||||
values = dict(values) # copy for later modification
|
||||
_vals = {}
|
||||
for name, _class in cls._type_hints.items():
|
||||
if name not in values:
|
||||
if not allow_incomplete:
|
||||
raise Exception(f'Config key {name} not found')
|
||||
continue
|
||||
value = values.pop(name)
|
||||
if not isinstance(value, _class):
|
||||
value = _class.fromDict(value, validate=validate)
|
||||
_vals[name] = value
|
||||
|
||||
if values:
|
||||
if validate:
|
||||
raise Exception(f'values contained unknown keys: {list(values.keys())}')
|
||||
_vals |= values
|
||||
|
||||
return Config(**_vals, validate=validate)
|
|
@ -5,7 +5,8 @@ from tempfile import mktemp, gettempdir as get_system_tempdir
|
|||
import toml
|
||||
from typing import Optional
|
||||
|
||||
from config import CONFIG_DEFAULTS, ConfigStateHolder, Profile
|
||||
from config import CONFIG_DEFAULTS, ConfigStateHolder, PROFILE_DEFAULTS
|
||||
from config.scheme import Config, Profile
|
||||
|
||||
|
||||
def get_filename():
|
||||
|
@ -85,14 +86,17 @@ def dict_filter_out_None(d: dict):
|
|||
return {k: v for k, v in d.items() if v is not None}
|
||||
|
||||
|
||||
def compare_to_defaults(config: dict, defaults: dict = CONFIG_DEFAULTS):
|
||||
def compare_to_defaults(config: dict, defaults: dict = CONFIG_DEFAULTS, filter_None_from_defaults: Optional[bool] = None):
|
||||
if filter_None_from_defaults is None:
|
||||
filter_None_from_defaults = not isinstance(config, Config)
|
||||
# assert sections match
|
||||
assert config.keys() == defaults.keys()
|
||||
for section, section_defaults in defaults.items():
|
||||
assert section in config
|
||||
assert isinstance(section_defaults, dict)
|
||||
# Filter out None values from defaults - they're not written unless set
|
||||
section_defaults = dict_filter_out_None(section_defaults)
|
||||
if filter_None_from_defaults:
|
||||
section_defaults = dict_filter_out_None(section_defaults)
|
||||
section_values_config = config[section]
|
||||
if section != 'profiles':
|
||||
assert section_values_config == section_defaults
|
||||
|
@ -101,8 +105,12 @@ def compare_to_defaults(config: dict, defaults: dict = CONFIG_DEFAULTS):
|
|||
assert CURRENT_KEY in section_defaults.keys()
|
||||
assert section_defaults.keys() == section_values_config.keys()
|
||||
assert section_defaults[CURRENT_KEY] == section_values_config[CURRENT_KEY]
|
||||
for key in set(section_defaults.keys()) - set([CURRENT_KEY]):
|
||||
assert dict_filter_out_None(section_defaults[key]) == section_values_config[key]
|
||||
for profile_name, profile in section_defaults.items():
|
||||
if profile_name == CURRENT_KEY:
|
||||
continue # not a profile
|
||||
if filter_None_from_defaults:
|
||||
profile = dict_filter_out_None(profile)
|
||||
assert profile == section_values_config[profile_name]
|
||||
|
||||
|
||||
def load_toml_file(path) -> dict:
|
||||
|
@ -111,6 +119,7 @@ def load_toml_file(path) -> dict:
|
|||
assert text
|
||||
return toml.loads(text)
|
||||
|
||||
|
||||
def get_path_from_stateholder(c: ConfigStateHolder):
|
||||
return c.runtime['config_file']
|
||||
|
||||
|
@ -141,8 +150,24 @@ def test_config_save_modified(configstate_emptyfile: ConfigStateHolder):
|
|||
compare_to_defaults(load_toml_file(get_path_from_stateholder(c)), defaults_modified)
|
||||
|
||||
|
||||
def test_config_scheme_defaults():
|
||||
c = Config.fromDict(CONFIG_DEFAULTS, validate=True, allow_incomplete=False)
|
||||
assert c
|
||||
compare_to_defaults(c)
|
||||
|
||||
|
||||
def test_config_scheme_modified():
|
||||
modifications = {'wrapper': {'type': 'none'}, 'build': {'crossdirect': False}}
|
||||
assert set(modifications.keys()).issubset(CONFIG_DEFAULTS.keys())
|
||||
d = {section_name: (section | modifications.get(section_name, {})) for section_name, section in CONFIG_DEFAULTS.items()}
|
||||
c = Config.fromDict(d, validate=True, allow_incomplete=False)
|
||||
assert c
|
||||
assert c.build.crossdirect is False
|
||||
assert c.wrapper.type == 'none'
|
||||
|
||||
|
||||
def test_profile():
|
||||
p = None
|
||||
p = Profile()
|
||||
p = Profile.fromDict(PROFILE_DEFAULTS)
|
||||
assert p is not None
|
||||
assert isinstance(p, dict)
|
||||
assert isinstance(p, Profile)
|
||||
|
|
|
@ -4,3 +4,4 @@ joblib==1.0.1
|
|||
toml
|
||||
typing_extensions
|
||||
coloredlogs
|
||||
munch
|
||||
|
|
Loading…
Add table
Reference in a new issue