TEMP: WIP: add repo_config

TEMP cause it spams a bunch of prints in dataclass handling
This commit is contained in:
InsanePrawn 2023-03-27 09:05:30 +02:00
parent 72f4d4948e
commit ff1c31e157
10 changed files with 493 additions and 130 deletions

View file

@ -81,7 +81,7 @@ class ProfilesSection(DataClass):
default: SparseProfile default: SparseProfile
@classmethod @classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True): def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None):
results = {} results = {}
for k, v in values.items(): for k, v in values.items():
if k == 'current': if k == 'current':

View file

@ -26,6 +26,8 @@ BASE_PACKAGES: list[str] = BASE_LOCAL_PACKAGES + [
POST_CMDS = ['kupfer-config apply'] POST_CMDS = ['kupfer-config apply']
REPOS_CONFIG_FILE = "repos.toml"
REPOSITORIES = [ REPOSITORIES = [
'boot', 'boot',
'cross', 'cross',

View file

@ -1,11 +1,12 @@
from __future__ import annotations from __future__ import annotations
import toml
from dataclasses import dataclass from dataclasses import dataclass
from munch import Munch from munch import Munch
from typing import ClassVar, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, GenericAlias, Iterable from toml.encoder import TomlEncoder, TomlPreserveInlineDictEncoder
from types import UnionType from typing import ClassVar, Generator, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
from types import UnionType, NoneType
NoneType = type(None)
def munchclass(*args, init=False, **kwargs): def munchclass(*args, init=False, **kwargs):
@ -27,36 +28,92 @@ def resolve_type_hint(hint: type, ignore_origins: list[type] = []) -> Iterable[t
return [origin or hint] return [origin or hint]
def flatten_hints(hints: Any) -> Generator[Any, None, None]:
if not isinstance(hints, (list, tuple)):
yield hints
return
for i in hints:
yield from flatten_hints(i)
def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
for hint in flatten_hints(hints):
t_origin = get_origin(hint)
t_args = get_args(hint)
if t_origin == dict:
print(f"Yielding {t_args=}")
yield t_args
continue
if t_origin in [NoneType, Optional, Union, UnionType] and t_args:
yield from resolve_dict_hints(t_args)
continue
class DataClass(Munch): class DataClass(Munch):
_type_hints: ClassVar[dict[str, Any]] _type_hints: ClassVar[dict[str, Any]]
_strip_hidden: ClassVar[bool] = False _strip_hidden: ClassVar[bool] = False
_sparse: ClassVar[bool] = False _sparse: ClassVar[bool] = False
def __init__(self, d: dict = {}, validate: bool = True, **kwargs): def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate) self.update(d | kwargs, validate=validate)
@classmethod @classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False) -> Any: def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False, type_hints: Optional[dict[str, Any]] = None) -> Any:
results = {} results = {}
values = dict(values) values = dict(values)
print(f"\ntransform function:\n{values}, {type_hints=}")
for key in list(values.keys()): for key in list(values.keys()):
value = values.pop(key) value = values.pop(key)
type_hints = cls._type_hints type_hints = cls._type_hints if type_hints is None else type_hints
if key in type_hints: if key in type_hints:
_classes = tuple[type](resolve_type_hint(type_hints[key])) _classes = tuple[type](resolve_type_hint(type_hints[key]))
optional = NoneType in _classes optional = NoneType in _classes
if issubclass(_classes[0], dict): if issubclass(_classes[0], dict):
assert isinstance(value, dict) or optional assert isinstance(value, dict) or optional
target_class = _classes[0] target_class = _classes[0]
if target_class in [None, NoneType, Optional]:
for target in _classes[1:]:
if target not in [None, NoneType, Optional]:
target_class = target
break
if target_class is dict: if target_class is dict:
target_class = Munch dict_hints = list(resolve_dict_hints(type_hints[key]))
print(f"Got {key=} {dict_hints=}")
if len(dict_hints) != 1:
print(f"Received wrong amount of type hints for key {key}: {len(dict_hints)}")
if len(dict_hints) == 1 and value is not None:
if len(dict_hints[0]) != 2 or not all(dict_hints[0]):
print(f"Weird dict hints received: {dict_hints}")
continue
key_type, value_type = dict_hints[0]
if not isinstance(value, Mapping):
if validate:
raise Exception(f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}")
print(f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}")
results[key] = value
continue
if isinstance(key_type, type):
if issubclass(key_type, str):
target_class = Munch
else:
print(f"{key=} DICT WRONG KEY TYPE: {key_type}")
if validate:
for k in value:
if not isinstance(k, tuple(flatten_hints(key_type))):
raise Exception(f'Subdict "{key}": wrong type for subkey "{k}": got: {type(k)}, expected: {key_type}')
dict_content_hints = {k: value_type for k in value}
print(f"tranforming: {value=} {dict_content_hints=}")
value = cls.transform(value, validate=validate, allow_extra=allow_extra, type_hints=dict_content_hints)
print(f"tranformed: {value=}")
if not isinstance(value, target_class): if not isinstance(value, target_class):
if not (optional and value is None): if not (optional and value is None):
assert issubclass(target_class, Munch) assert issubclass(target_class, Munch)
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here # despite the above assert, mypy doesn't seem to understand target_class is a Munch here
kwargs = {'validate': validate} if issubclass(target_class, DataClass) else {} kwargs = {'validate': validate} if issubclass(target_class, DataClass) else {}
value = target_class.fromDict(value, **kwargs) # type:ignore[attr-defined] value = target_class(value, **kwargs) # type:ignore[attr-defined]
else:
print(f"nothing to do: '{key}' was already {target_class}")
# handle numerics # handle numerics
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes: elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
parsed_number = None parsed_number = None
@ -81,7 +138,9 @@ class DataClass(Munch):
f'{" ,".join([ c.__name__ for c in _classes])}; ' f'{" ,".join([ c.__name__ for c in _classes])}; '
f'got: {type(value).__name__}; value: {value}') f'got: {type(value).__name__}; value: {value}')
elif validate and not allow_extra: elif validate and not allow_extra:
raise Exception(f'Unknown key "{key}"') import logging
logging.debug(f"{cls}: unknown key '{key}': {value}")
raise Exception(f'{cls}: Unknown key "{key}"')
else: else:
if isinstance(value, dict) and not isinstance(value, Munch): if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value) value = Munch.fromDict(value)
@ -102,14 +161,95 @@ class DataClass(Munch):
strip_hidden: Optional[bool] = None, strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None, sparse: Optional[bool] = None,
): ):
return strip_dict( return self.strip_dict(
self, self,
hints=self._type_hints, strip_hidden=strip_hidden,
strip_hidden=self._strip_hidden if strip_hidden is None else strip_hidden, sparse=sparse,
sparse=self._sparse if sparse is None else sparse,
recursive=True, recursive=True,
) )
@classmethod
def strip_dict(
cls,
d: dict[Any, Any],
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
recursive: bool = True,
hints: Optional[dict[str, Any]] = None,
) -> dict[Any, Any]:
# preserve original None-type args
_sparse = cls._sparse if sparse is None else sparse
_strip_hidden = cls._strip_hidden if strip_hidden is None else strip_hidden
hints = cls._type_hints if hints is None else hints
result = dict(d)
if not (_strip_hidden or _sparse or result):
print(f"shortcircuiting {d=}")
return result
print(f"Stripping {result} with hints: {hints}")
for k, v in d.items():
type_hint = resolve_type_hint(hints.get(k, "abc"))
print(f"Working on key {k}, type hints: {type_hint}")
if not isinstance(k, str):
print(f"skipping unknown key type {k=}")
continue
if strip_hidden and k.startswith('_'):
result.pop(k)
continue
if v is None:
if NoneType not in type_hint:
msg = f'encountered illegal null value at key "{k}" for typehint {type_hint}'
if True:
raise Exception(msg)
print(msg)
if _sparse:
print(f"popping empty {k}")
result.pop(k)
continue
print(f"encountered legal null value at {k}: {_sparse=}")
if recursive and isinstance(v, dict):
if not v:
result[k] = {}
continue
if isinstance(v, DataClass):
print(f"Dataclass detected in {k=}")
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse) # pass None in sparse and strip_hidden
continue
if isinstance(v, Munch):
print(f"Converting munch {k=}")
result[k] = v.toDict()
if k not in hints:
print(f"skipping unknown {k=}")
continue
print(f"STRIPPING RECURSIVELY: {k}: {v}, parent hints: {hints[k]}")
_subhints = {}
_hints = resolve_type_hint(hints[k], [dict])
hints_flat = list(flatten_hints(_hints))
print(f"going over hints for {k}: {_hints=} {hints_flat=}")
subclass = DataClass
for hint in hints_flat:
print(f"working on hint: {hint}")
if get_origin(hint) == dict:
_valtype = get_args(hint)[1]
_subhints = {n: _valtype for n in v.keys()}
print(f"generated {_subhints=} from {_valtype=}")
break
if isinstance(hint, type) and issubclass(hint, DataClass):
subclass = hint
_subhints = hint._type_hints
print(f"found subhints: {_subhints}")
break
else:
print(f"ignoring {hint=}")
print(f"STRIPPING SUBDICT {k=} WITH {_subhints=}")
result[k] = subclass.strip_dict(
v,
hints=_subhints,
sparse=_sparse,
strip_hidden=_strip_hidden,
recursive=recursive,
)
return result
def update(self, d: Mapping[str, Any], validate: bool = True): def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate)) Munch.update(self, type(self).transform(d, validate))
@ -118,93 +258,38 @@ class DataClass(Munch):
cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar} cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar}
def __repr__(self): def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}' return f'{type(self)}{dict.__repr__(dict(self))}'
def toYaml(self, strip_hidden: bool = False, sparse: bool = False, **yaml_args) -> str: def toYAML(
self,
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
**yaml_args
) -> str:
import yaml import yaml
yaml_args = {'sort_keys': False} | yaml_args
return yaml.dump( return yaml.dump(
self.toDict(strip_hidden=strip_hidden, sparse=sparse), self.toDict(strip_hidden=strip_hidden, sparse=sparse),
**yaml_args, **yaml_args,
) )
def toToml(self, strip_hidden: bool = False, sparse: bool = False, **toml_args) -> str: def toToml(
import toml self,
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
encoder: Optional[TomlEncoder] = TomlPreserveInlineDictEncoder()
) -> str:
return toml.dumps( return toml.dumps(
self.toDict(strip_hidden=strip_hidden, sparse=sparse), self.toDict(strip_hidden=strip_hidden, sparse=sparse),
**toml_args, encoder=encoder,
) )
def flatten_hints(hints: Any) -> list[Any]: class TomlInlineDict(dict, toml.decoder.InlineTableDict):
if not isinstance(hints, (list, tuple)): pass
yield hints
return
for i in hints:
yield from flatten_hints(i)
def strip_dict( def toml_inline_dicts(value: Any) -> Any:
d: dict[Any, Any], if not isinstance(value, Mapping):
hints: dict[str, Any], return value
strip_hidden: bool = False, return TomlInlineDict({k: toml_inline_dicts(v) for k, v in value.items()})
sparse: bool = False,
recursive: bool = True,
) -> dict[Any, Any]:
result = dict(d)
if not (strip_hidden or sparse or result):
print(f"shortcircuiting {d=}")
return result
print(f"Stripping {result} with hints: {hints}")
for k, v in d.items():
if not isinstance(k, str):
print(f"skipping unknown key type {k=}")
continue
if strip_hidden and k.startswith('_'):
result.pop(k)
continue
if sparse and (v is None and NoneType in resolve_type_hint(hints.get(k, "abc"))):
print(f"popping empty {k}")
result.pop(k)
continue
if recursive and isinstance(v, dict):
if not v:
result[k] = {}
continue
if isinstance(v, DataClass):
print(f"Dataclass detected in {k=}")
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse)
continue
if isinstance(v, Munch):
print(f"Converting munch {k=}")
result[k] = v.toDict()
if k not in hints:
print(f"skipping unknown {k=}")
continue
print(f"STRIPPING RECURSIVELY: {k}: {v}, parent hints: {hints[k]}")
_subhints = {}
_hints = resolve_type_hint(hints[k], [dict])
hints_flat = list(flatten_hints(_hints))
print(f"going over hints for {k}: {_hints=} {hints_flat=}")
for hint in hints_flat:
print(f"working on hint: {hint}")
if get_origin(hint) == dict:
_valtype = get_args(hint)[1]
_subhints = {n: _valtype for n in v.keys()}
print(f"generated {_subhints=} from {_valtype=}")
break
if isinstance(hint, type) and issubclass(hint, DataClass):
_subhints = hint._type_hints
print(f"found subhints: {_subhints}")
break
else:
print(f"ignoring {hint=}")
print(f"STRIPPING SUBDICT {k=} WITH {_subhints=}")
result[k] = strip_dict(
v,
hints=_subhints,
sparse=sparse,
strip_hidden=strip_hidden,
recursive=recursive,
)
return result

View file

@ -5,7 +5,7 @@ import copy
import logging import logging
import os import os
from typing import Mapping from typing import Any, Mapping, Optional
from config.state import config from config.state import config
from constants import Arch from constants import Arch
@ -26,7 +26,7 @@ class DeviceInfo(DataClass):
flash_method: str flash_method: str
@classmethod @classmethod
def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True): def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None):
return super().transform(values, validate=validate, allow_extra=allow_extra) return super().transform(values, validate=validate, allow_extra=allow_extra)

View file

@ -1,3 +1,4 @@
from enum import IntFlag
from typing import Generic, Mapping, Optional, TypeVar from typing import Generic, Mapping, Optional, TypeVar
from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
@ -5,6 +6,14 @@ from generator import generate_pacman_conf_body
from config.state import config from config.state import config
from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo
from .repo_config import AbstrRepoConfig, BaseDistro, ReposConfigFile, REPOS_CONFIG_DEFAULT, get_repo_config as _get_repo_config
class DistroLocation(IntFlag):
REMOTE = 0
LOCAL = 1
CHROOT = 3
RepoType = TypeVar('RepoType', bound=Repo) RepoType = TypeVar('RepoType', bound=Repo)
@ -72,11 +81,6 @@ class RemoteDistro(Distro[RemoteRepo]):
return RemoteRepo(**kwargs) return RemoteRepo(**kwargs)
def get_base_distro(arch: str, scan: bool = False) -> RemoteDistro:
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
return RemoteDistro(arch=arch, repo_infos=repos, scan=scan)
def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro: def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES} repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
remote = not url_template.startswith('file://') remote = not url_template.startswith('file://')
@ -90,9 +94,16 @@ def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
return distro return distro
_kupfer_https = dict[Arch, RemoteDistro]() _kupfer_https: dict[Arch, RemoteDistro] = {}
_kupfer_local = dict[Arch, LocalDistro]() _kupfer_local: dict[Arch, LocalDistro] = {}
_kupfer_local_chroots = dict[Arch, LocalDistro]() _kupfer_local_chroots: dict[Arch, LocalDistro] = {}
def reset_distro_caches():
global _kupfer_https, _kupfer_local, _kupfer_local_chroots
for cache in _kupfer_https, _kupfer_local, _kupfer_local_chroots:
assert isinstance(cache, dict)
cache.clear()
def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str: def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str:
@ -101,29 +112,103 @@ def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str
return url.replace('%branch%', branch) return url.replace('%branch%', branch)
def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro: def get_repo_config(*args, **kwargs) -> ReposConfigFile:
global _kupfer_https repo_config, changed = _get_repo_config(*args, **kwargs)
if arch not in _kupfer_https or not _kupfer_https[arch]: if changed:
kupfer = get_kupfer(arch, get_kupfer_url(), scan) reset_distro_caches()
assert isinstance(kupfer, RemoteDistro) return repo_config
_kupfer_https[arch] = kupfer
item = _kupfer_https[arch]
def get_kupfer_repo_names(local) -> list[str]:
configs = get_repo_config()
results = []
for repo, repo_config in configs.repos.items():
if not local and repo_config.local_only:
continue
results.append(repo)
return results
def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo:
url = repo_config.remote_url or default_url
if isinstance(url, dict):
url = url.get(arch, default_url)
assert url
return RepoInfo(
url_template=url,
options=repo_config.get('options', None) or {},
)
def get_base_distro(arch: Arch, scan: bool = False) -> RemoteDistro:
base_distros = get_repo_config().base_distros
if base_distros is None or arch not in base_distros:
base_distros = REPOS_CONFIG_DEFAULT.base_distros
assert base_distros
distro: BaseDistro
distro = base_distros.get(arch) # type: ignore[assignment]
repos = {}
for repo, repo_config in distro.repos.items():
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro.remote_url)
return RemoteDistro(arch=arch, repo_infos=repos, scan=scan)
def get_kupfer_distro(
arch: Arch,
location: DistroLocation,
scan: bool = False,
) -> Distro:
global _kupfer_https, _kupfer_local, _kupfer_local_chroots
cls: type[Distro]
cache: Mapping[str, Distro]
repo_config = get_repo_config()
if location == DistroLocation.REMOTE:
cache = _kupfer_https
default_url = get_kupfer_url(repo_config.remote_url or KUPFER_HTTPS)
repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only}
cls = RemoteDistro
elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]:
cache = _kupfer_local_chroots
pkgdir = CHROOT_PATHS['packages'] if location == DistroLocation.CHROOT else config.get_path('packages')
default_url = f"file://{pkgdir}/$arch/$repo"
cls = LocalDistro
repos = {}
for name, repo in repo_config.repos.items():
repo = repo.copy()
repo.remote_url = default_url
repos[name] = get_RepoInfo(arch, repo, default_url)
else:
raise Exception(f"Unknown location {location}")
if cache is None:
cache = {}
assert arch
assert isinstance(cache, dict)
if arch not in cache or not cache[arch]:
distro = cls(
arch=arch,
repo_infos=repos,
scan=scan,
)
assert isinstance(distro, (LocalDistro, RemoteDistro))
return distro
cache[arch] = distro
item: Distro = cache[arch]
if scan and not item.is_scanned(): if scan and not item.is_scanned():
item.scan() item.scan()
return item return item
def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro:
d = get_kupfer_distro(arch, location=DistroLocation.REMOTE, scan=scan)
assert isinstance(d, RemoteDistro)
return d
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro: def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro:
global _kupfer_local, _kupfer_local_chroots
cache = _kupfer_local_chroots if in_chroot else _kupfer_local
arch = arch or config.runtime.arch arch = arch or config.runtime.arch
assert arch assert arch
if arch not in cache or not cache[arch]: d = get_kupfer_distro(arch, location=DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL, scan=scan)
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages') assert isinstance(d, LocalDistro)
kupfer = get_kupfer(arch, f"file://{dir}/$arch/$repo") return d
assert isinstance(kupfer, LocalDistro)
cache[arch] = kupfer
item = cache[arch]
if scan and not item.is_scanned():
item.scan()
return item

184
distro/repo_config.py Normal file
View file

@ -0,0 +1,184 @@
from __future__ import annotations
import logging
import os
import toml
import yaml
from copy import deepcopy
from typing import Any, ClassVar, Optional, Mapping
from config.state import config
from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES
from dataclass import DataClass, Munch, toml_inline_dicts, TomlEncoder, TomlPreserveInlineDictEncoder
from utils import sha256sum
REPOS_KEY = 'repos'
NAME_KEY = 'name'
REMOTEURL_KEY = 'remote_url'
LOCALONLY_KEY = 'local_only'
OPTIONS_KEY = 'options'
BASEDISTROS_KEY = 'base_distros'
_current_config: Optional[ReposConfigFile]
class AbstrRepoConfig(DataClass):
options: Optional[dict[str, str]]
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = True
class BaseDistroRepo(AbstrRepoConfig):
remote_url: Optional[str]
class RepoConfig(AbstrRepoConfig):
name: str
remote_url: Optional[str | dict[Arch, str]]
local_only: Optional[bool]
class BaseDistro(DataClass):
remote_url: Optional[str]
repos: dict[str, BaseDistroRepo]
class ReposConfigFile(DataClass):
remote_url: Optional[str]
repos: dict[str, RepoConfig]
base_distros: dict[Arch, BaseDistro]
_path: Optional[str]
_checksum: Optional[str]
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = True
def __init__(self, d, **kwargs):
remote_url = d.get(REMOTEURL_KEY, None)
super().__init__(d=d, **kwargs)
for repo_cls, defaults, repos in [
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY, {})),
*[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos) for _distro in self.base_distros.values()],
]:
if repos is None:
continue
for name, repo in repos.items():
_repo = defaults | (repo or {}) # type: ignore[operator]
if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None):
_repo[REMOTEURL_KEY] = remote_url
repos[name] = repo_cls(_repo, **kwargs)
# self.repos = repos
def toDict(self, strip_hidden: Optional[bool] = None, sparse: Optional[bool] = None):
d = super().toDict(strip_hidden=strip_hidden, sparse=sparse)
if REPOS_KEY in d:
for v in d[REPOS_KEY].values():
if isinstance(v, dict) and NAME_KEY in v:
v.pop(NAME_KEY)
return d
@staticmethod
def parse_config(path: str) -> ReposConfigFile:
try:
with open(path, 'r') as fd:
data = toml.load(fd)
data['_path'] = path
data['_checksum'] = sha256sum(path)
return ReposConfigFile(data, validate=True)
except Exception as ex:
logging.error(f'Error parsing repos config at "{path}":\n{ex}')
raise ex
def toToml(self, strip_hidden=None, sparse=None, encoder=TomlPreserveInlineDictEncoder()):
d = self.toDict(strip_hidden=strip_hidden, sparse=sparse)
for key in [REPOS_KEY, ]:
if key not in d or not isinstance(d[key], Mapping):
continue
inline = {name: {k: toml_inline_dicts(v) for k, v in value.items()} for name, value in d[key].items()}
logging.info(f"Inlined {key}: {inline}")
d[key] = inline
return toml.dumps(d, encoder=encoder)
def toToml_old(self, **kwargs):
"""Dumps specific TOML format, kwargs are ignored."""
def toml_line(k, v):
assert isinstance(k, str)
if isinstance(v, dict):
assert isinstance(v, Munch)
return f'{k} = ' + v.toYAML(default_flow_style=True).strip('\n')
#return toml.dumps({k: (v if not isinstance(v, dict) else toml.}, encoder=toml.encoder.TomlPreserveInlineDictEncoder).strip('\n')
res = ''
for k in self.keys():
if k == REPOS_KEY:
continue
res
for k, v in self.repos.items():
res += f"[repos.{k}]\n"
for subk, subv in v.items():
res += toml_line(subk, subv) + '\n'
res += '\n'
return res
REPO_DEFAULTS = {
LOCALONLY_KEY: None,
REMOTEURL_KEY: None,
OPTIONS_KEY: {'SigLevel': 'Never'}
}
BASE_DISTRO_DEFAULTS = {
REMOTEURL_KEY: None,
OPTIONS_KEY: None,
}
REPOS_CONFIG_DEFAULT = ReposConfigFile({
REMOTEURL_KEY: KUPFER_HTTPS,
REPOS_KEY: {
'local': REPO_DEFAULTS | {LOCALONLY_KEY: True},
**{r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES},
},
BASEDISTROS_KEY: {
arch: {
'repos': {k: {'remote_url': v} for k, v in arch_def['repos'].items()},
}
for arch, arch_def in BASE_DISTROS.items()
},
})
def get_repo_config(
initialize_pkgbuilds: bool = False,
repo_config_file: Optional[str] = None,
) -> tuple[ReposConfigFile, bool]:
global _current_config
pkgbuilds_dir = config.get_path('pkgbuilds')
repo_config_file_default = os.path.join(pkgbuilds_dir, REPOS_CONFIG_FILE)
if repo_config_file is None:
repo_config_file_path = repo_config_file_default
else:
repo_config_file_path = repo_config_file
if not os.path.exists(repo_config_file_path):
if repo_config_file is not None:
raise Exception(f"Requested repo config {repo_config_file} doesn't exist")
if not initialize_pkgbuilds:
logging.warning(f"{repo_config_file_path} doesn't exist, using default Repositories")
return deepcopy(REPOS_CONFIG_DEFAULT), False
from packages.pkgbuild import init_pkgbuilds
init_pkgbuilds()
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file_path)
conf = _current_config
changed = False
if not conf or conf._path != repo_config_file_path or conf._checksum != sha256sum(repo_config_file_path):
conf = ReposConfigFile.parse_config(repo_config_file_path)
if repo_config_file_path == repo_config_file_default:
_current_config = conf
changed = True
return conf, changed
def get_repos(**kwargs) -> list[RepoConfig]:
config, _ = get_repo_config(**kwargs)
return list(config.repos.values())

View file

@ -10,12 +10,12 @@ from urllib.error import HTTPError
from typing import Iterable, Iterator, Optional from typing import Iterable, Iterator, Optional
from binfmt import register as binfmt_register, binfmt_is_registered from binfmt import register as binfmt_register, binfmt_is_registered
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD from constants import CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config.state import config from config.state import config
from exec.cmd import run_cmd, run_root_cmd from exec.cmd import run_cmd, run_root_cmd
from exec.file import makedir, remove_file, symlink from exec.file import makedir, remove_file, symlink
from chroot.build import get_build_chroot, BuildChroot from chroot.build import get_build_chroot, BuildChroot
from distro.distro import get_kupfer_https, get_kupfer_local from distro.distro import get_kupfer_https, get_kupfer_local, get_kupfer_repo_names
from distro.package import RemotePackage, LocalPackage from distro.package import RemotePackage, LocalPackage
from distro.repo import LocalRepo from distro.repo import LocalRepo
from progressbar import BAR_PADDING, get_levels_bar from progressbar import BAR_PADDING, get_levels_bar
@ -84,7 +84,7 @@ def init_prebuilts(arch: Arch):
"""Ensure that all `constants.REPOSITORIES` inside `dir` exist""" """Ensure that all `constants.REPOSITORIES` inside `dir` exist"""
prebuilts_dir = config.get_path('packages') prebuilts_dir = config.get_path('packages')
makedir(prebuilts_dir) makedir(prebuilts_dir)
for repo in REPOSITORIES: for repo in get_kupfer_repo_names(local=True):
init_local_repo(repo, arch) init_local_repo(repo, arch)

View file

@ -7,11 +7,11 @@ from glob import glob
from typing import Iterable, Optional from typing import Iterable, Optional
from config.state import config from config.state import config
from constants import Arch, ARCHES, REPOSITORIES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL from constants import Arch, ARCHES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL
from exec.cmd import run_cmd, shell_quote, CompletedProcess from exec.cmd import run_cmd, shell_quote, CompletedProcess
from exec.file import get_temp_dir, makedir, remove_file from exec.file import get_temp_dir, makedir, remove_file
from devices.device import get_profile_device from devices.device import get_profile_device
from distro.distro import get_kupfer_local, get_kupfer_url from distro.distro import get_kupfer_local, get_kupfer_url, get_kupfer_repo_names
from distro.package import LocalPackage from distro.package import LocalPackage
from net.ssh import run_ssh_command, scp_put_files from net.ssh import run_ssh_command, scp_put_files
from utils import download_file, git, sha256sum from utils import download_file, git, sha256sum
@ -269,7 +269,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
[ [
'clean', 'clean',
'-dffX' + ('n' if noop else ''), '-dffX' + ('n' if noop else ''),
] + REPOSITORIES, ] + get_kupfer_repo_names(local=True),
dir=pkgbuilds, dir=pkgbuilds,
) )
if result.returncode != 0: if result.returncode != 0:
@ -301,7 +301,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
@cmd_packages.command(name='list') @cmd_packages.command(name='list')
def cmd_list(): def cmd_list():
"List information about available source packages (PKGBUILDs)" "List information about available source packages (PKGBUILDs)"
pkgdir = os.path.join(config.get_path('pkgbuilds'), REPOSITORIES[0]) pkgdir = os.path.join(config.get_path('pkgbuilds'), get_kupfer_repo_names(local=False)[0])
if not os.path.exists(pkgdir): if not os.path.exists(pkgdir):
raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n" raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n"
f"Try running `kupferbootstrap packages init` first!") f"Try running `kupferbootstrap packages init` first!")

View file

@ -9,8 +9,8 @@ from joblib import Parallel, delayed
from typing import Iterable, Optional, TypeAlias from typing import Iterable, Optional, TypeAlias
from config.state import config, ConfigStateHolder from config.state import config, ConfigStateHolder
from constants import REPOSITORIES
from constants import Arch from constants import Arch
from distro.distro import get_kupfer_repo_names
from distro.package import PackageInfo from distro.package import PackageInfo
from exec.file import remove_file from exec.file import remove_file
from logger import setup_logging from logger import setup_logging
@ -439,8 +439,13 @@ def get_pkgbuild_dirs(quiet: bool = True, repositories: Optional[list[str]] = No
"""Gets the relative paths to directories containing PKGBUILDs, optionally warns about dirs without a PKGBUILD""" """Gets the relative paths to directories containing PKGBUILDs, optionally warns about dirs without a PKGBUILD"""
pkgbuilds_dir = config.get_path('pkgbuilds') pkgbuilds_dir = config.get_path('pkgbuilds')
paths = [] paths = []
for repo in repositories or REPOSITORIES: for repo in repositories or get_kupfer_repo_names(local=True):
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)): path = os.path.join(pkgbuilds_dir, repo)
if not os.path.exists(path):
if not quiet:
logging.warning(f'repo "{repo}" can\'t be listed: "{path}" doesn\'t exist; skipping')
continue
for dir in os.listdir(path):
p = os.path.join(repo, dir) p = os.path.join(repo, dir)
if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')): if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')):
if not quiet: if not quiet:

View file

@ -20,10 +20,12 @@ class JsonFile(DataClass):
_filename: ClassVar[str] _filename: ClassVar[str]
_relative_path: str _relative_path: str
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = False
def toJSON(self) -> str: def toJSON(self) -> str:
'Returns a json representation, with private keys that start with "_" filtered out' 'Returns a json representation, with private keys that start with "_" filtered out'
return json.dumps({key: val for key, val in self.toDict().items() if not key.startswith('_')}, indent=2) return json.dumps(self.toDict(), indent=2)
def write(self): def write(self):
'Write the filtered json representation to disk' 'Write the filtered json representation to disk'