Type annotate all the things!

This commit is contained in:
InsanePrawn 2022-02-18 06:32:04 +01:00
parent a7da033845
commit 7a324555da
14 changed files with 213 additions and 134 deletions

View file

@ -5,9 +5,10 @@ import os
from config import config
from wrapper import enforce_wrap
from .base import get_base_chroot, Chroot
from .build import get_build_chroot
from .device import get_device_chroot
from .base import get_base_chroot, Chroot, BaseChroot
from .build import get_build_chroot, BuildChroot
#from .device import get_device_chroot, DeviceChroot
from .helpers import get_chroot_path
# export Chroot class
Chroot = Chroot
@ -23,32 +24,34 @@ def cmd_chroot(type: str = 'build', arch: str = None, enable_crossdirect=True):
raise Exception('Unknown chroot type: ' + type)
enforce_wrap()
chroot: Chroot
if type == 'rootfs':
if arch:
name = 'rootfs_' + arch
else:
raise Exception('"rootfs" without args not yet implemented, sorry!')
# TODO: name = config.get_profile()[...]
chroot_path = os.path.join(config.get_path('chroots'), name)
chroot_path = get_chroot_path(name)
if not os.path.exists(chroot_path):
raise Exception(f"rootfs {name} doesn't exist")
else:
if not arch:
#TODO: arch = config.get_profile()[...]
# TODO: arch = config.get_profile()[...]
arch = 'aarch64'
if type == 'base':
chroot = get_base_chroot(arch)
if not os.path.exists(os.path.join(chroot.path, 'bin')):
if not os.path.exists(chroot.get_path('/bin')):
chroot.initialize()
chroot.initialized = True
elif type == 'build':
chroot = get_build_chroot(arch, activate=True)
if not os.path.exists(os.path.join(chroot.path, 'bin')):
chroot.initialize()
chroot.initialized = True
chroot.mount_pkgbuilds()
build_chroot: BuildChroot = get_build_chroot(arch, activate=True)
chroot = build_chroot # type safety
if not os.path.exists(build_chroot.get_path('/bin')):
build_chroot.initialize()
build_chroot.initialized = True
build_chroot.mount_pkgbuilds()
if config.file['build']['crossdirect'] and enable_crossdirect:
chroot.mount_crossdirect()
build_chroot.mount_crossdirect()
else:
raise Exception('Really weird bug')

View file

@ -2,8 +2,9 @@ import atexit
import logging
import os
import subprocess
from typing import Protocol, Union
from copy import deepcopy
from shlex import quote as shell_quote
from typing import Protocol, Union, Optional, Mapping
from config import config
from constants import Arch, CHROOT_PATHS
@ -22,17 +23,17 @@ class AbstractChroot(Protocol):
initialized: bool = False
active: bool = False
active_mounts: list[str] = []
extra_repos: dict[str, RepoInfo] = {}
extra_repos: Mapping[str, RepoInfo] = {}
base_packages: list[str] = ['base']
def __init__(
self,
name: str,
arch: Arch,
copy_base: bool = None,
initialize: bool = False,
extra_repos: dict[str, RepoInfo] = {},
base_packages: list[str] = ['base', 'base-devel', 'git'],
copy_base: bool,
initialize: bool,
extra_repos: Mapping[str, RepoInfo],
base_packages: list[str],
path_override: str = None,
):
pass
@ -40,25 +41,35 @@ class AbstractChroot(Protocol):
def initialize(self, reset: bool = False, fail_if_initialized: bool = False):
raise NotImplementedError()
def activate(self, *args, **kwargs):
def activate(self, fail_if_active: bool):
pass
def get_path(self, *args, **kwargs):
def get_path(self, *joins: str):
pass
def run_cmd(self, *args, **kwargs):
def run_cmd(
self,
script: Union[str, list[str]],
inner_env: dict[str, str],
outer_env: dict[str, str],
attach_tty: bool,
capture_output: bool,
cwd: str,
fail_inactive: bool,
stdout: Optional[int],
):
pass
def mount_pacman_cache(self, *args, **kwargs):
def mount_pacman_cache(self, fail_if_mounted: bool):
pass
def mount_packages(self, *args, **kwargs):
def mount_packages(self, fail_if_mounted: bool):
pass
def mount_pkgbuilds(self, *args, **kwargs):
def mount_pkgbuilds(self, fail_if_mounted: bool):
pass
def try_install_packages(self, *args, **kwargs):
def try_install_packages(self, packages: list[str], refresh: bool, allow_fail: bool) -> dict[str, Union[int, subprocess.CompletedProcess]]:
pass
@ -73,7 +84,7 @@ class Chroot(AbstractChroot):
arch: Arch,
copy_base: bool = None,
initialize: bool = False,
extra_repos: dict[str, RepoInfo] = {},
extra_repos: Mapping[str, RepoInfo] = {},
base_packages: list[str] = ['base', 'base-devel', 'git'],
path_override: str = None,
):
@ -84,7 +95,7 @@ class Chroot(AbstractChroot):
self.arch = arch
self.path = path_override or os.path.join(config.get_path('chroots'), name)
self.copy_base = copy_base
self.extra_repos = extra_repos.copy()
self.extra_repos = deepcopy(extra_repos)
self.base_packages = base_packages
if initialize:
self.initialize()
@ -108,7 +119,7 @@ class Chroot(AbstractChroot):
self.create_rootfs(reset, pacman_conf_target, active_previously)
def get_path(self, *joins) -> str:
def get_path(self, *joins: str) -> str:
if joins:
joins = (joins[0].lstrip('/'),) + joins[1:]
@ -194,22 +205,24 @@ class Chroot(AbstractChroot):
self.umount_many(self.active_mounts)
self.active = False
def run_cmd(self,
script: Union[str, list[str]],
inner_env: dict[str, str] = {},
outer_env: dict[str, str] = os.environ.copy() | {'QEMU_LD_PREFIX': '/usr/aarch64-linux-gnu'},
attach_tty: bool = False,
capture_output: bool = False,
cwd: str = None,
fail_inactive: bool = True,
stdout=None) -> subprocess.CompletedProcess:
def run_cmd(
self,
script: Union[str, list[str]],
inner_env: dict[str, str] = {},
outer_env: dict[str, str] = os.environ.copy() | {'QEMU_LD_PREFIX': '/usr/aarch64-linux-gnu'},
attach_tty: bool = False,
capture_output: bool = False,
cwd: Optional[str] = None,
fail_inactive: bool = True,
stdout: Optional[int] = None,
) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive:
raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`')
if outer_env is None:
outer_env = os.environ.copy()
env_cmd = ['/usr/bin/env'] + [f'{shell_quote(key)}={shell_quote(value)}' for key, value in inner_env.items()]
run_func = subprocess.call if attach_tty else subprocess.run
kwargs = {
kwargs: dict = {
'env': outer_env,
}
if not attach_tty:
@ -225,8 +238,10 @@ class Chroot(AbstractChroot):
script,
]
logging.debug(f'{self.name}: Running cmd: "{cmd}"')
result = run_func(cmd, **kwargs)
return result
if attach_tty:
return subprocess.call(cmd, **kwargs)
else:
return subprocess.run(cmd, **kwargs)
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
return self.mount(
@ -253,7 +268,7 @@ class Chroot(AbstractChroot):
fail_if_mounted=fail_if_mounted,
)
def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: str, cross: bool = True) -> str:
def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: Optional[str], cross: bool = True) -> str:
"""
Generate a `makepkg.conf` or `makepkg_cross_$arch.conf` file in /etc.
If `cross` is set makepkg will be configured to crosscompile for the foreign chroot at `cross_chroot_relative`
@ -296,13 +311,19 @@ class Chroot(AbstractChroot):
if result.returncode != 0:
raise Exception('Failed to setup user')
def try_install_packages(self, packages: list[str], refresh: bool = False, allow_fail: bool = True) -> dict[str, subprocess.CompletedProcess]:
def try_install_packages(
self,
packages: list[str],
refresh: bool = False,
allow_fail: bool = True,
) -> dict[str, Union[int, subprocess.CompletedProcess]]:
"""Try installing packages, fall back to installing one by one"""
results = {}
if refresh:
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm')
cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}')
assert isinstance(result, subprocess.CompletedProcess)
results |= {package: result for package in packages}
if result.returncode != 0 and allow_fail:
results = {}

View file

@ -43,9 +43,11 @@ class BaseChroot(Chroot):
self.initialized = True
def get_base_chroot(arch: Arch, **kwargs) -> Chroot:
def get_base_chroot(arch: Arch, **kwargs) -> BaseChroot:
name = base_chroot_name(arch)
default = BaseChroot(name, arch, initialize=False, copy_base=False)
if kwargs.pop('initialize', False):
logging.debug('get_base_chroot: Had to remove "initialize" from args. This indicates a bug.')
return get_chroot(name, **kwargs, initialize=False, default=default)
chroot = get_chroot(name, **kwargs, initialize=False, default=default)
assert (isinstance(chroot, BaseChroot))
return chroot

View file

@ -1,7 +1,8 @@
import glob
import logging
import os
import subprocess
from glob import glob
from typing import Optional
from config import config
from constants import Arch, GCC_HOSTSPECS, CROSSDIRECT_PKGS, CHROOT_PATHS
@ -66,7 +67,7 @@ class BuildChroot(Chroot):
if active_previously:
self.activate()
def mount_crossdirect(self, native_chroot: Chroot = None, fail_if_mounted: bool = False):
def mount_crossdirect(self, native_chroot: Optional[Chroot] = None, fail_if_mounted: bool = False):
"""
mount `native_chroot` at `target_chroot`/native
returns the absolute path that `native_chroot` has been mounted at.
@ -85,10 +86,19 @@ class BuildChroot(Chroot):
native_chroot.mount_pacman_cache()
native_chroot.mount_packages()
native_chroot.activate()
results = native_chroot.try_install_packages(CROSSDIRECT_PKGS + [gcc], refresh=True, allow_fail=False)
if results[gcc].returncode != 0:
results = dict(native_chroot.try_install_packages(
CROSSDIRECT_PKGS + [gcc],
refresh=True,
allow_fail=False,
),)
res_gcc = results[gcc]
res_crossdirect = results['crossdirect']
assert isinstance(res_gcc, subprocess.CompletedProcess)
assert isinstance(res_crossdirect, subprocess.CompletedProcess)
if res_gcc.returncode != 0:
logging.debug('Failed to install cross-compiler package {gcc}')
if results['crossdirect'].returncode != 0:
if res_crossdirect.returncode != 0:
raise Exception('Failed to install crossdirect')
cc_path = os.path.join(native_chroot.path, 'usr', 'bin', cc)
@ -135,4 +145,5 @@ def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> Bui
default = BuildChroot(name, arch, initialize=False, copy_base=True, extra_repos=repos)
chroot = get_chroot(name, **kwargs, default=default)
chroot.extra_repos = repos
assert (isinstance(chroot, BuildChroot))
return chroot

View file

@ -6,7 +6,7 @@ from utils import check_findmnt
from .base import BaseChroot
from .build import BuildChroot
from .abstract import get_chroot, Chroot
from .abstract import get_chroot
class DeviceChroot(BuildChroot):
@ -39,7 +39,16 @@ class DeviceChroot(BuildChroot):
self.mount(source_path, '/', fs_type=fs_type, options=options)
def get_device_chroot(device: str, flavour: str, arch: Arch, packages: list[str] = BASE_PACKAGES, extra_repos={}, **kwargs) -> Chroot:
def get_device_chroot(
device: str,
flavour: str,
arch: Arch,
packages: list[str] = BASE_PACKAGES,
extra_repos={},
**kwargs,
) -> DeviceChroot:
name = f'rootfs_{device}-{flavour}'
default = DeviceChroot(name, arch, initialize=False, copy_base=False, base_packages=packages, extra_repos=extra_repos)
return get_chroot(name, **kwargs, default=default)
chroot = get_chroot(name, **kwargs, default=default)
assert (isinstance(chroot, DeviceChroot))
return chroot

View file

@ -1,4 +1,5 @@
import os
from typing import Union, Optional, TypedDict
from config import config
from constants import Arch
@ -7,9 +8,16 @@ BIND_BUILD_DIRS = 'BINDBUILDDIRS'
BASE_CHROOT_PREFIX = 'base_'
BUILD_CHROOT_PREFIX = 'build_'
class MountEntry(TypedDict):
src: str
type: Optional[str]
options: list[str]
# inspired by arch-chroot
# order of these matters!
BASIC_MOUNTS = {
BASIC_MOUNTS: dict[str, MountEntry] = {
'/proc': {
'src': 'proc',
'type': 'proc',

View file

@ -1,16 +1,28 @@
import appdirs
import click
import os
import toml
import logging
from copy import deepcopy
import click
from typing import Optional, Union, TypedDict, Any, Mapping
CONFIG_DIR = appdirs.user_config_dir('kupfer')
CACHE_DIR = appdirs.user_cache_dir('kupfer')
CONFIG_DEFAULT_PATH = os.path.join(CONFIG_DIR, 'kupferbootstrap.toml')
Profile = dict[str, str]
class Profile(TypedDict, total=False):
parent: str
device: str
flavour: str
pkgs_include: list[str]
pkgs_exclude: list[str]
hostname: str
username: str
password: Optional[str]
size_extra_mb: Union[str, int]
PROFILE_DEFAULTS: Profile = {
'parent': '',
@ -24,9 +36,9 @@ PROFILE_DEFAULTS: Profile = {
'size_extra_mb': "0",
}
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()}
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
CONFIG_DEFAULTS = {
CONFIG_DEFAULTS: dict = {
'wrapper': {
'type': 'docker',
},
@ -132,9 +144,9 @@ def resolve_profile(
# now init missing keys
for key, value in PROFILE_DEFAULTS.items():
if key not in full.keys():
full[key] = None
full[key] = None # type: ignore[misc]
if type(value) == list:
full[key] = []
full[key] = [] # type: ignore[misc]
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)
@ -147,7 +159,7 @@ def sanitize_config(conf: dict[str, dict], warn_missing_defaultprofile=True) ->
return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile)
def merge_configs(conf_new: dict[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]:
def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defaultprofile=True) -> dict[str, dict]:
"""
Returns `conf_new` semantically merged into `conf_base`, after validating
`conf_new` keys against `CONFIG_DEFAULTS` and `PROFILE_DEFAULTS`.
@ -240,9 +252,8 @@ class ConfigLoadException(Exception):
inner = None
def __init__(self, extra_msg='', inner_exception: Exception = None):
msg = ['Config load failed!']
msg: list[str] = ['Config load failed!']
if extra_msg:
msg[0].append(':')
msg.append(extra_msg)
if inner_exception:
self.inner = inner_exception
@ -263,9 +274,9 @@ class ConfigStateHolder:
file: dict = {}
# runtime config not persisted anywhere
runtime: dict = CONFIG_RUNTIME_DEFAULTS
_profile_cache: dict[str, Profile] = None
_profile_cache: dict[str, Profile]
def __init__(self, runtime_conf={}, file_conf_path: str = None, file_conf_base: dict = {}):
def __init__(self, runtime_conf={}, file_conf_path: Optional[str] = None, file_conf_base: dict = {}):
"""init a stateholder, optionally loading `file_conf_path`"""
self.runtime.update(runtime_conf)
self.runtime['arch'] = os.uname().machine
@ -274,11 +285,11 @@ class ConfigStateHolder:
self.try_load_file(file_conf_path)
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
_conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH
self.runtime['config_file'] = _conf_file
config_file = config_file or CONFIG_DEFAULT_PATH
self.runtime['config_file'] = config_file
self._profile_cache = None
try:
self.file = parse_file(config_file=_conf_file, base=base)
self.file = parse_file(config_file=config_file, base=base)
except Exception as ex:
self.file_state.exception = ex
self.file_state.load_finished = True
@ -295,9 +306,8 @@ class ConfigStateHolder:
ex = Exception("File doesn't exist. Try running `kupferbootstrap config init` first?")
raise ex
def get_profile(self, name: str = None) -> Profile:
if not name:
name = self.file['profiles']['current']
def get_profile(self, name: Optional[str] = None) -> Profile:
name = name or self.file['profiles']['current']
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
return self._profile_cache[name]
@ -310,7 +320,7 @@ class ConfigStateHolder:
def dump(self) -> str:
"""dump toml representation of `self.file`"""
dump_toml(self.file)
return dump_toml(self.file)
def write(self, path=None):
"""write toml representation of `self.file` to `path`"""
@ -333,7 +343,7 @@ class ConfigStateHolder:
self.invalidate_profile_cache()
return changed
def update_profile(self, name: str, profile: dict, merge: bool = False, create: bool = True, prune: bool = True):
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
new = {}
if name not in self.file['profiles']:
if not create:
@ -366,11 +376,11 @@ def comma_str_to_list(s: str, default=None) -> list[str]:
def prompt_config(
text: str,
default: any,
default: Any,
field_type: type = str,
bold: bool = True,
echo_changes: bool = True,
) -> (any, bool):
) -> tuple[Any, bool]:
"""
prompts for a new value for a config key. returns the result and a boolean that indicates
whether the result is different, considering empty strings and None equal to each other.
@ -404,10 +414,10 @@ def prompt_config(
return result, changed
def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> (Profile, bool):
def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> tuple[Profile, bool]:
"""Prompts the user for every field in `defaults`. Set values to None for an empty profile."""
profile = PROFILE_EMPTY | defaults
profile: Any = PROFILE_EMPTY | defaults
# don't use get_profile() here because we need the sparse profile
if name in config.file['profiles']:
profile |= config.file['profiles'][name]
@ -420,14 +430,14 @@ def prompt_profile(name: str, create: bool = True, defaults: Profile = {}) -> (P
for key, current in profile.items():
current = profile[key]
text = f'{name}.{key}'
result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key]))
result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key])) # type: ignore
if _changed:
profile[key] = result
changed = True
return profile, changed
def config_dot_name_get(name: str, config: dict[str, any], prefix: str = ''):
def config_dot_name_get(name: str, config: dict[str, Any], prefix: str = '') -> Any:
if not isinstance(config, dict):
raise Exception(f"Couldn't resolve config name: passed config is not a dict: {repr(config)}")
split_name = name.split('.')
@ -442,7 +452,7 @@ def config_dot_name_get(name: str, config: dict[str, any], prefix: str = ''):
return config_dot_name_get(name=rest_name, config=value, prefix=prefix + name + '.')
def config_dot_name_set(name: str, value: any, config: dict[str, any]):
def config_dot_name_set(name: str, value: Any, config: dict[str, Any]):
split_name = name.split('.')
if len(split_name) > 1:
config = config_dot_name_get('.'.join(split_name[:-1]), config)
@ -482,7 +492,7 @@ noop_flag = click.option('--noop', '-n', help="Don't write changes to file", is_
def cmd_config_init(sections: list[str] = CONFIG_SECTIONS, non_interactive: bool = False, noop: bool = False):
"""Initialize the config file"""
if not non_interactive:
results = {}
results: dict[str, dict] = {}
for section in sections:
if section not in CONFIG_SECTIONS:
raise Exception(f'Unknown section: {section}')
@ -526,7 +536,8 @@ def cmd_config_set(key_vals: list[str], non_interactive: bool = False, noop: boo
for pair in key_vals:
split_pair = pair.split('=')
if len(split_pair) == 2:
key, value = split_pair
key: str = split_pair[0]
value: Any = split_pair[1]
value_type = type(config_dot_name_get(key, CONFIG_DEFAULTS))
if value_type != list:
value = click.types.convert_type(value_type)(value)

View file

@ -1,4 +1,4 @@
from typing import TypeAlias
from typing import TypeAlias, TypedDict
FASTBOOT = 'fastboot'
FLASH_PARTS = {
@ -14,28 +14,35 @@ LOCATIONS = [EMMC, MICROSD]
JUMPDRIVE = 'jumpdrive'
JUMPDRIVE_VERSION = '0.8'
BOOT_STRATEGIES = {
BOOT_STRATEGIES: dict[str, str] = {
'oneplus-enchilada': FASTBOOT,
'xiaomi-beryllium-ebbg': FASTBOOT,
'xiaomi-beryllium-tianma': FASTBOOT,
'bq-paella': FASTBOOT,
}
DEVICES = {
DEVICES: dict[str, list[str]] = {
'oneplus-enchilada': ['device-sdm845-oneplus-enchilada'],
'xiaomi-beryllium-ebbg': ['device-sdm845-xiaomi-beryllium-ebbg'],
'xiaomi-beryllium-tianma': ['device-sdm845-xiaomi-beryllium-tianma'],
'bq-paella': ['device-msm8916-bq-paella'],
}
BASE_PACKAGES = [
BASE_PACKAGES: list[str] = [
'base',
'base-kupfer',
'nano',
'vim',
]
FLAVOURS = {
class Flavour(TypedDict, total=False):
packages: list[str]
post_cmds: list[str]
size: int
FLAVOURS: dict[str, Flavour] = {
'barebone': {
'packages': [],
},

View file

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Mapping
from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from generator import generate_pacman_conf_body
@ -9,7 +9,7 @@ from .repo import RepoInfo, Repo
class Distro:
repos: dict[str, Repo]
repos: Mapping[str, Repo]
arch: str
def __init__(self, arch: str, repo_infos: dict[str, RepoInfo], scan=False):
@ -33,11 +33,11 @@ class Distro:
for package in repo.packages:
results[package.name] = package
def repos_config_snippet(self, extra_repos: dict[str, RepoInfo] = {}) -> str:
def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str:
extras = [Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()]
return '\n\n'.join(repo.config_snippet() for repo in (list(self.repos.values()) + extras))
def get_pacman_conf(self, extra_repos: dict[str, RepoInfo] = {}, check_space: bool = True):
def get_pacman_conf(self, extra_repos: Mapping[str, RepoInfo] = {}, check_space: bool = True):
body = generate_pacman_conf_body(self.arch, check_space=check_space)
return body + self.repos_config_snippet(extra_repos)

View file

@ -66,6 +66,7 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuil
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], base_package.path),
stdout=subprocess.PIPE,
)
assert (isinstance(srcinfo, subprocess.CompletedProcess))
lines = srcinfo.stdout.decode('utf-8').split('\n')
current = base_package

View file

@ -172,6 +172,7 @@ SRCEXT='.src.tar.gz'
#PACMAN_AUTH=()
'''
if cross:
assert chroot
chroot = chroot.strip('/')
includes = f'-I/usr/{hostspec}/usr/include -I/{chroot}/usr/include'
libs = f'-L/usr/{hostspec}/lib -L/{chroot}/usr/lib'

View file

@ -5,13 +5,13 @@ import re
import subprocess
import click
import logging
from signal import pause
from subprocess import run, CompletedProcess
from typing import Optional
from chroot import Chroot, get_device_chroot
from chroot.device import DeviceChroot, get_device_chroot
from constants import BASE_PACKAGES, DEVICES, FLAVOURS
from config import config
from config import config, Profile
from distro.distro import get_base_distro, get_kupfer_https, get_kupfer_local
from packages import build_enable_qemu_binfmt, discover_packages, build_packages
from ssh import copy_ssh_keys
@ -58,7 +58,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
raise Exception(f'Failed to resize2fs {loop_device}p2')
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2])
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore
sectors = blocks * sectors_blocks_factor #+ 157812 - 25600
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
@ -66,7 +66,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
['fdisk', '-b', str(sector_size), loop_device],
stdin=subprocess.PIPE,
)
child_proccess.stdin.write('\n'.join([
child_proccess.stdin.write('\n'.join([ # type: ignore
'd',
'2',
'n',
@ -114,9 +114,9 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
partprobe(loop_device)
def get_device_and_flavour(profile: str = None) -> tuple[str, str]:
def get_device_and_flavour(profile_name: Optional[str] = None) -> tuple[str, str]:
config.enforce_config_loaded()
profile = config.get_profile(profile)
profile = config.get_profile(profile_name)
if not profile['device']:
raise Exception("Please set the device using 'kupferbootstrap config init ...'")
@ -184,7 +184,7 @@ def losetup_rootfs_image(image_path: str, sector_size: int) -> str:
return loop_device
def mount_chroot(rootfs_source: str, boot_src: str, chroot: Chroot):
def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot):
logging.debug(f'Mounting {rootfs_source} at {chroot.path}')
chroot.mount_rootfs(rootfs_source)
@ -318,6 +318,7 @@ def install_rootfs(rootfs_device: str, bootfs_device: str, device, flavour, arch
file.write(get_base_distro(arch).get_pacman_conf(check_space=True, extra_repos=get_kupfer_https(arch).repos))
if post_cmds:
result = chroot.run_cmd(' && '.join(post_cmds))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception('Error running post_cmds')
@ -344,9 +345,9 @@ def cmd_image():
def cmd_build(profile_name: str = None, build_pkgs: bool = True, block_target: str = None, skip_part_images: bool = False):
"""Build a device image"""
enforce_wrap()
profile = config.get_profile(profile_name)
profile: Profile = config.get_profile(profile_name)
device, flavour = get_device_and_flavour(profile_name)
size_extra_mb = profile["size_extra_mb"]
size_extra_mb: int = int(profile["size_extra_mb"])
# TODO: PARSE DEVICE ARCH AND SECTOR SIZE
arch = 'aarch64'
@ -378,7 +379,8 @@ def cmd_build(profile_name: str = None, build_pkgs: bool = True, block_target: s
partition_device(loop_device)
partprobe(loop_device)
boot_dev, root_dev = None, None
boot_dev: str
root_dev: str
loop_boot = loop_device + 'p1'
loop_root = loop_device + 'p2'
if skip_part_images:

View file

@ -8,10 +8,11 @@ from copy import deepcopy
from joblib import Parallel, delayed
from glob import glob
from shutil import rmtree
from typing import Iterable, Iterator, Any, Optional
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config import config
from chroot import get_build_chroot, Chroot
from chroot.build import get_build_chroot, BuildChroot
from ssh import run_ssh_command, scp_put_files
from wrapper import enforce_wrap
from utils import git
@ -96,7 +97,7 @@ def init_prebuilts(arch: Arch, dir: str = None):
def discover_packages(parallel: bool = True) -> dict[str, Pkgbuild]:
pkgbuilds_dir = config.get_path('pkgbuilds')
packages = {}
packages: dict[str, Pkgbuild] = {}
paths = []
init_pkgbuilds(interactive=False)
for repo in REPOSITORIES:
@ -141,9 +142,9 @@ def discover_packages(parallel: bool = True) -> dict[str, Pkgbuild]:
return packages
def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: list[str], allow_empty_results=True) -> list[Pkgbuild]:
def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: Iterable[str], allow_empty_results=True) -> Iterable[Pkgbuild]:
if 'all' in paths:
return repo.values()
return list(repo.values())
result = []
for pkg in repo.values():
if pkg.path in paths:
@ -154,7 +155,7 @@ def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: list[str], allow_
return result
def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[Pkgbuild]) -> list[set[Pkgbuild]]:
def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
First the top-level packages get selected by searching the paths.
@ -171,9 +172,11 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[
def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]:
result = dict[Pkgbuild, int]()
for i, level in enumerate(levels):
result[level] = i
for pkg in level:
result[pkg] = i
return result
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> list[Pkgbuild]:
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for dep_name in package.depends:
if dep_name in visited_names:
continue
@ -182,7 +185,7 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[
visit(dep_pkg)
yield dep_pkg
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> list[Pkgbuild]:
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for pkg in get_dependencies(package, package_repo):
yield pkg
for sub_pkg in get_recursive_dependencies(pkg, package_repo):
@ -208,7 +211,7 @@ def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: list[
level = 0
# protect against dependency cycles
repeat_count = 0
_last_level: set[Pkgbuild] = None
_last_level: Optional[set[Pkgbuild]] = None
while dep_levels[level]:
level_copy = dep_levels[level].copy()
modified = False
@ -333,7 +336,7 @@ def check_package_version_built(package: Pkgbuild, arch: Arch) -> bool:
'--skippgpcheck',
'--packagelist',
]
result = native_chroot.run_cmd(
result: Any = native_chroot.run_cmd(
cmd,
capture_output=True,
)
@ -358,7 +361,7 @@ def setup_build_chroot(
extra_packages: list[str] = [],
add_kupfer_repos: bool = True,
clean_chroot: bool = False,
) -> Chroot:
) -> BuildChroot:
init_prebuilts(arch)
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
chroot.mount_packages()
@ -373,7 +376,7 @@ def setup_build_chroot(
return chroot
def setup_sources(package: Pkgbuild, chroot: Chroot, makepkg_conf_path='/etc/makepkg.conf', pkgbuilds_dir: str = None):
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf', pkgbuilds_dir: str = None):
pkgbuilds_dir = pkgbuilds_dir if pkgbuilds_dir else CHROOT_PATHS['pkgbuilds']
makepkg_setup_args = [
'--config',
@ -386,6 +389,7 @@ def setup_sources(package: Pkgbuild, chroot: Chroot, makepkg_conf_path='/etc/mak
logging.info(f'Setting up sources for {package.path} in {chroot.name}')
result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to check sources for {package.path}')
@ -428,7 +432,9 @@ def build_package(
logging.info('Setting up dependencies for cross-compilation')
# include crossdirect for ccache symlinks and qemu-user
results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"])
if results['crossdirect'].returncode != 0:
res_crossdirect = results['crossdirect']
assert isinstance(res_crossdirect, subprocess.CompletedProcess)
if res_crossdirect.returncode != 0:
raise Exception('Unable to install crossdirect')
# mount foreign arch chroot inside native chroot
chroot_relative = os.path.join(CHROOT_PATHS['chroots'], target_chroot.name)
@ -450,7 +456,7 @@ def build_package(
deps += ['ccache']
logging.debug(('Building for native arch. ' if not foreign_arch else '') + 'Skipping crossdirect.')
dep_install = target_chroot.try_install_packages(deps, allow_fail=False)
failed_deps = [name for name, res in dep_install.items() if res.returncode != 0]
failed_deps = [name for name, res in dep_install.items() if res.returncode != 0] # type: ignore[union-attr]
if failed_deps:
raise Exception(f'Dependencies failed to install: {failed_deps}')
@ -460,12 +466,12 @@ def build_package(
build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}'
logging.debug(f'Building: Running {build_cmd}')
result = build_root.run_cmd(build_cmd, inner_env=env, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to compile package {package.path}')
def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: list[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: Iterable[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
package_levels = generate_dependency_chain(repo, packages)
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
@ -486,7 +492,7 @@ def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: list[Pkgbuil
def build_packages(
repo: dict[str, Pkgbuild],
packages: list[Pkgbuild],
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
@ -517,7 +523,7 @@ def build_packages(
def build_packages_by_paths(
paths: list[str],
paths: Iterable[str],
arch: Arch,
repo: dict[str, Pkgbuild],
force=False,
@ -595,10 +601,9 @@ def cmd_build(paths: list[str], force=False, arch=None):
build(paths, force, arch)
def build(paths: list[str], force: bool, arch: Arch):
if arch is None:
# TODO: arch = config.get_profile()...
arch = 'aarch64'
def build(paths: Iterable[str], force: bool, arch: Optional[Arch]):
# TODO: arch = config.get_profile()...
arch = arch or 'aarch64'
if arch not in ARCHES:
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
@ -622,7 +627,7 @@ def build(paths: list[str], force: bool, arch: Arch):
@cmd_packages.command(name='sideload')
@click.argument('paths', nargs=-1)
def cmd_sideload(paths: list[str]):
def cmd_sideload(paths: Iterable[str]):
"""Build packages, copy to the device via SSH and install them"""
files = build(paths, True, None)
scp_put_files(files, '/tmp')
@ -641,7 +646,7 @@ def cmd_sideload(paths: list[str]):
@click.option('-f', '--force', is_flag=True, default=False, help="Don't prompt for confirmation")
@click.option('-n', '--noop', is_flag=True, default=False, help="Print what would be removed but dont execute")
@click.argument('what', type=click.Choice(['all', 'src', 'pkg']), nargs=-1)
def cmd_clean(what: list[str] = ['all'], force: bool = False, noop: bool = False):
def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = False):
"""Remove files and directories not tracked in PKGBUILDs.git"""
enforce_wrap()
if noop:

View file

@ -1,13 +1,11 @@
from shutil import which
import atexit
import subprocess
import logging
from os import PathLike
import subprocess
from shutil import which
from typing import Optional, Union, Sequence
def programs_available(programs) -> bool:
def programs_available(programs: Union[str, Sequence[str]]) -> bool:
if type(programs) is str:
programs = [programs]
for program in programs:
@ -16,7 +14,7 @@ def programs_available(programs) -> bool:
return True
def umount(dest: PathLike, lazy=False):
def umount(dest: str, lazy=False):
return subprocess.run(
[
'umount',
@ -27,7 +25,7 @@ def umount(dest: PathLike, lazy=False):
)
def mount(src: PathLike, dest: PathLike, options=['bind'], fs_type=None, register_unmount=True) -> subprocess.CompletedProcess:
def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[str] = None, register_unmount=True) -> subprocess.CompletedProcess:
opts = []
for opt in options:
opts += ['-o', opt]
@ -47,7 +45,7 @@ def mount(src: PathLike, dest: PathLike, options=['bind'], fs_type=None, registe
return result
def check_findmnt(path: PathLike):
def check_findmnt(path: str):
result = subprocess.run(
[
'findmnt',