distro: use repo_config properly

This commit is contained in:
InsanePrawn 2023-04-16 03:31:35 +02:00
parent 572142bf0b
commit 13aa258794
2 changed files with 42 additions and 25 deletions

View file

@ -1,3 +1,5 @@
import logging
from enum import IntFlag
from typing import Generic, Mapping, Optional, TypeVar
@ -115,6 +117,7 @@ def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str
def get_repo_config(*args, **kwargs) -> ReposConfigFile:
repo_config, changed = _get_repo_config(*args, **kwargs)
if changed:
logging.debug("Repo configs changed, resetting caches")
reset_distro_caches()
return repo_config
@ -132,6 +135,8 @@ def get_kupfer_repo_names(local) -> list[str]:
def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo:
url = repo_config.remote_url or default_url
if isinstance(url, dict):
if arch not in url and not default_url:
raise Exception(f"Invalid repo config: Architecture {arch} not in remote_url mapping: {url}")
url = url.get(arch, default_url)
assert url
return RepoInfo(
@ -171,8 +176,13 @@ def get_kupfer_distro(
repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only}
cls = RemoteDistro
elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]:
cache = _kupfer_local_chroots
pkgdir = CHROOT_PATHS['packages'] if location == DistroLocation.CHROOT else config.get_path('packages')
if location == DistroLocation.CHROOT:
cache = _kupfer_local_chroots
pkgdir = CHROOT_PATHS['packages']
else:
assert location == DistroLocation.LOCAL
cache = _kupfer_local
pkgdir = config.get_path('packages')
default_url = f"file://{pkgdir}/$arch/$repo"
cls = LocalDistro
repos = {}
@ -181,7 +191,7 @@ def get_kupfer_distro(
repo.remote_url = default_url
repos[name] = get_RepoInfo(arch, repo, default_url)
else:
raise Exception(f"Unknown location {location}")
raise Exception(f"Unknown distro location {location}")
if cache is None:
cache = {}
assert arch
@ -193,8 +203,8 @@ def get_kupfer_distro(
scan=scan,
)
assert isinstance(distro, (LocalDistro, RemoteDistro))
return distro
cache[arch] = distro
return distro
item: Distro = cache[arch]
if scan and not item.is_scanned():
item.scan()
@ -210,6 +220,7 @@ def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro:
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro:
arch = arch or config.runtime.arch
assert arch
d = get_kupfer_distro(arch, location=DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL, scan=scan)
location = DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL
d = get_kupfer_distro(arch, location=location, scan=scan)
assert isinstance(d, LocalDistro)
return d

View file

@ -53,18 +53,18 @@ class ReposConfigFile(DataClass):
def __init__(self, d, **kwargs):
super().__init__(d=d, **kwargs)
self[REPOS_KEY] = self.get(REPOS_KEY, {})
for repo_cls, defaults, repos, remote_url in [
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY, {}), d.get(REMOTEURL_KEY, None)),
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY), d.get(REMOTEURL_KEY, None)),
*[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos, _distro.get(REMOTEURL_KEY, None)) for _distro in self.base_distros.values()],
]:
if repos is None:
continue
for name, repo in repos.items():
_repo = defaults | (repo or {}) # type: ignore[operator]
_repo = dict(defaults | (repo or {})) # type: ignore[operator]
if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None):
_repo[REMOTEURL_KEY] = remote_url
repos[name] = repo_cls(_repo, **kwargs)
# self.repos = repos
@staticmethod
def parse_config(path: str) -> ReposConfigFile:
@ -103,11 +103,11 @@ BASE_DISTRO_DEFAULTS = {
}
REPOS_CONFIG_DEFAULT = ReposConfigFile({
'_path': None,
'_path': '__DEFAULTS__',
'_checksum': None,
REMOTEURL_KEY: KUPFER_HTTPS,
REPOS_KEY: {
'local': REPO_DEFAULTS | {
'kupfer_local': REPO_DEFAULTS | {
LOCALONLY_KEY: True
},
**{r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES},
@ -135,24 +135,30 @@ def get_repo_config(
repo_config_file_path = repo_config_file_default
else:
repo_config_file_path = repo_config_file
if not os.path.exists(repo_config_file_path):
config_exists = os.path.exists(repo_config_file_path)
if not config_exists and _current_config is None:
if initialize_pkgbuilds:
from packages.pkgbuild import init_pkgbuilds
init_pkgbuilds(update=False)
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file)
if repo_config_file is not None:
raise Exception(f"Requested repo config {repo_config_file} doesn't exist")
if not initialize_pkgbuilds:
logging.warning(f"{repo_config_file_path} doesn't exist, using default Repositories")
return deepcopy(REPOS_CONFIG_DEFAULT), False
from packages.pkgbuild import init_pkgbuilds
init_pkgbuilds()
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file_path)
conf = _current_config
logging.warning(f"{repo_config_file_path} doesn't exist, using built-in repo config defaults")
_current_config = deepcopy(REPOS_CONFIG_DEFAULT)
return _current_config, False
changed = False
if (not _current_config) or _current_config._path != repo_config_file_path or _current_config._checksum != sha256sum(repo_config_file_path):
conf = ReposConfigFile.parse_config(repo_config_file_path)
if repo_config_file_path == repo_config_file_default:
_current_config = conf
changed = True
assert conf
return conf, changed
if (not _current_config) or (config_exists and _current_config._checksum != sha256sum(repo_config_file_path)):
if config_exists:
conf = ReposConfigFile.parse_config(repo_config_file_path)
else:
conf = REPOS_CONFIG_DEFAULT
changed = conf != (_current_config or {})
if changed:
_current_config = deepcopy(conf)
else:
logging.debug("Repo config: Cache hit!")
assert _current_config
return _current_config, changed
def get_repos(**kwargs) -> list[RepoConfig]: