WIP: keyring init done(?)

This commit is contained in:
InsanePrawn 2023-04-17 17:29:20 +02:00
parent e068b3587e
commit 38edce080f
3 changed files with 197 additions and 2 deletions

View file

@ -57,6 +57,11 @@ DistroArch: TypeAlias = Arch
TargetArch: TypeAlias = Arch TargetArch: TypeAlias = Arch
KEYRINGS_KEY = 'keyrings' KEYRINGS_KEY = 'keyrings'
KEYRINGS_LOCAL_KEY = 'local_keyring'
KEYRING_REMOTE_NAME = "kupfer-keyring"
KEYRINGS_LOCAL_NAME = KEYRING_REMOTE_NAME + '-local'
ALARM_REPOS = { ALARM_REPOS = {
'core': 'http://mirror.archlinuxarm.org/$arch/$repo', 'core': 'http://mirror.archlinuxarm.org/$arch/$repo',

188
distro/keyring.py Normal file
View file

@ -0,0 +1,188 @@
import logging
import os
from enum import auto, Enum
from typing import Optional
from config.state import config
from constants import Arch, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY
from exec.cmd import CompletedProcess, run_cmd
from exec.file import makedir, remove_file
from repo_config import get_repo_config
from utils import extract_files_from_tar_generator, read_files_from_tar_recursive
from .distro import Distro, get_base_distro, get_kupfer_local, get_kupfer_https
from .package import BinaryPackage
KEYRING_DIR = 'keyrings'
KEYRING_DIST_DIR = 'dist'
KEYRING_GPG_DIR = 'keyring'
PKGNAME_MARKER = '.pkg.tar'
PKG_KEYRING_FOLDER = 'usr/share/pacman/keyrings/'
class DistroType(Enum):
BASE = auto
LOCAL = auto
REMOTE = auto
KEYRING_LOCATIONS: dict[DistroType, str] = {
DistroType.BASE: 'base',
DistroType.LOCAL: 'local',
DistroType.REMOTE: 'kupfer',
}
keyring_created: dict[tuple[Arch, DistroType], bool] = {}
def keyring_is_created(arch: Arch, distro_type: DistroType) -> bool:
return keyring_created.get((arch, distro_type), False)
def init_keyring_dir(
arch: Arch,
distro_type: DistroType,
target_path: Optional[str] = None,
lazy: bool = True,
) -> dict[str, bool]:
base_dir = target_path or get_keyring_path(arch, distro_type)
keyring_dists = init_keyring_dist_dir(arch, distro_type, base_dir, lazy)
gpg_changed = init_keyring_gpg_dir(arch, distro_type, keyring_dists, base_dir, lazy)
keyring_created[(arch, distro_type)] = True
return gpg_changed
def init_keyring_gpg_dir(
arch: Arch,
distro_type: DistroType,
keyring_dists: dict[str, tuple[str, bool]],
base_dir: Optional[str] = None,
lazy: bool = True,
) -> dict[str, bool]:
base_dir = base_dir or get_keyring_path(arch, distro_type)
gpg_dir = get_keyring_gpg_path(base_dir)
exists = os.path.exists(gpg_dir)
if exists and not lazy:
remove_file(gpg_dir)
exists = False
lazy = lazy and exists
makedir(gpg_dir)
results = {}
for name, val in keyring_dists.items():
dist_dir, dist_changed = val
if lazy and not dist_changed:
results[name] = False
continue
import_dist_keyring(gpg_dir, dist_dir)
results[name] = True
return results
def import_dist_keyring(
gpg_dir: str,
dist_dir: str,
) -> CompletedProcess:
assert gpg_dir and dist_dir and config.runtime.script_source_dir
pacman_key = os.path.join(config.runtime.script_source_dir, 'bin', 'pacman-key-user')
r = run_cmd([pacman_key, '--populate-from', dist_dir, '--populate', '--gpgdir', gpg_dir])
assert isinstance(r, CompletedProcess)
return r
def init_keyring_dist_dir(
arch: Arch,
distro_type: DistroType,
base_dir: Optional[str] = None,
lazy: bool = True,
) -> dict[str, tuple[str, bool]]:
"""
create keyrings/{arch}/dist. Returns a boolean indicating whether changes were made
"""
repo_config = get_repo_config()[0]
base_dir = base_dir or get_keyring_path(arch, distro_type)
dist_dir = get_keyring_dist_path(base_dir)
pkg_names: list[str] = []
distro: Distro
if distro_type == DistroType.BASE:
pkg_names = repo_config.base_distros.get(arch, {}).get(KEYRINGS_KEY, None) or []
distro = get_base_distro(arch, scan=False)
elif distro_type == DistroType.LOCAL:
pkg_name = repo_config.get(KEYRINGS_LOCAL_KEY, None)
pkg_names = [pkg_name] if pkg_name else []
distro = get_kupfer_local(arch, scan=False, in_chroot=False)
elif distro_type == DistroType.REMOTE:
pkg_names = repo_config.get(KEYRINGS_KEY, None) or []
distro = get_kupfer_https(arch, scan=False)
dist_pkgs, changed = acquire_dist_pkgs(pkg_names, distro, base_dir)
if lazy and dist_pkgs and not changed and os.path.exists(dist_dir): # and keyring_is_created(arch, distro_type):
return {name: (val[0], False) for name, val in dist_pkgs.items()}
makedir(dist_dir)
dist_dirs = []
results = {}
for name, _val in dist_pkgs.items():
dist_pkg, changed = _val
_dir = os.path.join(dist_dir, name)
results[name] = _dir, False
if lazy and not changed and os.path.exists(_dir):
continue
extract_keyring_pkg(dist_pkg, _dir)
dist_dirs.append(_dir)
results[name] = dist_pkg, True
return results
def acquire_dist_pkgs(keyring_packages: list[str], distro: Distro, dist_dir: str) -> tuple[dict[str, tuple[str, bool]], bool]:
if not keyring_packages:
return {}, False
pkgs = {}
not_found = []
distro.scan(lazy=True)
repos: dict[str, BinaryPackage] = distro.get_packages()
pkg: BinaryPackage
for name in keyring_packages:
if name not in repos:
not_found.append(name)
continue
pkg = repos[name]
pkgs[name] = pkg
if not_found:
raise Exception(f"Keyring packages for {distro.arch} not found: {not_found}")
changed = False
results = {}
for name in pkgs:
assert isinstance(pkgs[name], BinaryPackage)
pkg = pkgs[name]
assert PKGNAME_MARKER in pkg.filename
comp_ext = pkg.filename.rsplit(PKGNAME_MARKER, 1)[1]
target_path, _changed = pkg.acquire(dist_dir, f'{name}.tar{comp_ext}')
results[name] = target_path, _changed
if _changed:
logging.debug(f"{target_path} changed")
changed = True
return results, changed
def extract_keyring_pkg(pkg_path: str, dest_path: str):
extract_files_from_tar_generator(
read_files_from_tar_recursive(pkg_path, PKG_KEYRING_FOLDER),
dest_path,
remove_prefix=PKG_KEYRING_FOLDER,
)
def get_keyring_path(arch: Arch, distro_type: DistroType, *extra_paths) -> str:
return os.path.join(config.get_path('pacman'), KEYRING_DIR, arch, KEYRING_LOCATIONS[distro_type], *extra_paths)
def get_keyring_dist_path(base_dir: str) -> str:
return os.path.join(base_dir, KEYRING_DIST_DIR)
def get_keyring_gpg_path(base_dir: str) -> str:
return os.path.join(base_dir, KEYRING_GPG_DIR)

View file

@ -9,7 +9,7 @@ from copy import deepcopy
from typing import ClassVar, Optional, Mapping, Union from typing import ClassVar, Optional, Mapping, Union
from config.state import config from config.state import config
from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, KEYRINGS_KEY, REPOS_CONFIG_FILE, REPOSITORIES from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY, KEYRINGS_LOCAL_NAME, KEYRING_REMOTE_NAME, REPOS_CONFIG_FILE, REPOSITORIES
from dictscheme import DictScheme, toml_inline_dicts, TomlPreserveInlineDictEncoder from dictscheme import DictScheme, toml_inline_dicts, TomlPreserveInlineDictEncoder
from utils import sha256sum from utils import sha256sum
@ -46,6 +46,7 @@ class BaseDistro(DictScheme):
class ReposConfigFile(DictScheme): class ReposConfigFile(DictScheme):
remote_url: Optional[str] remote_url: Optional[str]
keyrings: Optional[list[str]] keyrings: Optional[list[str]]
local_keyring: Optional[str]
repos: dict[str, RepoConfig] repos: dict[str, RepoConfig]
base_distros: dict[Arch, BaseDistro] base_distros: dict[Arch, BaseDistro]
_path: Optional[str] _path: Optional[str]
@ -108,7 +109,8 @@ REPOS_CONFIG_DEFAULT = ReposConfigFile({
'_path': '__DEFAULTS__', '_path': '__DEFAULTS__',
'_checksum': None, '_checksum': None,
REMOTEURL_KEY: KUPFER_HTTPS, REMOTEURL_KEY: KUPFER_HTTPS,
KEYRINGS_KEY: [], KEYRINGS_KEY: [KEYRING_REMOTE_NAME],
KEYRINGS_LOCAL_KEY: KEYRINGS_LOCAL_NAME,
REPOS_KEY: { REPOS_KEY: {
'kupfer_local': REPO_DEFAULTS | { 'kupfer_local': REPO_DEFAULTS | {
LOCALONLY_KEY: True LOCALONLY_KEY: True