import logging import os from enum import auto, Enum from typing import Optional from config.state import config from constants import Arch, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY from distro.repo_config import get_repo_config from exec.cmd import CompletedProcess, run_cmd from exec.file import makedir, remove_file from utils import extract_files_from_tar_generator, read_files_from_tar_recursive, sha256sum from .distro import Distro, get_base_distro, get_kupfer_local, get_kupfer_https from .package import BinaryPackage KEYRING_DIR = 'keyrings' KEYRING_DIST_DIR = 'dist' KEYRING_GPG_DIR = 'keyring' PKGNAME_MARKER = '.pkg.tar' PKG_KEYRING_FOLDER = 'usr/share/pacman/keyrings/' class DistroType(Enum): BASE = auto() LOCAL = auto() REMOTE = auto() KEYRING_LOCATIONS: dict[DistroType, str] = { DistroType.BASE: 'base', DistroType.LOCAL: 'local', DistroType.REMOTE: 'kupfer', } keyring_created: dict[tuple[Arch, DistroType], bool] = {} def keyring_is_created(arch: Arch, distro_type: DistroType) -> bool: return keyring_created.get((arch, distro_type), False) def init_keyring_dir( arch: Arch, distro_type: DistroType, target_path: Optional[str] = None, lazy: bool = True, ) -> dict[str, bool]: base_dir = target_path or get_keyring_path(arch, distro_type) keyring_dists = init_keyring_dist_dir(arch, distro_type, base_dir, lazy) gpg_changed = init_keyring_gpg_dir(arch, distro_type, keyring_dists, base_dir, lazy) keyring_created[(arch, distro_type)] = True return gpg_changed def init_keyring_gpg_dir( arch: Arch, distro_type: DistroType, keyring_dists: dict[str, tuple[str, bool]], base_dir: Optional[str] = None, lazy: bool = True, ) -> dict[str, bool]: base_dir = base_dir or get_keyring_path(arch, distro_type) gpg_dir = get_keyring_gpg_path(base_dir) exists = os.path.exists(os.path.join(gpg_dir, 'trustdb.gpg')) if exists and not lazy: remove_file(gpg_dir) exists = False lazy = lazy and exists if not lazy: run_cmd([get_pacman_key_binary(), '--init', '--gpgdir', gpg_dir]) results = {} for name, val in keyring_dists.items(): dist_dir, dist_changed = val if lazy and not dist_changed: results[name] = False continue logging.info(f"Importing dir {dist_dir} into {gpg_dir}") import_dist_keyring(gpg_dir, dist_dir) results[name] = True return results def import_dist_keyring( gpg_dir: str, dist_dir: str, ) -> CompletedProcess: assert gpg_dir and dist_dir and config.runtime.script_source_dir r = run_cmd([get_pacman_key_binary(), '--populate-from', dist_dir, '--populate', '--gpgdir', gpg_dir]) assert isinstance(r, CompletedProcess) return r def init_keyring_dist_dir( arch: Arch, distro_type: DistroType, base_dir: Optional[str] = None, lazy: bool = True, ) -> dict[str, tuple[str, bool]]: """ create keyrings/{arch}/dist. Returns a boolean indicating whether changes were made """ repo_config = get_repo_config()[0] base_dir = base_dir or get_keyring_path(arch, distro_type) dist_dir = get_keyring_dist_path(base_dir) pkg_names: list[str] = [] distro: Distro if distro_type == DistroType.BASE: pkg_names = repo_config.base_distros.get(arch, {}).get(KEYRINGS_KEY, None) or [] # type: ignore[call-overload] distro = get_base_distro(arch, scan=False) elif distro_type == DistroType.LOCAL: pkg_name = repo_config.get(KEYRINGS_LOCAL_KEY, None) pkg_names = [pkg_name] if pkg_name else [] distro = get_kupfer_local(arch, scan=False, in_chroot=False) elif distro_type == DistroType.REMOTE: pkg_names = repo_config.get(KEYRINGS_KEY, None) or [] distro = get_kupfer_https(arch, scan=False) logging.debug(f"Acquiring keyrings from {distro}: {pkg_names}") dist_pkgs, changed = acquire_dist_pkgs(pkg_names, distro, dist_dir) #if lazy and dist_pkgs and not changed and os.path.exists(dist_dir): # and keyring_is_created(arch, distro_type): # return {name: (get_keyring_dist_path(base_dir, name), False) for name, val in dist_pkgs.items()} makedir(dist_dir) dist_dirs = [] results = {} for name, _val in dist_pkgs.items(): dist_pkg, changed = _val _dir = os.path.join(dist_dir, name) results[name] = _dir, False if lazy and not changed and os.path.exists(_dir): logging.debug(f"Skipping extracting keyring pkg for {name}: dir exists and file unchanged") continue extract_keyring_pkg(dist_pkg, _dir) dist_dirs.append(_dir) results[name] = _dir, True return results def acquire_dist_pkgs(keyring_packages: list[str], distro: Distro, dist_dir: str) -> tuple[dict[str, tuple[str, bool]], bool]: if not keyring_packages: return {}, False pkgs: dict[str, BinaryPackage] = {} not_found = [] pkg: Optional[BinaryPackage] for name in keyring_packages: pkg = distro.find_package(name) if not pkg: not_found.append(name) continue pkgs[name] = pkg if not_found: raise Exception(f"Keyring packages for {distro.arch} not found: {not_found}") changed = False results = {} for name in pkgs: pkg = pkgs[name] assert PKGNAME_MARKER in pkg.filename comp_ext = pkg.filename.rsplit(PKGNAME_MARKER, 1)[1] filename = f'{name}.tar{comp_ext}' filepath = os.path.join(dist_dir, filename) checksum = None if not os.path.exists(filepath) else sha256sum(filepath) target_path, _changed = pkg.acquire(dist_dir, filename) _changed = _changed and checksum != sha256sum(filepath) results[name] = target_path, _changed if _changed: changed = True logging.debug(f"{target_path} changed") return results, changed def extract_keyring_pkg(pkg_path: str, dest_path: str): logging.debug(f"Extracting {pkg_path} to {dest_path}") extract_files_from_tar_generator( read_files_from_tar_recursive(pkg_path, [PKG_KEYRING_FOLDER]), dest_path, remove_prefix=PKG_KEYRING_FOLDER, ) def get_keyring_path(arch: Arch, distro_type: DistroType, *extra_paths) -> str: return os.path.join(config.get_path('pacman'), KEYRING_DIR, arch, KEYRING_LOCATIONS[distro_type], *extra_paths) def get_keyring_dist_path(base_dir: str, *name) -> str: return os.path.join(base_dir, KEYRING_DIST_DIR, *name) def get_keyring_gpg_path(base_dir: str) -> str: return os.path.join(base_dir, KEYRING_GPG_DIR) def get_pacman_key_binary() -> str: return os.path.join(config.runtime.script_source_dir, 'bin', 'pacman-key-user')