parchbootstrap/distro/keyring.py

193 lines
6.5 KiB
Python

import logging
import os
from enum import auto, Enum
from typing import Optional
from config.state import config
from constants import Arch, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY
from distro.repo_config import get_repo_config
from exec.cmd import CompletedProcess, run_cmd
from exec.file import makedir, remove_file
from utils import extract_files_from_tar_generator, read_files_from_tar_recursive, sha256sum
from .distro import Distro, get_base_distro, get_kupfer_local, get_kupfer_https
from .package import BinaryPackage
KEYRING_DIR = 'keyrings'
KEYRING_DIST_DIR = 'dist'
KEYRING_GPG_DIR = 'keyring'
PKGNAME_MARKER = '.pkg.tar'
PKG_KEYRING_FOLDER = 'usr/share/pacman/keyrings/'
class DistroType(Enum):
BASE = auto()
LOCAL = auto()
REMOTE = auto()
KEYRING_LOCATIONS: dict[DistroType, str] = {
DistroType.BASE: 'base',
DistroType.LOCAL: 'local',
DistroType.REMOTE: 'kupfer',
}
keyring_created: dict[tuple[Arch, DistroType], bool] = {}
def keyring_is_created(arch: Arch, distro_type: DistroType) -> bool:
return keyring_created.get((arch, distro_type), False)
def init_keyring_dir(
arch: Arch,
distro_type: DistroType,
target_path: Optional[str] = None,
lazy: bool = True,
) -> dict[str, bool]:
base_dir = target_path or get_keyring_path(arch, distro_type)
keyring_dists = init_keyring_dist_dir(arch, distro_type, base_dir, lazy)
gpg_changed = init_keyring_gpg_dir(arch, distro_type, keyring_dists, base_dir, lazy)
keyring_created[(arch, distro_type)] = True
return gpg_changed
def init_keyring_gpg_dir(
arch: Arch,
distro_type: DistroType,
keyring_dists: dict[str, tuple[str, bool]],
base_dir: Optional[str] = None,
lazy: bool = True,
) -> dict[str, bool]:
base_dir = base_dir or get_keyring_path(arch, distro_type)
gpg_dir = get_keyring_gpg_path(base_dir)
exists = os.path.exists(os.path.join(gpg_dir, 'trustdb.gpg'))
if exists and not lazy:
remove_file(gpg_dir)
exists = False
lazy = lazy and exists
if not lazy:
run_cmd(['pacman-key', '--init', '--gpgdir', gpg_dir])
results = {}
for name, val in keyring_dists.items():
dist_dir, dist_changed = val
if lazy and not dist_changed:
results[name] = False
continue
logging.info(f"Importing dir {dist_dir} into {gpg_dir}")
import_dist_keyring(gpg_dir, dist_dir)
results[name] = True
return results
def import_dist_keyring(
gpg_dir: str,
dist_dir: str,
) -> CompletedProcess:
assert gpg_dir and dist_dir and config.runtime.script_source_dir
r = run_cmd(['pacman-key', '--populate-from', dist_dir, '--populate', '--gpgdir', gpg_dir])
assert isinstance(r, CompletedProcess)
return r
def init_keyring_dist_dir(
arch: Arch,
distro_type: DistroType,
base_dir: Optional[str] = None,
lazy: bool = True,
) -> dict[str, tuple[str, bool]]:
"""
create keyrings/{arch}/dist. Returns a boolean indicating whether changes were made
"""
repo_config = get_repo_config()[0]
base_dir = base_dir or get_keyring_path(arch, distro_type)
dist_dir = get_keyring_dist_path(base_dir)
pkg_names: list[str] = []
distro: Distro
if distro_type == DistroType.BASE:
pkg_names = repo_config.base_distros.get(arch, {}).get(KEYRINGS_KEY, None) or [] # type: ignore[call-overload]
distro = get_base_distro(arch, scan=False)
elif distro_type == DistroType.LOCAL:
pkg_name = repo_config.get(KEYRINGS_LOCAL_KEY, None)
pkg_names = [pkg_name] if pkg_name else []
distro = get_kupfer_local(arch, scan=False, in_chroot=False)
elif distro_type == DistroType.REMOTE:
pkg_names = repo_config.get(KEYRINGS_KEY, None) or []
distro = get_kupfer_https(arch, scan=False)
logging.debug(f"Acquiring keyrings from {distro}: {pkg_names}")
dist_pkgs, changed = acquire_dist_pkgs(pkg_names, distro, dist_dir)
#if lazy and dist_pkgs and not changed and os.path.exists(dist_dir): # and keyring_is_created(arch, distro_type):
# return {name: (get_keyring_dist_path(base_dir, name), False) for name, val in dist_pkgs.items()}
makedir(dist_dir)
dist_dirs = []
results = {}
for name, _val in dist_pkgs.items():
dist_pkg, changed = _val
_dir = os.path.join(dist_dir, name)
results[name] = _dir, False
if lazy and not changed and os.path.exists(_dir):
logging.debug(f"Skipping extracting keyring pkg for {name}: dir exists and file unchanged")
continue
extract_keyring_pkg(dist_pkg, _dir)
dist_dirs.append(_dir)
results[name] = _dir, True
return results
def acquire_dist_pkgs(keyring_packages: list[str], distro: Distro, dist_dir: str) -> tuple[dict[str, tuple[str, bool]], bool]:
if not keyring_packages:
return {}, False
pkgs: dict[str, BinaryPackage] = {}
not_found = []
pkg: Optional[BinaryPackage]
for name in keyring_packages:
pkg = distro.find_package(name)
if not pkg:
not_found.append(name)
continue
pkgs[name] = pkg
if not_found:
raise Exception(f"Keyring packages for {distro.arch} not found: {not_found}")
changed = False
results = {}
for name in pkgs:
pkg = pkgs[name]
assert PKGNAME_MARKER in pkg.filename
comp_ext = pkg.filename.rsplit(PKGNAME_MARKER, 1)[1]
filename = f'{name}.tar{comp_ext}'
filepath = os.path.join(dist_dir, filename)
checksum = None if not os.path.exists(filepath) else sha256sum(filepath)
target_path, _changed = pkg.acquire(dist_dir, filename)
_changed = _changed and checksum != sha256sum(filepath)
results[name] = target_path, _changed
if _changed:
changed = True
logging.debug(f"{target_path} changed")
return results, changed
def extract_keyring_pkg(pkg_path: str, dest_path: str):
logging.debug(f"Extracting {pkg_path} to {dest_path}")
extract_files_from_tar_generator(
read_files_from_tar_recursive(pkg_path, [PKG_KEYRING_FOLDER]),
dest_path,
remove_prefix=PKG_KEYRING_FOLDER,
)
def get_keyring_path(arch: Arch, distro_type: DistroType, *extra_paths) -> str:
return os.path.join(config.get_path('pacman'), KEYRING_DIR, arch, KEYRING_LOCATIONS[distro_type], *extra_paths)
def get_keyring_dist_path(base_dir: str, *name) -> str:
return os.path.join(base_dir, KEYRING_DIST_DIR, *name)
def get_keyring_gpg_path(base_dir: str) -> str:
return os.path.join(base_dir, KEYRING_GPG_DIR)