diff --git a/distro/gpg.py b/distro/gpg.py index 4fe15df..c354f59 100644 --- a/distro/gpg.py +++ b/distro/gpg.py @@ -1,16 +1,34 @@ import logging import os -from exec.cmd import run_cmd +from typing import Optional, TypedDict + +from config.state import config +from exec.cmd import run_cmd, CompletedProcess from exec.file import get_temp_dir, makedir, write_file +PKG_KEY_FILE = "package_signing_key.pgp" +REPO_KEY_FILE = "repo_signing_key.pgp" + +GPG_HOME_DIR = "gpghome" + +KUPFER_DEFAULT_NAME = "Kupfer Local Signing" +KUFER_DEFAULT_EMAIL = "local@kupfer.mobi" +KUPFER_DEFAULT_COMMENT = "Generated by kupferbootstrap" + GPG_ARGS = ["--batch", "--no-tty"] +class Fingerprints(TypedDict): + pkg: str + repo: str + + + def get_gpg_creation_script( - key_name: str = "Kupfer Local Signing", - email: str = "local@kupfer.mobi", - comment: str = "Generated by kupferbootstrap", + key_name: str = KUPFER_DEFAULT_NAME, + email: str = KUFER_DEFAULT_EMAIL, + comment: str = KUPFER_DEFAULT_COMMENT, ): return f""" %echo Generating a new ed25519 GPG key for "{key_name} <{email}> # {comment}" @@ -39,12 +57,88 @@ def create_secret_key(location: str, *, gpg_binary: str = "gpg", **creation_args temp_dir = get_temp_dir() script_file = os.path.join(temp_dir, "__gpg_creation_script") write_file(script_file, content=get_gpg_creation_script(**creation_args)) - logging.info(f"Creating new GPG key for {location!r}") - run_cmd([gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--generate-key", script_file]).check_returncode() - res = run_cmd( - [gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--armor", "--export-secret-keys"], capture_output=True - ) + run_cmd([gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--generate-key", script_file], capture_output=True).check_returncode() # type: ignore[union-attr] + res = run_cmd([gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--armor", "--export-secret-keys"], capture_output=True) + assert isinstance(res, CompletedProcess) if not (res.stdout and res.stdout.strip()): raise Exception(f"Failed to get secret GPG key from stdout: {res.stdout=}\n{res.stderr=}") logging.debug(f"Writing GPG private key to {location}") write_file(location, content=res.stdout, mode="600") + + +def import_gpg_key( + key_file: str, + gpgdir: str, + *, + gpg_binary: str = "gpg", +): + res = run_cmd([gpg_binary, "--homedir", gpgdir, *GPG_ARGS, "--import", key_file], capture_output=True) + assert isinstance(res, CompletedProcess) + res.check_returncode() + + +def detect_key_id(location: str, gpg_binary: str = "gpg"): + res = run_cmd([gpg_binary, *GPG_ARGS, "--with-colons", "--show-keys", location], capture_output=True) + assert isinstance(res, CompletedProcess) + if res.returncode: + raise Exception(f"Failed to scan {location} for a gpg key id:\n{res.stdout=}\n\n{res.stderr=}") + text = res.stdout.decode().strip() + for line in text.split("\n"): + if line.startswith("fpr:"): + fp: str = line.rstrip(":").rsplit(":")[-1] + if not fp or not fp.isalnum(): + raise Exception(f"Failed to detect GPG fingerprint fron line {line}") + return fp.strip() + raise Exception(f"GPG Fingerprint line (fpr:) not found in GPG stdout: {text!r}") + + +def ensure_gpg_initialised( + gpg_base_dir: str, + gpg_binary: str = "gpg", + email: str = KUFER_DEFAULT_EMAIL, + gpgdir: Optional[str] = None, +) -> Fingerprints: + repo_key = os.path.join(gpg_base_dir, REPO_KEY_FILE) + pkg_key = os.path.join(gpg_base_dir, PKG_KEY_FILE) + gpgdir = gpgdir or os.path.join(gpg_base_dir, GPG_HOME_DIR) + makedir(gpgdir) + names = {"repo": "Repo Signing", "pkg": "Package Signing"} + fingerprints: Fingerprints = {} # type: ignore[typeddict-item] + for key_type, key_file in {"repo": repo_key, "pkg": pkg_key}.items(): + if not os.path.exists(key_file): + key_name = f"Kupfer Local {names[key_type]}" + logging.info(f"Creating new GPG key for {key_name!r} <{email}> at {key_file!r}") + create_secret_key(key_file, key_name=key_name) + import_gpg_key(key_file, gpg_binary=gpg_binary, gpgdir=gpgdir) + fingerprints[key_type] = detect_key_id(key_file) # type: ignore[literal-required] + pkg_fp = fingerprints["pkg"] + repo_fp = fingerprints["repo"] + logging.debug(f"Ensuring package build GPG key {pkg_fp!r} is signed by repo key {repo_fp}") + res = run_cmd( + [ + gpg_binary, + *GPG_ARGS, + "--yes", + "--homedir", + gpgdir, + "--default-key", + repo_fp, + "--trusted-key", + pkg_fp, + "--sign-key", + pkg_fp, + ], + capture_output=True, + ) + assert isinstance(res, CompletedProcess) + if res.returncode: + raise Exception(f"Failed to sign package GPG key {pkg_fp!r} with repo key {repo_fp!r}:\n{res.stdout=}\n{res.stderr=}") + logging.debug("GPG setup done") + return fingerprints + +def init_keys(*kargs, lazy: bool = True, **kwargs) -> None: + if lazy and config.runtime.gpg_initialized: + return + fps = ensure_gpg_initialised(*kargs, **kwargs) + config.runtime.gpg_pkg_key = fps["pkg"] + config.runtime.gpg_repo_key = fps["repo"]