from __future__ import annotations import logging import os import toml import yaml from copy import deepcopy from typing import ClassVar, Optional, Mapping from config.state import config from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES from dataclass import DataClass, toml_inline_dicts, TomlPreserveInlineDictEncoder from utils import sha256sum REPOS_KEY = 'repos' REMOTEURL_KEY = 'remote_url' LOCALONLY_KEY = 'local_only' OPTIONS_KEY = 'options' BASEDISTROS_KEY = 'base_distros' _current_config: Optional[ReposConfigFile] class AbstrRepoConfig(DataClass): options: Optional[dict[str, str]] _strip_hidden: ClassVar[bool] = True _sparse: ClassVar[bool] = True class BaseDistroRepo(AbstrRepoConfig): remote_url: Optional[str] class RepoConfig(AbstrRepoConfig): remote_url: Optional[str | dict[Arch, str]] local_only: Optional[bool] class BaseDistro(DataClass): remote_url: Optional[str] repos: dict[str, BaseDistroRepo] class ReposConfigFile(DataClass): remote_url: Optional[str] repos: dict[str, RepoConfig] base_distros: dict[Arch, BaseDistro] _path: Optional[str] _checksum: Optional[str] _strip_hidden: ClassVar[bool] = True _sparse: ClassVar[bool] = True def __init__(self, d, **kwargs): super().__init__(d=d, **kwargs) self[REPOS_KEY] = self.get(REPOS_KEY, {}) for repo_cls, defaults, repos, remote_url in [ (RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY), d.get(REMOTEURL_KEY, None)), *[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos, _distro.get(REMOTEURL_KEY, None)) for _distro in self.base_distros.values()], ]: if repos is None: continue for name, repo in repos.items(): _repo = dict(defaults | (repo or {})) # type: ignore[operator] if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None): _repo[REMOTEURL_KEY] = remote_url repos[name] = repo_cls(_repo, **kwargs) @staticmethod def parse_config(path: str) -> ReposConfigFile: try: with open(path, 'r') as fd: data = yaml.safe_load(fd) data['_path'] = path data['_checksum'] = sha256sum(path) return ReposConfigFile(data, validate=True) except Exception as ex: logging.error(f'Error parsing repos config at "{path}":\n{ex}') raise ex def toToml(self, strip_hidden=None, sparse=None, encoder=TomlPreserveInlineDictEncoder()): d = self.toDict(strip_hidden=strip_hidden, sparse=sparse) for key in [REPOS_KEY]: if key not in d or not isinstance(d[key], Mapping): continue inline = {name: {k: toml_inline_dicts(v) for k, v in value.items()} for name, value in d[key].items()} logging.info(f"Inlined {key}: {inline}") d[key] = inline return toml.dumps(d, encoder=encoder) REPO_DEFAULTS = { LOCALONLY_KEY: None, REMOTEURL_KEY: None, OPTIONS_KEY: { 'SigLevel': 'Never' }, } BASE_DISTRO_DEFAULTS = { REMOTEURL_KEY: None, OPTIONS_KEY: None, } REPOS_CONFIG_DEFAULT = ReposConfigFile({ '_path': '__DEFAULTS__', '_checksum': None, REMOTEURL_KEY: KUPFER_HTTPS, REPOS_KEY: { 'kupfer_local': REPO_DEFAULTS | { LOCALONLY_KEY: True }, **{r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES}, }, BASEDISTROS_KEY: { arch: { REMOTEURL_KEY: None, 'repos': {k: { 'remote_url': v } for k, v in arch_def['repos'].items()}, } for arch, arch_def in BASE_DISTROS.items() }, }) _current_config = None def get_repo_config( initialize_pkgbuilds: bool = False, repo_config_file: Optional[str] = None, ) -> tuple[ReposConfigFile, bool]: global _current_config repo_config_file_default = os.path.join(config.get_path('pkgbuilds'), REPOS_CONFIG_FILE) if repo_config_file is None: repo_config_file_path = repo_config_file_default else: repo_config_file_path = repo_config_file config_exists = os.path.exists(repo_config_file_path) if not config_exists and _current_config is None: if initialize_pkgbuilds: from packages.pkgbuild import init_pkgbuilds init_pkgbuilds(update=False) return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file) if repo_config_file is not None: raise Exception(f"Requested repo config {repo_config_file} doesn't exist") logging.warning(f"{repo_config_file_path} doesn't exist, using built-in repo config defaults") _current_config = deepcopy(REPOS_CONFIG_DEFAULT) return _current_config, False changed = False if (not _current_config) or (config_exists and _current_config._checksum != sha256sum(repo_config_file_path)): if config_exists: conf = ReposConfigFile.parse_config(repo_config_file_path) else: conf = REPOS_CONFIG_DEFAULT changed = conf != (_current_config or {}) if changed: _current_config = deepcopy(conf) else: logging.debug("Repo config: Cache hit!") assert _current_config return _current_config, changed def get_repos(**kwargs) -> list[RepoConfig]: config, _ = get_repo_config(**kwargs) return list(config.repos.values())