Compare commits

...
Sign in to create a new pull request.

44 commits

Author SHA1 Message Date
InsanePrawn
0e103f5a40 add_package_to_repo: create foreign arch repo dir before copying foreign-arch packages 2022-09-16 19:45:25 +02:00
InsanePrawn
f9cf76e937 packages: check_package_built(): makedir() the other arch's repo dir before trying to copy our any-arch package there 2022-09-16 19:45:25 +02:00
InsanePrawn
4c3e264de3 wrapper/docker: create volume dirs ourselfes for better permissions and podman compat 2022-09-16 19:45:25 +02:00
InsanePrawn
ccec875a0c wrapper/docker: fix indentation (only version needs to be pulled from with open():) 2022-09-16 19:45:25 +02:00
InsanePrawn
156612bf73 dataclass.resolve_type_hint(): add conversion from str to [int,float] if str not in types 2022-09-16 19:45:25 +02:00
InsanePrawn
4833753975 config/scheme: move DataClass to dataclass.py 2022-09-16 19:45:25 +02:00
InsanePrawn
b86b7c94f0 config: DataClass.transform(): add allow_extra=False parameter 2022-09-16 19:45:25 +02:00
InsanePrawn
e00160f6df packages: move filter_packages() to pkgbuild, rename to filter_pkgbuilds() 2022-09-16 19:45:25 +02:00
InsanePrawn
d89ad54fc5 constants.py: remove DEVICES array, now comes from pkgbuilds.git 2022-09-16 19:45:25 +02:00
InsanePrawn
cc2e24285f image.py: use Device instead of the device name from config 2022-09-16 19:45:25 +02:00
InsanePrawn
86b4b30685 exec: makedir() accept Union[str, int] for user and group 2022-08-29 20:20:16 +02:00
InsanePrawn
6c26260001 chroot: add chroot.get_uid(user: str), use in chroot.mount_{ccache,rust} to apply correct ownership 2022-08-29 20:20:16 +02:00
InsanePrawn
ba58aa1a29 wrapper: add WRAPPER_PATHS to point ccache and rust to predictable locations 2022-08-29 20:20:16 +02:00
InsanePrawn
0378f7fdf6 requirements.txt: add setuptools required by munch 2022-08-29 19:05:02 +02:00
InsanePrawn
58fd212560 Dockerfile: clean up pkgconfig-aarch64 leftover 2022-08-29 04:54:05 +02:00
InsanePrawn
2ef5f27c6c config: introduce rust cache 2022-08-29 04:44:16 +02:00
InsanePrawn
4285cf734c config: introduce per-arch persisted ccache dir 2022-08-29 04:44:16 +02:00
InsanePrawn
fd1f759429 chroot: add chroot.mount_chroots() to mount /chroot and use in cmd_chroot() 2022-08-29 04:44:16 +02:00
InsanePrawn
8274a31068 pkgbuild.discover_pkgbuilds(): warn and skip directories that don't contain a PKGBUILD 2022-08-29 04:44:16 +02:00
InsanePrawn
3c2e6fe2d0 packages and image: wrap more upfront on missing binaries 2022-08-28 17:21:16 +02:00
InsanePrawn
a76ad5ac4b packages.filter_packages(): only filter by arch if arch is not None 2022-08-28 17:20:35 +02:00
InsanePrawn
9f1281f1cb wrapper_su_helper.py: use su -P to allocate a pseudo-TTY 2022-08-28 07:38:52 +02:00
InsanePrawn
57be536781 packages.cmd_sideload(): fix escape of --overwrite=* 2022-08-28 07:32:01 +02:00
InsanePrawn
b5214d9cd6 packages: respect package arches before and during building 2022-08-28 07:32:01 +02:00
InsanePrawn
39b98d30ae chroot.create_user(): add primary_group parameter 2022-08-28 07:32:01 +02:00
InsanePrawn
7b05fa4fdb packages.check_package_version_built(): use Pkgbuild.get_filename() instead of running makepkg --packagelist 2022-08-28 07:32:01 +02:00
InsanePrawn
2f98ffc79d pkgbuild: add get_filename(arch) 2022-08-28 05:49:21 +02:00
InsanePrawn
ea88397f1f packages.filter_packages(): optionally check package arch 2022-08-28 05:49:18 +02:00
InsanePrawn
d9a88e1474 packages: use user 'kupfer' in chroots for building 2022-08-28 05:49:18 +02:00
InsanePrawn
dcccc9bdc8 chroot: add chroot.add_sudo_config() 2022-08-28 05:49:18 +02:00
InsanePrawn
fc92298100 chroot.create_user(): add optional uid and non_unique parameter 2022-08-28 02:22:54 +02:00
InsanePrawn
20975feec6 chroot.run_cmd(): add switch_user parameter 2022-08-28 02:22:54 +02:00
InsanePrawn
4dc134c8f8 exec/cmd: generate_cmd_{su,elevated}: tolerate flat string as input for cmd instead of list 2022-08-28 02:22:54 +02:00
InsanePrawn
ac7d16e4a7 exec.file.write_file(): fix situation where file exists but stat fails due to permissions 2022-08-28 02:22:54 +02:00
InsanePrawn
bef0efc637 global: refactor to use config.{file,runtime}.$member instead of config.file["$member"] 2022-08-27 17:06:48 +02:00
InsanePrawn
13ad63446e DockerWrapper.wrap(): run as config.runtime.uid instead of root 2022-08-27 06:03:36 +02:00
InsanePrawn
6b64989a3b config: add config.runtime.uid 2022-08-27 05:56:45 +02:00
InsanePrawn
4c77a16bba main: add -w to *enforce* wrapping 2022-08-27 05:56:45 +02:00
InsanePrawn
57d5ed474f typecheck.sh: show error codes 2022-08-27 03:46:07 +02:00
InsanePrawn
114755888e packages: circumvent git dubious ownership errors in pkgbuilds.git due to chrootery 2022-08-27 03:45:19 +02:00
InsanePrawn
b154f835e6 constants: add QEMU_ARCHES 2022-08-26 22:55:08 +02:00
InsanePrawn
bc31f9822a constants.py: add armv7h support 2022-08-26 22:55:08 +02:00
InsanePrawn
08fc10bf11 Pkgbuild: add refresh_sources() 2022-08-26 03:41:59 +02:00
InsanePrawn
6e8fd9f622 packages/pkgbuild: cache parsed pkgbuilds by path, add get_pkgbuild_by_path(), Pkgbuild.update(pkgb) 2022-08-26 03:30:32 +02:00
32 changed files with 710 additions and 419 deletions

View file

@ -2,22 +2,16 @@ FROM archlinux:base-devel
RUN pacman-key --init && \ RUN pacman-key --init && \
pacman -Sy --noconfirm archlinux-keyring && \ pacman -Sy --noconfirm archlinux-keyring && \
pacman -Su --noconfirm \ pacman -Su --noconfirm --needed \
python python-pip \ python python-pip \
arch-install-scripts rsync \ arch-install-scripts rsync \
aarch64-linux-gnu-gcc aarch64-linux-gnu-binutils aarch64-linux-gnu-glibc aarch64-linux-gnu-linux-api-headers \ aarch64-linux-gnu-gcc aarch64-linux-gnu-binutils aarch64-linux-gnu-glibc aarch64-linux-gnu-linux-api-headers \
git \ git sudo \
android-tools openssh inetutils \ android-tools openssh inetutils \
parted parted
RUN sed -i "s/EUID == 0/EUID == -1/g" $(which makepkg) RUN sed -i "s/EUID == 0/EUID == -1/g" $(which makepkg)
RUN cd /tmp && \
git clone https://aur.archlinux.org/aarch64-linux-gnu-pkg-config.git && \
cd aarch64-linux-gnu-pkg-config && \
makepkg -s --skippgpcheck && \
pacman -U --noconfirm *.pkg*
RUN yes | pacman -Scc RUN yes | pacman -Scc
RUN sed -i "s/SigLevel.*/SigLevel = Never/g" /etc/pacman.conf RUN sed -i "s/SigLevel.*/SigLevel = Never/g" /etc/pacman.conf
@ -32,5 +26,7 @@ RUN pip install -r requirements.txt
COPY . . COPY . .
RUN python -c "from distro import distro; distro.get_kupfer_local(arch=None,in_chroot=False).repos_config_snippet()" | tee -a /etc/pacman.conf RUN python -c "from distro import distro; distro.get_kupfer_local(arch=None,in_chroot=False).repos_config_snippet()" | tee -a /etc/pacman.conf
RUN useradd -m -g users kupfer
RUN echo "kupfer ALL=(ALL) NOPASSWD: ALL" | tee /etc/sudoers.d/kupfer
WORKDIR / WORKDIR /

View file

@ -3,6 +3,7 @@
import os import os
import logging import logging
from constants import Arch, QEMU_ARCHES
from exec.cmd import run_root_cmd from exec.cmd import run_root_cmd
from utils import mount from utils import mount
@ -38,11 +39,15 @@ def binfmt_info():
return full return full
def is_registered(arch: str) -> bool: def is_registered(arch: Arch) -> bool:
return os.path.exists("/proc/sys/fs/binfmt_misc/qemu-" + arch) qemu_arch = QEMU_ARCHES[arch]
return os.path.exists("/proc/sys/fs/binfmt_misc/qemu-" + qemu_arch)
def register(arch): def register(arch: Arch):
if arch not in QEMU_ARCHES:
raise Exception(f'binfmt.register(): unknown arch {arch} (not in QEMU_ARCHES)')
qemu_arch = QEMU_ARCHES[arch]
if is_registered(arch): if is_registered(arch):
return return
@ -51,7 +56,7 @@ def register(arch):
# Build registration string # Build registration string
# https://en.wikipedia.org/wiki/Binfmt_misc # https://en.wikipedia.org/wiki/Binfmt_misc
# :name:type:offset:magic:mask:interpreter:flags # :name:type:offset:magic:mask:interpreter:flags
info = lines[arch] info = lines[qemu_arch]
code = info['line'] code = info['line']
binfmt = '/proc/sys/fs/binfmt_misc' binfmt = '/proc/sys/fs/binfmt_misc'
register = binfmt + '/register' register = binfmt + '/register'
@ -70,7 +75,10 @@ def register(arch):
def unregister(arch): def unregister(arch):
binfmt_file = "/proc/sys/fs/binfmt_misc/qemu-" + arch if arch not in QEMU_ARCHES:
raise Exception(f'binfmt.unregister(): unknown arch {arch} (not in QEMU_ARCHES)')
qemu_arch = QEMU_ARCHES[arch]
binfmt_file = "/proc/sys/fs/binfmt_misc/qemu-" + qemu_arch
if not os.path.exists(binfmt_file): if not os.path.exists(binfmt_file):
return return
logging.info(f"Unregistering qemu binfmt ({arch})") logging.info(f"Unregistering qemu binfmt ({arch})")

View file

@ -6,7 +6,8 @@ from config import config
from constants import BOOT_STRATEGIES, FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION from constants import BOOT_STRATEGIES, FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION
from exec.file import makedir from exec.file import makedir
from fastboot import fastboot_boot, fastboot_erase_dtbo from fastboot import fastboot_boot, fastboot_erase_dtbo
from image import get_device_and_flavour, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd from image import get_flavour, get_device_name, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd
from packages.device import get_profile_device
from wrapper import enforce_wrap from wrapper import enforce_wrap
LK2ND = FLASH_PARTS['LK2ND'] LK2ND = FLASH_PARTS['LK2ND']
@ -20,7 +21,8 @@ TYPES = [LK2ND, JUMPDRIVE, ABOOT]
def cmd_boot(type): def cmd_boot(type):
"""Boot JumpDrive or the Kupfer aboot image. Erases Android DTBO in the process.""" """Boot JumpDrive or the Kupfer aboot image. Erases Android DTBO in the process."""
enforce_wrap() enforce_wrap()
device, flavour = get_device_and_flavour() device = get_profile_device()
flavour = get_flavour()
# TODO: parse arch and sector size # TODO: parse arch and sector size
sector_size = 4096 sector_size = 4096
image_path = get_image_path(device, flavour) image_path = get_image_path(device, flavour)
@ -28,7 +30,7 @@ def cmd_boot(type):
if strategy == FASTBOOT: if strategy == FASTBOOT:
if type == JUMPDRIVE: if type == JUMPDRIVE:
file = f'boot-{device}.img' file = f'boot-{get_device_name(device)}.img'
path = os.path.join(config.get_path('jumpdrive'), file) path = os.path.join(config.get_path('jumpdrive'), file)
makedir(os.path.dirname(path)) makedir(os.path.dirname(path))
if not os.path.exists(path): if not os.path.exists(path):

View file

@ -50,11 +50,14 @@ def cmd_chroot(type: str = 'build', arch: str = None, enable_crossdirect=True):
build_chroot.initialize() build_chroot.initialize()
build_chroot.initialized = True build_chroot.initialized = True
build_chroot.mount_pkgbuilds() build_chroot.mount_pkgbuilds()
if config.file['build']['crossdirect'] and enable_crossdirect: build_chroot.mount_chroots()
assert arch and config.runtime.arch
if config.file.build.crossdirect and enable_crossdirect and arch != config.runtime.arch:
build_chroot.mount_crossdirect() build_chroot.mount_crossdirect()
else: else:
raise Exception('Really weird bug') raise Exception('Really weird bug')
chroot.mount_packages()
chroot.activate() chroot.activate()
logging.debug(f'Starting shell in {chroot.name}:') logging.debug(f'Starting shell in {chroot.name}:')
chroot.run_cmd('bash', attach_tty=True) chroot.run_cmd('bash', attach_tty=True)

View file

@ -10,7 +10,7 @@ from uuid import uuid4
from config import config from config import config
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.file import makedir, root_makedir, root_write_file, write_file from exec.file import makedir, root_makedir, root_write_file, write_file
from generator import generate_makepkg_conf from generator import generate_makepkg_conf
from utils import mount, umount, check_findmnt, log_or_exception from utils import mount, umount, check_findmnt, log_or_exception
@ -223,12 +223,14 @@ class Chroot(AbstractChroot):
cwd: Optional[str] = None, cwd: Optional[str] = None,
fail_inactive: bool = True, fail_inactive: bool = True,
stdout: Optional[int] = None, stdout: Optional[int] = None,
switch_user: Optional[str] = None,
) -> Union[int, subprocess.CompletedProcess]: ) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive: if not self.active and fail_inactive:
raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`') raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`')
if outer_env is None: if outer_env is None:
outer_env = {} outer_env = {}
native = config.runtime['arch'] native = config.runtime.arch
assert native
if self.arch != native and 'QEMU_LD_PREFIX' not in outer_env: if self.arch != native and 'QEMU_LD_PREFIX' not in outer_env:
outer_env = dict(outer_env) # copy dict for modification outer_env = dict(outer_env) # copy dict for modification
outer_env |= {'QEMU_LD_PREFIX': f'/usr/{GCC_HOSTSPECS[native][self.arch]}'} outer_env |= {'QEMU_LD_PREFIX': f'/usr/{GCC_HOSTSPECS[native][self.arch]}'}
@ -238,7 +240,11 @@ class Chroot(AbstractChroot):
script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False) script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False)
if cwd: if cwd:
script = f"cd {shell_quote(cwd)} && ( {script} )" script = f"cd {shell_quote(cwd)} && ( {script} )"
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + wrap_in_bash(script, flatten_result=False), shell_quote_items=True) if switch_user:
inner_cmd = generate_cmd_su(script, switch_user=switch_user, elevation_method='none', force_su=True)
else:
inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout) return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
@ -267,6 +273,13 @@ class Chroot(AbstractChroot):
fail_if_mounted=fail_if_mounted, fail_if_mounted=fail_if_mounted,
) )
def mount_chroots(self, fail_if_mounted: bool = False) -> str:
return self.mount(
absolute_source=config.get_path('chroots'),
relative_destination=CHROOT_PATHS['chroots'].lstrip('/'),
fail_if_mounted=fail_if_mounted,
)
def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: Optional[str], cross: bool = True) -> str: def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: Optional[str], cross: bool = True) -> str:
""" """
Generate a `makepkg.conf` or `makepkg_cross_$arch.conf` file in /etc. Generate a `makepkg.conf` or `makepkg_cross_$arch.conf` file in /etc.
@ -285,7 +298,7 @@ class Chroot(AbstractChroot):
user = None user = None
group = None group = None
if check_space is None: if check_space is None:
check_space = config.file['pacman']['check_space'] check_space = config.file.pacman.check_space
if not absolute_path: if not absolute_path:
path = self.get_path('/etc') path = self.get_path('/etc')
root_makedir(path) root_makedir(path)
@ -305,26 +318,53 @@ class Chroot(AbstractChroot):
def create_user( def create_user(
self, self,
user='kupfer', user: str = 'kupfer',
password='123456', password: Optional[str] = None,
groups=['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'], groups: list[str] = ['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'],
primary_group: Optional[str] = 'users',
uid: Optional[int] = None,
non_unique: bool = False,
): ):
user = user or 'kupfer' user = user or 'kupfer'
uid_param = f'-u {uid}' if uid is not None else ''
unique_param = '--non-unique' if non_unique else ''
pgroup_param = f'-g {primary_group}' if primary_group else ''
install_script = f''' install_script = f'''
set -e set -e
if ! id -u "{user}" >/dev/null 2>&1; then if ! id -u "{user}" >/dev/null 2>&1; then
useradd -m {user} useradd -m {unique_param} {uid_param} {pgroup_param} {user}
fi fi
usermod -a -G {",".join(groups)} {user} usermod -a -G {",".join(groups)} {unique_param} {uid_param} {pgroup_param} {user}
chown {user}:{user} /home/{user} -R chown {user}:{primary_group if primary_group else user} /home/{user} -R
''' '''
if password: if password:
install_script += f'echo "{user}:{password}" | chpasswd' install_script += f'echo "{user}:{password}" | chpasswd'
else: else:
install_script += f'echo "Set user password:" && passwd {user}' install_script += f'echo "Set user password:" && passwd {user}'
result = self.run_cmd(install_script) result = self.run_cmd(install_script)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
raise Exception('Failed to setup user') raise Exception(f'Failed to setup user {user} in self.name')
def get_uid(self, user: Union[str, int]) -> int:
if isinstance(user, int):
return user
if user == 'root':
return 0
res = self.run_cmd(['id', '-u', user], capture_output=True)
assert isinstance(res, subprocess.CompletedProcess)
if res.returncode or not res.stdout:
raise Exception(f"chroot {self.name}: Couldnt detect uid for user {user}: {repr(res.stdout)}")
uid = res.stdout.decode()
return int(uid)
def add_sudo_config(self, config_name: str = 'wheel', privilegee: str = '%wheel', password_required: bool = True):
if '.' in config_name:
raise Exception(f"won't create sudoers.d file {config_name} since it will be ignored by sudo because it contains a dot!")
comment = ('# allow ' + (f'members of group {privilegee.strip("%")}' if privilegee.startswith('%') else f'user {privilegee}') +
'to run any program as root' + ('' if password_required else ' without a password'))
line = privilegee + (' ALL=(ALL:ALL) ALL' if password_required else ' ALL=(ALL) NOPASSWD: ALL')
root_write_file(self.get_path(f'/etc/sudoers.d/{config_name}'), f'{comment}\n{line}')
def try_install_packages( def try_install_packages(
self, self,

View file

@ -69,7 +69,8 @@ class BuildChroot(Chroot):
""" """
target_arch = self.arch target_arch = self.arch
if not native_chroot: if not native_chroot:
native_chroot = get_build_chroot(config.runtime['arch']) assert config.runtime.arch
native_chroot = get_build_chroot(config.runtime.arch)
host_arch = native_chroot.arch host_arch = native_chroot.arch
hostspec = GCC_HOSTSPECS[host_arch][target_arch] hostspec = GCC_HOSTSPECS[host_arch][target_arch]
cc = f'{hostspec}-cc' cc = f'{hostspec}-cc'
@ -131,6 +132,32 @@ class BuildChroot(Chroot):
fail_if_mounted=fail_if_mounted, fail_if_mounted=fail_if_mounted,
) )
def mount_ccache(self, user: str = 'kupfer', fail_if_mounted: bool = False):
mount_source = os.path.join(config.file.paths.ccache, self.arch)
mount_dest = os.path.join(f'/home/{user}' if user != 'root' else '/root', '.ccache')
uid = self.get_uid(user)
makedir(mount_source, user=uid)
return self.mount(
absolute_source=mount_source,
relative_destination=mount_dest,
fail_if_mounted=fail_if_mounted,
)
def mount_rust(self, user: str = 'kupfer', fail_if_mounted: bool = False) -> list[str]:
results = []
uid = self.get_uid(user)
mount_source_base = config.file.paths.rust # apparently arch-agnostic
for rust_dir in ['cargo', 'rustup']:
mount_source = os.path.join(mount_source_base, rust_dir)
mount_dest = os.path.join(f'/home/{user}' if user != 'root' else '/root', f'.{rust_dir}')
makedir(mount_source, user=uid)
results.append(self.mount(
absolute_source=mount_source,
relative_destination=mount_dest,
fail_if_mounted=fail_if_mounted,
))
return results
def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> BuildChroot: def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> BuildChroot:
name = build_chroot_name(arch) name = build_chroot_name(arch)

View file

@ -66,8 +66,8 @@ def prompt_profile(name: str, create: bool = True, defaults: Union[Profile, dict
profile: Any = PROFILE_EMPTY | defaults profile: Any = PROFILE_EMPTY | defaults
# don't use get_profile() here because we need the sparse profile # don't use get_profile() here because we need the sparse profile
if name in config.file['profiles']: if name in config.file.profiles:
profile |= config.file['profiles'][name] profile |= config.file.profiles[name]
elif create: elif create:
logging.info(f"Profile {name} doesn't exist yet, creating new profile.") logging.info(f"Profile {name} doesn't exist yet, creating new profile.")
else: else:
@ -113,7 +113,7 @@ def prompt_for_save(retry_ctx: Optional[click.Context] = None):
If `retry_ctx` is passed, the context's command will be reexecuted with the same arguments if the user chooses to retry. If `retry_ctx` is passed, the context's command will be reexecuted with the same arguments if the user chooses to retry.
False will still be returned as the retry is expected to either save, perform another retry or arbort. False will still be returned as the retry is expected to either save, perform another retry or arbort.
""" """
if click.confirm(f'Do you want to save your changes to {config.runtime["config_file"]}?', default=True): if click.confirm(f'Do you want to save your changes to {config.runtime.config_file}?', default=True):
return True return True
if retry_ctx: if retry_ctx:
if click.confirm('Retry? ("n" to quit without saving)', default=True): if click.confirm('Retry? ("n" to quit without saving)', default=True):
@ -171,7 +171,7 @@ def cmd_config_init(ctx, sections: list[str] = CONFIG_SECTIONS, non_interactive:
config.update(results) config.update(results)
if 'profiles' in sections: if 'profiles' in sections:
current_profile = 'default' if 'current' not in config.file['profiles'] else config.file['profiles']['current'] current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current
new_current, _ = prompt_config('profile.current', default=current_profile, field_type=str) new_current, _ = prompt_config('profile.current', default=current_profile, field_type=str)
profile, changed = prompt_profile(new_current, create=True) profile, changed = prompt_profile(new_current, create=True)
config.update_profile(new_current, profile) config.update_profile(new_current, profile)
@ -182,7 +182,7 @@ def cmd_config_init(ctx, sections: list[str] = CONFIG_SECTIONS, non_interactive:
if not noop: if not noop:
config.write() config.write()
else: else:
logging.info(f'--noop passed, not writing to {config.runtime["config_file"]}!') logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')
@cmd_config.command(name='set') @cmd_config.command(name='set')
@ -250,8 +250,8 @@ def cmd_profile():
def cmd_profile_init(ctx, name: str, non_interactive: bool = False, noop: bool = False): def cmd_profile_init(ctx, name: str, non_interactive: bool = False, noop: bool = False):
"""Create or edit a profile""" """Create or edit a profile"""
profile = deepcopy(PROFILE_EMPTY) profile = deepcopy(PROFILE_EMPTY)
if name in config.file['profiles']: if name in config.file.profiles:
profile |= config.file['profiles'][name] profile |= config.file.profiles[name]
if not non_interactive: if not non_interactive:
profile, _changed = prompt_profile(name, create=True) profile, _changed = prompt_profile(name, create=True)
@ -262,4 +262,4 @@ def cmd_profile_init(ctx, name: str, non_interactive: bool = False, noop: bool =
return return
config.write() config.write()
else: else:
logging.info(f'--noop passed, not writing to {config.runtime["config_file"]}!') logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')

View file

@ -1,81 +1,12 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
from munch import Munch from munch import Munch
from typing import Any, Optional, Mapping, Union
from dataclass import DataClass, munchclass
from constants import Arch from constants import Arch
def munchclass(*args, init=False, **kwargs):
return dataclass(*args, init=init, slots=True, **kwargs)
def resolve_type_hint(hint: type):
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin is Optional:
args = set(list(args) + [type(None)])
if origin in [Union, Optional]:
results = []
for arg in args:
results += resolve_type_hint(arg)
return results
return [origin or hint]
class DataClass(Munch):
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate)
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True) -> Any:
results = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints
if key in type_hints:
_classes = tuple(resolve_type_hint(type_hints[key]))
if issubclass(_classes[0], dict):
assert isinstance(value, dict)
target_class = _classes[0]
if not issubclass(_classes[0], Munch):
target_class = DataClass
if not isinstance(value, target_class):
value = target_class.fromDict(value, validate=validate)
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type {_classes}: {value}')
elif validate:
raise Exception(f'Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(**cls.transform(values, validate))
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = get_type_hints(cls)
def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}'
@munchclass() @munchclass()
class SparseProfile(DataClass): class SparseProfile(DataClass):
parent: Optional[str] parent: Optional[str]
@ -141,6 +72,8 @@ class PathsSection(DataClass):
pkgbuilds: str pkgbuilds: str
jumpdrive: str jumpdrive: str
images: str images: str
ccache: str
rust: str
class ProfilesSection(DataClass): class ProfilesSection(DataClass):
@ -148,12 +81,14 @@ class ProfilesSection(DataClass):
default: SparseProfile default: SparseProfile
@classmethod @classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True): def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True):
results = {} results = {}
for k, v in values.items(): for k, v in values.items():
if k == 'current': if k == 'current':
results[k] = v results[k] = v
continue continue
if not allow_extra and k != 'default':
raise Exception(f'Unknown key {k} in profiles section (Hint: extra_keys not allowed for some reason)')
if not isinstance(v, dict): if not isinstance(v, dict):
raise Exception(f'profile {v} is not a dict!') raise Exception(f'profile {v} is not a dict!')
results[k] = SparseProfile.fromDict(v, validate=True) results[k] = SparseProfile.fromDict(v, validate=True)
@ -176,7 +111,13 @@ class Config(DataClass):
profiles: ProfilesSection profiles: ProfilesSection
@classmethod @classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True, allow_incomplete: bool = False): def fromDict(
cls,
values: Mapping[str, Any],
validate: bool = True,
allow_extra: bool = False,
allow_incomplete: bool = False,
):
values = dict(values) # copy for later modification values = dict(values) # copy for later modification
_vals = {} _vals = {}
for name, _class in cls._type_hints.items(): for name, _class in cls._type_hints.items():
@ -200,11 +141,12 @@ class Config(DataClass):
@munchclass() @munchclass()
class RuntimeConfiguration(DataClass): class RuntimeConfiguration(DataClass):
verbose: bool verbose: bool
config_file: Optional[str]
arch: Optional[Arch]
no_wrap: bool no_wrap: bool
script_source_dir: str
error_shell: bool error_shell: bool
config_file: Optional[str]
script_source_dir: Optional[str]
arch: Optional[Arch]
uid: Optional[int]
class ConfigLoadState(DataClass): class ConfigLoadState(DataClass):

View file

@ -42,6 +42,8 @@ CONFIG_DEFAULTS_DICT = {
'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'), 'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'),
'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'), 'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'),
'images': os.path.join('%cache_dir%', 'images'), 'images': os.path.join('%cache_dir%', 'images'),
'ccache': os.path.join('%cache_dir%', 'ccache'),
'rust': os.path.join('%cache_dir%', 'rust'),
}, },
'profiles': { 'profiles': {
'current': 'default', 'current': 'default',
@ -53,11 +55,12 @@ CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'verbose': False, 'verbose': False,
'config_file': None,
'arch': None,
'no_wrap': False, 'no_wrap': False,
'script_source_dir': os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'error_shell': False, 'error_shell': False,
'config_file': None,
'script_source_dir': None,
'arch': None,
'uid': None,
}) })
@ -194,14 +197,16 @@ class ConfigStateHolder:
self.file = Config.fromDict(merge_configs(conf_new=file_conf_base, conf_base=CONFIG_DEFAULTS)) self.file = Config.fromDict(merge_configs(conf_new=file_conf_base, conf_base=CONFIG_DEFAULTS))
self.file_state = ConfigLoadState() self.file_state = ConfigLoadState()
self.runtime = RuntimeConfiguration.fromDict(CONFIG_RUNTIME_DEFAULTS | runtime_conf) self.runtime = RuntimeConfiguration.fromDict(CONFIG_RUNTIME_DEFAULTS | runtime_conf)
self.runtime['arch'] = os.uname().machine self.runtime.arch = os.uname().machine
self.runtime.script_source_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
self.runtime.uid = os.getuid()
self._profile_cache = {} self._profile_cache = {}
if file_conf_path: if file_conf_path:
self.try_load_file(file_conf_path) self.try_load_file(file_conf_path)
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS): def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
config_file = config_file or CONFIG_DEFAULT_PATH config_file = config_file or CONFIG_DEFAULT_PATH
self.runtime['config_file'] = config_file self.runtime.config_file = config_file
self._profile_cache = None self._profile_cache = None
try: try:
self.file = parse_file(config_file=config_file, base=base) self.file = parse_file(config_file=config_file, base=base)
@ -224,8 +229,8 @@ class ConfigStateHolder:
raise ex raise ex
def get_profile(self, name: Optional[str] = None) -> Profile: def get_profile(self, name: Optional[str] = None) -> Profile:
name = name or self.file['profiles']['current'] name = name or self.file.profiles.current
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache) self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file.profiles, resolved=self._profile_cache)
return self._profile_cache[name] return self._profile_cache[name]
def enforce_profile_device_set(self, profile_name: Optional[str] = None, hint_or_set_arch: bool = False) -> Profile: def enforce_profile_device_set(self, profile_name: Optional[str] = None, hint_or_set_arch: bool = False) -> Profile:
@ -252,7 +257,7 @@ class ConfigStateHolder:
return profile return profile
def get_path(self, path_name: str) -> str: def get_path(self, path_name: str) -> str:
paths = self.file['paths'] paths = self.file.paths
return resolve_path_template(paths[path_name], paths) return resolve_path_template(paths[path_name], paths)
def get_package_dir(self, arch: str): def get_package_dir(self, arch: str):
@ -265,7 +270,8 @@ class ConfigStateHolder:
def write(self, path=None): def write(self, path=None):
"""write toml representation of `self.file` to `path`""" """write toml representation of `self.file` to `path`"""
if path is None: if path is None:
path = self.runtime['config_file'] path = self.runtime.config_file
assert path
os.makedirs(os.path.dirname(path), exist_ok=True) os.makedirs(os.path.dirname(path), exist_ok=True)
dump_file(path, self.file) dump_file(path, self.file)
logging.info(f'Created config file at {path}') logging.info(f'Created config file at {path}')
@ -279,18 +285,18 @@ class ConfigStateHolder:
merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile) merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile)
changed = self.file != merged changed = self.file != merged
self.file.update(merged) self.file.update(merged)
if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']: if changed and 'profiles' in config_fragment and self.file.profiles != config_fragment['profiles']:
self.invalidate_profile_cache() self.invalidate_profile_cache()
return changed return changed
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True): def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
new = {} new = {}
if name not in self.file['profiles']: if name not in self.file.profiles:
if not create: if not create:
raise Exception(f'Unknown profile: {name}') raise Exception(f'Unknown profile: {name}')
else: else:
if merge: if merge:
new = deepcopy(self.file['profiles'][name]) new = deepcopy(self.file.profiles[name])
logging.debug(f'new: {new}') logging.debug(f'new: {new}')
logging.debug(f'profile: {profile}') logging.debug(f'profile: {profile}')
@ -298,5 +304,5 @@ class ConfigStateHolder:
if prune: if prune:
new = {key: val for key, val in new.items() if val is not None} new = {key: val for key, val in new.items() if val is not None}
self.file['profiles'][name] = new self.file.profiles[name] = new
self.invalidate_profile_cache() self.invalidate_profile_cache()

View file

@ -53,7 +53,7 @@ def validate_ConfigStateHolder(c: ConfigStateHolder, should_load: Optional[bool]
def test_fixture_configstate(conf_fixture: str, exists: bool, request): def test_fixture_configstate(conf_fixture: str, exists: bool, request):
configstate = request.getfixturevalue(conf_fixture) configstate = request.getfixturevalue(conf_fixture)
assert 'config_file' in configstate.runtime assert 'config_file' in configstate.runtime
confpath = configstate.runtime['config_file'] confpath = configstate.runtime.config_file
assert isinstance(confpath, str) assert isinstance(confpath, str)
assert confpath assert confpath
assert exists == os.path.exists(confpath) assert exists == os.path.exists(confpath)
@ -124,12 +124,13 @@ def load_toml_file(path) -> dict:
def get_path_from_stateholder(c: ConfigStateHolder): def get_path_from_stateholder(c: ConfigStateHolder):
return c.runtime['config_file'] return c.runtime.config_file
def test_config_save_nonexistant(configstate_nonexistant: ConfigStateHolder): def test_config_save_nonexistant(configstate_nonexistant: ConfigStateHolder):
c = configstate_nonexistant c = configstate_nonexistant
confpath = c.runtime['config_file'] confpath = c.runtime.config_file
assert confpath
assert not os.path.exists(confpath) assert not os.path.exists(confpath)
c.write() c.write()
assert confpath assert confpath

View file

@ -23,14 +23,6 @@ BOOT_STRATEGIES: dict[str, str] = {
'bq-paella': FASTBOOT, 'bq-paella': FASTBOOT,
} }
DEVICES: dict[str, list[str]] = {
'oneplus-enchilada': ['device-sdm845-oneplus-enchilada'],
'oneplus-fajita': ['device-sdm845-oneplus-fajita'],
'xiaomi-beryllium-ebbg': ['device-sdm845-xiaomi-beryllium-ebbg'],
'xiaomi-beryllium-tianma': ['device-sdm845-xiaomi-beryllium-tianma'],
'bq-paella': ['device-msm8916-bq-paella'],
}
BASE_PACKAGES: list[str] = [ BASE_PACKAGES: list[str] = [
'base', 'base',
'base-kupfer', 'base-kupfer',
@ -91,11 +83,20 @@ Arch: TypeAlias = str
ARCHES = [ ARCHES = [
'x86_64', 'x86_64',
'aarch64', 'aarch64',
'armv7h',
] ]
DistroArch: TypeAlias = Arch DistroArch: TypeAlias = Arch
TargetArch: TypeAlias = Arch TargetArch: TypeAlias = Arch
ALARM_REPOS = {
'core': 'http://mirror.archlinuxarm.org/$arch/$repo',
'extra': 'http://mirror.archlinuxarm.org/$arch/$repo',
'community': 'http://mirror.archlinuxarm.org/$arch/$repo',
'alarm': 'http://mirror.archlinuxarm.org/$arch/$repo',
'aur': 'http://mirror.archlinuxarm.org/$arch/$repo',
}
BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = { BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = {
'x86_64': { 'x86_64': {
'repos': { 'repos': {
@ -105,42 +106,58 @@ BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = {
}, },
}, },
'aarch64': { 'aarch64': {
'repos': { 'repos': ALARM_REPOS,
'core': 'http://mirror.archlinuxarm.org/$arch/$repo',
'extra': 'http://mirror.archlinuxarm.org/$arch/$repo',
'community': 'http://mirror.archlinuxarm.org/$arch/$repo',
'alarm': 'http://mirror.archlinuxarm.org/$arch/$repo',
'aur': 'http://mirror.archlinuxarm.org/$arch/$repo',
}, },
'armv7h': {
'repos': ALARM_REPOS,
}, },
} }
COMPILE_ARCHES: dict[Arch, str] = { COMPILE_ARCHES: dict[Arch, str] = {
'x86_64': 'amd64', 'x86_64': 'amd64',
'aarch64': 'arm64', 'aarch64': 'arm64',
'armv7h': 'arm',
} }
GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = { GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = {
'x86_64': { 'x86_64': {
'x86_64': 'x86_64-pc-linux-gnu', 'x86_64': 'x86_64-pc-linux-gnu',
'aarch64': 'aarch64-linux-gnu', 'aarch64': 'aarch64-linux-gnu',
'armv7h': 'arm-unknown-linux-gnueabihf'
}, },
'aarch64': { 'aarch64': {
'aarch64': 'aarch64-unknown-linux-gnu', 'aarch64': 'aarch64-unknown-linux-gnu',
} },
'armv7h': {
'armv7h': 'armv7l-unknown-linux-gnueabihf'
},
} }
CFLAGS_GENERAL = ['-O2', '-pipe', '-fstack-protector-strong'] CFLAGS_GENERAL = ['-O2', '-pipe', '-fstack-protector-strong']
CFLAGS_ARCHES: dict[Arch, list[str]] = { CFLAGS_ALARM = [
'x86_64': ['-march=x86-64', '-mtune=generic'], ' -fno-plt',
'aarch64': [
'-march=armv8-a',
'-fexceptions', '-fexceptions',
'-Wp,-D_FORTIFY_SOURCE=2', '-Wp,-D_FORTIFY_SOURCE=2',
'-Wformat', '-Wformat',
'-Werror=format-security', '-Werror=format-security',
'-fstack-clash-protection', '-fstack-clash-protection',
] ]
CFLAGS_ARCHES: dict[Arch, list[str]] = {
'x86_64': ['-march=x86-64', '-mtune=generic'],
'aarch64': [
'-march=armv8-a',
] + CFLAGS_ALARM,
'armv7h': [
'-march=armv7-a',
'-mfloat-abi=hard',
'-mfpu=neon',
] + CFLAGS_ALARM,
}
QEMU_ARCHES: dict[Arch, str] = {
'x86_64': 'x86_64',
'aarch64': 'aarch64',
'armv7h': 'arm',
} }
QEMU_BINFMT_PKGS = ['qemu-user-static-bin', 'binfmt-qemu-static'] QEMU_BINFMT_PKGS = ['qemu-user-static-bin', 'binfmt-qemu-static']

92
dataclass.py Normal file
View file

@ -0,0 +1,92 @@
from dataclasses import dataclass
from munch import Munch
from typing import Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
def munchclass(*args, init=False, **kwargs):
return dataclass(*args, init=init, slots=True, **kwargs)
def resolve_type_hint(hint: type) -> Iterable[type]:
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin is Optional:
args = set(list(args) + [type(None)])
if origin in [Union, Optional]:
results: list[type] = []
for arg in args:
results += resolve_type_hint(arg)
return results
return [origin or hint]
class DataClass(Munch):
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate)
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False) -> Any:
results = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints
if key in type_hints:
_classes = tuple[type](resolve_type_hint(type_hints[key]))
if issubclass(_classes[0], dict):
assert isinstance(value, dict)
target_class = _classes[0]
if target_class is dict:
target_class = Munch
if not isinstance(value, target_class):
assert issubclass(target_class, Munch)
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
value = target_class.fromDict(value, validate=validate) # type:ignore[attr-defined]
# handle numerics
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
parsed_number = None
parsers: list[tuple[type, list]] = [(int, [10]), (int, [0]), (float, [])]
for _cls, args in parsers:
if _cls not in _classes:
continue
try:
parsed_number = _cls(value, *args)
break
except ValueError:
continue
if parsed_number is None:
if validate:
raise Exception(f"Couldn't parse string value {repr(value)} for key '{key}' into number formats: " +
(', '.join(list(c.__name__ for c in _classes))))
else:
value = parsed_number
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type {_classes}: {value}')
elif validate and not allow_extra:
raise Exception(f'Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(**cls.transform(values, validate))
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = get_type_hints(cls)
def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}'

View file

@ -75,7 +75,7 @@ _kupfer_local_chroots = dict[Arch, Distro]()
def get_kupfer_https(arch: Arch, scan: bool = False) -> Distro: def get_kupfer_https(arch: Arch, scan: bool = False) -> Distro:
global _kupfer_https global _kupfer_https
if arch not in _kupfer_https or not _kupfer_https[arch]: if arch not in _kupfer_https or not _kupfer_https[arch]:
_kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS.replace('%branch%', config.file['pacman']['repo_branch']), scan) _kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS.replace('%branch%', config.file.pacman.repo_branch), scan)
item = _kupfer_https[arch] item = _kupfer_https[arch]
if scan and not item.is_scanned(): if scan and not item.is_scanned():
item.scan() item.scan()
@ -85,7 +85,8 @@ def get_kupfer_https(arch: Arch, scan: bool = False) -> Distro:
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> Distro: def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> Distro:
global _kupfer_local, _kupfer_local_chroots global _kupfer_local, _kupfer_local_chroots
cache = _kupfer_local_chroots if in_chroot else _kupfer_local cache = _kupfer_local_chroots if in_chroot else _kupfer_local
arch = arch or config.runtime['arch'] arch = arch or config.runtime.arch
assert arch
if arch not in cache or not cache[arch]: if arch not in cache or not cache[arch]:
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages') dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
cache[arch] = get_kupfer(arch, f"file://{dir}/$arch/$repo") cache[arch] = get_kupfer(arch, f"file://{dir}/$arch/$repo")

View file

@ -14,6 +14,7 @@ ElevationMethod: TypeAlias = str
ELEVATION_METHOD_DEFAULT = "sudo" ELEVATION_METHOD_DEFAULT = "sudo"
ELEVATION_METHODS: dict[ElevationMethod, list[str]] = { ELEVATION_METHODS: dict[ElevationMethod, list[str]] = {
"none": [],
"sudo": ['sudo', '--'], "sudo": ['sudo', '--'],
} }
@ -47,15 +48,18 @@ def wrap_in_bash(cmd: Union[list[str], str], flatten_result=True) -> Union[str,
return res return res
def generate_cmd_elevated(cmd: list[str], elevation_method: ElevationMethod): def generate_cmd_elevated(cmd: Union[list[str], str], elevation_method: ElevationMethod):
"wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`." "wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`."
if isinstance(cmd, str):
cmd = wrap_in_bash(cmd, flatten_result=False)
assert not isinstance(cmd, str) # typhints cmd as list[str]
if elevation_method not in ELEVATION_METHODS: if elevation_method not in ELEVATION_METHODS:
raise Exception(f"Unknown elevation method {elevation_method}") raise Exception(f"Unknown elevation method {elevation_method}")
return ELEVATION_METHODS[elevation_method] + cmd return ELEVATION_METHODS[elevation_method] + cmd
def generate_cmd_su( def generate_cmd_su(
cmd: list[str], cmd: Union[list[str], str],
switch_user: str, switch_user: str,
elevation_method: Optional[ElevationMethod] = None, elevation_method: Optional[ElevationMethod] = None,
force_su: bool = False, force_su: bool = False,

View file

@ -82,8 +82,12 @@ def write_file(
fstat: os.stat_result fstat: os.stat_result
exists = root_check_exists(path) exists = root_check_exists(path)
dirname = os.path.dirname(path) dirname = os.path.dirname(path)
failed = False
if exists: if exists:
try:
fstat = os.stat(path) fstat = os.stat(path)
except PermissionError:
failed = True
else: else:
chown_user = chown_user or get_user_name(os.getuid()) chown_user = chown_user or get_user_name(os.getuid())
chown_group = chown_group or get_group_name(os.getgid()) chown_group = chown_group or get_group_name(os.getgid())
@ -94,9 +98,10 @@ def write_file(
if mode: if mode:
if not mode.isnumeric(): if not mode.isnumeric():
raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}") raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}")
if not exists or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode): if not exists or failed or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode):
chmod_mode = mode chmod_mode = mode
failed = try_native_filewrite(path, content, chmod_mode) if not failed:
failed = try_native_filewrite(path, content, chmod_mode) is not None
if exists or failed: if exists or failed:
if failed: if failed:
try: try:
@ -139,7 +144,7 @@ def remove_file(path: str, recursive=False):
raise Exception(f"Unable to remove {path}: cmd returned {rc}") raise Exception(f"Unable to remove {path}: cmd returned {rc}")
def makedir(path, user: Optional[str] = None, group: Optional[str] = None, parents: bool = True): def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None, parents: bool = True):
if not root_check_exists(path): if not root_check_exists(path):
try: try:
if parents: if parents:

View file

@ -6,7 +6,8 @@ from constants import FLASH_PARTS, LOCATIONS
from exec.cmd import run_root_cmd from exec.cmd import run_root_cmd
from exec.file import get_temp_dir from exec.file import get_temp_dir
from fastboot import fastboot_flash from fastboot import fastboot_flash
from image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_device_and_flavour, get_image_name, get_image_path from image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_flavour, get_image_name, get_image_path
from packages.device import get_profile_device
from wrapper import enforce_wrap from wrapper import enforce_wrap
ABOOT = FLASH_PARTS['ABOOT'] ABOOT = FLASH_PARTS['ABOOT']
@ -21,7 +22,8 @@ ROOTFS = FLASH_PARTS['ROOTFS']
def cmd_flash(what: str, location: str): def cmd_flash(what: str, location: str):
"""Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard""" """Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard"""
enforce_wrap() enforce_wrap()
device, flavour = get_device_and_flavour() device = get_profile_device()
flavour = get_flavour()
device_image_name = get_image_name(device, flavour) device_image_name = get_image_name(device, flavour)
device_image_path = get_image_path(device, flavour) device_image_path = get_image_path(device, flavour)

View file

@ -7,7 +7,8 @@ def generate_makepkg_conf(arch: Arch, cross: bool = False, chroot: str = None) -
Generate a makepkg.conf. For use with crosscompiling, specify `cross=True` and pass as `chroot` Generate a makepkg.conf. For use with crosscompiling, specify `cross=True` and pass as `chroot`
the relative path inside the native chroot where the foreign chroot will be mounted. the relative path inside the native chroot where the foreign chroot will be mounted.
""" """
hostspec = GCC_HOSTSPECS[config.runtime['arch'] if cross else arch][arch] assert config.runtime.arch
hostspec = GCC_HOSTSPECS[config.runtime.arch if cross else arch][arch]
cflags = CFLAGS_ARCHES[arch] + CFLAGS_GENERAL cflags = CFLAGS_ARCHES[arch] + CFLAGS_GENERAL
if cross and not chroot: if cross and not chroot:
raise Exception('Cross-compile makepkg conf requested but no chroot path given: "{chroot}"') raise Exception('Cross-compile makepkg conf requested but no chroot path given: "{chroot}"')
@ -233,7 +234,7 @@ Color
#NoProgressBar #NoProgressBar
{'' if check_space else '#'}CheckSpace {'' if check_space else '#'}CheckSpace
VerbosePkgLists VerbosePkgLists
ParallelDownloads = {config.file['pacman']['parallel_downloads']} ParallelDownloads = {config.file.pacman.parallel_downloads}
# By default, pacman accepts packages signed by keys that its local keyring # By default, pacman accepts packages signed by keys that its local keyring
# trusts (see pacman-key and its man page), as well as unsigned packages. # trusts (see pacman-key and its man page), as well as unsigned packages.

View file

@ -7,18 +7,18 @@ import click
import logging import logging
from signal import pause from signal import pause
from subprocess import CompletedProcess from subprocess import CompletedProcess
from typing import Optional from typing import Optional, Union
from chroot.device import DeviceChroot, get_device_chroot from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_PACKAGES, DEVICES, FLAVOURS from constants import Arch, BASE_PACKAGES, FLAVOURS
from config import config, Profile from config import config, Profile
from distro.distro import get_base_distro, get_kupfer_https from distro.distro import get_base_distro, get_kupfer_https
from exec.cmd import run_root_cmd, generate_cmd_su from exec.cmd import run_root_cmd, generate_cmd_su
from exec.file import root_write_file, root_makedir, makedir from exec.file import root_write_file, root_makedir, makedir
from packages import build_enable_qemu_binfmt, build_packages_by_paths from packages import build_enable_qemu_binfmt, build_packages_by_paths
from packages.device import get_profile_device from packages.device import Device, get_profile_device
from ssh import copy_ssh_keys from ssh import copy_ssh_keys
from wrapper import wrap_if_foreign_arch from wrapper import check_programs_wrap, wrap_if_foreign_arch
# image files need to be slightly smaller than partitions to fit # image files need to be slightly smaller than partitions to fit
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M" IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
@ -131,23 +131,25 @@ def losetup_destroy(loop_device):
) )
def get_device_and_flavour(profile_name: Optional[str] = None) -> tuple[str, str]: def get_flavour(profile_name: Optional[str] = None) -> str:
config.enforce_config_loaded() config.enforce_config_loaded()
profile = config.get_profile(profile_name) profile = config.get_profile(profile_name)
if not profile['device']:
raise Exception("Please set the device using 'kupferbootstrap config init ...'")
if not profile['flavour']: if not profile['flavour']:
raise Exception("Please set the flavour using 'kupferbootstrap config init ...'") raise Exception("Please set the flavour using 'kupferbootstrap config init ...'")
return (profile['device'], profile['flavour']) return profile['flavour']
def get_image_name(device, flavour, img_type='full') -> str: def get_device_name(device: Union[str, Device]) -> str:
return f'{device}-{flavour}-{img_type}.img' return device.name if isinstance(device, Device) else device
def get_image_path(device, flavour, img_type='full') -> str: def get_image_name(device: Union[str, Device], flavour, img_type='full') -> str:
return f'{get_device_name(device)}-{flavour}-{img_type}.img'
def get_image_path(device: Union[str, Device], flavour, img_type='full') -> str:
return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type)) return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type))
@ -299,7 +301,7 @@ def create_boot_fs(device: str, blocksize: int):
def install_rootfs( def install_rootfs(
rootfs_device: str, rootfs_device: str,
bootfs_device: str, bootfs_device: str,
device: str, device: Union[str, Device],
flavour: str, flavour: str,
arch: Arch, arch: Arch,
packages: list[str], packages: list[str],
@ -308,7 +310,7 @@ def install_rootfs(
): ):
user = profile['username'] or 'kupfer' user = profile['username'] or 'kupfer'
post_cmds = FLAVOURS[flavour].get('post_cmds', []) post_cmds = FLAVOURS[flavour].get('post_cmds', [])
chroot = get_device_chroot(device=device, flavour=flavour, arch=arch, packages=packages, use_local_repos=use_local_repos) chroot = get_device_chroot(device=get_device_name(device), flavour=flavour, arch=arch, packages=packages, use_local_repos=use_local_repos)
mount_chroot(rootfs_device, bootfs_device, chroot) mount_chroot(rootfs_device, bootfs_device, chroot)
@ -319,6 +321,7 @@ def install_rootfs(
user=user, user=user,
password=profile['password'], password=profile['password'],
) )
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
copy_ssh_keys( copy_ssh_keys(
chroot.path, chroot.path,
user=user, user=user,
@ -329,7 +332,6 @@ def install_rootfs(
extra_repos=get_kupfer_https(arch).repos, extra_repos=get_kupfer_https(arch).repos,
in_chroot=True, in_chroot=True,
), ),
'etc/sudoers.d/wheel': "# allow members of group wheel to execute any command\n%wheel ALL=(ALL:ALL) ALL\n",
'etc/hostname': profile['hostname'], 'etc/hostname': profile['hostname'],
} }
for target, content in files.items(): for target, content in files.items():
@ -388,18 +390,19 @@ def cmd_build(profile_name: str = None,
Unless overriden, required packages will be built or preferably downloaded from HTTPS repos. Unless overriden, required packages will be built or preferably downloaded from HTTPS repos.
""" """
arch = get_profile_device(profile_name).arch device = get_profile_device(profile_name)
wrap_if_foreign_arch(arch) arch = device.arch
check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
profile: Profile = config.get_profile(profile_name) profile: Profile = config.get_profile(profile_name)
device, flavour = get_device_and_flavour(profile_name) flavour = get_flavour(profile_name)
size_extra_mb: int = int(profile["size_extra_mb"]) size_extra_mb: int = int(profile["size_extra_mb"])
sector_size = 4096 sector_size = 4096
rootfs_size_mb = FLAVOURS[flavour].get('size', 2) * 1000 rootfs_size_mb = FLAVOURS[flavour].get('size', 2) * 1000
packages = BASE_PACKAGES + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include'] packages = BASE_PACKAGES + [device.package.name] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
if arch != config.runtime['arch']: if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch) build_enable_qemu_binfmt(arch)
if local_repos and build_pkgs: if local_repos and build_pkgs:
@ -459,9 +462,10 @@ def cmd_build(profile_name: str = None,
@click.argument('profile', required=False) @click.argument('profile', required=False)
def cmd_inspect(profile: str = None, shell: bool = False): def cmd_inspect(profile: str = None, shell: bool = False):
"""Open a shell in a device image""" """Open a shell in a device image"""
arch = get_profile_device(profile).arch device = get_profile_device(profile)
arch = device.arch
wrap_if_foreign_arch(arch) wrap_if_foreign_arch(arch)
device, flavour = get_device_and_flavour(profile) flavour = get_flavour(profile)
# TODO: PARSE DEVICE SECTOR SIZE # TODO: PARSE DEVICE SECTOR SIZE
sector_size = 4096 sector_size = 4096
chroot = get_device_chroot(device, flavour, arch) chroot = get_device_chroot(device, flavour, arch)
@ -475,7 +479,7 @@ def cmd_inspect(profile: str = None, shell: bool = False):
if shell: if shell:
chroot.initialized = True chroot.initialized = True
chroot.activate() chroot.activate()
if arch != config.runtime['arch']: if arch != config.runtime.arch:
logging.info('Installing requisites for foreign-arch shell') logging.info('Installing requisites for foreign-arch shell')
build_enable_qemu_binfmt(arch) build_enable_qemu_binfmt(arch)
logging.info('Starting inspection shell') logging.info('Starting inspection shell')

1
local/bin/wrapper_su_helper Symbolic link
View file

@ -0,0 +1 @@
../../wrapper_su_helper.py

View file

@ -1,32 +0,0 @@
#!/bin/sh
set -e
wget https://raw.githubusercontent.com/archlinuxarm/PKGBUILDs/master/core/pacman/makepkg.conf -O etc/makepkg.conf
sed -i "s/@CARCH@/aarch64/g" etc/makepkg.conf
sed -i "s/@CHOST@/aarch64-unknown-linux-gnu/g" etc/makepkg.conf
sed -i "s/@CARCHFLAGS@/-march=armv8-a /g" etc/makepkg.conf
sed -i "s/xz /xz -T0 /g" etc/makepkg.conf
sed -i "s/ check / !check /g" etc/makepkg.conf
chroot="/chroot/base_aarch64"
include="-I\${CROOT}/usr/include -I$chroot/usr/include"
lib_croot="\${CROOT}/lib"
lib_chroot="$chroot/usr/lib"
cat >>etc/makepkg.conf <<EOF
export CROOT="/usr/aarch64-linux-gnu"
export ARCH="arm64"
export CROSS_COMPILE="aarch64-linux-gnu-"
export CC="aarch64-linux-gnu-gcc $include -L$lib_croot -L$lib_chroot"
export CXX="aarch64-linux-gnu-g++ $include -L$lib_croot -L$lib_chroot"
export CFLAGS="\$CFLAGS $include"
export CXXFLAGS="\$CXXFLAGS $include"
export LDFLAGS="\$LDFLAGS,-L$lib_croot,-L$lib_chroot,-rpath-link,$lib_croot,-rpath-link,$lib_chroot"
export PACMAN_CHROOT="$chroot"
EOF
# TODO: Set PACKAGER
wget https://raw.githubusercontent.com/archlinuxarm/PKGBUILDs/master/core/pacman/pacman.conf -O etc/pacman.conf
sed -i "s/@CARCH@/aarch64/g" etc/pacman.conf
sed -i "s/#ParallelDownloads.*/ParallelDownloads = 8/g" etc/pacman.conf
sed -i "s/SigLevel.*/SigLevel = Never/g" etc/pacman.conf
sed -i "s/^CheckSpace/#CheckSpace/g" etc/pacman.conf
sed -i "s|Include = /etc/pacman.d/mirrorlist|Server = http://mirror.archlinuxarm.org/\$arch/\$repo|g" etc/pacman.conf

20
main.py
View file

@ -1,11 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import click import click
from traceback import format_exc as get_trace
import subprocess import subprocess
from traceback import format_exc as get_trace
from typing import Optional
from logger import logging, setup_logging, verbose_option from logger import logging, setup_logging, verbose_option
from wrapper import nowrapper_option from wrapper import nowrapper_option, enforce_wrap
from config import config, config_option, cmd_config from config import config, config_option, cmd_config
from forwarding import cmd_forwarding from forwarding import cmd_forwarding
from packages import cmd_packages from packages import cmd_packages
@ -23,23 +25,25 @@ from ssh import cmd_ssh
@verbose_option @verbose_option
@config_option @config_option
@nowrapper_option @nowrapper_option
def cli(verbose: bool = False, config_file: str = None, no_wrapper: bool = False, error_shell: bool = False): def cli(verbose: bool = False, config_file: str = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
setup_logging(verbose) setup_logging(verbose)
config.runtime['verbose'] = verbose config.runtime.verbose = verbose
config.runtime['no_wrap'] = no_wrapper config.runtime.no_wrap = wrapper_override is False
config.runtime['error_shell'] = error_shell config.runtime.error_shell = error_shell
config.try_load_file(config_file) config.try_load_file(config_file)
if wrapper_override:
enforce_wrap()
def main(): def main():
try: try:
return cli(prog_name='kupferbootstrap') return cli(prog_name='kupferbootstrap')
except Exception as ex: except Exception as ex:
if config.runtime['verbose']: if config.runtime.verbose:
logging.fatal(get_trace()) logging.fatal(get_trace())
else: else:
logging.fatal(ex) logging.fatal(ex)
if config.runtime['error_shell']: if config.runtime.error_shell:
logging.info('Starting error shell. Type exit to quit.') logging.info('Starting error shell. Type exit to quit.')
subprocess.call('/bin/bash') subprocess.call('/bin/bash')
exit(1) exit(1)

View file

@ -9,9 +9,9 @@ from glob import glob
from urllib.error import HTTPError from urllib.error import HTTPError
from urllib.request import urlopen from urllib.request import urlopen
from shutil import copyfileobj from shutil import copyfileobj
from typing import Iterable, Iterator, Any, Optional from typing import Iterable, Iterator, Optional
from binfmt import register as binfmt_register from binfmt import register as binfmt_register, QEMU_ARCHES
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config import config from config import config
from exec.cmd import run_cmd, run_root_cmd from exec.cmd import run_cmd, run_root_cmd
@ -22,7 +22,7 @@ from ssh import run_ssh_command, scp_put_files
from wrapper import enforce_wrap, check_programs_wrap, wrap_if_foreign_arch from wrapper import enforce_wrap, check_programs_wrap, wrap_if_foreign_arch
from utils import git from utils import git
from .pkgbuild import discover_pkgbuilds, init_pkgbuilds, Pkgbuild from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, init_pkgbuilds, Pkgbuild
from .device import get_profile_device from .device import get_profile_device
pacman_cmd = [ pacman_cmd = [
@ -36,7 +36,7 @@ pacman_cmd = [
def get_makepkg_env(arch: Optional[Arch] = None): def get_makepkg_env(arch: Optional[Arch] = None):
# has to be a function because calls to `config` must be done after config file was read # has to be a function because calls to `config` must be done after config file was read
threads = config.file['build']['threads'] or multiprocessing.cpu_count() threads = config.file.build.threads or multiprocessing.cpu_count()
env = {key: val for key, val in os.environ.items() if not key.split('_', maxsplit=1)[0] in ['CI', 'GITLAB', 'FF']} env = {key: val for key, val in os.environ.items() if not key.split('_', maxsplit=1)[0] in ['CI', 'GITLAB', 'FF']}
env |= { env |= {
'LANG': 'C', 'LANG': 'C',
@ -77,33 +77,6 @@ def init_prebuilts(arch: Arch, dir: str = None):
raise Exception(f'Failed to create local repo {repo}') raise Exception(f'Failed to create local repo {repo}')
def filter_packages(
paths: Iterable[str],
repo: Optional[dict[str, Pkgbuild]] = None,
allow_empty_results=True,
use_paths=True,
use_names=True,
) -> Iterable[Pkgbuild]:
if not allow_empty_results and not paths:
raise Exception("Can't search for packages: no query given")
repo = repo or discover_pkgbuilds()
if 'all' in paths:
return list(repo.values())
result = []
for pkg in repo.values():
comparison = set()
if use_paths:
comparison.add(pkg.path)
if use_names:
comparison.add(pkg.name)
if comparison.intersection(paths):
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result
def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]: def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]:
""" """
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection. This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
@ -262,7 +235,7 @@ def strip_compression_extension(filename: str):
for ext in ['zst', 'xz', 'gz', 'bz2']: for ext in ['zst', 'xz', 'gz', 'bz2']:
if filename.endswith(f'.pkg.tar.{ext}'): if filename.endswith(f'.pkg.tar.{ext}'):
return filename[:-(len(ext) + 1)] return filename[:-(len(ext) + 1)]
logging.warning(f"file {filename} matches no known package extension") logging.debug(f"file {filename} matches no known package extension")
return filename return filename
@ -286,7 +259,9 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
for repo_arch in ARCHES: for repo_arch in ARCHES:
if repo_arch == arch: if repo_arch == arch:
continue continue
copy_target = os.path.join(config.get_package_dir(repo_arch), package.repo, file) repo_dir = os.path.join(config.get_package_dir(repo_arch), package.repo)
makedir(repo_dir)
copy_target = os.path.join(repo_dir, file)
shutil.copy(repo_file, copy_target) shutil.copy(repo_file, copy_target)
add_file_to_repo(copy_target, package.repo, repo_arch) add_file_to_repo(copy_target, package.repo, repo_arch)
@ -331,47 +306,21 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: bool = False) -> bool: def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: bool = False) -> bool:
enforce_wrap()
native_chroot = setup_build_chroot(config.runtime['arch'])
config_path = '/' + native_chroot.write_makepkg_conf(
target_arch=arch,
cross_chroot_relative=os.path.join('chroot', arch),
cross=True,
)
cmd = ['cd', os.path.join(CHROOT_PATHS['pkgbuilds'], package.path), '&&'] + MAKEPKG_CMD + [
'--config',
config_path,
'--nobuild',
'--noprepare',
'--skippgpcheck',
'--packagelist',
]
result: Any = native_chroot.run_cmd(
cmd,
capture_output=True,
)
if result.returncode != 0:
raise Exception(f'Failed to get package list for {package.path}:' + '\n' + result.stdout.decode() + '\n' + result.stderr.decode())
missing = True missing = True
for line in result.stdout.decode('utf-8').split('\n'): filename = package.get_filename(arch)
if not line: filename_stripped = strip_compression_extension(filename)
continue logging.debug(f'Checking if {filename_stripped} is built')
basename = os.path.basename(line) for ext in ['xz', 'zst']:
file = os.path.join(config.get_package_dir(arch), package.repo, basename) file = os.path.join(config.get_package_dir(arch), package.repo, f'{filename_stripped}.{ext}')
filename_stripped = strip_compression_extension(file)
logging.debug(f'Checking if {file} is built')
if not filename_stripped.endswith('.pkg.tar'): if not filename_stripped.endswith('.pkg.tar'):
logging.debug(f'skipping unknown file extension {basename}') raise Exception(f'stripped filename has unknown extension. {filename}')
continue
if os.path.exists(file) or (try_download and try_download_package(file, package, arch)): if os.path.exists(file) or (try_download and try_download_package(file, package, arch)):
missing = False missing = False
add_file_to_repo(file, repo_name=package.repo, arch=arch) add_file_to_repo(file, repo_name=package.repo, arch=arch)
# copy arch=(any) packages to all arches # copy arch=(any) packages to all arches
if filename_stripped.endswith('any.pkg.tar'): if filename_stripped.endswith('any.pkg.tar'):
logging.debug("any-arch pkg detected") logging.debug("any-arch pkg detected")
target_repo_file = os.path.join(config.get_package_dir(arch), package.repo, basename) target_repo_file = os.path.join(config.get_package_dir(arch), package.repo, filename)
if os.path.exists(target_repo_file): if os.path.exists(target_repo_file):
missing = False missing = False
else: else:
@ -379,7 +328,7 @@ def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: boo
for repo_arch in ARCHES: for repo_arch in ARCHES:
if repo_arch == arch: if repo_arch == arch:
continue # we already checked that continue # we already checked that
other_repo_path = os.path.join(config.get_package_dir(repo_arch), package.repo, basename) other_repo_path = os.path.join(config.get_package_dir(repo_arch), package.repo, filename)
if os.path.exists(other_repo_path): if os.path.exists(other_repo_path):
missing = False missing = False
logging.info(f"package {file} found in {repo_arch} repos, copying to {arch}") logging.info(f"package {file} found in {repo_arch} repos, copying to {arch}")
@ -392,12 +341,16 @@ def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: boo
for repo_arch in ARCHES: for repo_arch in ARCHES:
if repo_arch == arch: if repo_arch == arch:
continue # we already have that continue # we already have that
copy_target = os.path.join(config.get_package_dir(repo_arch), package.repo, basename) repo_dir = os.path.join(config.get_package_dir(repo_arch), package.repo)
copy_target = os.path.join(repo_dir, filename)
if not os.path.exists(copy_target): if not os.path.exists(copy_target):
logging.info(f"copying to {copy_target}") logging.info(f"copying to {copy_target}")
makedir(repo_dir)
shutil.copyfile(target_repo_file, copy_target) shutil.copyfile(target_repo_file, copy_target)
add_file_to_repo(copy_target, package.repo, repo_arch) add_file_to_repo(copy_target, package.repo, repo_arch)
return not missing if not missing:
return True
return False
def setup_build_chroot( def setup_build_chroot(
@ -406,7 +359,8 @@ def setup_build_chroot(
add_kupfer_repos: bool = True, add_kupfer_repos: bool = True,
clean_chroot: bool = False, clean_chroot: bool = False,
) -> BuildChroot: ) -> BuildChroot:
if arch != config.runtime['arch']: assert config.runtime.arch
if arch != config.runtime.arch:
wrap_if_foreign_arch(arch) wrap_if_foreign_arch(arch)
build_enable_qemu_binfmt(arch) build_enable_qemu_binfmt(arch)
init_prebuilts(arch) init_prebuilts(arch)
@ -420,10 +374,22 @@ def setup_build_chroot(
chroot.mount_pkgbuilds() chroot.mount_pkgbuilds()
if extra_packages: if extra_packages:
chroot.try_install_packages(extra_packages, allow_fail=False) chroot.try_install_packages(extra_packages, allow_fail=False)
assert config.runtime.uid is not None
chroot.create_user('kupfer', password='12345678', uid=config.runtime.uid, non_unique=True)
if not os.path.exists(chroot.get_path('/etc/sudoers.d/kupfer_nopw')):
chroot.add_sudo_config('kupfer_nopw', 'kupfer', password_required=False)
return chroot return chroot
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf'): def setup_git_insecure_paths(chroot: BuildChroot, username: str = 'kupfer'):
chroot.run_cmd(
["git", "config", "--global", "--add", "safe.directory", "'*'"],
switch_user=username,
).check_returncode() # type: ignore[union-attr]
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf', switch_user: str = 'kupfer'):
makepkg_setup_args = [ makepkg_setup_args = [
'--config', '--config',
makepkg_conf_path, makepkg_conf_path,
@ -434,7 +400,13 @@ def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/et
] ]
logging.info(f'Setting up sources for {package.path} in {chroot.name}') logging.info(f'Setting up sources for {package.path} in {chroot.name}')
result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path)) setup_git_insecure_paths(chroot)
result = chroot.run_cmd(
MAKEPKG_CMD + makepkg_setup_args,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
inner_env=get_makepkg_env(chroot.arch),
switch_user=switch_user,
)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed to check sources for {package.path}') raise Exception(f'Failed to check sources for {package.path}')
@ -448,19 +420,23 @@ def build_package(
enable_crossdirect: bool = True, enable_crossdirect: bool = True,
enable_ccache: bool = True, enable_ccache: bool = True,
clean_chroot: bool = False, clean_chroot: bool = False,
build_user: str = 'kupfer',
): ):
makepkg_compile_opts = ['--holdver'] makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf' makepkg_conf_path = 'etc/makepkg.conf'
repo_dir = repo_dir if repo_dir else config.get_path('pkgbuilds') repo_dir = repo_dir if repo_dir else config.get_path('pkgbuilds')
foreign_arch = config.runtime['arch'] != arch foreign_arch = config.runtime.arch != arch
deps = (list(set(package.depends) - set(package.names()))) deps = (list(set(package.depends) - set(package.names())))
needs_rust = 'rust' in deps
build_root: BuildChroot
target_chroot = setup_build_chroot( target_chroot = setup_build_chroot(
arch=arch, arch=arch,
extra_packages=deps, extra_packages=deps,
clean_chroot=clean_chroot, clean_chroot=clean_chroot,
) )
assert config.runtime.arch
native_chroot = target_chroot if not foreign_arch else setup_build_chroot( native_chroot = target_chroot if not foreign_arch else setup_build_chroot(
arch=config.runtime['arch'], arch=config.runtime.arch,
extra_packages=['base-devel'] + CROSSDIRECT_PKGS, extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot, clean_chroot=clean_chroot,
) )
@ -475,6 +451,7 @@ def build_package(
env = deepcopy(get_makepkg_env(arch)) env = deepcopy(get_makepkg_env(arch))
if enable_ccache: if enable_ccache:
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}" env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
native_chroot.mount_ccache(user=build_user)
logging.info('Setting up dependencies for cross-compilation') logging.info('Setting up dependencies for cross-compilation')
# include crossdirect for ccache symlinks and qemu-user # include crossdirect for ccache symlinks and qemu-user
results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"]) results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"])
@ -506,12 +483,22 @@ def build_package(
if failed_deps: if failed_deps:
raise Exception(f'Dependencies failed to install: {failed_deps}') raise Exception(f'Dependencies failed to install: {failed_deps}')
if enable_ccache:
build_root.mount_ccache(user=build_user)
if needs_rust:
build_root.mount_rust(user=build_user)
setup_git_insecure_paths(build_root)
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path) makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
setup_sources(package, build_root, makepkg_conf_path=makepkg_conf_absolute) setup_sources(package, build_root, makepkg_conf_path=makepkg_conf_absolute)
build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}' build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}'
logging.debug(f'Building: Running {build_cmd}') logging.debug(f'Building: Running {build_cmd}')
result = build_root.run_cmd(build_cmd, inner_env=env, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path)) result = build_root.run_cmd(
build_cmd,
inner_env=env,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
switch_user=build_user,
)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed to compile package {package.path}') raise Exception(f'Failed to compile package {package.path}')
@ -520,15 +507,19 @@ def build_package(
def get_dependants( def get_dependants(
repo: dict[str, Pkgbuild], repo: dict[str, Pkgbuild],
packages: Iterable[Pkgbuild], packages: Iterable[Pkgbuild],
arch: Arch,
recursive: bool = True, recursive: bool = True,
) -> set[Pkgbuild]: ) -> set[Pkgbuild]:
names = set([pkg.name for pkg in packages]) names = set([pkg.name for pkg in packages])
to_add = set[Pkgbuild]() to_add = set[Pkgbuild]()
for pkg in repo.values(): for pkg in repo.values():
if set.intersection(names, set(pkg.depends)): if set.intersection(names, set(pkg.depends)):
if not set([arch, 'any']).intersection(pkg.arches):
logging.warn(f'get_dependants: skipping matched pkg {pkg.name} due to wrong arch: {pkg.arches}')
continue
to_add.add(pkg) to_add.add(pkg)
if recursive and to_add: if recursive and to_add:
to_add.update(get_dependants(repo, to_add)) to_add.update(get_dependants(repo, to_add, arch=arch))
return to_add return to_add
@ -543,7 +534,7 @@ def get_unbuilt_package_levels(
repo = repo or discover_pkgbuilds() repo = repo or discover_pkgbuilds()
dependants = set[Pkgbuild]() dependants = set[Pkgbuild]()
if rebuild_dependants: if rebuild_dependants:
dependants = get_dependants(repo, packages) dependants = get_dependants(repo, packages, arch=arch)
package_levels = generate_dependency_chain(repo, set(packages).union(dependants)) package_levels = generate_dependency_chain(repo, set(packages).union(dependants))
build_names = set[str]() build_names = set[str]()
build_levels = list[set[Pkgbuild]]() build_levels = list[set[Pkgbuild]]()
@ -574,6 +565,7 @@ def build_packages(
enable_ccache: bool = True, enable_ccache: bool = True,
clean_chroot: bool = False, clean_chroot: bool = False,
): ):
check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
init_prebuilts(arch) init_prebuilts(arch)
build_levels = get_unbuilt_package_levels( build_levels = get_unbuilt_package_levels(
packages, packages,
@ -619,9 +611,11 @@ def build_packages_by_paths(
if isinstance(paths, str): if isinstance(paths, str):
paths = [paths] paths = [paths]
for _arch in set([arch, config.runtime['arch']]): check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
assert config.runtime.arch
for _arch in set([arch, config.runtime.arch]):
init_prebuilts(_arch) init_prebuilts(_arch)
packages = filter_packages(paths, repo=repo, allow_empty_results=False) packages = filter_pkgbuilds(paths, arch=arch, repo=repo, allow_empty_results=False)
return build_packages( return build_packages(
packages, packages,
arch, arch,
@ -645,10 +639,11 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
logging.info('Installing qemu-user (building if necessary)') logging.info('Installing qemu-user (building if necessary)')
if lazy and _qemu_enabled[arch]: if lazy and _qemu_enabled[arch]:
return return
native = config.runtime['arch'] native = config.runtime.arch
assert native
if arch == native: if arch == native:
return return
wrap_if_foreign_arch(arch) check_programs_wrap([f'qemu-{QEMU_ARCHES[arch]}-static', 'pacman', 'makepkg'])
# build qemu-user, binfmt, crossdirect # build qemu-user, binfmt, crossdirect
build_packages_by_paths( build_packages_by_paths(
CROSSDIRECT_PKGS, CROSSDIRECT_PKGS,
@ -706,7 +701,6 @@ def build(
rebuild_dependants: bool = False, rebuild_dependants: bool = False,
try_download: bool = False, try_download: bool = False,
): ):
# TODO: arch = config.get_profile()...
arch = arch or get_profile_device(hint_or_set_arch=True).arch arch = arch or get_profile_device(hint_or_set_arch=True).arch
if arch not in ARCHES: if arch not in ARCHES:
@ -719,10 +713,10 @@ def build(
force=force, force=force,
rebuild_dependants=rebuild_dependants, rebuild_dependants=rebuild_dependants,
try_download=try_download, try_download=try_download,
enable_crosscompile=config.file['build']['crosscompile'], enable_crosscompile=config.file.build.crosscompile,
enable_crossdirect=config.file['build']['crossdirect'], enable_crossdirect=config.file.build.crossdirect,
enable_ccache=config.file['build']['ccache'], enable_ccache=config.file.build.ccache,
clean_chroot=config.file['build']['clean_mode'], clean_chroot=config.file.build.clean_mode,
) )
@ -751,7 +745,7 @@ def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bo
'-U', '-U',
] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [ ] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [
'--noconfirm', '--noconfirm',
'--overwrite=\\*', "'--overwrite=\\*'",
], ],
alloc_tty=True).check_returncode() alloc_tty=True).check_returncode()
@ -830,7 +824,7 @@ def cmd_check(paths):
return False return False
paths = list(paths) paths = list(paths)
packages = filter_packages(paths, allow_empty_results=False) packages = filter_pkgbuilds(paths, allow_empty_results=False)
for package in packages: for package in packages:
name = package.name name = package.name

View file

@ -6,7 +6,7 @@ from typing import Optional
from config import config from config import config
from constants import Arch, ARCHES from constants import Arch, ARCHES
from config.scheme import DataClass, munchclass from config.scheme import DataClass, munchclass
from .pkgbuild import discover_pkgbuilds, _pkgbuilds_cache, Pkgbuild, parse_pkgbuild from .pkgbuild import discover_pkgbuilds, get_pkgbuild_by_path, _pkgbuilds_cache, Pkgbuild
DEVICE_DEPRECATIONS = { DEVICE_DEPRECATIONS = {
"oneplus-enchilada": "sdm845-oneplus-enchilada", "oneplus-enchilada": "sdm845-oneplus-enchilada",
@ -105,8 +105,7 @@ def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy:
else: else:
relative_path = os.path.join('device', pkgname) relative_path = os.path.join('device', pkgname)
assert os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path)) assert os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path))
pkgbuild = [p for p in parse_pkgbuild(relative_path, _config=config) if p.name == pkgname][0] pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy, _config=config) if p.name == pkgname][0]
_pkgbuilds_cache[pkgname] = pkgbuild
device = parse_device_pkg(pkgbuild) device = parse_device_pkg(pkgbuild)
if lazy: if lazy:
_device_cache[name] = device _device_cache[name] = device

View file

@ -6,11 +6,11 @@ import multiprocessing
import os import os
import subprocess import subprocess
from constants import REPOSITORIES
from joblib import Parallel, delayed from joblib import Parallel, delayed
from typing import Optional, Sequence from typing import Iterable, Optional
from config import config, ConfigStateHolder from config import config, ConfigStateHolder
from constants import REPOSITORIES
from exec.cmd import run_cmd from exec.cmd import run_cmd
from constants import Arch, MAKEPKG_CMD from constants import Arch, MAKEPKG_CMD
from distro.package import PackageInfo from distro.package import PackageInfo
@ -47,8 +47,8 @@ def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=F
def init_pkgbuilds(interactive=False): def init_pkgbuilds(interactive=False):
pkgbuilds_dir = config.get_path('pkgbuilds') pkgbuilds_dir = config.get_path('pkgbuilds')
repo_url = config.file['pkgbuilds']['git_repo'] repo_url = config.file.pkgbuilds.git_repo
branch = config.file['pkgbuilds']['git_branch'] branch = config.file.pkgbuilds.git_branch
clone_pkbuilds(pkgbuilds_dir, repo_url, branch, interactive=interactive, update=False) clone_pkbuilds(pkgbuilds_dir, repo_url, branch, interactive=interactive, update=False)
@ -65,6 +65,7 @@ class Pkgbuild(PackageInfo):
path: str path: str
pkgver: str pkgver: str
pkgrel: str pkgrel: str
sources_refreshed: bool
def __init__( def __init__(
self, self,
@ -74,6 +75,7 @@ class Pkgbuild(PackageInfo):
provides: list[str] = [], provides: list[str] = [],
replaces: list[str] = [], replaces: list[str] = [],
repo: Optional[str] = None, repo: Optional[str] = None,
sources_refreshed: bool = False,
) -> None: ) -> None:
""" """
Create new Pkgbuild representation for file located at `{relative_path}/PKGBUILD`. Create new Pkgbuild representation for file located at `{relative_path}/PKGBUILD`.
@ -91,9 +93,15 @@ class Pkgbuild(PackageInfo):
self.path = relative_path self.path = relative_path
self.pkgver = '' self.pkgver = ''
self.pkgrel = '' self.pkgrel = ''
self.sources_refreshed = sources_refreshed
def __repr__(self): def __repr__(self):
return f'Pkgbuild({self.name},{repr(self.path)},{self.version},{self.mode})' return ','.join([
'Pkgbuild(' + self.name,
repr(self.path),
self.version + ("🔄" if self.sources_refreshed else ""),
self.mode + ')',
])
def names(self): def names(self):
return list(set([self.name] + self.provides + self.replaces)) return list(set([self.name] + self.provides + self.replaces))
@ -102,14 +110,69 @@ class Pkgbuild(PackageInfo):
"""updates `self.version` from `self.pkgver` and `self.pkgrel`""" """updates `self.version` from `self.pkgver` and `self.pkgrel`"""
self.version = f'{self.pkgver}-{self.pkgrel}' self.version = f'{self.pkgver}-{self.pkgrel}'
def update(self, pkg: Pkgbuild):
self.version = pkg.version
self.arches = list(pkg.arches)
self.depends = list(pkg.depends)
self.provides = list(pkg.provides)
self.replaces = list(pkg.replaces)
self.local_depends = list(pkg.local_depends)
self.repo = pkg.repo
self.mode = pkg.mode
self.path = pkg.path
self.pkgver = pkg.pkgver
self.pkgrel = pkg.pkgrel
self.sources_refreshed = self.sources_refreshed or pkg.sources_refreshed
self.update_version()
def refresh_sources(self):
raise NotImplementedError()
def get_filename(self, arch: Arch):
if not self.version:
self.update_version()
if self.arches[0] == 'any':
arch = 'any'
return f'{self.name}-{self.version}-{arch}.pkg.tar.zst'
class Pkgbase(Pkgbuild): class Pkgbase(Pkgbuild):
subpackages: Sequence[SubPkgbuild] subpackages: list[SubPkgbuild]
def __init__(self, relative_path: str, subpackages: Sequence[SubPkgbuild] = [], **args): def __init__(self, relative_path: str, subpackages: list[SubPkgbuild] = [], **args):
self.subpackages = list(subpackages) self.subpackages = list(subpackages)
super().__init__(relative_path, **args) super().__init__(relative_path, **args)
def update(self, pkg: Pkgbuild):
if not isinstance(pkg, Pkgbase):
raise Exception(f"Tried to update pkgbase {self.name} with non-base pkg {pkg}")
Pkgbuild.update(self, pkg)
sub_dict = {p.name: p for p in self.subpackages}
self.subpackages.clear()
for new_pkg in pkg.subpackages:
name = new_pkg.name
if name not in sub_dict:
sub_dict[name] = new_pkg
else:
sub_dict[name].update(new_pkg)
updated = sub_dict[name]
updated.sources_refreshed = self.sources_refreshed
self.subpackages.append(updated)
def refresh_sources(self, lazy: bool = True):
'''
Reloads the pkgbuild from disk.
Does **NOT** actually perform the makepkg action to refresh the pkgver() first!
'''
if lazy and self.sources_refreshed:
return
parsed = parse_pkgbuild(self.path, sources_refreshed=True)
basepkgs = [p for p in parsed if isinstance(p, Pkgbase)]
if not len(basepkgs) == 1:
raise Exception(f"error refreshing {self.name}: wrong number of base packages found: {basepkgs}")
self.sources_refreshed = True
self.update(basepkgs[0])
class SubPkgbuild(Pkgbuild): class SubPkgbuild(Pkgbuild):
pkgbase: Pkgbase pkgbase: Pkgbase
@ -119,28 +182,25 @@ class SubPkgbuild(Pkgbuild):
self.name = name self.name = name
self.pkgbase = pkgbase self.pkgbase = pkgbase
self.version = pkgbase.version self.sources_refreshed = False
self.arches = pkgbase.arches self.update(pkgbase)
self.depends = list(pkgbase.depends)
self.provides = [] self.provides = []
self.replaces = [] self.replaces = []
self.local_depends = list(pkgbase.local_depends)
self.repo = pkgbase.repo def refresh_sources(self, lazy: bool = True):
self.mode = pkgbase.mode assert self.pkgbase
self.path = pkgbase.path self.pkgbase.refresh_sources(lazy=lazy)
self.pkgver = pkgbase.pkgver
self.pkgrel = pkgbase.pkgrel
self.update_version()
def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None) -> Sequence[Pkgbuild]: def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None, sources_refreshed: bool = False) -> list[Pkgbuild]:
""" """
Since function may run in a different subprocess, we need to be passed the config via parameter Since function may run in a different subprocess, we need to be passed the config via parameter
""" """
global config global config
if _config: if _config:
config = _config config = _config
setup_logging(verbose=config.runtime['verbose'], log_setup=False) # different thread needs log setup. setup_logging(verbose=config.runtime.verbose, log_setup=False) # different thread needs log setup.
logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}") logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}")
pkgbuilds_dir = config.get_path('pkgbuilds') pkgbuilds_dir = config.get_path('pkgbuilds')
pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir) pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir)
@ -156,7 +216,7 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
raise Exception((f'{relative_pkg_dir}/PKGBUILD has {"no" if mode is None else "an invalid"} mode configured') + raise Exception((f'{relative_pkg_dir}/PKGBUILD has {"no" if mode is None else "an invalid"} mode configured') +
(f': "{mode}"' if mode is not None else '')) (f': "{mode}"' if mode is not None else ''))
base_package = Pkgbase(relative_pkg_dir) base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed)
base_package.mode = mode base_package.mode = mode
base_package.repo = relative_pkg_dir.split('/')[0] base_package.repo = relative_pkg_dir.split('/')[0]
srcinfo = run_cmd( srcinfo = run_cmd(
@ -197,7 +257,7 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'): elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'):
current.depends.append(splits[1].split('=')[0].split(': ')[0]) current.depends.append(splits[1].split('=')[0].split(': ')[0])
results: Sequence[Pkgbuild] = list(base_package.subpackages) results: list[Pkgbuild] = list(base_package.subpackages)
if len(results) > 1: if len(results) > 1:
logging.debug(f" Split package detected: {base_package.name}: {results}") logging.debug(f" Split package detected: {base_package.name}: {results}")
base_package.update_version() base_package.update_version()
@ -214,9 +274,21 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
_pkgbuilds_cache = dict[str, Pkgbuild]() _pkgbuilds_cache = dict[str, Pkgbuild]()
_pkgbuilds_paths = dict[str, list[Pkgbuild]]()
_pkgbuilds_scanned: bool = False _pkgbuilds_scanned: bool = False
def get_pkgbuild_by_path(relative_path: str, lazy: bool = True, _config: Optional[ConfigStateHolder] = None) -> list[Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_paths
if lazy and relative_path in _pkgbuilds_paths:
return _pkgbuilds_paths[relative_path]
parsed = parse_pkgbuild(relative_path, _config=_config)
_pkgbuilds_paths[relative_path] = parsed
for pkg in parsed:
_pkgbuilds_cache[pkg.name] = pkg
return parsed
def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pkgbuild]: def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_scanned global _pkgbuilds_cache, _pkgbuilds_scanned
if lazy and _pkgbuilds_scanned: if lazy and _pkgbuilds_scanned:
@ -228,22 +300,38 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
init_pkgbuilds(interactive=False) init_pkgbuilds(interactive=False)
for repo in REPOSITORIES: for repo in REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)): for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
paths.append(os.path.join(repo, dir)) p = os.path.join(repo, dir)
if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')):
results = [] logging.warning(f"{p} doesn't include a PKGBUILD file; skipping")
continue
paths.append(p)
logging.info("Parsing PKGBUILDs") logging.info("Parsing PKGBUILDs")
logging.debug(f"About to parse pkgbuilds. verbosity: {config.runtime['verbose']}") results = []
if parallel: if parallel:
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(delayed(parse_pkgbuild)(path, config) for path in paths)) paths_filtered = paths
if lazy:
# filter out cached packages as the caches don't cross process boundaries
paths_filtered = []
for p in paths:
if p in _pkgbuilds_paths:
# use cache
results += _pkgbuilds_paths[p]
else: else:
chunks = (parse_pkgbuild(path) for path in paths) paths_filtered += [p]
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(
delayed(get_pkgbuild_by_path)(path, lazy=lazy, _config=config) for path in paths_filtered))
else:
chunks = (get_pkgbuild_by_path(path, lazy=lazy) for path in paths)
_pkgbuilds_paths.clear()
# one list of packages per path
for pkglist in chunks: for pkglist in chunks:
_pkgbuilds_paths[pkglist[0].path] = pkglist
results += pkglist results += pkglist
logging.debug('Building package dictionary!') logging.info('Building package dictionary')
for package in results: for package in results:
for name in [package.name] + package.replaces: for name in [package.name] + package.replaces:
if name in packages: if name in packages:
@ -255,11 +343,11 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
package.local_depends = package.depends.copy() package.local_depends = package.depends.copy()
for dep in package.depends.copy(): for dep in package.depends.copy():
found = dep in packages found = dep in packages
for p in packages.values(): for pkg in packages.values():
if found: if found:
break break
if dep in p.names(): if dep in pkg.names():
logging.debug(f'Found {p.name} that provides {dep}') logging.debug(f'Found {pkg.name} that provides {dep}')
found = True found = True
break break
if not found: if not found:
@ -270,3 +358,41 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
_pkgbuilds_cache.update(packages) _pkgbuilds_cache.update(packages)
_pkgbuilds_scanned = True _pkgbuilds_scanned = True
return packages return packages
def filter_pkgbuilds(
paths: Iterable[str],
repo: Optional[dict[str, Pkgbuild]] = None,
arch: Optional[Arch] = None,
allow_empty_results=True,
use_paths=True,
use_names=True,
) -> Iterable[Pkgbuild]:
if not (use_names or use_paths):
raise Exception('Error: filter_packages instructed to match neither by names nor paths; impossible!')
if not allow_empty_results and not paths:
raise Exception("Can't search for packages: no query given")
repo = repo or discover_pkgbuilds()
if 'all' in paths:
all_pkgs = list(repo.values())
if arch:
all_pkgs = [pkg for pkg in all_pkgs if set([arch, 'any']).intersection(pkg.arches)]
return all_pkgs
result = []
for pkg in repo.values():
comparison = set()
if use_paths:
comparison.add(pkg.path)
if use_names:
comparison.add(pkg.name)
matches = list(comparison.intersection(paths))
if matches:
assert pkg.arches
if arch and not set([arch, 'any']).intersection(pkg.arches):
logging.warn(f"Pkg {pkg.name} matches query {matches[0]} but isn't available for architecture {arch}: {pkg.arches}")
continue
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result

View file

@ -42,7 +42,7 @@ ONEPLUS_ENCHILADA_PKG = f'device-{ONEPLUS_ENCHILADA}'
def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder): def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder):
config = initialise_pkgbuilds_dir config = initialise_pkgbuilds_dir
config.try_load_file() config.try_load_file()
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), config)[0] return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), _config=config)[0]
def validate_oneplus_enchilada(d: Device): def validate_oneplus_enchilada(d: Device):

View file

@ -5,3 +5,4 @@ toml
typing_extensions typing_extensions
coloredlogs coloredlogs
munch munch
setuptools # required by munch

2
ssh.py
View file

@ -32,7 +32,7 @@ def run_ssh_command(cmd: list[str] = [],
extra_args = [] extra_args = []
if len(keys) > 0: if len(keys) > 0:
extra_args += ['-i', keys[0]] extra_args += ['-i', keys[0]]
if config.runtime['verbose']: if config.runtime.verbose:
extra_args += ['-v'] extra_args += ['-v']
if alloc_tty: if alloc_tty:
extra_args += ['-t'] extra_args += ['-t']

View file

@ -1,2 +1,2 @@
#!/bin/bash #!/bin/bash
git ls-files \*.py | sort -u | xargs mypy --pretty --install-types --ignore-missing-imports "$@" git ls-files \*.py | sort -u | xargs mypy --pretty --show-error-codes --install-types --ignore-missing-imports "$@"

View file

@ -15,7 +15,7 @@ wrapper_impls: dict[str, Wrapper] = {
def get_wrapper_type(wrapper_type: str = None): def get_wrapper_type(wrapper_type: str = None):
return wrapper_type or config.file['wrapper']['type'] return wrapper_type or config.file.wrapper.type
def get_wrapper_impl(wrapper_type: str = None) -> Wrapper: def get_wrapper_impl(wrapper_type: str = None) -> Wrapper:
@ -34,7 +34,7 @@ def is_wrapped(wrapper_type: str = None):
def enforce_wrap(no_wrapper=False): def enforce_wrap(no_wrapper=False):
wrapper_type = get_wrapper_type() wrapper_type = get_wrapper_type()
if wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime['no_wrap'] and not no_wrapper: if wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime.no_wrap and not no_wrapper:
logging.info(f'Wrapping in {wrapper_type}') logging.info(f'Wrapping in {wrapper_type}')
wrap() wrap()
@ -51,10 +51,10 @@ def wrap_if_foreign_arch(arch: Arch):
nowrapper_option = click.option( nowrapper_option = click.option(
'-W', '-w/-W',
'--no-wrapper', '--force-wrapper/--no-wrapper',
'no_wrapper', 'wrapper_override',
is_flag=True, is_flag=True,
default=False, default=None,
help='Disable the docker wrapper. Defaults to autodetection.', help='Force or disable the docker wrapper. Defaults to autodetection.',
) )

View file

@ -5,10 +5,11 @@ import subprocess
import sys import sys
from config import config from config import config
from constants import CHROOT_PATHS from exec.file import makedir
from .wrapper import BaseWrapper
DOCKER_PATHS = CHROOT_PATHS.copy() from .wrapper import BaseWrapper, WRAPPER_PATHS
DOCKER_PATHS = WRAPPER_PATHS.copy()
def docker_volumes_args(volume_mappings: dict[str, str]) -> list[str]: def docker_volumes_args(volume_mappings: dict[str, str]) -> list[str]:
@ -22,7 +23,7 @@ class DockerWrapper(BaseWrapper):
type: str = 'docker' type: str = 'docker'
def wrap(self): def wrap(self):
script_path = config.runtime['script_source_dir'] script_path = config.runtime.script_source_dir
with open(os.path.join(script_path, 'version.txt')) as version_file: with open(os.path.join(script_path, 'version.txt')) as version_file:
version = version_file.read().replace('\n', '') version = version_file.read().replace('\n', '')
tag = f'registry.gitlab.com/kupfer/kupferbootstrap:{version}' tag = f'registry.gitlab.com/kupfer/kupferbootstrap:{version}'
@ -34,7 +35,7 @@ class DockerWrapper(BaseWrapper):
'.', '.',
'-t', '-t',
tag, tag,
] + (['-q'] if not config.runtime['verbose'] else []) ] + (['-q'] if not config.runtime.verbose else [])
logging.debug('Running docker cmd: ' + ' '.join(cmd)) logging.debug('Running docker cmd: ' + ' '.join(cmd))
result = subprocess.run(cmd, cwd=script_path, capture_output=True) result = subprocess.run(cmd, cwd=script_path, capture_output=True)
if result.returncode != 0: if result.returncode != 0:
@ -62,11 +63,17 @@ class DockerWrapper(BaseWrapper):
wrapped_config = self.generate_wrapper_config() wrapped_config = self.generate_wrapper_config()
target_user = 'root' if config.runtime.uid == 0 else 'kupfer'
target_home = '/root' if target_user == 'root' else f'/home/{target_user}'
ssh_dir = os.path.join(pathlib.Path.home(), '.ssh') ssh_dir = os.path.join(pathlib.Path.home(), '.ssh')
if not os.path.exists(ssh_dir): if not os.path.exists(ssh_dir):
os.makedirs(ssh_dir, mode=0o700) os.makedirs(ssh_dir, mode=0o700)
volumes = self.get_bind_mounts_default(wrapped_config) volumes = self.get_bind_mounts_default(wrapped_config, ssh_dir=ssh_dir, target_home=target_home)
volumes |= dict({config.get_path(vol_name): vol_dest for vol_name, vol_dest in DOCKER_PATHS.items()}) for vol_name, vol_dest in DOCKER_PATHS.items():
vol_src = config.get_path(vol_name)
makedir(vol_src)
volumes[vol_src] = vol_dest
docker_cmd = [ docker_cmd = [
'docker', 'docker',
'run', 'run',
@ -78,7 +85,9 @@ class DockerWrapper(BaseWrapper):
'--privileged', '--privileged',
] + docker_volumes_args(volumes) + [tag] ] + docker_volumes_args(volumes) + [tag]
kupfer_cmd = ['kupferbootstrap', '--config', '/root/.config/kupfer/kupferbootstrap.toml'] + self.filter_args_wrapper(sys.argv[1:]) kupfer_cmd = ['kupferbootstrap', '--config', volumes[wrapped_config]] + self.filter_args_wrapper(sys.argv[1:])
if config.runtime.uid:
kupfer_cmd = ['wrapper_su_helper', '--uid', str(config.runtime.uid), '--username', 'kupfer', '--'] + kupfer_cmd
cmd = docker_cmd + kupfer_cmd cmd = docker_cmd + kupfer_cmd
logging.debug('Wrapping in docker:' + repr(cmd)) logging.debug('Wrapping in docker:' + repr(cmd))

View file

@ -9,6 +9,11 @@ from config import config
from config.state import dump_file as dump_config_file from config.state import dump_file as dump_config_file
from constants import CHROOT_PATHS from constants import CHROOT_PATHS
WRAPPER_PATHS = CHROOT_PATHS | {
'ccache': '/ccache',
'rust': '/rust',
}
class Wrapper(Protocol): class Wrapper(Protocol):
"""Wrappers wrap kupferbootstrap in some form of isolation from the host OS, i.e. docker or chroots""" """Wrappers wrap kupferbootstrap in some form of isolation from the host OS, i.e. docker or chroots"""
@ -27,7 +32,7 @@ class Wrapper(Protocol):
class BaseWrapper(Wrapper): class BaseWrapper(Wrapper):
id: str uuid: str
identifier: str identifier: str
type: str type: str
wrapped_config_path: str wrapped_config_path: str
@ -63,7 +68,7 @@ class BaseWrapper(Wrapper):
def generate_wrapper_config( def generate_wrapper_config(
self, self,
target_path: str = '/tmp/kupferbootstrap', target_path: str = '/tmp/kupferbootstrap',
paths: dict[str, str] = CHROOT_PATHS, paths: dict[str, str] = WRAPPER_PATHS,
config_overrides: dict[str, dict] = {}, config_overrides: dict[str, dict] = {},
) -> str: ) -> str:
wrapped_config = f'{target_path.rstrip("/")}/{self.identifier}_wrapped.toml' wrapped_config = f'{target_path.rstrip("/")}/{self.identifier}_wrapped.toml'

33
wrapper_su_helper.py Executable file
View file

@ -0,0 +1,33 @@
#!/bin/python3
import click
import pwd
from logger import logging, setup_logging
from exec.cmd import run_cmd, flatten_shell_script
from exec.file import chown
@click.command('kupferbootstrap_su')
@click.option('--username', default='kupfer', help="The user's name. If --uid is provided, the user's uid will be changed to this in passwd")
@click.option('--uid', default=1000, type=int, help='uid to change $username to and run as')
@click.argument('cmd', type=str, nargs=-1)
def kupferbootstrap_su(cmd: list[str], uid: int = 1000, username: str = 'kupfer'):
"Changes `username`'s uid to `uid` and executes kupferbootstrap as that user"
cmd = list(cmd)
user = pwd.getpwnam(username)
home = user.pw_dir
if uid != user.pw_uid:
run_cmd(['usermod', '-u', str(uid), username]).check_returncode() # type: ignore[union-attr]
chown(home, username, recursive=False)
logging.debug(f'wrapper_su_helper: running {cmd} as {repr(username)}')
su_cmd = ['sudo', 'su', '-P', username, '-c', flatten_shell_script(cmd, wrap_in_shell_quote=True, shell_quote_items=True)]
result = run_cmd(su_cmd, attach_tty=True)
assert isinstance(result, int)
exit(result)
if __name__ == '__main__':
setup_logging(True)
kupferbootstrap_su(prog_name='kupferbootstrap_su_helper')