diff --git a/chroot/__init__.py b/chroot/__init__.py index 5a2d0da..2ba24da 100644 --- a/chroot/__init__.py +++ b/chroot/__init__.py @@ -5,9 +5,10 @@ import os from config import config from wrapper import enforce_wrap -from .base import get_base_chroot, Chroot -from .build import get_build_chroot -from .device import get_device_chroot +from .base import get_base_chroot, Chroot, BaseChroot +from .build import get_build_chroot, BuildChroot +#from .device import get_device_chroot, DeviceChroot +from .helpers import get_chroot_path # export Chroot class Chroot = Chroot @@ -23,32 +24,34 @@ def cmd_chroot(type: str = 'build', arch: str = None, enable_crossdirect=True): raise Exception('Unknown chroot type: ' + type) enforce_wrap() + chroot: Chroot if type == 'rootfs': if arch: name = 'rootfs_' + arch else: raise Exception('"rootfs" without args not yet implemented, sorry!') # TODO: name = config.get_profile()[...] - chroot_path = os.path.join(config.get_path('chroots'), name) + chroot_path = get_chroot_path(name) if not os.path.exists(chroot_path): raise Exception(f"rootfs {name} doesn't exist") else: if not arch: - #TODO: arch = config.get_profile()[...] + # TODO: arch = config.get_profile()[...] arch = 'aarch64' if type == 'base': chroot = get_base_chroot(arch) - if not os.path.exists(os.path.join(chroot.path, 'bin')): + if not os.path.exists(chroot.get_path('/bin')): chroot.initialize() chroot.initialized = True elif type == 'build': - chroot = get_build_chroot(arch, activate=True) - if not os.path.exists(os.path.join(chroot.path, 'bin')): - chroot.initialize() - chroot.initialized = True - chroot.mount_pkgbuilds() + build_chroot: BuildChroot = get_build_chroot(arch, activate=True) + chroot = build_chroot # type safety + if not os.path.exists(build_chroot.get_path('/bin')): + build_chroot.initialize() + build_chroot.initialized = True + build_chroot.mount_pkgbuilds() if config.file['build']['crossdirect'] and enable_crossdirect: - chroot.mount_crossdirect() + build_chroot.mount_crossdirect() else: raise Exception('Really weird bug') diff --git a/chroot/abstract.py b/chroot/abstract.py index a7c28ae..bcc7b9b 100644 --- a/chroot/abstract.py +++ b/chroot/abstract.py @@ -2,8 +2,9 @@ import atexit import logging import os import subprocess -from typing import Protocol, Union +from copy import deepcopy from shlex import quote as shell_quote +from typing import Protocol, Union, Optional, Mapping from config import config from constants import Arch, CHROOT_PATHS @@ -22,17 +23,17 @@ class AbstractChroot(Protocol): initialized: bool = False active: bool = False active_mounts: list[str] = [] - extra_repos: dict[str, RepoInfo] = {} + extra_repos: Mapping[str, RepoInfo] = {} base_packages: list[str] = ['base'] def __init__( self, name: str, arch: Arch, - copy_base: bool = None, - initialize: bool = False, - extra_repos: dict[str, RepoInfo] = {}, - base_packages: list[str] = ['base', 'base-devel', 'git'], + copy_base: bool, + initialize: bool, + extra_repos: Mapping[str, RepoInfo], + base_packages: list[str], path_override: str = None, ): pass @@ -40,25 +41,35 @@ class AbstractChroot(Protocol): def initialize(self, reset: bool = False, fail_if_initialized: bool = False): raise NotImplementedError() - def activate(self, *args, **kwargs): + def activate(self, fail_if_active: bool): pass - def get_path(self, *args, **kwargs): + def get_path(self, *joins: str): pass - def run_cmd(self, *args, **kwargs): + def run_cmd( + self, + script: Union[str, list[str]], + inner_env: dict[str, str], + outer_env: dict[str, str], + attach_tty: bool, + capture_output: bool, + cwd: str, + fail_inactive: bool, + stdout: Optional[int], + ): pass - def mount_pacman_cache(self, *args, **kwargs): + def mount_pacman_cache(self, fail_if_mounted: bool): pass - def mount_packages(self, *args, **kwargs): + def mount_packages(self, fail_if_mounted: bool): pass - def mount_pkgbuilds(self, *args, **kwargs): + def mount_pkgbuilds(self, fail_if_mounted: bool): pass - def try_install_packages(self, *args, **kwargs): + def try_install_packages(self, packages: list[str], refresh: bool, allow_fail: bool) -> dict[str, Union[int, subprocess.CompletedProcess]]: pass @@ -73,7 +84,7 @@ class Chroot(AbstractChroot): arch: Arch, copy_base: bool = None, initialize: bool = False, - extra_repos: dict[str, RepoInfo] = {}, + extra_repos: Mapping[str, RepoInfo] = {}, base_packages: list[str] = ['base', 'base-devel', 'git'], path_override: str = None, ): @@ -84,7 +95,7 @@ class Chroot(AbstractChroot): self.arch = arch self.path = path_override or os.path.join(config.get_path('chroots'), name) self.copy_base = copy_base - self.extra_repos = extra_repos.copy() + self.extra_repos = deepcopy(extra_repos) self.base_packages = base_packages if initialize: self.initialize() @@ -108,7 +119,7 @@ class Chroot(AbstractChroot): self.create_rootfs(reset, pacman_conf_target, active_previously) - def get_path(self, *joins) -> str: + def get_path(self, *joins: str) -> str: if joins: joins = (joins[0].lstrip('/'),) + joins[1:] @@ -194,22 +205,24 @@ class Chroot(AbstractChroot): self.umount_many(self.active_mounts) self.active = False - def run_cmd(self, - script: Union[str, list[str]], - inner_env: dict[str, str] = {}, - outer_env: dict[str, str] = os.environ.copy() | {'QEMU_LD_PREFIX': '/usr/aarch64-linux-gnu'}, - attach_tty: bool = False, - capture_output: bool = False, - cwd: str = None, - fail_inactive: bool = True, - stdout=None) -> subprocess.CompletedProcess: + def run_cmd( + self, + script: Union[str, list[str]], + inner_env: dict[str, str] = {}, + outer_env: dict[str, str] = os.environ.copy() | {'QEMU_LD_PREFIX': '/usr/aarch64-linux-gnu'}, + attach_tty: bool = False, + capture_output: bool = False, + cwd: Optional[str] = None, + fail_inactive: bool = True, + stdout: Optional[int] = None, + ) -> Union[int, subprocess.CompletedProcess]: if not self.active and fail_inactive: raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`') if outer_env is None: outer_env = os.environ.copy() env_cmd = ['/usr/bin/env'] + [f'{shell_quote(key)}={shell_quote(value)}' for key, value in inner_env.items()] run_func = subprocess.call if attach_tty else subprocess.run - kwargs = { + kwargs: dict = { 'env': outer_env, } if not attach_tty: @@ -225,8 +238,10 @@ class Chroot(AbstractChroot): script, ] logging.debug(f'{self.name}: Running cmd: "{cmd}"') - result = run_func(cmd, **kwargs) - return result + if attach_tty: + return subprocess.call(cmd, **kwargs) + else: + return subprocess.run(cmd, **kwargs) def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str: return self.mount( @@ -253,7 +268,7 @@ class Chroot(AbstractChroot): fail_if_mounted=fail_if_mounted, ) - def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: str, cross: bool = True) -> str: + def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: Optional[str], cross: bool = True) -> str: """ Generate a `makepkg.conf` or `makepkg_cross_$arch.conf` file in /etc. If `cross` is set makepkg will be configured to crosscompile for the foreign chroot at `cross_chroot_relative` @@ -296,13 +311,19 @@ class Chroot(AbstractChroot): if result.returncode != 0: raise Exception('Failed to setup user') - def try_install_packages(self, packages: list[str], refresh: bool = False, allow_fail: bool = True) -> dict[str, subprocess.CompletedProcess]: + def try_install_packages( + self, + packages: list[str], + refresh: bool = False, + allow_fail: bool = True, + ) -> dict[str, Union[int, subprocess.CompletedProcess]]: """Try installing packages, fall back to installing one by one""" results = {} if refresh: results['refresh'] = self.run_cmd('pacman -Syy --noconfirm') cmd = "pacman -S --noconfirm --needed --overwrite='/*'" result = self.run_cmd(f'{cmd} -y {" ".join(packages)}') + assert isinstance(result, subprocess.CompletedProcess) results |= {package: result for package in packages} if result.returncode != 0 and allow_fail: results = {} diff --git a/chroot/base.py b/chroot/base.py index 78c9142..b32f7b6 100644 --- a/chroot/base.py +++ b/chroot/base.py @@ -43,9 +43,11 @@ class BaseChroot(Chroot): self.initialized = True -def get_base_chroot(arch: Arch, **kwargs) -> Chroot: +def get_base_chroot(arch: Arch, **kwargs) -> BaseChroot: name = base_chroot_name(arch) default = BaseChroot(name, arch, initialize=False, copy_base=False) if kwargs.pop('initialize', False): logging.debug('get_base_chroot: Had to remove "initialize" from args. This indicates a bug.') - return get_chroot(name, **kwargs, initialize=False, default=default) + chroot = get_chroot(name, **kwargs, initialize=False, default=default) + assert (isinstance(chroot, BaseChroot)) + return chroot diff --git a/chroot/build.py b/chroot/build.py index 9d09225..6fcd337 100644 --- a/chroot/build.py +++ b/chroot/build.py @@ -1,7 +1,8 @@ -import glob import logging import os import subprocess +from glob import glob +from typing import Optional from config import config from constants import Arch, GCC_HOSTSPECS, CROSSDIRECT_PKGS, CHROOT_PATHS @@ -66,7 +67,7 @@ class BuildChroot(Chroot): if active_previously: self.activate() - def mount_crossdirect(self, native_chroot: Chroot = None, fail_if_mounted: bool = False): + def mount_crossdirect(self, native_chroot: Optional[Chroot] = None, fail_if_mounted: bool = False): """ mount `native_chroot` at `target_chroot`/native returns the absolute path that `native_chroot` has been mounted at. @@ -85,10 +86,19 @@ class BuildChroot(Chroot): native_chroot.mount_pacman_cache() native_chroot.mount_packages() native_chroot.activate() - results = native_chroot.try_install_packages(CROSSDIRECT_PKGS + [gcc], refresh=True, allow_fail=False) - if results[gcc].returncode != 0: + results = dict(native_chroot.try_install_packages( + CROSSDIRECT_PKGS + [gcc], + refresh=True, + allow_fail=False, + ),) + res_gcc = results[gcc] + res_crossdirect = results['crossdirect'] + assert isinstance(res_gcc, subprocess.CompletedProcess) + assert isinstance(res_crossdirect, subprocess.CompletedProcess) + + if res_gcc.returncode != 0: logging.debug('Failed to install cross-compiler package {gcc}') - if results['crossdirect'].returncode != 0: + if res_crossdirect.returncode != 0: raise Exception('Failed to install crossdirect') cc_path = os.path.join(native_chroot.path, 'usr', 'bin', cc) @@ -135,4 +145,5 @@ def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> Bui default = BuildChroot(name, arch, initialize=False, copy_base=True, extra_repos=repos) chroot = get_chroot(name, **kwargs, default=default) chroot.extra_repos = repos + assert (isinstance(chroot, BuildChroot)) return chroot diff --git a/chroot/device.py b/chroot/device.py index 890e461..bdce35a 100644 --- a/chroot/device.py +++ b/chroot/device.py @@ -6,7 +6,7 @@ from utils import check_findmnt from .base import BaseChroot from .build import BuildChroot -from .abstract import get_chroot, Chroot +from .abstract import get_chroot class DeviceChroot(BuildChroot): @@ -39,7 +39,16 @@ class DeviceChroot(BuildChroot): self.mount(source_path, '/', fs_type=fs_type, options=options) -def get_device_chroot(device: str, flavour: str, arch: Arch, packages: list[str] = BASE_PACKAGES, extra_repos={}, **kwargs) -> Chroot: +def get_device_chroot( + device: str, + flavour: str, + arch: Arch, + packages: list[str] = BASE_PACKAGES, + extra_repos={}, + **kwargs, +) -> DeviceChroot: name = f'rootfs_{device}-{flavour}' default = DeviceChroot(name, arch, initialize=False, copy_base=False, base_packages=packages, extra_repos=extra_repos) - return get_chroot(name, **kwargs, default=default) + chroot = get_chroot(name, **kwargs, default=default) + assert (isinstance(chroot, DeviceChroot)) + return chroot diff --git a/chroot/helpers.py b/chroot/helpers.py index 328fbd1..1b223b2 100644 --- a/chroot/helpers.py +++ b/chroot/helpers.py @@ -1,4 +1,5 @@ import os +from typing import Union, Optional, TypedDict from config import config from constants import Arch @@ -7,9 +8,16 @@ BIND_BUILD_DIRS = 'BINDBUILDDIRS' BASE_CHROOT_PREFIX = 'base_' BUILD_CHROOT_PREFIX = 'build_' + +class MountEntry(TypedDict): + src: str + type: Optional[str] + options: list[str] + + # inspired by arch-chroot # order of these matters! -BASIC_MOUNTS = { +BASIC_MOUNTS: dict[str, MountEntry] = { '/proc': { 'src': 'proc', 'type': 'proc', diff --git a/config.py b/config.py index 5abd927..67ddf6e 100644 --- a/config.py +++ b/config.py @@ -1,16 +1,28 @@ import appdirs +import click import os import toml import logging from copy import deepcopy -import click +from typing import Optional, Union, TypedDict, Any, Mapping CONFIG_DIR = appdirs.user_config_dir('kupfer') CACHE_DIR = appdirs.user_cache_dir('kupfer') CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml') -Profile = dict[str, str] + +class Profile(TypedDict, total=False): + parent: str + device: str + flavour: str + pkgs_include: list[str] + pkgs_exclude: list[str] + hostname: str + username: str + password: Optional[str] + size_extra_mb: Union[str, int] + PROFILE_DEFAULTS: Profile = { 'parent': '', @@ -24,9 +36,9 @@ PROFILE_DEFAULTS: Profile = { 'size_extra_mb': "0", } -PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} +PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore -CONFIG_DEFAULTS = { +CONFIG_DEFAULTS: dict = { 'wrapper': { 'type': 'docker', }, @@ -132,9 +144,9 @@ def resolve_profile( # now init missing keys for key, value in PROFILE_DEFAULTS.items(): if key not in full.keys(): - full[key] = None + full[key] = None # type: ignore[misc] if type(value) == list: - full[key] = [] + full[key] = [] # type: ignore[misc] full['size_extra_mb'] = int(full['size_extra_mb'] or 0) @@ -147,7 +159,7 @@ def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) -> return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile) -def merge_configs(conf_new: dict[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]: +def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]: """ Returns `conf_new` semantically merged into `conf_base`, after validating `conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`. @@ -240,9 +252,8 @@ class ConfigLoadException(Exception): inner = None def __init__(self, extra_msg='', inner_exception: Exception = None): - msg = ['Config load failed!'] + msg: list[str] = ['Config load failed!'] if extra_msg: - msg[0].append(':') msg.append(extra_msg) if inner_exception: self.inner = inner_exception @@ -263,9 +274,9 @@ class ConfigStateHolder: file: dict = {} # runtime config not persisted anywhere runtime: dict = CONFIG_RUNTIME_DEFAULTS - _profile_cache: dict[str, Profile] = None + _profile_cache: dict[str, Profile] - def __init__(self, runtime_conf={}, file_conf_path: str = None, file_conf_base: dict = {}): + def __init__(self, runtime_conf={}, file_conf_path: Optional[str] = None, file_conf_base: dict = {}): """init a stateholder, optionally loading `file_conf_path`""" self.runtime.update(runtime_conf) self.runtime['arch'] = os.uname().machine @@ -274,11 +285,11 @@ class ConfigStateHolder: self.try_load_file(file_conf_path) def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS): - _conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH - self.runtime['config_file'] = _conf_file + config_file = config_file or CONFIG_DEFAULT_PATH + self.runtime['config_file'] = config_file self._profile_cache = None try: - self.file = parse_file(config_file=_conf_file, base=base) + self.file = parse_file(config_file=config_file, base=base) except Exception as ex: self.file_state.exception = ex self.file_state.load_finished = True @@ -295,9 +306,8 @@ class ConfigStateHolder: ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?") raise ex - def get_profile(self, name: str = None) -> Profile: - if not name: - name = self.file['profiles']['current'] + def get_profile(self, name: Optional[str] = None) -> Profile: + name = name or self.file['profiles']['current'] self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache) return self._profile_cache[name] @@ -310,7 +320,7 @@ class ConfigStateHolder: def dump(self) -> str: """dump toml representation of `self.file`""" - dump_toml(self.file) + return dump_toml(self.file) def write(self, path=None): """write toml representation of `self.file` to `path`""" @@ -333,7 +343,7 @@ class ConfigStateHolder: self.invalidate_profile_cache() return changed - def update_profile(self, name: str, profile: dict, merge: bool = False, create: bool = True, prune: bool = True): + def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True): new = {} if name not in self.file['profiles']: if not create: @@ -366,11 +376,11 @@ def comma_str_to_list(s: str, default=None) -> list[str]: def prompt_config( text: str, - default: any, + default: Any, field_type: type = str, bold: bool = True, echo_changes: bool = True, -) -> (any, bool): +) -> tuple[Any, bool]: """ prompts for a new value for a config key. returns the result and a boolean that indicates whether the result is different, considering empty strings and None equal to each other. @@ -404,10 +414,10 @@ def prompt_config( return result, changed -def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> (Profile, bool): +def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> tuple[Profile, bool]: """Prompts the user for every field in `defaults`. Set values to None for an empty profile.""" - profile = PROFILE_EMPTY | defaults + profile: Any = PROFILE_EMPTY | defaults # don't use get_profile() here because we need the sparse profile if name in config.file['profiles']: profile |= config.file['profiles'][name] @@ -420,14 +430,14 @@ def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> (P for key, current in profile.items(): current = profile[key] text = f'{name}.{key}' - result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key])) + result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key])) # type: ignore if _changed: profile[key] = result changed = True return profile, changed -def config_dot_name_get(name: str, config: dict[str, any], prefix: str = ''): +def config_dot_name_get(name: str, config: dict[str, Any], prefix: str = '') -> Any: if not isinstance(config, dict): raise Exception(f"Couldn't resolve config name: passed config is not a dict: {repr(config)}") split_name = name.split('.') @@ -442,7 +452,7 @@ def config_dot_name_get(name: str, config: dict[str, any], prefix: str = ''): return config_dot_name_get(name=rest_name, config=value, prefix=prefix + name + '.') -def config_dot_name_set(name: str, value: any, config: dict[str, any]): +def config_dot_name_set(name: str, value: Any, config: dict[str, Any]): split_name = name.split('.') if len(split_name) > 1: config = config_dot_name_get('.'.join(split_name[:-1]), config) @@ -482,7 +492,7 @@ noop_flag = click.option('--noop', '-n', help="Don't write changes to file", is_ def cmd_config_init(sections: list[str] = CONFIG_SECTIONS, non_interactive: bool = False, noop: bool = False): """Initialize the config file""" if not non_interactive: - results = {} + results: dict[str, dict] = {} for section in sections: if section not in CONFIG_SECTIONS: raise Exception(f'Unknown section: {section}') @@ -526,7 +536,8 @@ def cmd_config_set(key_vals: list[str], non_interactive: bool = False, noop: boo for pair in key_vals: split_pair = pair.split('=') if len(split_pair) == 2: - key, value = split_pair + key: str = split_pair[0] + value: Any = split_pair[1] value_type = type(config_dot_name_get(key, CONFIG_DEFAULTS)) if value_type != list: value = click.types.convert_type(value_type)(value) diff --git a/constants.py b/constants.py index dea7ebb..69613c2 100644 --- a/constants.py +++ b/constants.py @@ -1,4 +1,4 @@ -from typing import TypeAlias +from typing import TypeAlias, TypedDict FASTBOOT = 'fastboot' FLASH_PARTS = { @@ -14,28 +14,35 @@ LOCATIONS = [EMMC, MICROSD] JUMPDRIVE = 'jumpdrive' JUMPDRIVE_VERSION = '0.8' -BOOT_STRATEGIES = { +BOOT_STRATEGIES: dict[str, str] = { 'oneplus-enchilada': FASTBOOT, 'xiaomi-beryllium-ebbg': FASTBOOT, 'xiaomi-beryllium-tianma': FASTBOOT, 'bq-paella': FASTBOOT, } -DEVICES = { +DEVICES: dict[str, list[str]] = { 'oneplus-enchilada': ['device-sdm845-oneplus-enchilada'], 'xiaomi-beryllium-ebbg': ['device-sdm845-xiaomi-beryllium-ebbg'], 'xiaomi-beryllium-tianma': ['device-sdm845-xiaomi-beryllium-tianma'], 'bq-paella': ['device-msm8916-bq-paella'], } -BASE_PACKAGES = [ +BASE_PACKAGES: list[str] = [ 'base', 'base-kupfer', 'nano', 'vim', ] -FLAVOURS = { + +class Flavour(TypedDict, total=False): + packages: list[str] + post_cmds: list[str] + size: int + + +FLAVOURS: dict[str, Flavour] = { 'barebone': { 'packages': [], }, diff --git a/distro/distro.py b/distro/distro.py index 41727d4..21afe4c 100644 --- a/distro/distro.py +++ b/distro/distro.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Mapping from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS from generator import generate_pacman_conf_body @@ -9,7 +9,7 @@ from .repo import RepoInfo, Repo class Distro: - repos: dict[str, Repo] + repos: Mapping[str, Repo] arch: str def __init__(self, arch: str, repo_infos: dict[str, RepoInfo], scan=False): @@ -33,11 +33,11 @@ class Distro: for package in repo.packages: results[package.name] = package - def repos_config_snippet(self, extra_repos: dict[str, RepoInfo] = {}) -> str: + def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str: extras = [Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()] return '\n\n'.join(repo.config_snippet() for repo in (list(self.repos.values()) + extras)) - def get_pacman_conf(self, extra_repos: dict[str, RepoInfo] = {}, check_space: bool = True): + def get_pacman_conf(self, extra_repos: Mapping[str, RepoInfo] = {}, check_space: bool = True): body = generate_pacman_conf_body(self.arch, check_space=check_space) return body + self.repos_config_snippet(extra_repos) diff --git a/distro/pkgbuild.py b/distro/pkgbuild.py index 7cf8796..509d53b 100644 --- a/distro/pkgbuild.py +++ b/distro/pkgbuild.py @@ -66,6 +66,7 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuil cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], base_package.path), stdout=subprocess.PIPE, ) + assert (isinstance(srcinfo, subprocess.CompletedProcess)) lines = srcinfo.stdout.decode('utf-8').split('\n') current = base_package diff --git a/generator.py b/generator.py index 39051c2..3529d40 100644 --- a/generator.py +++ b/generator.py @@ -172,6 +172,7 @@ SRCEXT='.src.tar.gz' #PACMAN_AUTH=() ''' if cross: + assert chroot chroot = chroot.strip('/') includes = f'-I/usr/{hostspec}/usr/include -I/{chroot}/usr/include' libs = f'-L/usr/{hostspec}/lib -L/{chroot}/usr/lib' diff --git a/image.py b/image.py index 70b6af3..4a9fe35 100644 --- a/image.py +++ b/image.py @@ -5,13 +5,13 @@ import re import subprocess import click import logging - from signal import pause from subprocess import run, CompletedProcess +from typing import Optional -from chroot import Chroot, get_device_chroot +from chroot.device import DeviceChroot, get_device_chroot from constants import BASE_PACKAGES, DEVICES, FLAVOURS -from config import config +from config import config, Profile from distro.distro import get_base_distro, get_kupfer_https, get_kupfer_local from packages import build_enable_qemu_binfmt, discover_packages, build_packages from ssh import copy_ssh_keys @@ -58,7 +58,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int): raise Exception(f'Failed to resize2fs {loop_device}p2') logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2') - blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) + blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore sectors = blocks * sectors_blocks_factor #+ 157812 - 25600 logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors') @@ -66,7 +66,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int): ['fdisk', '-b', str(sector_size), loop_device], stdin=subprocess.PIPE, ) - child_proccess.stdin.write('\n'.join([ + child_proccess.stdin.write('\n'.join([ # type: ignore 'd', '2', 'n', @@ -114,9 +114,9 @@ def shrink_fs(loop_device: str, file: str, sector_size: int): partprobe(loop_device) -def get_device_and_flavour(profile: str = None) -> tuple[str, str]: +def get_device_and_flavour(profile_name: Optional[str] = None) -> tuple[str, str]: config.enforce_config_loaded() - profile = config.get_profile(profile) + profile = config.get_profile(profile_name) if not profile['device']: raise Exception("Please set the device using 'kupferbootstrap config init ...'") @@ -184,7 +184,7 @@ def losetup_rootfs_image(image_path: str, sector_size: int) -> str: return loop_device -def mount_chroot(rootfs_source: str, boot_src: str, chroot: Chroot): +def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot): logging.debug(f'Mounting {rootfs_source} at {chroot.path}') chroot.mount_rootfs(rootfs_source) @@ -318,6 +318,7 @@ def install_rootfs(rootfs_device: str, bootfs_device: str, device, flavour, arch file.write(get_base_distro(arch).get_pacman_conf(check_space=True, extra_repos=get_kupfer_https(arch).repos)) if post_cmds: result = chroot.run_cmd(' && '.join(post_cmds)) + assert isinstance(result, subprocess.CompletedProcess) if result.returncode != 0: raise Exception('Error running post_cmds') @@ -344,9 +345,9 @@ def cmd_image(): def cmd_build(profile_name: str = None, build_pkgs: bool = True, block_target: str = None, skip_part_images: bool = False): """Build a device image""" enforce_wrap() - profile = config.get_profile(profile_name) + profile: Profile = config.get_profile(profile_name) device, flavour = get_device_and_flavour(profile_name) - size_extra_mb = profile["size_extra_mb"] + size_extra_mb: int = int(profile["size_extra_mb"]) # TODO: PARSE DEVICE ARCH AND SECTOR SIZE arch = 'aarch64' @@ -378,7 +379,8 @@ def cmd_build(profile_name: str = None, build_pkgs: bool = True, block_target: s partition_device(loop_device) partprobe(loop_device) - boot_dev, root_dev = None, None + boot_dev: str + root_dev: str loop_boot = loop_device + 'p1' loop_root = loop_device + 'p2' if skip_part_images: diff --git a/packages.py b/packages.py index 8506e6a..bd3fc9b 100644 --- a/packages.py +++ b/packages.py @@ -8,10 +8,11 @@ from copy import deepcopy from joblib import Parallel, delayed from glob import glob from shutil import rmtree +from typing import Iterable, Iterator, Any, Optional from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD from config import config -from chroot import get_build_chroot, Chroot +from chroot.build import get_build_chroot, BuildChroot from ssh import run_ssh_command, scp_put_files from wrapper import enforce_wrap from utils import git @@ -96,7 +97,7 @@ def init_prebuilts(arch: Arch, dir: str = None): def discover_packages(parallel: bool = True) -> dict[str, Pkgbuild]: pkgbuilds_dir = config.get_path('pkgbuilds') - packages = {} + packages: dict[str, Pkgbuild] = {} paths = [] init_pkgbuilds(interactive=False) for repo in REPOSITORIES: @@ -141,9 +142,9 @@ def discover_packages(parallel: bool = True) -> dict[str, Pkgbuild]: return packages -def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: list[str], allow_empty_results=True) -> list[Pkgbuild]: +def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: Iterable[str], allow_empty_results=True) -> Iterable[Pkgbuild]: if 'all' in paths: - return repo.values() + return list(repo.values()) result = [] for pkg in repo.values(): if pkg.path in paths: @@ -154,7 +155,7 @@ def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: list[str], allow_ return result -def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[Pkgbuild]) -> list[set[Pkgbuild]]: +def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]: """ This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection. First the top-level packages get selected by searching the paths. @@ -171,9 +172,11 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[ def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]: result = dict[Pkgbuild, int]() for i, level in enumerate(levels): - result[level] = i + for pkg in level: + result[pkg] = i + return result - def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> list[Pkgbuild]: + def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]: for dep_name in package.depends: if dep_name in visited_names: continue @@ -182,7 +185,7 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[ visit(dep_pkg) yield dep_pkg - def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> list[Pkgbuild]: + def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]: for pkg in get_dependencies(package, package_repo): yield pkg for sub_pkg in get_recursive_dependencies(pkg, package_repo): @@ -208,7 +211,7 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[ level = 0 # protect against dependency cycles repeat_count = 0 - _last_level: set[Pkgbuild] = None + _last_level: Optional[set[Pkgbuild]] = None while dep_levels[level]: level_copy = dep_levels[level].copy() modified = False @@ -333,7 +336,7 @@ def check_package_version_built(package: Pkgbuild, arch: Arch) -> bool: '--skippgpcheck', '--packagelist', ] - result = native_chroot.run_cmd( + result: Any = native_chroot.run_cmd( cmd, capture_output=True, ) @@ -358,7 +361,7 @@ def setup_build_chroot( extra_packages: list[str] = [], add_kupfer_repos: bool = True, clean_chroot: bool = False, -) -> Chroot: +) -> BuildChroot: init_prebuilts(arch) chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos) chroot.mount_packages() @@ -373,7 +376,7 @@ def setup_build_chroot( return chroot -def setup_sources(package: Pkgbuild, chroot: Chroot, makepkg_conf_path='/etc/makepkg.conf', pkgbuilds_dir: str = None): +def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf', pkgbuilds_dir: str = None): pkgbuilds_dir = pkgbuilds_dir if pkgbuilds_dir else CHROOT_PATHS['pkgbuilds'] makepkg_setup_args = [ '--config', @@ -386,6 +389,7 @@ def setup_sources(package: Pkgbuild, chroot: Chroot, makepkg_conf_path='/etc/mak logging.info(f'Setting up sources for {package.path} in {chroot.name}') result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path)) + assert isinstance(result, subprocess.CompletedProcess) if result.returncode != 0: raise Exception(f'Failed to check sources for {package.path}') @@ -428,7 +432,9 @@ def build_package( logging.info('Setting up dependencies for cross-compilation') # include crossdirect for ccache symlinks and qemu-user results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"]) - if results['crossdirect'].returncode != 0: + res_crossdirect = results['crossdirect'] + assert isinstance(res_crossdirect, subprocess.CompletedProcess) + if res_crossdirect.returncode != 0: raise Exception('Unable to install crossdirect') # mount foreign arch chroot inside native chroot chroot_relative = os.path.join(CHROOT_PATHS['chroots'], target_chroot.name) @@ -450,7 +456,7 @@ def build_package( deps += ['ccache'] logging.debug(('Building for native arch. ' if not foreign_arch else '') + 'Skipping crossdirect.') dep_install = target_chroot.try_install_packages(deps, allow_fail=False) - failed_deps = [name for name, res in dep_install.items() if res.returncode != 0] + failed_deps = [name for name, res in dep_install.items() if res.returncode != 0] # type: ignore[union-attr] if failed_deps: raise Exception(f'Dependencies failed to install: {failed_deps}') @@ -460,12 +466,12 @@ def build_package( build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}' logging.debug(f'Building: Running {build_cmd}') result = build_root.run_cmd(build_cmd, inner_env=env, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path)) - + assert isinstance(result, subprocess.CompletedProcess) if result.returncode != 0: raise Exception(f'Failed to compile package {package.path}') -def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: list[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]: +def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: Iterable[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]: package_levels = generate_dependency_chain(repo, packages) build_names = set[str]() build_levels = list[set[Pkgbuild]]() @@ -486,7 +492,7 @@ def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: list[Pkgbuil def build_packages( repo: dict[str, Pkgbuild], - packages: list[Pkgbuild], + packages: Iterable[Pkgbuild], arch: Arch, force: bool = False, enable_crosscompile: bool = True, @@ -517,7 +523,7 @@ def build_packages( def build_packages_by_paths( - paths: list[str], + paths: Iterable[str], arch: Arch, repo: dict[str, Pkgbuild], force=False, @@ -595,10 +601,9 @@ def cmd_build(paths: list[str], force=False, arch=None): build(paths, force, arch) -def build(paths: list[str], force: bool, arch: Arch): - if arch is None: - # TODO: arch = config.get_profile()... - arch = 'aarch64' +def build(paths: Iterable[str], force: bool, arch: Optional[Arch]): + # TODO: arch = config.get_profile()... + arch = arch or 'aarch64' if arch not in ARCHES: raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}') @@ -622,7 +627,7 @@ def build(paths: list[str], force: bool, arch: Arch): @cmd_packages.command(name='sideload') @click.argument('paths', nargs=-1) -def cmd_sideload(paths: list[str]): +def cmd_sideload(paths: Iterable[str]): """Build packages, copy to the device via SSH and install them""" files = build(paths, True, None) scp_put_files(files, '/tmp') @@ -641,7 +646,7 @@ def cmd_sideload(paths: list[str]): @click.option('-f', '--force', is_flag=True, default=False, help="Don't prompt for confirmation") @click.option('-n', '--noop', is_flag=True, default=False, help="Print what would be removed but dont execute") @click.argument('what', type=click.Choice(['all', 'src', 'pkg']), nargs=-1) -def cmd_clean(what: list[str] = ['all'], force: bool = False, noop: bool = False): +def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = False): """Remove files and directories not tracked in PKGBUILDs.git""" enforce_wrap() if noop: diff --git a/utils.py b/utils.py index dd008e3..ec04d87 100644 --- a/utils.py +++ b/utils.py @@ -1,13 +1,11 @@ -from shutil import which import atexit -import subprocess - import logging - -from os import PathLike +import subprocess +from shutil import which +from typing import Optional, Union, Sequence -def programs_available(programs) -> bool: +def programs_available(programs: Union[str, Sequence[str]]) -> bool: if type(programs) is str: programs = [programs] for program in programs: @@ -16,7 +14,7 @@ def programs_available(programs) -> bool: return True -def umount(dest: PathLike, lazy=False): +def umount(dest: str, lazy=False): return subprocess.run( [ 'umount', @@ -27,7 +25,7 @@ def umount(dest: PathLike, lazy=False): ) -def mount(src: PathLike, dest: PathLike, options=['bind'], fs_type=None, register_unmount=True) -> subprocess.CompletedProcess: +def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[str] = None, register_unmount=True) -> subprocess.CompletedProcess: opts = [] for opt in options: opts += ['-o', opt] @@ -47,7 +45,7 @@ def mount(src: PathLike, dest: PathLike, options=['bind'], fs_type=None, registe return result -def check_findmnt(path: PathLike): +def check_findmnt(path: str): result = subprocess.run( [ 'findmnt',