diff --git a/config/scheme.py b/config/scheme.py index 09ece1c..a0554ad 100644 --- a/config/scheme.py +++ b/config/scheme.py @@ -81,7 +81,7 @@ class ProfilesSection(DataClass): default: SparseProfile @classmethod - def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True): + def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None): results = {} for k, v in values.items(): if k == 'current': diff --git a/constants.py b/constants.py index b24ea30..9e829d5 100644 --- a/constants.py +++ b/constants.py @@ -26,6 +26,8 @@ BASE_PACKAGES: list[str] = BASE_LOCAL_PACKAGES + [ POST_CMDS = ['kupfer-config apply'] +REPOS_CONFIG_FILE = "repos.toml" + REPOSITORIES = [ 'boot', 'cross', diff --git a/dataclass.py b/dataclass.py index 4dd9948..53d44fc 100644 --- a/dataclass.py +++ b/dataclass.py @@ -1,11 +1,12 @@ from __future__ import annotations +import toml + from dataclasses import dataclass from munch import Munch -from typing import ClassVar, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, GenericAlias, Iterable -from types import UnionType - -NoneType = type(None) +from toml.encoder import TomlEncoder, TomlPreserveInlineDictEncoder +from typing import ClassVar, Generator, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable +from types import UnionType, NoneType def munchclass(*args, init=False, **kwargs): @@ -27,36 +28,92 @@ def resolve_type_hint(hint: type, ignore_origins: list[type] = []) -> Iterable[t return [origin or hint] +def flatten_hints(hints: Any) -> Generator[Any, None, None]: + if not isinstance(hints, (list, tuple)): + yield hints + return + for i in hints: + yield from flatten_hints(i) + + +def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]: + for hint in flatten_hints(hints): + t_origin = get_origin(hint) + t_args = get_args(hint) + if t_origin == dict: + print(f"Yielding {t_args=}") + yield t_args + continue + if t_origin in [NoneType, Optional, Union, UnionType] and t_args: + yield from resolve_dict_hints(t_args) + continue + + class DataClass(Munch): _type_hints: ClassVar[dict[str, Any]] _strip_hidden: ClassVar[bool] = False _sparse: ClassVar[bool] = False - def __init__(self, d: dict = {}, validate: bool = True, **kwargs): + def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs): self.update(d | kwargs, validate=validate) @classmethod - def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False) -> Any: + def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False, type_hints: Optional[dict[str, Any]] = None) -> Any: results = {} values = dict(values) + print(f"\ntransform function:\n{values}, {type_hints=}") for key in list(values.keys()): value = values.pop(key) - type_hints = cls._type_hints + type_hints = cls._type_hints if type_hints is None else type_hints if key in type_hints: _classes = tuple[type](resolve_type_hint(type_hints[key])) optional = NoneType in _classes if issubclass(_classes[0], dict): assert isinstance(value, dict) or optional target_class = _classes[0] + if target_class in [None, NoneType, Optional]: + for target in _classes[1:]: + if target not in [None, NoneType, Optional]: + target_class = target + break if target_class is dict: - target_class = Munch + dict_hints = list(resolve_dict_hints(type_hints[key])) + print(f"Got {key=} {dict_hints=}") + if len(dict_hints) != 1: + print(f"Received wrong amount of type hints for key {key}: {len(dict_hints)}") + if len(dict_hints) == 1 and value is not None: + if len(dict_hints[0]) != 2 or not all(dict_hints[0]): + print(f"Weird dict hints received: {dict_hints}") + continue + key_type, value_type = dict_hints[0] + if not isinstance(value, Mapping): + if validate: + raise Exception(f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}") + print(f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}") + results[key] = value + continue + if isinstance(key_type, type): + if issubclass(key_type, str): + target_class = Munch + else: + print(f"{key=} DICT WRONG KEY TYPE: {key_type}") + if validate: + for k in value: + if not isinstance(k, tuple(flatten_hints(key_type))): + raise Exception(f'Subdict "{key}": wrong type for subkey "{k}": got: {type(k)}, expected: {key_type}') + dict_content_hints = {k: value_type for k in value} + print(f"tranforming: {value=} {dict_content_hints=}") + value = cls.transform(value, validate=validate, allow_extra=allow_extra, type_hints=dict_content_hints) + print(f"tranformed: {value=}") if not isinstance(value, target_class): if not (optional and value is None): assert issubclass(target_class, Munch) # despite the above assert, mypy doesn't seem to understand target_class is a Munch here kwargs = {'validate': validate} if issubclass(target_class, DataClass) else {} - value = target_class.fromDict(value, **kwargs) # type:ignore[attr-defined] + value = target_class(value, **kwargs) # type:ignore[attr-defined] + else: + print(f"nothing to do: '{key}' was already {target_class}") # handle numerics elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes: parsed_number = None @@ -81,7 +138,9 @@ class DataClass(Munch): f'{" ,".join([ c.__name__ for c in _classes])}; ' f'got: {type(value).__name__}; value: {value}') elif validate and not allow_extra: - raise Exception(f'Unknown key "{key}"') + import logging + logging.debug(f"{cls}: unknown key '{key}': {value}") + raise Exception(f'{cls}: Unknown key "{key}"') else: if isinstance(value, dict) and not isinstance(value, Munch): value = Munch.fromDict(value) @@ -102,14 +161,95 @@ class DataClass(Munch): strip_hidden: Optional[bool] = None, sparse: Optional[bool] = None, ): - return strip_dict( + return self.strip_dict( self, - hints=self._type_hints, - strip_hidden=self._strip_hidden if strip_hidden is None else strip_hidden, - sparse=self._sparse if sparse is None else sparse, + strip_hidden=strip_hidden, + sparse=sparse, recursive=True, ) + @classmethod + def strip_dict( + cls, + d: dict[Any, Any], + strip_hidden: Optional[bool] = None, + sparse: Optional[bool] = None, + recursive: bool = True, + hints: Optional[dict[str, Any]] = None, + ) -> dict[Any, Any]: + # preserve original None-type args + _sparse = cls._sparse if sparse is None else sparse + _strip_hidden = cls._strip_hidden if strip_hidden is None else strip_hidden + hints = cls._type_hints if hints is None else hints + result = dict(d) + if not (_strip_hidden or _sparse or result): + print(f"shortcircuiting {d=}") + return result + print(f"Stripping {result} with hints: {hints}") + for k, v in d.items(): + type_hint = resolve_type_hint(hints.get(k, "abc")) + print(f"Working on key {k}, type hints: {type_hint}") + if not isinstance(k, str): + print(f"skipping unknown key type {k=}") + continue + if strip_hidden and k.startswith('_'): + result.pop(k) + continue + if v is None: + if NoneType not in type_hint: + msg = f'encountered illegal null value at key "{k}" for typehint {type_hint}' + if True: + raise Exception(msg) + print(msg) + if _sparse: + print(f"popping empty {k}") + result.pop(k) + continue + print(f"encountered legal null value at {k}: {_sparse=}") + if recursive and isinstance(v, dict): + if not v: + result[k] = {} + continue + if isinstance(v, DataClass): + print(f"Dataclass detected in {k=}") + result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse) # pass None in sparse and strip_hidden + continue + if isinstance(v, Munch): + print(f"Converting munch {k=}") + result[k] = v.toDict() + if k not in hints: + print(f"skipping unknown {k=}") + continue + print(f"STRIPPING RECURSIVELY: {k}: {v}, parent hints: {hints[k]}") + _subhints = {} + _hints = resolve_type_hint(hints[k], [dict]) + hints_flat = list(flatten_hints(_hints)) + print(f"going over hints for {k}: {_hints=} {hints_flat=}") + subclass = DataClass + for hint in hints_flat: + print(f"working on hint: {hint}") + if get_origin(hint) == dict: + _valtype = get_args(hint)[1] + _subhints = {n: _valtype for n in v.keys()} + print(f"generated {_subhints=} from {_valtype=}") + break + if isinstance(hint, type) and issubclass(hint, DataClass): + subclass = hint + _subhints = hint._type_hints + print(f"found subhints: {_subhints}") + break + else: + print(f"ignoring {hint=}") + print(f"STRIPPING SUBDICT {k=} WITH {_subhints=}") + result[k] = subclass.strip_dict( + v, + hints=_subhints, + sparse=_sparse, + strip_hidden=_strip_hidden, + recursive=recursive, + ) + return result + def update(self, d: Mapping[str, Any], validate: bool = True): Munch.update(self, type(self).transform(d, validate)) @@ -118,93 +258,38 @@ class DataClass(Munch): cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar} def __repr__(self): - return f'{type(self)}{dict.__repr__(self.toDict())}' + return f'{type(self)}{dict.__repr__(dict(self))}' - def toYaml(self, strip_hidden: bool = False, sparse: bool = False, **yaml_args) -> str: + def toYAML( + self, + strip_hidden: Optional[bool] = None, + sparse: Optional[bool] = None, + **yaml_args + ) -> str: import yaml + yaml_args = {'sort_keys': False} | yaml_args return yaml.dump( self.toDict(strip_hidden=strip_hidden, sparse=sparse), **yaml_args, ) - def toToml(self, strip_hidden: bool = False, sparse: bool = False, **toml_args) -> str: - import toml + def toToml( + self, + strip_hidden: Optional[bool] = None, + sparse: Optional[bool] = None, + encoder: Optional[TomlEncoder] = TomlPreserveInlineDictEncoder() + ) -> str: return toml.dumps( self.toDict(strip_hidden=strip_hidden, sparse=sparse), - **toml_args, + encoder=encoder, ) -def flatten_hints(hints: Any) -> list[Any]: - if not isinstance(hints, (list, tuple)): - yield hints - return - for i in hints: - yield from flatten_hints(i) +class TomlInlineDict(dict, toml.decoder.InlineTableDict): + pass -def strip_dict( - d: dict[Any, Any], - hints: dict[str, Any], - strip_hidden: bool = False, - sparse: bool = False, - recursive: bool = True, -) -> dict[Any, Any]: - result = dict(d) - if not (strip_hidden or sparse or result): - print(f"shortcircuiting {d=}") - return result - print(f"Stripping {result} with hints: {hints}") - for k, v in d.items(): - if not isinstance(k, str): - print(f"skipping unknown key type {k=}") - continue - if strip_hidden and k.startswith('_'): - result.pop(k) - continue - if sparse and (v is None and NoneType in resolve_type_hint(hints.get(k, "abc"))): - print(f"popping empty {k}") - result.pop(k) - continue - if recursive and isinstance(v, dict): - if not v: - result[k] = {} - continue - if isinstance(v, DataClass): - print(f"Dataclass detected in {k=}") - result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse) - continue - if isinstance(v, Munch): - print(f"Converting munch {k=}") - result[k] = v.toDict() - if k not in hints: - print(f"skipping unknown {k=}") - continue - print(f"STRIPPING RECURSIVELY: {k}: {v}, parent hints: {hints[k]}") - _subhints = {} - _hints = resolve_type_hint(hints[k], [dict]) - hints_flat = list(flatten_hints(_hints)) - print(f"going over hints for {k}: {_hints=} {hints_flat=}") - - for hint in hints_flat: - print(f"working on hint: {hint}") - if get_origin(hint) == dict: - _valtype = get_args(hint)[1] - _subhints = {n: _valtype for n in v.keys()} - print(f"generated {_subhints=} from {_valtype=}") - break - if isinstance(hint, type) and issubclass(hint, DataClass): - _subhints = hint._type_hints - print(f"found subhints: {_subhints}") - break - else: - print(f"ignoring {hint=}") - print(f"STRIPPING SUBDICT {k=} WITH {_subhints=}") - result[k] = strip_dict( - v, - hints=_subhints, - sparse=sparse, - strip_hidden=strip_hidden, - recursive=recursive, - ) - return result +def toml_inline_dicts(value: Any) -> Any: + if not isinstance(value, Mapping): + return value + return TomlInlineDict({k: toml_inline_dicts(v) for k, v in value.items()}) diff --git a/devices/deviceinfo.py b/devices/deviceinfo.py index 9fdd73a..c8df9cf 100644 --- a/devices/deviceinfo.py +++ b/devices/deviceinfo.py @@ -5,7 +5,7 @@ import copy import logging import os -from typing import Mapping +from typing import Any, Mapping, Optional from config.state import config from constants import Arch @@ -26,7 +26,7 @@ class DeviceInfo(DataClass): flash_method: str @classmethod - def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True): + def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None): return super().transform(values, validate=validate, allow_extra=allow_extra) diff --git a/distro/distro.py b/distro/distro.py index 326056a..2861b5a 100644 --- a/distro/distro.py +++ b/distro/distro.py @@ -1,3 +1,4 @@ +from enum import IntFlag from typing import Generic, Mapping, Optional, TypeVar from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS @@ -5,6 +6,14 @@ from generator import generate_pacman_conf_body from config.state import config from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo +from .repo_config import AbstrRepoConfig, BaseDistro, ReposConfigFile, REPOS_CONFIG_DEFAULT, get_repo_config as _get_repo_config + + +class DistroLocation(IntFlag): + REMOTE = 0 + LOCAL = 1 + CHROOT = 3 + RepoType = TypeVar('RepoType', bound=Repo) @@ -72,11 +81,6 @@ class RemoteDistro(Distro[RemoteRepo]): return RemoteRepo(**kwargs) -def get_base_distro(arch: str, scan: bool = False) -> RemoteDistro: - repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()} - return RemoteDistro(arch=arch, repo_infos=repos, scan=scan) - - def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro: repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES} remote = not url_template.startswith('file://') @@ -90,9 +94,16 @@ def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro: return distro -_kupfer_https = dict[Arch, RemoteDistro]() -_kupfer_local = dict[Arch, LocalDistro]() -_kupfer_local_chroots = dict[Arch, LocalDistro]() +_kupfer_https: dict[Arch, RemoteDistro] = {} +_kupfer_local: dict[Arch, LocalDistro] = {} +_kupfer_local_chroots: dict[Arch, LocalDistro] = {} + + +def reset_distro_caches(): + global _kupfer_https, _kupfer_local, _kupfer_local_chroots + for cache in _kupfer_https, _kupfer_local, _kupfer_local_chroots: + assert isinstance(cache, dict) + cache.clear() def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str: @@ -101,29 +112,103 @@ def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str return url.replace('%branch%', branch) -def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro: - global _kupfer_https - if arch not in _kupfer_https or not _kupfer_https[arch]: - kupfer = get_kupfer(arch, get_kupfer_url(), scan) - assert isinstance(kupfer, RemoteDistro) - _kupfer_https[arch] = kupfer - item = _kupfer_https[arch] +def get_repo_config(*args, **kwargs) -> ReposConfigFile: + repo_config, changed = _get_repo_config(*args, **kwargs) + if changed: + reset_distro_caches() + return repo_config + + +def get_kupfer_repo_names(local) -> list[str]: + configs = get_repo_config() + results = [] + for repo, repo_config in configs.repos.items(): + if not local and repo_config.local_only: + continue + results.append(repo) + return results + + +def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo: + url = repo_config.remote_url or default_url + if isinstance(url, dict): + url = url.get(arch, default_url) + assert url + return RepoInfo( + url_template=url, + options=repo_config.get('options', None) or {}, + ) + + +def get_base_distro(arch: Arch, scan: bool = False) -> RemoteDistro: + base_distros = get_repo_config().base_distros + if base_distros is None or arch not in base_distros: + base_distros = REPOS_CONFIG_DEFAULT.base_distros + assert base_distros + distro: BaseDistro + distro = base_distros.get(arch) # type: ignore[assignment] + repos = {} + for repo, repo_config in distro.repos.items(): + repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro.remote_url) + + return RemoteDistro(arch=arch, repo_infos=repos, scan=scan) + + +def get_kupfer_distro( + arch: Arch, + location: DistroLocation, + scan: bool = False, +) -> Distro: + global _kupfer_https, _kupfer_local, _kupfer_local_chroots + cls: type[Distro] + cache: Mapping[str, Distro] + repo_config = get_repo_config() + if location == DistroLocation.REMOTE: + cache = _kupfer_https + default_url = get_kupfer_url(repo_config.remote_url or KUPFER_HTTPS) + repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only} + cls = RemoteDistro + elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]: + cache = _kupfer_local_chroots + pkgdir = CHROOT_PATHS['packages'] if location == DistroLocation.CHROOT else config.get_path('packages') + default_url = f"file://{pkgdir}/$arch/$repo" + cls = LocalDistro + repos = {} + for name, repo in repo_config.repos.items(): + repo = repo.copy() + repo.remote_url = default_url + repos[name] = get_RepoInfo(arch, repo, default_url) + else: + raise Exception(f"Unknown location {location}") + if cache is None: + cache = {} + assert arch + assert isinstance(cache, dict) + if arch not in cache or not cache[arch]: + distro = cls( + arch=arch, + repo_infos=repos, + scan=scan, + ) + assert isinstance(distro, (LocalDistro, RemoteDistro)) + return distro + cache[arch] = distro + item: Distro = cache[arch] if scan and not item.is_scanned(): item.scan() return item +def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro: + d = get_kupfer_distro(arch, location=DistroLocation.REMOTE, scan=scan) + assert isinstance(d, RemoteDistro) + return d + + def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro: - global _kupfer_local, _kupfer_local_chroots - cache = _kupfer_local_chroots if in_chroot else _kupfer_local arch = arch or config.runtime.arch assert arch - if arch not in cache or not cache[arch]: - dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages') - kupfer = get_kupfer(arch, f"file://{dir}/$arch/$repo") - assert isinstance(kupfer, LocalDistro) - cache[arch] = kupfer - item = cache[arch] - if scan and not item.is_scanned(): - item.scan() - return item + d = get_kupfer_distro(arch, location=DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL, scan=scan) + assert isinstance(d, LocalDistro) + return d + diff --git a/distro/repo_config.py b/distro/repo_config.py new file mode 100644 index 0000000..655aca5 --- /dev/null +++ b/distro/repo_config.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import logging +import os +import toml +import yaml + +from copy import deepcopy +from typing import Any, ClassVar, Optional, Mapping + +from config.state import config +from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES +from dataclass import DataClass, Munch, toml_inline_dicts, TomlEncoder, TomlPreserveInlineDictEncoder +from utils import sha256sum + + +REPOS_KEY = 'repos' +NAME_KEY = 'name' +REMOTEURL_KEY = 'remote_url' +LOCALONLY_KEY = 'local_only' +OPTIONS_KEY = 'options' +BASEDISTROS_KEY = 'base_distros' + +_current_config: Optional[ReposConfigFile] + + +class AbstrRepoConfig(DataClass): + options: Optional[dict[str, str]] + _strip_hidden: ClassVar[bool] = True + _sparse: ClassVar[bool] = True + + +class BaseDistroRepo(AbstrRepoConfig): + remote_url: Optional[str] + + +class RepoConfig(AbstrRepoConfig): + name: str + remote_url: Optional[str | dict[Arch, str]] + local_only: Optional[bool] + + +class BaseDistro(DataClass): + remote_url: Optional[str] + repos: dict[str, BaseDistroRepo] + + +class ReposConfigFile(DataClass): + remote_url: Optional[str] + repos: dict[str, RepoConfig] + base_distros: dict[Arch, BaseDistro] + _path: Optional[str] + _checksum: Optional[str] + _strip_hidden: ClassVar[bool] = True + _sparse: ClassVar[bool] = True + + def __init__(self, d, **kwargs): + remote_url = d.get(REMOTEURL_KEY, None) + super().__init__(d=d, **kwargs) + for repo_cls, defaults, repos in [ + (RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY, {})), + *[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos) for _distro in self.base_distros.values()], + ]: + if repos is None: + continue + for name, repo in repos.items(): + _repo = defaults | (repo or {}) # type: ignore[operator] + if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None): + _repo[REMOTEURL_KEY] = remote_url + repos[name] = repo_cls(_repo, **kwargs) + # self.repos = repos + + def toDict(self, strip_hidden: Optional[bool] = None, sparse: Optional[bool] = None): + d = super().toDict(strip_hidden=strip_hidden, sparse=sparse) + if REPOS_KEY in d: + for v in d[REPOS_KEY].values(): + if isinstance(v, dict) and NAME_KEY in v: + v.pop(NAME_KEY) + return d + + @staticmethod + def parse_config(path: str) -> ReposConfigFile: + try: + with open(path, 'r') as fd: + data = toml.load(fd) + data['_path'] = path + data['_checksum'] = sha256sum(path) + return ReposConfigFile(data, validate=True) + except Exception as ex: + logging.error(f'Error parsing repos config at "{path}":\n{ex}') + raise ex + + def toToml(self, strip_hidden=None, sparse=None, encoder=TomlPreserveInlineDictEncoder()): + d = self.toDict(strip_hidden=strip_hidden, sparse=sparse) + for key in [REPOS_KEY, ]: + if key not in d or not isinstance(d[key], Mapping): + continue + inline = {name: {k: toml_inline_dicts(v) for k, v in value.items()} for name, value in d[key].items()} + logging.info(f"Inlined {key}: {inline}") + d[key] = inline + return toml.dumps(d, encoder=encoder) + + def toToml_old(self, **kwargs): + """Dumps specific TOML format, kwargs are ignored.""" + def toml_line(k, v): + assert isinstance(k, str) + if isinstance(v, dict): + assert isinstance(v, Munch) + return f'{k} = ' + v.toYAML(default_flow_style=True).strip('\n') + #return toml.dumps({k: (v if not isinstance(v, dict) else toml.}, encoder=toml.encoder.TomlPreserveInlineDictEncoder).strip('\n') + + res = '' + for k in self.keys(): + if k == REPOS_KEY: + continue + res + + for k, v in self.repos.items(): + res += f"[repos.{k}]\n" + for subk, subv in v.items(): + res += toml_line(subk, subv) + '\n' + res += '\n' + return res + + +REPO_DEFAULTS = { + LOCALONLY_KEY: None, + REMOTEURL_KEY: None, + OPTIONS_KEY: {'SigLevel': 'Never'} +} + +BASE_DISTRO_DEFAULTS = { + REMOTEURL_KEY: None, + OPTIONS_KEY: None, +} + +REPOS_CONFIG_DEFAULT = ReposConfigFile({ + REMOTEURL_KEY: KUPFER_HTTPS, + REPOS_KEY: { + 'local': REPO_DEFAULTS | {LOCALONLY_KEY: True}, + **{r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES}, + }, + BASEDISTROS_KEY: { + arch: { + 'repos': {k: {'remote_url': v} for k, v in arch_def['repos'].items()}, + } + for arch, arch_def in BASE_DISTROS.items() + }, +}) + + +def get_repo_config( + initialize_pkgbuilds: bool = False, + repo_config_file: Optional[str] = None, +) -> tuple[ReposConfigFile, bool]: + global _current_config + pkgbuilds_dir = config.get_path('pkgbuilds') + repo_config_file_default = os.path.join(pkgbuilds_dir, REPOS_CONFIG_FILE) + if repo_config_file is None: + repo_config_file_path = repo_config_file_default + else: + repo_config_file_path = repo_config_file + if not os.path.exists(repo_config_file_path): + if repo_config_file is not None: + raise Exception(f"Requested repo config {repo_config_file} doesn't exist") + if not initialize_pkgbuilds: + logging.warning(f"{repo_config_file_path} doesn't exist, using default Repositories") + return deepcopy(REPOS_CONFIG_DEFAULT), False + from packages.pkgbuild import init_pkgbuilds + init_pkgbuilds() + return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file_path) + conf = _current_config + changed = False + if not conf or conf._path != repo_config_file_path or conf._checksum != sha256sum(repo_config_file_path): + conf = ReposConfigFile.parse_config(repo_config_file_path) + if repo_config_file_path == repo_config_file_default: + _current_config = conf + changed = True + return conf, changed + + +def get_repos(**kwargs) -> list[RepoConfig]: + config, _ = get_repo_config(**kwargs) + return list(config.repos.values()) diff --git a/packages/build.py b/packages/build.py index 6de2e40..532b690 100644 --- a/packages/build.py +++ b/packages/build.py @@ -10,12 +10,12 @@ from urllib.error import HTTPError from typing import Iterable, Iterator, Optional from binfmt import register as binfmt_register, binfmt_is_registered -from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD +from constants import CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD from config.state import config from exec.cmd import run_cmd, run_root_cmd from exec.file import makedir, remove_file, symlink from chroot.build import get_build_chroot, BuildChroot -from distro.distro import get_kupfer_https, get_kupfer_local +from distro.distro import get_kupfer_https, get_kupfer_local, get_kupfer_repo_names from distro.package import RemotePackage, LocalPackage from distro.repo import LocalRepo from progressbar import BAR_PADDING, get_levels_bar @@ -84,7 +84,7 @@ def init_prebuilts(arch: Arch): """Ensure that all `constants.REPOSITORIES` inside `dir` exist""" prebuilts_dir = config.get_path('packages') makedir(prebuilts_dir) - for repo in REPOSITORIES: + for repo in get_kupfer_repo_names(local=True): init_local_repo(repo, arch) diff --git a/packages/cli.py b/packages/cli.py index 1ac93db..5c17ad9 100644 --- a/packages/cli.py +++ b/packages/cli.py @@ -7,11 +7,11 @@ from glob import glob from typing import Iterable, Optional from config.state import config -from constants import Arch, ARCHES, REPOSITORIES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL +from constants import Arch, ARCHES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL from exec.cmd import run_cmd, shell_quote, CompletedProcess from exec.file import get_temp_dir, makedir, remove_file from devices.device import get_profile_device -from distro.distro import get_kupfer_local, get_kupfer_url +from distro.distro import get_kupfer_local, get_kupfer_url, get_kupfer_repo_names from distro.package import LocalPackage from net.ssh import run_ssh_command, scp_put_files from utils import download_file, git, sha256sum @@ -269,7 +269,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F [ 'clean', '-dffX' + ('n' if noop else ''), - ] + REPOSITORIES, + ] + get_kupfer_repo_names(local=True), dir=pkgbuilds, ) if result.returncode != 0: @@ -301,7 +301,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F @cmd_packages.command(name='list') def cmd_list(): "List information about available source packages (PKGBUILDs)" - pkgdir = os.path.join(config.get_path('pkgbuilds'), REPOSITORIES[0]) + pkgdir = os.path.join(config.get_path('pkgbuilds'), get_kupfer_repo_names(local=False)[0]) if not os.path.exists(pkgdir): raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n" f"Try running `kupferbootstrap packages init` first!") diff --git a/packages/pkgbuild.py b/packages/pkgbuild.py index dd2e5c4..a2d13e5 100644 --- a/packages/pkgbuild.py +++ b/packages/pkgbuild.py @@ -9,8 +9,8 @@ from joblib import Parallel, delayed from typing import Iterable, Optional, TypeAlias from config.state import config, ConfigStateHolder -from constants import REPOSITORIES from constants import Arch +from distro.distro import get_kupfer_repo_names from distro.package import PackageInfo from exec.file import remove_file from logger import setup_logging @@ -439,8 +439,13 @@ def get_pkgbuild_dirs(quiet: bool = True, repositories: Optional[list[str]] = No """Gets the relative paths to directories containing PKGBUILDs, optionally warns about dirs without a PKGBUILD""" pkgbuilds_dir = config.get_path('pkgbuilds') paths = [] - for repo in repositories or REPOSITORIES: - for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)): + for repo in repositories or get_kupfer_repo_names(local=True): + path = os.path.join(pkgbuilds_dir, repo) + if not os.path.exists(path): + if not quiet: + logging.warning(f'repo "{repo}" can\'t be listed: "{path}" doesn\'t exist; skipping') + continue + for dir in os.listdir(path): p = os.path.join(repo, dir) if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')): if not quiet: diff --git a/packages/srcinfo_cache.py b/packages/srcinfo_cache.py index fb36c1b..112591f 100644 --- a/packages/srcinfo_cache.py +++ b/packages/srcinfo_cache.py @@ -20,10 +20,12 @@ class JsonFile(DataClass): _filename: ClassVar[str] _relative_path: str + _strip_hidden: ClassVar[bool] = True + _sparse: ClassVar[bool] = False def toJSON(self) -> str: 'Returns a json representation, with private keys that start with "_" filtered out' - return json.dumps({key: val for key, val in self.toDict().items() if not key.startswith('_')}, indent=2) + return json.dumps(self.toDict(), indent=2) def write(self): 'Write the filtered json representation to disk'