Compare commits
20 commits
dev
...
prawn/keyr
Author | SHA1 | Date | |
---|---|---|---|
|
aaf94de0ac | ||
|
07436a0ad2 | ||
|
871c4f27c7 | ||
|
166a8620a7 | ||
|
6f09fe4403 | ||
|
4a48e78ec0 | ||
|
3034afe5a8 | ||
|
e79859b0a0 | ||
|
3e957254f5 | ||
|
d2e0fad436 | ||
|
38b23de9ad | ||
|
c576dc8a51 | ||
|
30c3fa77fd | ||
|
a982f8c966 | ||
|
38edce080f | ||
|
e068b3587e | ||
|
0c56038ed6 | ||
|
d527769473 | ||
|
ba5aa209dd | ||
|
7666b91efc |
16 changed files with 563 additions and 43 deletions
|
@ -248,7 +248,7 @@ class Chroot(AbstractChroot):
|
|||
inner_cmd = generate_cmd_su(script, switch_user=switch_user, elevation_method='none', force_su=True)
|
||||
else:
|
||||
inner_cmd = wrap_in_bash(script, flatten_result=False)
|
||||
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
|
||||
cmd = flatten_shell_script(["unshare", "--fork", "--pid", 'chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
|
||||
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import atexit
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
@ -7,6 +8,7 @@ from typing import ClassVar, Optional
|
|||
from config.state import config
|
||||
from constants import Arch, GCC_HOSTSPECS, CROSSDIRECT_PKGS, CHROOT_PATHS
|
||||
from distro.distro import get_kupfer_local
|
||||
from distro.gpg import GPG_HOME_DIR
|
||||
from exec.cmd import run_root_cmd
|
||||
from exec.file import makedir, remove_file, root_makedir, root_write_file, symlink
|
||||
|
||||
|
@ -159,6 +161,25 @@ class BuildChroot(Chroot):
|
|||
))
|
||||
return results
|
||||
|
||||
def mount_gpg(self, fail_if_mounted: bool = False, schedule_gpg_kill: bool = True) -> str:
|
||||
res = self.mount(
|
||||
absolute_source=config.get_path('gpg'),
|
||||
relative_destination=CHROOT_PATHS['gpg'].lstrip('/'),
|
||||
fail_if_mounted=fail_if_mounted,
|
||||
)
|
||||
if schedule_gpg_kill:
|
||||
atexit.register(self.kill_gpg_agent)
|
||||
return res
|
||||
|
||||
def get_gpg_home(self, host_path: bool = False) -> str:
|
||||
gpg_home = os.path.join(CHROOT_PATHS['gpg']. GPG_HOME_DIR)
|
||||
if host_path:
|
||||
gpg_home = self.get_path(gpg_home)
|
||||
return gpg_home
|
||||
|
||||
def kill_gpg_agent(self) -> subprocess.CompletedProcess:
|
||||
res = self.run_cmd(["timeout", "2s", "gpgconf", "--kill", "gpg-agent"], inner_env={"GNUPGHOME": self.get_gpg_home()})
|
||||
logging.debug(f"GPG agent killed: {res.returncode=}, {res.stdout=}, {res.stderr}")
|
||||
|
||||
def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> BuildChroot:
|
||||
name = build_chroot_name(arch)
|
||||
|
|
|
@ -44,6 +44,8 @@ class BuildSection(DictScheme):
|
|||
crosscompile: bool
|
||||
crossdirect: bool
|
||||
threads: int
|
||||
sign_pkgs: bool
|
||||
sign_repos: bool
|
||||
|
||||
|
||||
class PkgbuildsSection(DictScheme):
|
||||
|
@ -67,6 +69,7 @@ class PathsSection(DictScheme):
|
|||
images: str
|
||||
ccache: str
|
||||
rust: str
|
||||
gpg: str
|
||||
|
||||
|
||||
class ProfilesSection(DictScheme):
|
||||
|
@ -140,6 +143,9 @@ class RuntimeConfiguration(DictScheme):
|
|||
uid: Optional[int]
|
||||
progress_bars: Optional[bool]
|
||||
colors: Optional[bool]
|
||||
gpg_initialized: bool
|
||||
gpg_pkg_key: Optional[str]
|
||||
gpg_repo_key: Optional[str]
|
||||
|
||||
|
||||
class ConfigLoadState(DictScheme):
|
||||
|
|
|
@ -24,6 +24,8 @@ CONFIG_DEFAULTS_DICT = {
|
|||
'crosscompile': True,
|
||||
'crossdirect': True,
|
||||
'threads': 0,
|
||||
'sign_pkgs': True,
|
||||
'sign_repos': False,
|
||||
},
|
||||
'pkgbuilds': {
|
||||
'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git',
|
||||
|
@ -44,6 +46,7 @@ CONFIG_DEFAULTS_DICT = {
|
|||
'images': os.path.join('%cache_dir%', 'images'),
|
||||
'ccache': os.path.join('%cache_dir%', 'ccache'),
|
||||
'rust': os.path.join('%cache_dir%', 'rust'),
|
||||
'gpg': os.path.join('%cache_dir%', 'gpg'),
|
||||
},
|
||||
'profiles': {
|
||||
'current': 'default',
|
||||
|
@ -63,6 +66,9 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
|
|||
'uid': None,
|
||||
'progress_bars': None,
|
||||
'colors': None,
|
||||
'gpg_initialized': False,
|
||||
'gpg_pkg_key': None,
|
||||
'gpg_repo_key': None,
|
||||
})
|
||||
|
||||
|
||||
|
|
25
constants.py
25
constants.py
|
@ -1,4 +1,4 @@
|
|||
from typehelpers import TypeAlias
|
||||
from typehelpers import TypeAlias, Union
|
||||
|
||||
FASTBOOT = 'fastboot'
|
||||
FLASH_PARTS = {
|
||||
|
@ -56,6 +56,12 @@ ARCHES = [
|
|||
DistroArch: TypeAlias = Arch
|
||||
TargetArch: TypeAlias = Arch
|
||||
|
||||
KEYRINGS_KEY = 'keyrings'
|
||||
KEYRINGS_LOCAL_KEY = 'local_keyring'
|
||||
|
||||
KEYRING_REMOTE_NAME = "kupfer-keyring"
|
||||
KEYRINGS_LOCAL_NAME = KEYRING_REMOTE_NAME + '-local'
|
||||
|
||||
ALARM_REPOS = {
|
||||
'core': 'http://mirror.archlinuxarm.org/$arch/$repo',
|
||||
'extra': 'http://mirror.archlinuxarm.org/$arch/$repo',
|
||||
|
@ -64,20 +70,22 @@ ALARM_REPOS = {
|
|||
'aur': 'http://mirror.archlinuxarm.org/$arch/$repo',
|
||||
}
|
||||
|
||||
BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = {
|
||||
ALARM_DISTRO: dict[str, Union[dict[str, str], list[str]]] = {
|
||||
'repos': ALARM_REPOS,
|
||||
KEYRINGS_KEY: ['archlinuxarm-keyring'],
|
||||
}
|
||||
|
||||
BASE_DISTROS: dict[DistroArch, dict[str, Union[dict[str, str], list[str]]]] = {
|
||||
'x86_64': {
|
||||
'repos': {
|
||||
'core': 'https://geo.mirror.pkgbuild.com/$repo/os/$arch',
|
||||
'extra': 'https://geo.mirror.pkgbuild.com/$repo/os/$arch',
|
||||
'community': 'https://geo.mirror.pkgbuild.com/$repo/os/$arch',
|
||||
},
|
||||
KEYRINGS_KEY: ['archlinux-keyring'],
|
||||
},
|
||||
'aarch64': {
|
||||
'repos': ALARM_REPOS,
|
||||
},
|
||||
'armv7h': {
|
||||
'repos': ALARM_REPOS,
|
||||
},
|
||||
'aarch64': ALARM_DISTRO.copy(),
|
||||
'armv7h': ALARM_DISTRO.copy(),
|
||||
}
|
||||
|
||||
COMPILE_ARCHES: dict[Arch, str] = {
|
||||
|
@ -148,6 +156,7 @@ CHROOT_PATHS = {
|
|||
'packages': '/packages',
|
||||
'pkgbuilds': '/pkgbuilds',
|
||||
'images': '/images',
|
||||
'gpg': '/gpg',
|
||||
}
|
||||
|
||||
WRAPPER_TYPES = [
|
||||
|
|
|
@ -80,7 +80,7 @@ class Device(DictScheme):
|
|||
if self.package.name not in pkgs:
|
||||
raise Exception(f"device package {self.package.name} somehow not in repos, this is a kupferbootstrap bug")
|
||||
pkg = pkgs[self.package.name]
|
||||
file_path = pkg.acquire()
|
||||
file_path, _ = pkg.acquire()
|
||||
assert file_path
|
||||
assert os.path.exists(file_path)
|
||||
deviceinfo_path = 'etc/kupfer/deviceinfo'
|
||||
|
|
|
@ -8,7 +8,7 @@ from generator import generate_pacman_conf_body
|
|||
from config.state import config
|
||||
|
||||
from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo
|
||||
from .repo_config import AbstrRepoConfig, BaseDistro, ReposConfigFile, REPOS_CONFIG_DEFAULT, get_repo_config as _get_repo_config
|
||||
from .repo_config import AbstrRepoConfig, BaseDistro, REMOTEURL_KEY, ReposConfigFile, REPOS_CONFIG_DEFAULT, get_repo_config as _get_repo_config
|
||||
|
||||
|
||||
class DistroLocation(IntFlag):
|
||||
|
@ -49,6 +49,15 @@ class Distro(Generic[RepoType]):
|
|||
results.update(repo.packages)
|
||||
return results
|
||||
|
||||
def find_package(self, name: str) -> Optional[BinaryPackageType]:
|
||||
for repo in self.repos.values():
|
||||
if not repo.scanned:
|
||||
repo.scan()
|
||||
p = repo.packages.get(name, None)
|
||||
if p:
|
||||
return p
|
||||
return None
|
||||
|
||||
def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str:
|
||||
extras: list[Repo] = [
|
||||
Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()
|
||||
|
@ -138,7 +147,7 @@ def get_kupfer_repo_names(local) -> list[str]:
|
|||
|
||||
|
||||
def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo:
|
||||
url = repo_config.remote_url or default_url
|
||||
url = repo_config.get(REMOTEURL_KEY, None) or default_url
|
||||
if isinstance(url, dict):
|
||||
if arch not in url and not default_url:
|
||||
raise Exception(f"Invalid repo config: Architecture {arch} not in remote_url mapping: {url}")
|
||||
|
@ -161,7 +170,7 @@ def get_base_distro(arch: Arch, scan: bool = False, unsigned: bool = True, cache
|
|||
for repo, repo_config in distro_config.repos.items():
|
||||
if unsigned:
|
||||
repo_config['options'] = (repo_config.get('options', None) or {}) | {'SigLevel': 'Never'}
|
||||
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro_config.remote_url)
|
||||
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro_config.get(REMOTEURL_KEY, None))
|
||||
|
||||
distro = RemoteDistro(arch=arch, repo_infos=repos, scan=False)
|
||||
if cache_db:
|
||||
|
@ -187,7 +196,7 @@ def get_kupfer_distro(
|
|||
if location == DistroLocation.REMOTE:
|
||||
remote = True
|
||||
cache = _kupfer_https
|
||||
default_url = repo_config.remote_url or KUPFER_HTTPS
|
||||
default_url = repo_config.get(REMOTEURL_KEY, None) or KUPFER_HTTPS
|
||||
repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only}
|
||||
cls = RemoteDistro
|
||||
elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]:
|
||||
|
|
144
distro/gpg.py
Normal file
144
distro/gpg.py
Normal file
|
@ -0,0 +1,144 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
from typing import Optional, TypedDict
|
||||
|
||||
from config.state import config
|
||||
from exec.cmd import run_cmd, CompletedProcess
|
||||
from exec.file import get_temp_dir, makedir, write_file
|
||||
|
||||
PKG_KEY_FILE = "package_signing_key.pgp"
|
||||
REPO_KEY_FILE = "repo_signing_key.pgp"
|
||||
|
||||
GPG_HOME_DIR = "gpghome"
|
||||
|
||||
KUPFER_DEFAULT_NAME = "Kupfer Local Signing"
|
||||
KUFER_DEFAULT_EMAIL = "local@kupfer.mobi"
|
||||
KUPFER_DEFAULT_COMMENT = "Generated by kupferbootstrap"
|
||||
|
||||
GPG_ARGS = ["--batch", "--no-tty"]
|
||||
|
||||
|
||||
class Fingerprints(TypedDict):
|
||||
pkg: str
|
||||
repo: str
|
||||
|
||||
|
||||
|
||||
def get_gpg_creation_script(
|
||||
key_name: str = KUPFER_DEFAULT_NAME,
|
||||
email: str = KUFER_DEFAULT_EMAIL,
|
||||
comment: str = KUPFER_DEFAULT_COMMENT,
|
||||
):
|
||||
return f"""
|
||||
%echo Generating a new ed25519 GPG key for "{key_name} <{email}> # {comment}"
|
||||
|
||||
%no-protection
|
||||
|
||||
Key-Type: eddsa
|
||||
Key-Curve: Ed25519
|
||||
Key-Usage: cert,sign
|
||||
Subkey-Type: ecdh
|
||||
Subkey-Curve: Curve25519
|
||||
Subkey-Usage: encrypt
|
||||
|
||||
Name-Real: {key_name}
|
||||
Name-Comment: {comment}
|
||||
Name-Email: {email}
|
||||
Expire-Date: 0
|
||||
# Do a commit here, so that we can later print "done"
|
||||
%commit
|
||||
%echo done
|
||||
"""
|
||||
|
||||
|
||||
def create_secret_key(location: str, *, gpg_binary: str = "gpg", **creation_args):
|
||||
makedir(os.path.dirname(location))
|
||||
temp_dir = get_temp_dir()
|
||||
script_file = os.path.join(temp_dir, "__gpg_creation_script")
|
||||
write_file(script_file, content=get_gpg_creation_script(**creation_args))
|
||||
run_cmd([gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--generate-key", script_file], capture_output=True).check_returncode() # type: ignore[union-attr]
|
||||
res = run_cmd([gpg_binary, *GPG_ARGS, "--homedir", temp_dir, "--armor", "--export-secret-keys"], capture_output=True)
|
||||
assert isinstance(res, CompletedProcess)
|
||||
if not (res.stdout and res.stdout.strip()):
|
||||
raise Exception(f"Failed to get secret GPG key from stdout: {res.stdout=}\n{res.stderr=}")
|
||||
logging.debug(f"Writing GPG private key to {location}")
|
||||
write_file(location, content=res.stdout, mode="600")
|
||||
|
||||
|
||||
def import_gpg_key(
|
||||
key_file: str,
|
||||
gpgdir: str,
|
||||
*,
|
||||
gpg_binary: str = "gpg",
|
||||
):
|
||||
res = run_cmd([gpg_binary, "--homedir", gpgdir, *GPG_ARGS, "--import", key_file], capture_output=True)
|
||||
assert isinstance(res, CompletedProcess)
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
def detect_key_id(location: str, gpg_binary: str = "gpg"):
|
||||
res = run_cmd([gpg_binary, *GPG_ARGS, "--with-colons", "--show-keys", location], capture_output=True)
|
||||
assert isinstance(res, CompletedProcess)
|
||||
if res.returncode:
|
||||
raise Exception(f"Failed to scan {location} for a gpg key id:\n{res.stdout=}\n\n{res.stderr=}")
|
||||
text = res.stdout.decode().strip()
|
||||
for line in text.split("\n"):
|
||||
if line.startswith("fpr:"):
|
||||
fp: str = line.rstrip(":").rsplit(":")[-1]
|
||||
if not fp or not fp.isalnum():
|
||||
raise Exception(f"Failed to detect GPG fingerprint fron line {line}")
|
||||
return fp.strip()
|
||||
raise Exception(f"GPG Fingerprint line (fpr:) not found in GPG stdout: {text!r}")
|
||||
|
||||
|
||||
def ensure_gpg_initialised(
|
||||
gpg_base_dir: str,
|
||||
gpg_binary: str = "gpg",
|
||||
email: str = KUFER_DEFAULT_EMAIL,
|
||||
gpgdir: Optional[str] = None,
|
||||
) -> Fingerprints:
|
||||
repo_key = os.path.join(gpg_base_dir, REPO_KEY_FILE)
|
||||
pkg_key = os.path.join(gpg_base_dir, PKG_KEY_FILE)
|
||||
gpgdir = gpgdir or os.path.join(gpg_base_dir, GPG_HOME_DIR)
|
||||
makedir(gpgdir)
|
||||
names = {"repo": "Repo Signing", "pkg": "Package Signing"}
|
||||
fingerprints: Fingerprints = {} # type: ignore[typeddict-item]
|
||||
for key_type, key_file in {"repo": repo_key, "pkg": pkg_key}.items():
|
||||
if not os.path.exists(key_file):
|
||||
key_name = f"Kupfer Local {names[key_type]}"
|
||||
logging.info(f"Creating new GPG key for {key_name!r} <{email}> at {key_file!r}")
|
||||
create_secret_key(key_file, key_name=key_name)
|
||||
import_gpg_key(key_file, gpg_binary=gpg_binary, gpgdir=gpgdir)
|
||||
fingerprints[key_type] = detect_key_id(key_file) # type: ignore[literal-required]
|
||||
pkg_fp = fingerprints["pkg"]
|
||||
repo_fp = fingerprints["repo"]
|
||||
logging.debug(f"Ensuring package build GPG key {pkg_fp!r} is signed by repo key {repo_fp}")
|
||||
res = run_cmd(
|
||||
[
|
||||
gpg_binary,
|
||||
*GPG_ARGS,
|
||||
"--yes",
|
||||
"--homedir",
|
||||
gpgdir,
|
||||
"--default-key",
|
||||
repo_fp,
|
||||
"--trusted-key",
|
||||
pkg_fp,
|
||||
"--sign-key",
|
||||
pkg_fp,
|
||||
],
|
||||
capture_output=True,
|
||||
)
|
||||
assert isinstance(res, CompletedProcess)
|
||||
if res.returncode:
|
||||
raise Exception(f"Failed to sign package GPG key {pkg_fp!r} with repo key {repo_fp!r}:\n{res.stdout=}\n{res.stderr=}")
|
||||
logging.debug("GPG setup done")
|
||||
return fingerprints
|
||||
|
||||
def init_keys(*kargs, lazy: bool = True, **kwargs) -> None:
|
||||
if lazy and config.runtime.gpg_initialized:
|
||||
return
|
||||
fps = ensure_gpg_initialised(*kargs, **kwargs)
|
||||
config.runtime.gpg_pkg_key = fps["pkg"]
|
||||
config.runtime.gpg_repo_key = fps["repo"]
|
193
distro/keyring.py
Normal file
193
distro/keyring.py
Normal file
|
@ -0,0 +1,193 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
from enum import auto, Enum
|
||||
from typing import Optional
|
||||
|
||||
from config.state import config
|
||||
from constants import Arch, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY
|
||||
from distro.repo_config import get_repo_config
|
||||
from exec.cmd import CompletedProcess, run_cmd
|
||||
from exec.file import makedir, remove_file
|
||||
from utils import extract_files_from_tar_generator, read_files_from_tar_recursive, sha256sum
|
||||
|
||||
from .distro import Distro, get_base_distro, get_kupfer_local, get_kupfer_https
|
||||
from .package import BinaryPackage
|
||||
|
||||
KEYRING_DIR = 'keyrings'
|
||||
KEYRING_DIST_DIR = 'dist'
|
||||
KEYRING_GPG_DIR = 'keyring'
|
||||
|
||||
PKGNAME_MARKER = '.pkg.tar'
|
||||
|
||||
PKG_KEYRING_FOLDER = 'usr/share/pacman/keyrings/'
|
||||
|
||||
|
||||
class DistroType(Enum):
|
||||
BASE = auto()
|
||||
LOCAL = auto()
|
||||
REMOTE = auto()
|
||||
|
||||
|
||||
KEYRING_LOCATIONS: dict[DistroType, str] = {
|
||||
DistroType.BASE: 'base',
|
||||
DistroType.LOCAL: 'local',
|
||||
DistroType.REMOTE: 'kupfer',
|
||||
}
|
||||
|
||||
keyring_created: dict[tuple[Arch, DistroType], bool] = {}
|
||||
|
||||
|
||||
def keyring_is_created(arch: Arch, distro_type: DistroType) -> bool:
|
||||
return keyring_created.get((arch, distro_type), False)
|
||||
|
||||
|
||||
def init_keyring_dir(
|
||||
arch: Arch,
|
||||
distro_type: DistroType,
|
||||
target_path: Optional[str] = None,
|
||||
lazy: bool = True,
|
||||
) -> dict[str, bool]:
|
||||
base_dir = target_path or get_keyring_path(arch, distro_type)
|
||||
keyring_dists = init_keyring_dist_dir(arch, distro_type, base_dir, lazy)
|
||||
gpg_changed = init_keyring_gpg_dir(arch, distro_type, keyring_dists, base_dir, lazy)
|
||||
keyring_created[(arch, distro_type)] = True
|
||||
return gpg_changed
|
||||
|
||||
|
||||
def init_keyring_gpg_dir(
|
||||
arch: Arch,
|
||||
distro_type: DistroType,
|
||||
keyring_dists: dict[str, tuple[str, bool]],
|
||||
base_dir: Optional[str] = None,
|
||||
lazy: bool = True,
|
||||
) -> dict[str, bool]:
|
||||
base_dir = base_dir or get_keyring_path(arch, distro_type)
|
||||
gpg_dir = get_keyring_gpg_path(base_dir)
|
||||
exists = os.path.exists(os.path.join(gpg_dir, 'trustdb.gpg'))
|
||||
if exists and not lazy:
|
||||
remove_file(gpg_dir)
|
||||
exists = False
|
||||
lazy = lazy and exists
|
||||
if not lazy:
|
||||
run_cmd(['pacman-key', '--init', '--gpgdir', gpg_dir])
|
||||
results = {}
|
||||
for name, val in keyring_dists.items():
|
||||
dist_dir, dist_changed = val
|
||||
if lazy and not dist_changed:
|
||||
results[name] = False
|
||||
continue
|
||||
logging.info(f"Importing dir {dist_dir} into {gpg_dir}")
|
||||
import_dist_keyring(gpg_dir, dist_dir)
|
||||
results[name] = True
|
||||
return results
|
||||
|
||||
|
||||
def import_dist_keyring(
|
||||
gpg_dir: str,
|
||||
dist_dir: str,
|
||||
) -> CompletedProcess:
|
||||
assert gpg_dir and dist_dir and config.runtime.script_source_dir
|
||||
r = run_cmd(['pacman-key', '--populate-from', dist_dir, '--populate', '--gpgdir', gpg_dir])
|
||||
assert isinstance(r, CompletedProcess)
|
||||
return r
|
||||
|
||||
|
||||
def init_keyring_dist_dir(
|
||||
arch: Arch,
|
||||
distro_type: DistroType,
|
||||
base_dir: Optional[str] = None,
|
||||
lazy: bool = True,
|
||||
) -> dict[str, tuple[str, bool]]:
|
||||
"""
|
||||
create keyrings/{arch}/dist. Returns a boolean indicating whether changes were made
|
||||
"""
|
||||
repo_config = get_repo_config()[0]
|
||||
base_dir = base_dir or get_keyring_path(arch, distro_type)
|
||||
dist_dir = get_keyring_dist_path(base_dir)
|
||||
|
||||
pkg_names: list[str] = []
|
||||
distro: Distro
|
||||
if distro_type == DistroType.BASE:
|
||||
pkg_names = repo_config.base_distros.get(arch, {}).get(KEYRINGS_KEY, None) or [] # type: ignore[call-overload]
|
||||
distro = get_base_distro(arch, scan=False)
|
||||
elif distro_type == DistroType.LOCAL:
|
||||
pkg_name = repo_config.get(KEYRINGS_LOCAL_KEY, None)
|
||||
pkg_names = [pkg_name] if pkg_name else []
|
||||
distro = get_kupfer_local(arch, scan=False, in_chroot=False)
|
||||
elif distro_type == DistroType.REMOTE:
|
||||
pkg_names = repo_config.get(KEYRINGS_KEY, None) or []
|
||||
distro = get_kupfer_https(arch, scan=False)
|
||||
logging.debug(f"Acquiring keyrings from {distro}: {pkg_names}")
|
||||
dist_pkgs, changed = acquire_dist_pkgs(pkg_names, distro, dist_dir)
|
||||
#if lazy and dist_pkgs and not changed and os.path.exists(dist_dir): # and keyring_is_created(arch, distro_type):
|
||||
# return {name: (get_keyring_dist_path(base_dir, name), False) for name, val in dist_pkgs.items()}
|
||||
|
||||
makedir(dist_dir)
|
||||
dist_dirs = []
|
||||
results = {}
|
||||
for name, _val in dist_pkgs.items():
|
||||
dist_pkg, changed = _val
|
||||
_dir = os.path.join(dist_dir, name)
|
||||
results[name] = _dir, False
|
||||
if lazy and not changed and os.path.exists(_dir):
|
||||
logging.debug(f"Skipping extracting keyring pkg for {name}: dir exists and file unchanged")
|
||||
continue
|
||||
extract_keyring_pkg(dist_pkg, _dir)
|
||||
dist_dirs.append(_dir)
|
||||
results[name] = _dir, True
|
||||
return results
|
||||
|
||||
|
||||
def acquire_dist_pkgs(keyring_packages: list[str], distro: Distro, dist_dir: str) -> tuple[dict[str, tuple[str, bool]], bool]:
|
||||
if not keyring_packages:
|
||||
return {}, False
|
||||
pkgs: dict[str, BinaryPackage] = {}
|
||||
not_found = []
|
||||
pkg: Optional[BinaryPackage]
|
||||
for name in keyring_packages:
|
||||
pkg = distro.find_package(name)
|
||||
if not pkg:
|
||||
not_found.append(name)
|
||||
continue
|
||||
pkgs[name] = pkg
|
||||
if not_found:
|
||||
raise Exception(f"Keyring packages for {distro.arch} not found: {not_found}")
|
||||
|
||||
changed = False
|
||||
results = {}
|
||||
for name in pkgs:
|
||||
pkg = pkgs[name]
|
||||
assert PKGNAME_MARKER in pkg.filename
|
||||
comp_ext = pkg.filename.rsplit(PKGNAME_MARKER, 1)[1]
|
||||
filename = f'{name}.tar{comp_ext}'
|
||||
filepath = os.path.join(dist_dir, filename)
|
||||
checksum = None if not os.path.exists(filepath) else sha256sum(filepath)
|
||||
target_path, _changed = pkg.acquire(dist_dir, filename)
|
||||
_changed = _changed and checksum != sha256sum(filepath)
|
||||
results[name] = target_path, _changed
|
||||
if _changed:
|
||||
changed = True
|
||||
logging.debug(f"{target_path} changed")
|
||||
return results, changed
|
||||
|
||||
|
||||
def extract_keyring_pkg(pkg_path: str, dest_path: str):
|
||||
logging.debug(f"Extracting {pkg_path} to {dest_path}")
|
||||
extract_files_from_tar_generator(
|
||||
read_files_from_tar_recursive(pkg_path, [PKG_KEYRING_FOLDER]),
|
||||
dest_path,
|
||||
remove_prefix=PKG_KEYRING_FOLDER,
|
||||
)
|
||||
|
||||
|
||||
def get_keyring_path(arch: Arch, distro_type: DistroType, *extra_paths) -> str:
|
||||
return os.path.join(config.get_path('pacman'), KEYRING_DIR, arch, KEYRING_LOCATIONS[distro_type], *extra_paths)
|
||||
|
||||
|
||||
def get_keyring_dist_path(base_dir: str, *name) -> str:
|
||||
return os.path.join(base_dir, KEYRING_DIST_DIR, *name)
|
||||
|
||||
|
||||
def get_keyring_gpg_path(base_dir: str) -> str:
|
||||
return os.path.join(base_dir, KEYRING_GPG_DIR)
|
|
@ -1,11 +1,10 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
from shutil import copyfileobj
|
||||
from typing import Optional, Union
|
||||
from urllib.request import urlopen
|
||||
|
||||
from exec.file import get_temp_dir, makedir
|
||||
from exec.file import copy_file, get_temp_dir, makedir
|
||||
from utils import download_file, sha256sum
|
||||
|
||||
|
||||
class PackageInfo:
|
||||
|
@ -62,32 +61,38 @@ class BinaryPackage(PackageInfo):
|
|||
p._desc = desc
|
||||
return p
|
||||
|
||||
def acquire(self) -> str:
|
||||
def acquire(self, dest_dir: Optional[str] = None, filename: Optional[str] = None) -> tuple[str, bool]:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class LocalPackage(BinaryPackage):
|
||||
|
||||
def acquire(self) -> str:
|
||||
def acquire(self, dest_dir: Optional[str] = None, filename: Optional[str] = None) -> tuple[str, bool]:
|
||||
changed = False
|
||||
assert self.resolved_url and self.filename and self.filename in self.resolved_url
|
||||
path = f'{self.resolved_url.split("file://")[1]}'
|
||||
assert os.path.exists(path) or print(path)
|
||||
return path
|
||||
if dest_dir:
|
||||
makedir(dest_dir)
|
||||
target = os.path.join(dest_dir, filename or self.filename)
|
||||
if os.path.getsize(path) != os.path.getsize(target) or sha256sum(path) != sha256sum(target):
|
||||
copy_file(path, target, follow_symlinks=True)
|
||||
changed = True
|
||||
path = target
|
||||
return path, changed
|
||||
|
||||
|
||||
class RemotePackage(BinaryPackage):
|
||||
|
||||
def acquire(self, dest_dir: Optional[str] = None) -> str:
|
||||
def acquire(self, dest_dir: Optional[str] = None, filename: Optional[str] = None) -> tuple[str, bool]:
|
||||
assert self.resolved_url and '.pkg.tar.' in self.resolved_url
|
||||
url = f"{self.resolved_url}"
|
||||
assert url
|
||||
|
||||
dest_dir = dest_dir or get_temp_dir()
|
||||
makedir(dest_dir)
|
||||
dest_file_path = os.path.join(dest_dir, self.filename)
|
||||
dest_file_path = os.path.join(dest_dir, filename or self.filename)
|
||||
|
||||
logging.info(f"Trying to download package {url}")
|
||||
with urlopen(url) as fsrc, open(dest_file_path, 'wb') as fdst:
|
||||
copyfileobj(fsrc, fdst)
|
||||
logging.info(f"{self.filename} downloaded from repos")
|
||||
return dest_file_path
|
||||
changed = download_file(dest_file_path, url)
|
||||
logging.info(f"{self.filename} {'already ' if not changed else ''}downloaded from repos")
|
||||
return dest_file_path, changed
|
||||
|
|
|
@ -9,7 +9,7 @@ from copy import deepcopy
|
|||
from typing import ClassVar, Optional, Mapping, Union
|
||||
|
||||
from config.state import config
|
||||
from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES
|
||||
from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, KEYRINGS_KEY, KEYRINGS_LOCAL_KEY, KEYRINGS_LOCAL_NAME, KEYRING_REMOTE_NAME, REPOS_CONFIG_FILE, REPOSITORIES
|
||||
from dictscheme import DictScheme, toml_inline_dicts, TomlPreserveInlineDictEncoder
|
||||
from utils import sha256sum
|
||||
|
||||
|
@ -39,11 +39,14 @@ class RepoConfig(AbstrRepoConfig):
|
|||
|
||||
class BaseDistro(DictScheme):
|
||||
remote_url: Optional[str]
|
||||
keyrings: Optional[list[str]]
|
||||
repos: dict[str, BaseDistroRepo]
|
||||
|
||||
|
||||
class ReposConfigFile(DictScheme):
|
||||
remote_url: Optional[str]
|
||||
keyrings: Optional[list[str]]
|
||||
local_keyring: Optional[str]
|
||||
repos: dict[str, RepoConfig]
|
||||
base_distros: dict[Arch, BaseDistro]
|
||||
_path: Optional[str]
|
||||
|
@ -67,10 +70,11 @@ class ReposConfigFile(DictScheme):
|
|||
repos[name] = repo_cls(_repo, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def parse_config(path: str) -> ReposConfigFile:
|
||||
def parse_config(path: str, insert_defaults: bool = True) -> ReposConfigFile:
|
||||
defaults = REPOS_CONFIG_DEFAULTS_DICT if insert_defaults else {}
|
||||
try:
|
||||
with open(path, 'r') as fd:
|
||||
data = yaml.safe_load(fd)
|
||||
data = defaults | yaml.safe_load(fd)
|
||||
data['_path'] = path
|
||||
data['_checksum'] = sha256sum(path)
|
||||
return ReposConfigFile(data, validate=True)
|
||||
|
@ -102,10 +106,12 @@ BASE_DISTRO_DEFAULTS = {
|
|||
OPTIONS_KEY: None,
|
||||
}
|
||||
|
||||
REPOS_CONFIG_DEFAULT = ReposConfigFile({
|
||||
REPOS_CONFIG_DEFAULTS_DICT = {
|
||||
'_path': '__DEFAULTS__',
|
||||
'_checksum': None,
|
||||
REMOTEURL_KEY: KUPFER_HTTPS,
|
||||
KEYRINGS_KEY: [KEYRING_REMOTE_NAME],
|
||||
KEYRINGS_LOCAL_KEY: KEYRINGS_LOCAL_NAME,
|
||||
REPOS_KEY: {
|
||||
'kupfer_local': REPO_DEFAULTS | {
|
||||
LOCALONLY_KEY: True
|
||||
|
@ -117,14 +123,17 @@ REPOS_CONFIG_DEFAULT = ReposConfigFile({
|
|||
BASEDISTROS_KEY: {
|
||||
arch: {
|
||||
REMOTEURL_KEY: None,
|
||||
KEYRINGS_KEY: arch_def[KEYRINGS_KEY].copy() if KEYRINGS_KEY in arch_def else None,
|
||||
'repos': {
|
||||
k: {
|
||||
'remote_url': v
|
||||
} for k, v in arch_def['repos'].items()
|
||||
} for k, v in arch_def['repos'].items() # type: ignore[union-attr]
|
||||
},
|
||||
} for arch, arch_def in BASE_DISTROS.items()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
REPOS_CONFIG_DEFAULT = ReposConfigFile(REPOS_CONFIG_DEFAULTS_DICT)
|
||||
|
||||
_current_config = None
|
||||
|
||||
|
|
11
exec/file.py
11
exec/file.py
|
@ -4,7 +4,7 @@ import os
|
|||
import stat
|
||||
import subprocess
|
||||
|
||||
from shutil import rmtree
|
||||
from shutil import copyfile, rmtree
|
||||
from tempfile import mkdtemp
|
||||
from typing import Optional, Union
|
||||
|
||||
|
@ -41,7 +41,7 @@ def chown(path: str, user: Optional[Union[str, int]] = None, group: Optional[Uni
|
|||
raise Exception(f"Failed to change owner of '{path}' to '{owner}'")
|
||||
|
||||
|
||||
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: bool = True):
|
||||
def chmod(path: str, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: bool = True):
|
||||
if not isinstance(mode, str):
|
||||
octal = oct(mode)[2:]
|
||||
else:
|
||||
|
@ -60,11 +60,14 @@ def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: b
|
|||
raise Exception(f"Failed to set mode of '{path}' to '{chmod}'")
|
||||
|
||||
|
||||
def root_check_exists(path):
|
||||
copy_file = copyfile
|
||||
|
||||
|
||||
def root_check_exists(path: str):
|
||||
return os.path.exists(path) or run_root_cmd(['[', '-e', path, ']']).returncode == 0
|
||||
|
||||
|
||||
def root_check_is_dir(path):
|
||||
def root_check_is_dir(path: str):
|
||||
return os.path.isdir(path) or run_root_cmd(['[', '-d', path, ']'])
|
||||
|
||||
|
||||
|
|
|
@ -446,7 +446,7 @@ def cmd_build(
|
|||
packages_extra = BASE_PACKAGES + profile.pkgs_include
|
||||
|
||||
if arch != config.runtime.arch:
|
||||
build_enable_qemu_binfmt(arch)
|
||||
build_enable_qemu_binfmt(arch, try_download=not no_download_pkgs)
|
||||
|
||||
if local_repos and build_pkgs:
|
||||
logging.info("Making sure all packages are built")
|
||||
|
|
|
@ -16,6 +16,7 @@ from exec.cmd import run_cmd, run_root_cmd
|
|||
from exec.file import makedir, remove_file, symlink
|
||||
from chroot.build import get_build_chroot, BuildChroot
|
||||
from distro.distro import get_kupfer_https, get_kupfer_local, get_kupfer_repo_names
|
||||
from distro.gpg import init_keys, GPG_HOME_DIR
|
||||
from distro.package import RemotePackage, LocalPackage
|
||||
from distro.repo import LocalRepo
|
||||
from progressbar import BAR_PADDING, get_levels_bar
|
||||
|
@ -213,6 +214,14 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
|
|||
)
|
||||
if remove_original:
|
||||
remove_file(file_path)
|
||||
sig_file = "{file_path}.sig"
|
||||
if os.path.exists(sig_file):
|
||||
shutil.copy(
|
||||
sig_file,
|
||||
repo_dir,
|
||||
)
|
||||
if remove_original:
|
||||
remove_file(sig_file)
|
||||
|
||||
# clean up same name package from pacman cache
|
||||
cache_file = os.path.join(pacman_cache_dir, file_name)
|
||||
|
@ -316,7 +325,7 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
|
|||
url = repo_pkg.resolved_url
|
||||
assert url
|
||||
try:
|
||||
path = repo_pkg.acquire()
|
||||
path, _ = repo_pkg.acquire()
|
||||
assert os.path.exists(path)
|
||||
return path
|
||||
except HTTPError as e:
|
||||
|
@ -440,10 +449,11 @@ def setup_build_chroot(
|
|||
add_kupfer_repos: bool = True,
|
||||
clean_chroot: bool = False,
|
||||
repo: Optional[dict[str, Pkgbuild]] = None,
|
||||
try_download: bool = True,
|
||||
) -> BuildChroot:
|
||||
assert config.runtime.arch
|
||||
if arch != config.runtime.arch:
|
||||
build_enable_qemu_binfmt(arch, repo=repo or discover_pkgbuilds(), lazy=False)
|
||||
build_enable_qemu_binfmt(arch, repo=repo or discover_pkgbuilds(), try_download=try_download, lazy=False)
|
||||
init_prebuilts(arch)
|
||||
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
|
||||
chroot.mount_packages()
|
||||
|
@ -513,6 +523,7 @@ def build_package(
|
|||
clean_chroot: bool = False,
|
||||
build_user: str = 'kupfer',
|
||||
repo: Optional[dict[str, Pkgbuild]] = None,
|
||||
try_download: bool = False,
|
||||
):
|
||||
makepkg_compile_opts = ['--holdver']
|
||||
makepkg_conf_path = 'etc/makepkg.conf'
|
||||
|
@ -533,6 +544,7 @@ def build_package(
|
|||
extra_packages=deps,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
try_download=try_download,
|
||||
)
|
||||
assert config.runtime.arch
|
||||
native_chroot = target_chroot
|
||||
|
@ -543,6 +555,7 @@ def build_package(
|
|||
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
try_download=try_download,
|
||||
)
|
||||
if not package.mode:
|
||||
logging.warning(f'Package {package.path} has no _mode set, assuming "host"')
|
||||
|
@ -598,6 +611,13 @@ def build_package(
|
|||
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
|
||||
|
||||
build_cmd = ['source', '/etc/profile', '&&', *MAKEPKG_CMD, '--config', makepkg_conf_absolute, '--skippgpcheck', *makepkg_compile_opts]
|
||||
if config.file.build.sign_pkgs:
|
||||
logging.debug("Package signing requested; adding makepkg args and GNUPGHOME env var")
|
||||
init_keys(config.get_path("gpg"), lazy=True)
|
||||
assert config.runtime.gpg_pkg_key
|
||||
build_cmd.extend(["--sign", "--key", config.runtime.gpg_pkg_key])
|
||||
env["GNUPGHOME"] = os.path.join(CHROOT_PATHS["gpg"], GPG_HOME_DIR)
|
||||
target_chroot.mount_gpg()
|
||||
logging.debug(f'Building: Running {build_cmd}')
|
||||
result = build_root.run_cmd(
|
||||
build_cmd,
|
||||
|
@ -762,6 +782,7 @@ def build_packages(
|
|||
enable_ccache=enable_ccache,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
try_download=try_download,
|
||||
)
|
||||
files += add_package_to_repo(package, arch)
|
||||
updated_repos.add(package.repo)
|
||||
|
@ -816,7 +837,12 @@ def build_packages_by_paths(
|
|||
_qemu_enabled: dict[Arch, bool] = {arch: False for arch in ARCHES}
|
||||
|
||||
|
||||
def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = None, lazy: bool = True, native_chroot: Optional[BuildChroot] = None):
|
||||
def build_enable_qemu_binfmt(
|
||||
arch: Arch, repo: Optional[dict[str, Pkgbuild]] = None,
|
||||
lazy: bool = True,
|
||||
native_chroot: Optional[BuildChroot] = None,
|
||||
try_download: bool = True,
|
||||
) -> None:
|
||||
"""
|
||||
Build and enable qemu-user-static, binfmt and crossdirect
|
||||
Specify lazy=False to force building the packages.
|
||||
|
@ -852,7 +878,7 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
|
|||
packages,
|
||||
native,
|
||||
repo=repo,
|
||||
try_download=True,
|
||||
try_download=try_download,
|
||||
enable_crosscompile=False,
|
||||
enable_crossdirect=False,
|
||||
enable_ccache=False,
|
||||
|
|
|
@ -10,3 +10,4 @@ requests
|
|||
python-dateutil
|
||||
enlighten
|
||||
PyYAML
|
||||
zstandard
|
||||
|
|
90
utils.py
90
utils.py
|
@ -11,6 +11,7 @@ import subprocess
|
|||
import tarfile
|
||||
|
||||
from dateutil.parser import parse as parsedate
|
||||
from io import BytesIO
|
||||
from shutil import which
|
||||
from typing import Any, Generator, IO, Optional, Union, Sequence
|
||||
|
||||
|
@ -129,15 +130,102 @@ def get_gid(group: Union[int, str]) -> int:
|
|||
return grp.getgrnam(group).gr_gid
|
||||
|
||||
|
||||
def is_zstd(data):
|
||||
"""
|
||||
Returns True if the given byte stream is compressed with the zstd algorithm,
|
||||
False otherwise. This function performs a simplified version of the actual zstd
|
||||
header validation, using hardcoded values.
|
||||
"""
|
||||
# Check for the magic number at the beginning of the stream
|
||||
if len(data) < 4 or data[:4] != b"\x28\xb5\x2f\xfd":
|
||||
logging.debug("zstd header not found")
|
||||
return False
|
||||
# Check the frame descriptor block size
|
||||
if len(data) < 8:
|
||||
return False
|
||||
frame_size = data[4] & 0x7F | (data[5] & 0x7F) << 7 | (data[6] & 0x7F) << 14 | (data[7] & 0x07) << 21
|
||||
if frame_size < 1 or frame_size > 1 << 31:
|
||||
return False
|
||||
# Check the frame descriptor block for the checksum
|
||||
if len(data) < 18:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def decompress_if_zstd(stream):
|
||||
"""
|
||||
Given a byte stream, returns either the original stream or the decompressed stream
|
||||
if it is compressed with the zstd algorithm.
|
||||
"""
|
||||
if isinstance(stream, str):
|
||||
stream = open(stream, 'rb')
|
||||
data = stream.peek(18)[:18]
|
||||
if not is_zstd(data):
|
||||
logging.debug(f"{data=} Not zstd, skipping")
|
||||
return tarfile.open(fileobj=stream)
|
||||
logging.debug(f"Decompressing {stream=}")
|
||||
import zstandard as zstd
|
||||
dctx = zstd.ZstdDecompressor()
|
||||
return tarfile.open(fileobj=BytesIO(dctx.stream_reader(stream).read()), mode='r:tar')
|
||||
|
||||
|
||||
def open_tar(tar_file: str) -> tarfile.TarFile:
|
||||
return decompress_if_zstd(tar_file)
|
||||
|
||||
|
||||
def read_files_from_tar(tar_file: str, files: Sequence[str]) -> Generator[tuple[str, IO], None, None]:
|
||||
assert os.path.exists(tar_file)
|
||||
with tarfile.open(tar_file) as index:
|
||||
with open_tar(tar_file) as index:
|
||||
for path in files:
|
||||
fd = index.extractfile(index.getmember(path))
|
||||
assert fd
|
||||
yield path, fd
|
||||
|
||||
|
||||
def read_files_from_tar_recursive(tar_file: str, paths: Sequence[str], append_slash: bool = True) -> Generator[tuple[str, IO], None, None]:
|
||||
"""
|
||||
Returns tar FDs to files that lie under the directories specified in paths.
|
||||
HINT: deactivate append_slash to get glob-like behaviour, as if all paths ended with *
|
||||
"""
|
||||
assert os.path.exists(tar_file)
|
||||
paths = [f"{p.strip('/')}/" for p in paths]
|
||||
with open_tar(tar_file) as index:
|
||||
for member in index.getmembers():
|
||||
file_path = member.path
|
||||
if member.isfile() and check_file_matches(file_path, paths):
|
||||
logging.debug(f"tar: Returning {file_path}")
|
||||
fd = index.extractfile(member)
|
||||
assert fd
|
||||
yield file_path, fd
|
||||
else:
|
||||
logging.debug(f'tar: unmatched {file_path} for query {paths}')
|
||||
|
||||
|
||||
def check_file_matches(file_path: str, queries: list[str]) -> bool:
|
||||
for query in queries:
|
||||
if file_path.startswith(query):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def extract_files_from_tar_generator(
|
||||
tar_generator: Generator[tuple[str, IO], None, None],
|
||||
output_dir: str,
|
||||
remove_prefix: str = '',
|
||||
append_slash: bool = True,
|
||||
):
|
||||
remove_prefix = remove_prefix.strip('/')
|
||||
if append_slash and remove_prefix:
|
||||
remove_prefix += '/'
|
||||
for file_path, fd in tar_generator:
|
||||
assert file_path.startswith(remove_prefix)
|
||||
output_path = os.path.join(output_dir, file_path[len(remove_prefix):].lstrip('/'))
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, 'wb') as f:
|
||||
logging.debug(f"Extracting {file_path}")
|
||||
f.write(fd.read())
|
||||
|
||||
|
||||
def download_file(path: str, url: str, update: bool = True):
|
||||
"""Download a file over http[s]. With `update`, tries to use mtime timestamps to download only changed files."""
|
||||
url_time = None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue