Compare commits

...
Sign in to create a new pull request.

44 commits

Author SHA1 Message Date
InsanePrawn
0e103f5a40 add_package_to_repo: create foreign arch repo dir before copying foreign-arch packages 2022-09-16 19:45:25 +02:00
InsanePrawn
f9cf76e937 packages: check_package_built(): makedir() the other arch's repo dir before trying to copy our any-arch package there 2022-09-16 19:45:25 +02:00
InsanePrawn
4c3e264de3 wrapper/docker: create volume dirs ourselfes for better permissions and podman compat 2022-09-16 19:45:25 +02:00
InsanePrawn
ccec875a0c wrapper/docker: fix indentation (only version needs to be pulled from with open():) 2022-09-16 19:45:25 +02:00
InsanePrawn
156612bf73 dataclass.resolve_type_hint(): add conversion from str to [int,float] if str not in types 2022-09-16 19:45:25 +02:00
InsanePrawn
4833753975 config/scheme: move DataClass to dataclass.py 2022-09-16 19:45:25 +02:00
InsanePrawn
b86b7c94f0 config: DataClass.transform(): add allow_extra=False parameter 2022-09-16 19:45:25 +02:00
InsanePrawn
e00160f6df packages: move filter_packages() to pkgbuild, rename to filter_pkgbuilds() 2022-09-16 19:45:25 +02:00
InsanePrawn
d89ad54fc5 constants.py: remove DEVICES array, now comes from pkgbuilds.git 2022-09-16 19:45:25 +02:00
InsanePrawn
cc2e24285f image.py: use Device instead of the device name from config 2022-09-16 19:45:25 +02:00
InsanePrawn
86b4b30685 exec: makedir() accept Union[str, int] for user and group 2022-08-29 20:20:16 +02:00
InsanePrawn
6c26260001 chroot: add chroot.get_uid(user: str), use in chroot.mount_{ccache,rust} to apply correct ownership 2022-08-29 20:20:16 +02:00
InsanePrawn
ba58aa1a29 wrapper: add WRAPPER_PATHS to point ccache and rust to predictable locations 2022-08-29 20:20:16 +02:00
InsanePrawn
0378f7fdf6 requirements.txt: add setuptools required by munch 2022-08-29 19:05:02 +02:00
InsanePrawn
58fd212560 Dockerfile: clean up pkgconfig-aarch64 leftover 2022-08-29 04:54:05 +02:00
InsanePrawn
2ef5f27c6c config: introduce rust cache 2022-08-29 04:44:16 +02:00
InsanePrawn
4285cf734c config: introduce per-arch persisted ccache dir 2022-08-29 04:44:16 +02:00
InsanePrawn
fd1f759429 chroot: add chroot.mount_chroots() to mount /chroot and use in cmd_chroot() 2022-08-29 04:44:16 +02:00
InsanePrawn
8274a31068 pkgbuild.discover_pkgbuilds(): warn and skip directories that don't contain a PKGBUILD 2022-08-29 04:44:16 +02:00
InsanePrawn
3c2e6fe2d0 packages and image: wrap more upfront on missing binaries 2022-08-28 17:21:16 +02:00
InsanePrawn
a76ad5ac4b packages.filter_packages(): only filter by arch if arch is not None 2022-08-28 17:20:35 +02:00
InsanePrawn
9f1281f1cb wrapper_su_helper.py: use su -P to allocate a pseudo-TTY 2022-08-28 07:38:52 +02:00
InsanePrawn
57be536781 packages.cmd_sideload(): fix escape of --overwrite=* 2022-08-28 07:32:01 +02:00
InsanePrawn
b5214d9cd6 packages: respect package arches before and during building 2022-08-28 07:32:01 +02:00
InsanePrawn
39b98d30ae chroot.create_user(): add primary_group parameter 2022-08-28 07:32:01 +02:00
InsanePrawn
7b05fa4fdb packages.check_package_version_built(): use Pkgbuild.get_filename() instead of running makepkg --packagelist 2022-08-28 07:32:01 +02:00
InsanePrawn
2f98ffc79d pkgbuild: add get_filename(arch) 2022-08-28 05:49:21 +02:00
InsanePrawn
ea88397f1f packages.filter_packages(): optionally check package arch 2022-08-28 05:49:18 +02:00
InsanePrawn
d9a88e1474 packages: use user 'kupfer' in chroots for building 2022-08-28 05:49:18 +02:00
InsanePrawn
dcccc9bdc8 chroot: add chroot.add_sudo_config() 2022-08-28 05:49:18 +02:00
InsanePrawn
fc92298100 chroot.create_user(): add optional uid and non_unique parameter 2022-08-28 02:22:54 +02:00
InsanePrawn
20975feec6 chroot.run_cmd(): add switch_user parameter 2022-08-28 02:22:54 +02:00
InsanePrawn
4dc134c8f8 exec/cmd: generate_cmd_{su,elevated}: tolerate flat string as input for cmd instead of list 2022-08-28 02:22:54 +02:00
InsanePrawn
ac7d16e4a7 exec.file.write_file(): fix situation where file exists but stat fails due to permissions 2022-08-28 02:22:54 +02:00
InsanePrawn
bef0efc637 global: refactor to use config.{file,runtime}.$member instead of config.file["$member"] 2022-08-27 17:06:48 +02:00
InsanePrawn
13ad63446e DockerWrapper.wrap(): run as config.runtime.uid instead of root 2022-08-27 06:03:36 +02:00
InsanePrawn
6b64989a3b config: add config.runtime.uid 2022-08-27 05:56:45 +02:00
InsanePrawn
4c77a16bba main: add -w to *enforce* wrapping 2022-08-27 05:56:45 +02:00
InsanePrawn
57d5ed474f typecheck.sh: show error codes 2022-08-27 03:46:07 +02:00
InsanePrawn
114755888e packages: circumvent git dubious ownership errors in pkgbuilds.git due to chrootery 2022-08-27 03:45:19 +02:00
InsanePrawn
b154f835e6 constants: add QEMU_ARCHES 2022-08-26 22:55:08 +02:00
InsanePrawn
bc31f9822a constants.py: add armv7h support 2022-08-26 22:55:08 +02:00
InsanePrawn
08fc10bf11 Pkgbuild: add refresh_sources() 2022-08-26 03:41:59 +02:00
InsanePrawn
6e8fd9f622 packages/pkgbuild: cache parsed pkgbuilds by path, add get_pkgbuild_by_path(), Pkgbuild.update(pkgb) 2022-08-26 03:30:32 +02:00
32 changed files with 710 additions and 419 deletions

View file

@ -2,22 +2,16 @@ FROM archlinux:base-devel
RUN pacman-key --init && \
pacman -Sy --noconfirm archlinux-keyring && \
pacman -Su --noconfirm \
pacman -Su --noconfirm --needed \
python python-pip \
arch-install-scripts rsync \
aarch64-linux-gnu-gcc aarch64-linux-gnu-binutils aarch64-linux-gnu-glibc aarch64-linux-gnu-linux-api-headers \
git \
git sudo \
android-tools openssh inetutils \
parted
RUN sed -i "s/EUID == 0/EUID == -1/g" $(which makepkg)
RUN cd /tmp && \
git clone https://aur.archlinux.org/aarch64-linux-gnu-pkg-config.git && \
cd aarch64-linux-gnu-pkg-config && \
makepkg -s --skippgpcheck && \
pacman -U --noconfirm *.pkg*
RUN yes | pacman -Scc
RUN sed -i "s/SigLevel.*/SigLevel = Never/g" /etc/pacman.conf
@ -32,5 +26,7 @@ RUN pip install -r requirements.txt
COPY . .
RUN python -c "from distro import distro; distro.get_kupfer_local(arch=None,in_chroot=False).repos_config_snippet()" | tee -a /etc/pacman.conf
RUN useradd -m -g users kupfer
RUN echo "kupfer ALL=(ALL) NOPASSWD: ALL" | tee /etc/sudoers.d/kupfer
WORKDIR /

View file

@ -3,6 +3,7 @@
import os
import logging
from constants import Arch, QEMU_ARCHES
from exec.cmd import run_root_cmd
from utils import mount
@ -38,11 +39,15 @@ def binfmt_info():
return full
def is_registered(arch: str) -> bool:
return os.path.exists("/proc/sys/fs/binfmt_misc/qemu-" + arch)
def is_registered(arch: Arch) -> bool:
qemu_arch = QEMU_ARCHES[arch]
return os.path.exists("/proc/sys/fs/binfmt_misc/qemu-" + qemu_arch)
def register(arch):
def register(arch: Arch):
if arch not in QEMU_ARCHES:
raise Exception(f'binfmt.register(): unknown arch {arch} (not in QEMU_ARCHES)')
qemu_arch = QEMU_ARCHES[arch]
if is_registered(arch):
return
@ -51,7 +56,7 @@ def register(arch):
# Build registration string
# https://en.wikipedia.org/wiki/Binfmt_misc
# :name:type:offset:magic:mask:interpreter:flags
info = lines[arch]
info = lines[qemu_arch]
code = info['line']
binfmt = '/proc/sys/fs/binfmt_misc'
register = binfmt + '/register'
@ -70,7 +75,10 @@ def register(arch):
def unregister(arch):
binfmt_file = "/proc/sys/fs/binfmt_misc/qemu-" + arch
if arch not in QEMU_ARCHES:
raise Exception(f'binfmt.unregister(): unknown arch {arch} (not in QEMU_ARCHES)')
qemu_arch = QEMU_ARCHES[arch]
binfmt_file = "/proc/sys/fs/binfmt_misc/qemu-" + qemu_arch
if not os.path.exists(binfmt_file):
return
logging.info(f"Unregistering qemu binfmt ({arch})")

View file

@ -6,7 +6,8 @@ from config import config
from constants import BOOT_STRATEGIES, FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION
from exec.file import makedir
from fastboot import fastboot_boot, fastboot_erase_dtbo
from image import get_device_and_flavour, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd
from image import get_flavour, get_device_name, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd
from packages.device import get_profile_device
from wrapper import enforce_wrap
LK2ND = FLASH_PARTS['LK2ND']
@ -20,7 +21,8 @@ TYPES = [LK2ND, JUMPDRIVE, ABOOT]
def cmd_boot(type):
"""Boot JumpDrive or the Kupfer aboot image. Erases Android DTBO in the process."""
enforce_wrap()
device, flavour = get_device_and_flavour()
device = get_profile_device()
flavour = get_flavour()
# TODO: parse arch and sector size
sector_size = 4096
image_path = get_image_path(device, flavour)
@ -28,7 +30,7 @@ def cmd_boot(type):
if strategy == FASTBOOT:
if type == JUMPDRIVE:
file = f'boot-{device}.img'
file = f'boot-{get_device_name(device)}.img'
path = os.path.join(config.get_path('jumpdrive'), file)
makedir(os.path.dirname(path))
if not os.path.exists(path):

View file

@ -50,11 +50,14 @@ def cmd_chroot(type: str = 'build', arch: str = None, enable_crossdirect=True):
build_chroot.initialize()
build_chroot.initialized = True
build_chroot.mount_pkgbuilds()
if config.file['build']['crossdirect'] and enable_crossdirect:
build_chroot.mount_chroots()
assert arch and config.runtime.arch
if config.file.build.crossdirect and enable_crossdirect and arch != config.runtime.arch:
build_chroot.mount_crossdirect()
else:
raise Exception('Really weird bug')
chroot.mount_packages()
chroot.activate()
logging.debug(f'Starting shell in {chroot.name}:')
chroot.run_cmd('bash', attach_tty=True)

View file

@ -10,7 +10,7 @@ from uuid import uuid4
from config import config
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.file import makedir, root_makedir, root_write_file, write_file
from generator import generate_makepkg_conf
from utils import mount, umount, check_findmnt, log_or_exception
@ -223,12 +223,14 @@ class Chroot(AbstractChroot):
cwd: Optional[str] = None,
fail_inactive: bool = True,
stdout: Optional[int] = None,
switch_user: Optional[str] = None,
) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive:
raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`')
if outer_env is None:
outer_env = {}
native = config.runtime['arch']
native = config.runtime.arch
assert native
if self.arch != native and 'QEMU_LD_PREFIX' not in outer_env:
outer_env = dict(outer_env) # copy dict for modification
outer_env |= {'QEMU_LD_PREFIX': f'/usr/{GCC_HOSTSPECS[native][self.arch]}'}
@ -238,7 +240,11 @@ class Chroot(AbstractChroot):
script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False)
if cwd:
script = f"cd {shell_quote(cwd)} && ( {script} )"
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + wrap_in_bash(script, flatten_result=False), shell_quote_items=True)
if switch_user:
inner_cmd = generate_cmd_su(script, switch_user=switch_user, elevation_method='none', force_su=True)
else:
inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
@ -267,6 +273,13 @@ class Chroot(AbstractChroot):
fail_if_mounted=fail_if_mounted,
)
def mount_chroots(self, fail_if_mounted: bool = False) -> str:
return self.mount(
absolute_source=config.get_path('chroots'),
relative_destination=CHROOT_PATHS['chroots'].lstrip('/'),
fail_if_mounted=fail_if_mounted,
)
def write_makepkg_conf(self, target_arch: Arch, cross_chroot_relative: Optional[str], cross: bool = True) -> str:
"""
Generate a `makepkg.conf` or `makepkg_cross_$arch.conf` file in /etc.
@ -285,7 +298,7 @@ class Chroot(AbstractChroot):
user = None
group = None
if check_space is None:
check_space = config.file['pacman']['check_space']
check_space = config.file.pacman.check_space
if not absolute_path:
path = self.get_path('/etc')
root_makedir(path)
@ -305,26 +318,53 @@ class Chroot(AbstractChroot):
def create_user(
self,
user='kupfer',
password='123456',
groups=['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'],
user: str = 'kupfer',
password: Optional[str] = None,
groups: list[str] = ['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'],
primary_group: Optional[str] = 'users',
uid: Optional[int] = None,
non_unique: bool = False,
):
user = user or 'kupfer'
uid_param = f'-u {uid}' if uid is not None else ''
unique_param = '--non-unique' if non_unique else ''
pgroup_param = f'-g {primary_group}' if primary_group else ''
install_script = f'''
set -e
if ! id -u "{user}" >/dev/null 2>&1; then
useradd -m {user}
useradd -m {unique_param} {uid_param} {pgroup_param} {user}
fi
usermod -a -G {",".join(groups)} {user}
chown {user}:{user} /home/{user} -R
usermod -a -G {",".join(groups)} {unique_param} {uid_param} {pgroup_param} {user}
chown {user}:{primary_group if primary_group else user} /home/{user} -R
'''
if password:
install_script += f'echo "{user}:{password}" | chpasswd'
else:
install_script += f'echo "Set user password:" && passwd {user}'
result = self.run_cmd(install_script)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception('Failed to setup user')
raise Exception(f'Failed to setup user {user} in self.name')
def get_uid(self, user: Union[str, int]) -> int:
if isinstance(user, int):
return user
if user == 'root':
return 0
res = self.run_cmd(['id', '-u', user], capture_output=True)
assert isinstance(res, subprocess.CompletedProcess)
if res.returncode or not res.stdout:
raise Exception(f"chroot {self.name}: Couldnt detect uid for user {user}: {repr(res.stdout)}")
uid = res.stdout.decode()
return int(uid)
def add_sudo_config(self, config_name: str = 'wheel', privilegee: str = '%wheel', password_required: bool = True):
if '.' in config_name:
raise Exception(f"won't create sudoers.d file {config_name} since it will be ignored by sudo because it contains a dot!")
comment = ('# allow ' + (f'members of group {privilegee.strip("%")}' if privilegee.startswith('%') else f'user {privilegee}') +
'to run any program as root' + ('' if password_required else ' without a password'))
line = privilegee + (' ALL=(ALL:ALL) ALL' if password_required else ' ALL=(ALL) NOPASSWD: ALL')
root_write_file(self.get_path(f'/etc/sudoers.d/{config_name}'), f'{comment}\n{line}')
def try_install_packages(
self,

View file

@ -69,7 +69,8 @@ class BuildChroot(Chroot):
"""
target_arch = self.arch
if not native_chroot:
native_chroot = get_build_chroot(config.runtime['arch'])
assert config.runtime.arch
native_chroot = get_build_chroot(config.runtime.arch)
host_arch = native_chroot.arch
hostspec = GCC_HOSTSPECS[host_arch][target_arch]
cc = f'{hostspec}-cc'
@ -131,6 +132,32 @@ class BuildChroot(Chroot):
fail_if_mounted=fail_if_mounted,
)
def mount_ccache(self, user: str = 'kupfer', fail_if_mounted: bool = False):
mount_source = os.path.join(config.file.paths.ccache, self.arch)
mount_dest = os.path.join(f'/home/{user}' if user != 'root' else '/root', '.ccache')
uid = self.get_uid(user)
makedir(mount_source, user=uid)
return self.mount(
absolute_source=mount_source,
relative_destination=mount_dest,
fail_if_mounted=fail_if_mounted,
)
def mount_rust(self, user: str = 'kupfer', fail_if_mounted: bool = False) -> list[str]:
results = []
uid = self.get_uid(user)
mount_source_base = config.file.paths.rust # apparently arch-agnostic
for rust_dir in ['cargo', 'rustup']:
mount_source = os.path.join(mount_source_base, rust_dir)
mount_dest = os.path.join(f'/home/{user}' if user != 'root' else '/root', f'.{rust_dir}')
makedir(mount_source, user=uid)
results.append(self.mount(
absolute_source=mount_source,
relative_destination=mount_dest,
fail_if_mounted=fail_if_mounted,
))
return results
def get_build_chroot(arch: Arch, add_kupfer_repos: bool = True, **kwargs) -> BuildChroot:
name = build_chroot_name(arch)

View file

@ -66,8 +66,8 @@ def prompt_profile(name: str, create: bool = True, defaults: Union[Profile, dict
profile: Any = PROFILE_EMPTY | defaults
# don't use get_profile() here because we need the sparse profile
if name in config.file['profiles']:
profile |= config.file['profiles'][name]
if name in config.file.profiles:
profile |= config.file.profiles[name]
elif create:
logging.info(f"Profile {name} doesn't exist yet, creating new profile.")
else:
@ -113,7 +113,7 @@ def prompt_for_save(retry_ctx: Optional[click.Context] = None):
If `retry_ctx` is passed, the context's command will be reexecuted with the same arguments if the user chooses to retry.
False will still be returned as the retry is expected to either save, perform another retry or arbort.
"""
if click.confirm(f'Do you want to save your changes to {config.runtime["config_file"]}?', default=True):
if click.confirm(f'Do you want to save your changes to {config.runtime.config_file}?', default=True):
return True
if retry_ctx:
if click.confirm('Retry? ("n" to quit without saving)', default=True):
@ -171,7 +171,7 @@ def cmd_config_init(ctx, sections: list[str] = CONFIG_SECTIONS, non_interactive:
config.update(results)
if 'profiles' in sections:
current_profile = 'default' if 'current' not in config.file['profiles'] else config.file['profiles']['current']
current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current
new_current, _ = prompt_config('profile.current', default=current_profile, field_type=str)
profile, changed = prompt_profile(new_current, create=True)
config.update_profile(new_current, profile)
@ -182,7 +182,7 @@ def cmd_config_init(ctx, sections: list[str] = CONFIG_SECTIONS, non_interactive:
if not noop:
config.write()
else:
logging.info(f'--noop passed, not writing to {config.runtime["config_file"]}!')
logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')
@cmd_config.command(name='set')
@ -250,8 +250,8 @@ def cmd_profile():
def cmd_profile_init(ctx, name: str, non_interactive: bool = False, noop: bool = False):
"""Create or edit a profile"""
profile = deepcopy(PROFILE_EMPTY)
if name in config.file['profiles']:
profile |= config.file['profiles'][name]
if name in config.file.profiles:
profile |= config.file.profiles[name]
if not non_interactive:
profile, _changed = prompt_profile(name, create=True)
@ -262,4 +262,4 @@ def cmd_profile_init(ctx, name: str, non_interactive: bool = False, noop: bool =
return
config.write()
else:
logging.info(f'--noop passed, not writing to {config.runtime["config_file"]}!')
logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')

View file

@ -1,81 +1,12 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
from munch import Munch
from typing import Any, Optional, Mapping, Union
from dataclass import DataClass, munchclass
from constants import Arch
def munchclass(*args, init=False, **kwargs):
return dataclass(*args, init=init, slots=True, **kwargs)
def resolve_type_hint(hint: type):
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin is Optional:
args = set(list(args) + [type(None)])
if origin in [Union, Optional]:
results = []
for arg in args:
results += resolve_type_hint(arg)
return results
return [origin or hint]
class DataClass(Munch):
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate)
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True) -> Any:
results = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints
if key in type_hints:
_classes = tuple(resolve_type_hint(type_hints[key]))
if issubclass(_classes[0], dict):
assert isinstance(value, dict)
target_class = _classes[0]
if not issubclass(_classes[0], Munch):
target_class = DataClass
if not isinstance(value, target_class):
value = target_class.fromDict(value, validate=validate)
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type {_classes}: {value}')
elif validate:
raise Exception(f'Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(**cls.transform(values, validate))
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = get_type_hints(cls)
def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}'
@munchclass()
class SparseProfile(DataClass):
parent: Optional[str]
@ -141,6 +72,8 @@ class PathsSection(DataClass):
pkgbuilds: str
jumpdrive: str
images: str
ccache: str
rust: str
class ProfilesSection(DataClass):
@ -148,12 +81,14 @@ class ProfilesSection(DataClass):
default: SparseProfile
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True):
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True):
results = {}
for k, v in values.items():
if k == 'current':
results[k] = v
continue
if not allow_extra and k != 'default':
raise Exception(f'Unknown key {k} in profiles section (Hint: extra_keys not allowed for some reason)')
if not isinstance(v, dict):
raise Exception(f'profile {v} is not a dict!')
results[k] = SparseProfile.fromDict(v, validate=True)
@ -176,7 +111,13 @@ class Config(DataClass):
profiles: ProfilesSection
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True, allow_incomplete: bool = False):
def fromDict(
cls,
values: Mapping[str, Any],
validate: bool = True,
allow_extra: bool = False,
allow_incomplete: bool = False,
):
values = dict(values) # copy for later modification
_vals = {}
for name, _class in cls._type_hints.items():
@ -200,11 +141,12 @@ class Config(DataClass):
@munchclass()
class RuntimeConfiguration(DataClass):
verbose: bool
config_file: Optional[str]
arch: Optional[Arch]
no_wrap: bool
script_source_dir: str
error_shell: bool
config_file: Optional[str]
script_source_dir: Optional[str]
arch: Optional[Arch]
uid: Optional[int]
class ConfigLoadState(DataClass):

View file

@ -42,6 +42,8 @@ CONFIG_DEFAULTS_DICT = {
'pkgbuilds': os.path.join('%cache_dir%', 'pkgbuilds'),
'jumpdrive': os.path.join('%cache_dir%', 'jumpdrive'),
'images': os.path.join('%cache_dir%', 'images'),
'ccache': os.path.join('%cache_dir%', 'ccache'),
'rust': os.path.join('%cache_dir%', 'rust'),
},
'profiles': {
'current': 'default',
@ -53,11 +55,12 @@ CONFIG_SECTIONS = list(CONFIG_DEFAULTS.keys())
CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'verbose': False,
'config_file': None,
'arch': None,
'no_wrap': False,
'script_source_dir': os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'error_shell': False,
'config_file': None,
'script_source_dir': None,
'arch': None,
'uid': None,
})
@ -194,14 +197,16 @@ class ConfigStateHolder:
self.file = Config.fromDict(merge_configs(conf_new=file_conf_base, conf_base=CONFIG_DEFAULTS))
self.file_state = ConfigLoadState()
self.runtime = RuntimeConfiguration.fromDict(CONFIG_RUNTIME_DEFAULTS | runtime_conf)
self.runtime['arch'] = os.uname().machine
self.runtime.arch = os.uname().machine
self.runtime.script_source_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
self.runtime.uid = os.getuid()
self._profile_cache = {}
if file_conf_path:
self.try_load_file(file_conf_path)
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
config_file = config_file or CONFIG_DEFAULT_PATH
self.runtime['config_file'] = config_file
self.runtime.config_file = config_file
self._profile_cache = None
try:
self.file = parse_file(config_file=config_file, base=base)
@ -224,8 +229,8 @@ class ConfigStateHolder:
raise ex
def get_profile(self, name: Optional[str] = None) -> Profile:
name = name or self.file['profiles']['current']
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file['profiles'], resolved=self._profile_cache)
name = name or self.file.profiles.current
self._profile_cache = resolve_profile(name=name, sparse_profiles=self.file.profiles, resolved=self._profile_cache)
return self._profile_cache[name]
def enforce_profile_device_set(self, profile_name: Optional[str] = None, hint_or_set_arch: bool = False) -> Profile:
@ -252,7 +257,7 @@ class ConfigStateHolder:
return profile
def get_path(self, path_name: str) -> str:
paths = self.file['paths']
paths = self.file.paths
return resolve_path_template(paths[path_name], paths)
def get_package_dir(self, arch: str):
@ -265,7 +270,8 @@ class ConfigStateHolder:
def write(self, path=None):
"""write toml representation of `self.file` to `path`"""
if path is None:
path = self.runtime['config_file']
path = self.runtime.config_file
assert path
os.makedirs(os.path.dirname(path), exist_ok=True)
dump_file(path, self.file)
logging.info(f'Created config file at {path}')
@ -279,18 +285,18 @@ class ConfigStateHolder:
merged = merge_configs(config_fragment, conf_base=self.file, warn_missing_defaultprofile=warn_missing_defaultprofile)
changed = self.file != merged
self.file.update(merged)
if changed and 'profiles' in config_fragment and self.file['profiles'] != config_fragment['profiles']:
if changed and 'profiles' in config_fragment and self.file.profiles != config_fragment['profiles']:
self.invalidate_profile_cache()
return changed
def update_profile(self, name: str, profile: Profile, merge: bool = False, create: bool = True, prune: bool = True):
new = {}
if name not in self.file['profiles']:
if name not in self.file.profiles:
if not create:
raise Exception(f'Unknown profile: {name}')
else:
if merge:
new = deepcopy(self.file['profiles'][name])
new = deepcopy(self.file.profiles[name])
logging.debug(f'new: {new}')
logging.debug(f'profile: {profile}')
@ -298,5 +304,5 @@ class ConfigStateHolder:
if prune:
new = {key: val for key, val in new.items() if val is not None}
self.file['profiles'][name] = new
self.file.profiles[name] = new
self.invalidate_profile_cache()

View file

@ -53,7 +53,7 @@ def validate_ConfigStateHolder(c: ConfigStateHolder, should_load: Optional[bool]
def test_fixture_configstate(conf_fixture: str, exists: bool, request):
configstate = request.getfixturevalue(conf_fixture)
assert 'config_file' in configstate.runtime
confpath = configstate.runtime['config_file']
confpath = configstate.runtime.config_file
assert isinstance(confpath, str)
assert confpath
assert exists == os.path.exists(confpath)
@ -124,12 +124,13 @@ def load_toml_file(path) -> dict:
def get_path_from_stateholder(c: ConfigStateHolder):
return c.runtime['config_file']
return c.runtime.config_file
def test_config_save_nonexistant(configstate_nonexistant: ConfigStateHolder):
c = configstate_nonexistant
confpath = c.runtime['config_file']
confpath = c.runtime.config_file
assert confpath
assert not os.path.exists(confpath)
c.write()
assert confpath

View file

@ -23,14 +23,6 @@ BOOT_STRATEGIES: dict[str, str] = {
'bq-paella': FASTBOOT,
}
DEVICES: dict[str, list[str]] = {
'oneplus-enchilada': ['device-sdm845-oneplus-enchilada'],
'oneplus-fajita': ['device-sdm845-oneplus-fajita'],
'xiaomi-beryllium-ebbg': ['device-sdm845-xiaomi-beryllium-ebbg'],
'xiaomi-beryllium-tianma': ['device-sdm845-xiaomi-beryllium-tianma'],
'bq-paella': ['device-msm8916-bq-paella'],
}
BASE_PACKAGES: list[str] = [
'base',
'base-kupfer',
@ -91,11 +83,20 @@ Arch: TypeAlias = str
ARCHES = [
'x86_64',
'aarch64',
'armv7h',
]
DistroArch: TypeAlias = Arch
TargetArch: TypeAlias = Arch
ALARM_REPOS = {
'core': 'http://mirror.archlinuxarm.org/$arch/$repo',
'extra': 'http://mirror.archlinuxarm.org/$arch/$repo',
'community': 'http://mirror.archlinuxarm.org/$arch/$repo',
'alarm': 'http://mirror.archlinuxarm.org/$arch/$repo',
'aur': 'http://mirror.archlinuxarm.org/$arch/$repo',
}
BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = {
'x86_64': {
'repos': {
@ -105,42 +106,58 @@ BASE_DISTROS: dict[DistroArch, dict[str, dict[str, str]]] = {
},
},
'aarch64': {
'repos': {
'core': 'http://mirror.archlinuxarm.org/$arch/$repo',
'extra': 'http://mirror.archlinuxarm.org/$arch/$repo',
'community': 'http://mirror.archlinuxarm.org/$arch/$repo',
'alarm': 'http://mirror.archlinuxarm.org/$arch/$repo',
'aur': 'http://mirror.archlinuxarm.org/$arch/$repo',
},
'repos': ALARM_REPOS,
},
'armv7h': {
'repos': ALARM_REPOS,
},
}
COMPILE_ARCHES: dict[Arch, str] = {
'x86_64': 'amd64',
'aarch64': 'arm64',
'armv7h': 'arm',
}
GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = {
'x86_64': {
'x86_64': 'x86_64-pc-linux-gnu',
'aarch64': 'aarch64-linux-gnu',
'armv7h': 'arm-unknown-linux-gnueabihf'
},
'aarch64': {
'aarch64': 'aarch64-unknown-linux-gnu',
}
},
'armv7h': {
'armv7h': 'armv7l-unknown-linux-gnueabihf'
},
}
CFLAGS_GENERAL = ['-O2', '-pipe', '-fstack-protector-strong']
CFLAGS_ALARM = [
' -fno-plt',
'-fexceptions',
'-Wp,-D_FORTIFY_SOURCE=2',
'-Wformat',
'-Werror=format-security',
'-fstack-clash-protection',
]
CFLAGS_ARCHES: dict[Arch, list[str]] = {
'x86_64': ['-march=x86-64', '-mtune=generic'],
'aarch64': [
'-march=armv8-a',
'-fexceptions',
'-Wp,-D_FORTIFY_SOURCE=2',
'-Wformat',
'-Werror=format-security',
'-fstack-clash-protection',
]
] + CFLAGS_ALARM,
'armv7h': [
'-march=armv7-a',
'-mfloat-abi=hard',
'-mfpu=neon',
] + CFLAGS_ALARM,
}
QEMU_ARCHES: dict[Arch, str] = {
'x86_64': 'x86_64',
'aarch64': 'aarch64',
'armv7h': 'arm',
}
QEMU_BINFMT_PKGS = ['qemu-user-static-bin', 'binfmt-qemu-static']

92
dataclass.py Normal file
View file

@ -0,0 +1,92 @@
from dataclasses import dataclass
from munch import Munch
from typing import Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
def munchclass(*args, init=False, **kwargs):
return dataclass(*args, init=init, slots=True, **kwargs)
def resolve_type_hint(hint: type) -> Iterable[type]:
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin is Optional:
args = set(list(args) + [type(None)])
if origin in [Union, Optional]:
results: list[type] = []
for arg in args:
results += resolve_type_hint(arg)
return results
return [origin or hint]
class DataClass(Munch):
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate)
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False) -> Any:
results = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints
if key in type_hints:
_classes = tuple[type](resolve_type_hint(type_hints[key]))
if issubclass(_classes[0], dict):
assert isinstance(value, dict)
target_class = _classes[0]
if target_class is dict:
target_class = Munch
if not isinstance(value, target_class):
assert issubclass(target_class, Munch)
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
value = target_class.fromDict(value, validate=validate) # type:ignore[attr-defined]
# handle numerics
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
parsed_number = None
parsers: list[tuple[type, list]] = [(int, [10]), (int, [0]), (float, [])]
for _cls, args in parsers:
if _cls not in _classes:
continue
try:
parsed_number = _cls(value, *args)
break
except ValueError:
continue
if parsed_number is None:
if validate:
raise Exception(f"Couldn't parse string value {repr(value)} for key '{key}' into number formats: " +
(', '.join(list(c.__name__ for c in _classes))))
else:
value = parsed_number
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type {_classes}: {value}')
elif validate and not allow_extra:
raise Exception(f'Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(**cls.transform(values, validate))
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = get_type_hints(cls)
def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}'

View file

@ -75,7 +75,7 @@ _kupfer_local_chroots = dict[Arch, Distro]()
def get_kupfer_https(arch: Arch, scan: bool = False) -> Distro:
global _kupfer_https
if arch not in _kupfer_https or not _kupfer_https[arch]:
_kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS.replace('%branch%', config.file['pacman']['repo_branch']), scan)
_kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS.replace('%branch%', config.file.pacman.repo_branch), scan)
item = _kupfer_https[arch]
if scan and not item.is_scanned():
item.scan()
@ -85,7 +85,8 @@ def get_kupfer_https(arch: Arch, scan: bool = False) -> Distro:
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> Distro:
global _kupfer_local, _kupfer_local_chroots
cache = _kupfer_local_chroots if in_chroot else _kupfer_local
arch = arch or config.runtime['arch']
arch = arch or config.runtime.arch
assert arch
if arch not in cache or not cache[arch]:
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
cache[arch] = get_kupfer(arch, f"file://{dir}/$arch/$repo")

View file

@ -14,6 +14,7 @@ ElevationMethod: TypeAlias = str
ELEVATION_METHOD_DEFAULT = "sudo"
ELEVATION_METHODS: dict[ElevationMethod, list[str]] = {
"none": [],
"sudo": ['sudo', '--'],
}
@ -47,15 +48,18 @@ def wrap_in_bash(cmd: Union[list[str], str], flatten_result=True) -> Union[str,
return res
def generate_cmd_elevated(cmd: list[str], elevation_method: ElevationMethod):
def generate_cmd_elevated(cmd: Union[list[str], str], elevation_method: ElevationMethod):
"wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`."
if isinstance(cmd, str):
cmd = wrap_in_bash(cmd, flatten_result=False)
assert not isinstance(cmd, str) # typhints cmd as list[str]
if elevation_method not in ELEVATION_METHODS:
raise Exception(f"Unknown elevation method {elevation_method}")
return ELEVATION_METHODS[elevation_method] + cmd
def generate_cmd_su(
cmd: list[str],
cmd: Union[list[str], str],
switch_user: str,
elevation_method: Optional[ElevationMethod] = None,
force_su: bool = False,

View file

@ -82,8 +82,12 @@ def write_file(
fstat: os.stat_result
exists = root_check_exists(path)
dirname = os.path.dirname(path)
failed = False
if exists:
fstat = os.stat(path)
try:
fstat = os.stat(path)
except PermissionError:
failed = True
else:
chown_user = chown_user or get_user_name(os.getuid())
chown_group = chown_group or get_group_name(os.getgid())
@ -94,9 +98,10 @@ def write_file(
if mode:
if not mode.isnumeric():
raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}")
if not exists or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode):
if not exists or failed or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode):
chmod_mode = mode
failed = try_native_filewrite(path, content, chmod_mode)
if not failed:
failed = try_native_filewrite(path, content, chmod_mode) is not None
if exists or failed:
if failed:
try:
@ -139,7 +144,7 @@ def remove_file(path: str, recursive=False):
raise Exception(f"Unable to remove {path}: cmd returned {rc}")
def makedir(path, user: Optional[str] = None, group: Optional[str] = None, parents: bool = True):
def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None, parents: bool = True):
if not root_check_exists(path):
try:
if parents:

View file

@ -6,7 +6,8 @@ from constants import FLASH_PARTS, LOCATIONS
from exec.cmd import run_root_cmd
from exec.file import get_temp_dir
from fastboot import fastboot_flash
from image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_device_and_flavour, get_image_name, get_image_path
from image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_flavour, get_image_name, get_image_path
from packages.device import get_profile_device
from wrapper import enforce_wrap
ABOOT = FLASH_PARTS['ABOOT']
@ -21,7 +22,8 @@ ROOTFS = FLASH_PARTS['ROOTFS']
def cmd_flash(what: str, location: str):
"""Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard"""
enforce_wrap()
device, flavour = get_device_and_flavour()
device = get_profile_device()
flavour = get_flavour()
device_image_name = get_image_name(device, flavour)
device_image_path = get_image_path(device, flavour)

View file

@ -7,7 +7,8 @@ def generate_makepkg_conf(arch: Arch, cross: bool = False, chroot: str = None) -
Generate a makepkg.conf. For use with crosscompiling, specify `cross=True` and pass as `chroot`
the relative path inside the native chroot where the foreign chroot will be mounted.
"""
hostspec = GCC_HOSTSPECS[config.runtime['arch'] if cross else arch][arch]
assert config.runtime.arch
hostspec = GCC_HOSTSPECS[config.runtime.arch if cross else arch][arch]
cflags = CFLAGS_ARCHES[arch] + CFLAGS_GENERAL
if cross and not chroot:
raise Exception('Cross-compile makepkg conf requested but no chroot path given: "{chroot}"')
@ -233,7 +234,7 @@ Color
#NoProgressBar
{'' if check_space else '#'}CheckSpace
VerbosePkgLists
ParallelDownloads = {config.file['pacman']['parallel_downloads']}
ParallelDownloads = {config.file.pacman.parallel_downloads}
# By default, pacman accepts packages signed by keys that its local keyring
# trusts (see pacman-key and its man page), as well as unsigned packages.

View file

@ -7,18 +7,18 @@ import click
import logging
from signal import pause
from subprocess import CompletedProcess
from typing import Optional
from typing import Optional, Union
from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_PACKAGES, DEVICES, FLAVOURS
from constants import Arch, BASE_PACKAGES, FLAVOURS
from config import config, Profile
from distro.distro import get_base_distro, get_kupfer_https
from exec.cmd import run_root_cmd, generate_cmd_su
from exec.file import root_write_file, root_makedir, makedir
from packages import build_enable_qemu_binfmt, build_packages_by_paths
from packages.device import get_profile_device
from packages.device import Device, get_profile_device
from ssh import copy_ssh_keys
from wrapper import wrap_if_foreign_arch
from wrapper import check_programs_wrap, wrap_if_foreign_arch
# image files need to be slightly smaller than partitions to fit
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
@ -131,23 +131,25 @@ def losetup_destroy(loop_device):
)
def get_device_and_flavour(profile_name: Optional[str] = None) -> tuple[str, str]:
def get_flavour(profile_name: Optional[str] = None) -> str:
config.enforce_config_loaded()
profile = config.get_profile(profile_name)
if not profile['device']:
raise Exception("Please set the device using 'kupferbootstrap config init ...'")
if not profile['flavour']:
raise Exception("Please set the flavour using 'kupferbootstrap config init ...'")
return (profile['device'], profile['flavour'])
return profile['flavour']
def get_image_name(device, flavour, img_type='full') -> str:
return f'{device}-{flavour}-{img_type}.img'
def get_device_name(device: Union[str, Device]) -> str:
return device.name if isinstance(device, Device) else device
def get_image_path(device, flavour, img_type='full') -> str:
def get_image_name(device: Union[str, Device], flavour, img_type='full') -> str:
return f'{get_device_name(device)}-{flavour}-{img_type}.img'
def get_image_path(device: Union[str, Device], flavour, img_type='full') -> str:
return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type))
@ -299,7 +301,7 @@ def create_boot_fs(device: str, blocksize: int):
def install_rootfs(
rootfs_device: str,
bootfs_device: str,
device: str,
device: Union[str, Device],
flavour: str,
arch: Arch,
packages: list[str],
@ -308,7 +310,7 @@ def install_rootfs(
):
user = profile['username'] or 'kupfer'
post_cmds = FLAVOURS[flavour].get('post_cmds', [])
chroot = get_device_chroot(device=device, flavour=flavour, arch=arch, packages=packages, use_local_repos=use_local_repos)
chroot = get_device_chroot(device=get_device_name(device), flavour=flavour, arch=arch, packages=packages, use_local_repos=use_local_repos)
mount_chroot(rootfs_device, bootfs_device, chroot)
@ -319,6 +321,7 @@ def install_rootfs(
user=user,
password=profile['password'],
)
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
copy_ssh_keys(
chroot.path,
user=user,
@ -329,7 +332,6 @@ def install_rootfs(
extra_repos=get_kupfer_https(arch).repos,
in_chroot=True,
),
'etc/sudoers.d/wheel': "# allow members of group wheel to execute any command\n%wheel ALL=(ALL:ALL) ALL\n",
'etc/hostname': profile['hostname'],
}
for target, content in files.items():
@ -388,18 +390,19 @@ def cmd_build(profile_name: str = None,
Unless overriden, required packages will be built or preferably downloaded from HTTPS repos.
"""
arch = get_profile_device(profile_name).arch
wrap_if_foreign_arch(arch)
device = get_profile_device(profile_name)
arch = device.arch
check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
profile: Profile = config.get_profile(profile_name)
device, flavour = get_device_and_flavour(profile_name)
flavour = get_flavour(profile_name)
size_extra_mb: int = int(profile["size_extra_mb"])
sector_size = 4096
rootfs_size_mb = FLAVOURS[flavour].get('size', 2) * 1000
packages = BASE_PACKAGES + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
packages = BASE_PACKAGES + [device.package.name] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
if arch != config.runtime['arch']:
if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch)
if local_repos and build_pkgs:
@ -459,9 +462,10 @@ def cmd_build(profile_name: str = None,
@click.argument('profile', required=False)
def cmd_inspect(profile: str = None, shell: bool = False):
"""Open a shell in a device image"""
arch = get_profile_device(profile).arch
device = get_profile_device(profile)
arch = device.arch
wrap_if_foreign_arch(arch)
device, flavour = get_device_and_flavour(profile)
flavour = get_flavour(profile)
# TODO: PARSE DEVICE SECTOR SIZE
sector_size = 4096
chroot = get_device_chroot(device, flavour, arch)
@ -475,7 +479,7 @@ def cmd_inspect(profile: str = None, shell: bool = False):
if shell:
chroot.initialized = True
chroot.activate()
if arch != config.runtime['arch']:
if arch != config.runtime.arch:
logging.info('Installing requisites for foreign-arch shell')
build_enable_qemu_binfmt(arch)
logging.info('Starting inspection shell')

1
local/bin/wrapper_su_helper Symbolic link
View file

@ -0,0 +1 @@
../../wrapper_su_helper.py

View file

@ -1,32 +0,0 @@
#!/bin/sh
set -e
wget https://raw.githubusercontent.com/archlinuxarm/PKGBUILDs/master/core/pacman/makepkg.conf -O etc/makepkg.conf
sed -i "s/@CARCH@/aarch64/g" etc/makepkg.conf
sed -i "s/@CHOST@/aarch64-unknown-linux-gnu/g" etc/makepkg.conf
sed -i "s/@CARCHFLAGS@/-march=armv8-a /g" etc/makepkg.conf
sed -i "s/xz /xz -T0 /g" etc/makepkg.conf
sed -i "s/ check / !check /g" etc/makepkg.conf
chroot="/chroot/base_aarch64"
include="-I\${CROOT}/usr/include -I$chroot/usr/include"
lib_croot="\${CROOT}/lib"
lib_chroot="$chroot/usr/lib"
cat >>etc/makepkg.conf <<EOF
export CROOT="/usr/aarch64-linux-gnu"
export ARCH="arm64"
export CROSS_COMPILE="aarch64-linux-gnu-"
export CC="aarch64-linux-gnu-gcc $include -L$lib_croot -L$lib_chroot"
export CXX="aarch64-linux-gnu-g++ $include -L$lib_croot -L$lib_chroot"
export CFLAGS="\$CFLAGS $include"
export CXXFLAGS="\$CXXFLAGS $include"
export LDFLAGS="\$LDFLAGS,-L$lib_croot,-L$lib_chroot,-rpath-link,$lib_croot,-rpath-link,$lib_chroot"
export PACMAN_CHROOT="$chroot"
EOF
# TODO: Set PACKAGER
wget https://raw.githubusercontent.com/archlinuxarm/PKGBUILDs/master/core/pacman/pacman.conf -O etc/pacman.conf
sed -i "s/@CARCH@/aarch64/g" etc/pacman.conf
sed -i "s/#ParallelDownloads.*/ParallelDownloads = 8/g" etc/pacman.conf
sed -i "s/SigLevel.*/SigLevel = Never/g" etc/pacman.conf
sed -i "s/^CheckSpace/#CheckSpace/g" etc/pacman.conf
sed -i "s|Include = /etc/pacman.d/mirrorlist|Server = http://mirror.archlinuxarm.org/\$arch/\$repo|g" etc/pacman.conf

20
main.py
View file

@ -1,11 +1,13 @@
#!/usr/bin/env python3
import click
from traceback import format_exc as get_trace
import subprocess
from traceback import format_exc as get_trace
from typing import Optional
from logger import logging, setup_logging, verbose_option
from wrapper import nowrapper_option
from wrapper import nowrapper_option, enforce_wrap
from config import config, config_option, cmd_config
from forwarding import cmd_forwarding
from packages import cmd_packages
@ -23,23 +25,25 @@ from ssh import cmd_ssh
@verbose_option
@config_option
@nowrapper_option
def cli(verbose: bool = False, config_file: str = None, no_wrapper: bool = False, error_shell: bool = False):
def cli(verbose: bool = False, config_file: str = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
setup_logging(verbose)
config.runtime['verbose'] = verbose
config.runtime['no_wrap'] = no_wrapper
config.runtime['error_shell'] = error_shell
config.runtime.verbose = verbose
config.runtime.no_wrap = wrapper_override is False
config.runtime.error_shell = error_shell
config.try_load_file(config_file)
if wrapper_override:
enforce_wrap()
def main():
try:
return cli(prog_name='kupferbootstrap')
except Exception as ex:
if config.runtime['verbose']:
if config.runtime.verbose:
logging.fatal(get_trace())
else:
logging.fatal(ex)
if config.runtime['error_shell']:
if config.runtime.error_shell:
logging.info('Starting error shell. Type exit to quit.')
subprocess.call('/bin/bash')
exit(1)

View file

@ -9,9 +9,9 @@ from glob import glob
from urllib.error import HTTPError
from urllib.request import urlopen
from shutil import copyfileobj
from typing import Iterable, Iterator, Any, Optional
from typing import Iterable, Iterator, Optional
from binfmt import register as binfmt_register
from binfmt import register as binfmt_register, QEMU_ARCHES
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config import config
from exec.cmd import run_cmd, run_root_cmd
@ -22,7 +22,7 @@ from ssh import run_ssh_command, scp_put_files
from wrapper import enforce_wrap, check_programs_wrap, wrap_if_foreign_arch
from utils import git
from .pkgbuild import discover_pkgbuilds, init_pkgbuilds, Pkgbuild
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, init_pkgbuilds, Pkgbuild
from .device import get_profile_device
pacman_cmd = [
@ -36,7 +36,7 @@ pacman_cmd = [
def get_makepkg_env(arch: Optional[Arch] = None):
# has to be a function because calls to `config` must be done after config file was read
threads = config.file['build']['threads'] or multiprocessing.cpu_count()
threads = config.file.build.threads or multiprocessing.cpu_count()
env = {key: val for key, val in os.environ.items() if not key.split('_', maxsplit=1)[0] in ['CI', 'GITLAB', 'FF']}
env |= {
'LANG': 'C',
@ -77,33 +77,6 @@ def init_prebuilts(arch: Arch, dir: str = None):
raise Exception(f'Failed to create local repo {repo}')
def filter_packages(
paths: Iterable[str],
repo: Optional[dict[str, Pkgbuild]] = None,
allow_empty_results=True,
use_paths=True,
use_names=True,
) -> Iterable[Pkgbuild]:
if not allow_empty_results and not paths:
raise Exception("Can't search for packages: no query given")
repo = repo or discover_pkgbuilds()
if 'all' in paths:
return list(repo.values())
result = []
for pkg in repo.values():
comparison = set()
if use_paths:
comparison.add(pkg.path)
if use_names:
comparison.add(pkg.name)
if comparison.intersection(paths):
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result
def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
@ -262,7 +235,7 @@ def strip_compression_extension(filename: str):
for ext in ['zst', 'xz', 'gz', 'bz2']:
if filename.endswith(f'.pkg.tar.{ext}'):
return filename[:-(len(ext) + 1)]
logging.warning(f"file {filename} matches no known package extension")
logging.debug(f"file {filename} matches no known package extension")
return filename
@ -286,7 +259,9 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
for repo_arch in ARCHES:
if repo_arch == arch:
continue
copy_target = os.path.join(config.get_package_dir(repo_arch), package.repo, file)
repo_dir = os.path.join(config.get_package_dir(repo_arch), package.repo)
makedir(repo_dir)
copy_target = os.path.join(repo_dir, file)
shutil.copy(repo_file, copy_target)
add_file_to_repo(copy_target, package.repo, repo_arch)
@ -331,47 +306,21 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: bool = False) -> bool:
enforce_wrap()
native_chroot = setup_build_chroot(config.runtime['arch'])
config_path = '/' + native_chroot.write_makepkg_conf(
target_arch=arch,
cross_chroot_relative=os.path.join('chroot', arch),
cross=True,
)
cmd = ['cd', os.path.join(CHROOT_PATHS['pkgbuilds'], package.path), '&&'] + MAKEPKG_CMD + [
'--config',
config_path,
'--nobuild',
'--noprepare',
'--skippgpcheck',
'--packagelist',
]
result: Any = native_chroot.run_cmd(
cmd,
capture_output=True,
)
if result.returncode != 0:
raise Exception(f'Failed to get package list for {package.path}:' + '\n' + result.stdout.decode() + '\n' + result.stderr.decode())
missing = True
for line in result.stdout.decode('utf-8').split('\n'):
if not line:
continue
basename = os.path.basename(line)
file = os.path.join(config.get_package_dir(arch), package.repo, basename)
filename_stripped = strip_compression_extension(file)
logging.debug(f'Checking if {file} is built')
filename = package.get_filename(arch)
filename_stripped = strip_compression_extension(filename)
logging.debug(f'Checking if {filename_stripped} is built')
for ext in ['xz', 'zst']:
file = os.path.join(config.get_package_dir(arch), package.repo, f'{filename_stripped}.{ext}')
if not filename_stripped.endswith('.pkg.tar'):
logging.debug(f'skipping unknown file extension {basename}')
continue
raise Exception(f'stripped filename has unknown extension. {filename}')
if os.path.exists(file) or (try_download and try_download_package(file, package, arch)):
missing = False
add_file_to_repo(file, repo_name=package.repo, arch=arch)
# copy arch=(any) packages to all arches
if filename_stripped.endswith('any.pkg.tar'):
logging.debug("any-arch pkg detected")
target_repo_file = os.path.join(config.get_package_dir(arch), package.repo, basename)
target_repo_file = os.path.join(config.get_package_dir(arch), package.repo, filename)
if os.path.exists(target_repo_file):
missing = False
else:
@ -379,7 +328,7 @@ def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: boo
for repo_arch in ARCHES:
if repo_arch == arch:
continue # we already checked that
other_repo_path = os.path.join(config.get_package_dir(repo_arch), package.repo, basename)
other_repo_path = os.path.join(config.get_package_dir(repo_arch), package.repo, filename)
if os.path.exists(other_repo_path):
missing = False
logging.info(f"package {file} found in {repo_arch} repos, copying to {arch}")
@ -392,12 +341,16 @@ def check_package_version_built(package: Pkgbuild, arch: Arch, try_download: boo
for repo_arch in ARCHES:
if repo_arch == arch:
continue # we already have that
copy_target = os.path.join(config.get_package_dir(repo_arch), package.repo, basename)
repo_dir = os.path.join(config.get_package_dir(repo_arch), package.repo)
copy_target = os.path.join(repo_dir, filename)
if not os.path.exists(copy_target):
logging.info(f"copying to {copy_target}")
makedir(repo_dir)
shutil.copyfile(target_repo_file, copy_target)
add_file_to_repo(copy_target, package.repo, repo_arch)
return not missing
if not missing:
return True
return False
def setup_build_chroot(
@ -406,7 +359,8 @@ def setup_build_chroot(
add_kupfer_repos: bool = True,
clean_chroot: bool = False,
) -> BuildChroot:
if arch != config.runtime['arch']:
assert config.runtime.arch
if arch != config.runtime.arch:
wrap_if_foreign_arch(arch)
build_enable_qemu_binfmt(arch)
init_prebuilts(arch)
@ -420,10 +374,22 @@ def setup_build_chroot(
chroot.mount_pkgbuilds()
if extra_packages:
chroot.try_install_packages(extra_packages, allow_fail=False)
assert config.runtime.uid is not None
chroot.create_user('kupfer', password='12345678', uid=config.runtime.uid, non_unique=True)
if not os.path.exists(chroot.get_path('/etc/sudoers.d/kupfer_nopw')):
chroot.add_sudo_config('kupfer_nopw', 'kupfer', password_required=False)
return chroot
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf'):
def setup_git_insecure_paths(chroot: BuildChroot, username: str = 'kupfer'):
chroot.run_cmd(
["git", "config", "--global", "--add", "safe.directory", "'*'"],
switch_user=username,
).check_returncode() # type: ignore[union-attr]
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf', switch_user: str = 'kupfer'):
makepkg_setup_args = [
'--config',
makepkg_conf_path,
@ -434,7 +400,13 @@ def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/et
]
logging.info(f'Setting up sources for {package.path} in {chroot.name}')
result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
setup_git_insecure_paths(chroot)
result = chroot.run_cmd(
MAKEPKG_CMD + makepkg_setup_args,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
inner_env=get_makepkg_env(chroot.arch),
switch_user=switch_user,
)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to check sources for {package.path}')
@ -448,19 +420,23 @@ def build_package(
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
build_user: str = 'kupfer',
):
makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf'
repo_dir = repo_dir if repo_dir else config.get_path('pkgbuilds')
foreign_arch = config.runtime['arch'] != arch
foreign_arch = config.runtime.arch != arch
deps = (list(set(package.depends) - set(package.names())))
needs_rust = 'rust' in deps
build_root: BuildChroot
target_chroot = setup_build_chroot(
arch=arch,
extra_packages=deps,
clean_chroot=clean_chroot,
)
assert config.runtime.arch
native_chroot = target_chroot if not foreign_arch else setup_build_chroot(
arch=config.runtime['arch'],
arch=config.runtime.arch,
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot,
)
@ -475,6 +451,7 @@ def build_package(
env = deepcopy(get_makepkg_env(arch))
if enable_ccache:
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
native_chroot.mount_ccache(user=build_user)
logging.info('Setting up dependencies for cross-compilation')
# include crossdirect for ccache symlinks and qemu-user
results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"])
@ -506,12 +483,22 @@ def build_package(
if failed_deps:
raise Exception(f'Dependencies failed to install: {failed_deps}')
if enable_ccache:
build_root.mount_ccache(user=build_user)
if needs_rust:
build_root.mount_rust(user=build_user)
setup_git_insecure_paths(build_root)
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
setup_sources(package, build_root, makepkg_conf_path=makepkg_conf_absolute)
build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}'
logging.debug(f'Building: Running {build_cmd}')
result = build_root.run_cmd(build_cmd, inner_env=env, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
result = build_root.run_cmd(
build_cmd,
inner_env=env,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
switch_user=build_user,
)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to compile package {package.path}')
@ -520,15 +507,19 @@ def build_package(
def get_dependants(
repo: dict[str, Pkgbuild],
packages: Iterable[Pkgbuild],
arch: Arch,
recursive: bool = True,
) -> set[Pkgbuild]:
names = set([pkg.name for pkg in packages])
to_add = set[Pkgbuild]()
for pkg in repo.values():
if set.intersection(names, set(pkg.depends)):
if not set([arch, 'any']).intersection(pkg.arches):
logging.warn(f'get_dependants: skipping matched pkg {pkg.name} due to wrong arch: {pkg.arches}')
continue
to_add.add(pkg)
if recursive and to_add:
to_add.update(get_dependants(repo, to_add))
to_add.update(get_dependants(repo, to_add, arch=arch))
return to_add
@ -543,7 +534,7 @@ def get_unbuilt_package_levels(
repo = repo or discover_pkgbuilds()
dependants = set[Pkgbuild]()
if rebuild_dependants:
dependants = get_dependants(repo, packages)
dependants = get_dependants(repo, packages, arch=arch)
package_levels = generate_dependency_chain(repo, set(packages).union(dependants))
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
@ -574,6 +565,7 @@ def build_packages(
enable_ccache: bool = True,
clean_chroot: bool = False,
):
check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
init_prebuilts(arch)
build_levels = get_unbuilt_package_levels(
packages,
@ -619,9 +611,11 @@ def build_packages_by_paths(
if isinstance(paths, str):
paths = [paths]
for _arch in set([arch, config.runtime['arch']]):
check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
assert config.runtime.arch
for _arch in set([arch, config.runtime.arch]):
init_prebuilts(_arch)
packages = filter_packages(paths, repo=repo, allow_empty_results=False)
packages = filter_pkgbuilds(paths, arch=arch, repo=repo, allow_empty_results=False)
return build_packages(
packages,
arch,
@ -645,10 +639,11 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
logging.info('Installing qemu-user (building if necessary)')
if lazy and _qemu_enabled[arch]:
return
native = config.runtime['arch']
native = config.runtime.arch
assert native
if arch == native:
return
wrap_if_foreign_arch(arch)
check_programs_wrap([f'qemu-{QEMU_ARCHES[arch]}-static', 'pacman', 'makepkg'])
# build qemu-user, binfmt, crossdirect
build_packages_by_paths(
CROSSDIRECT_PKGS,
@ -706,7 +701,6 @@ def build(
rebuild_dependants: bool = False,
try_download: bool = False,
):
# TODO: arch = config.get_profile()...
arch = arch or get_profile_device(hint_or_set_arch=True).arch
if arch not in ARCHES:
@ -719,10 +713,10 @@ def build(
force=force,
rebuild_dependants=rebuild_dependants,
try_download=try_download,
enable_crosscompile=config.file['build']['crosscompile'],
enable_crossdirect=config.file['build']['crossdirect'],
enable_ccache=config.file['build']['ccache'],
clean_chroot=config.file['build']['clean_mode'],
enable_crosscompile=config.file.build.crosscompile,
enable_crossdirect=config.file.build.crossdirect,
enable_ccache=config.file.build.ccache,
clean_chroot=config.file.build.clean_mode,
)
@ -751,7 +745,7 @@ def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bo
'-U',
] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [
'--noconfirm',
'--overwrite=\\*',
"'--overwrite=\\*'",
],
alloc_tty=True).check_returncode()
@ -830,7 +824,7 @@ def cmd_check(paths):
return False
paths = list(paths)
packages = filter_packages(paths, allow_empty_results=False)
packages = filter_pkgbuilds(paths, allow_empty_results=False)
for package in packages:
name = package.name

View file

@ -6,7 +6,7 @@ from typing import Optional
from config import config
from constants import Arch, ARCHES
from config.scheme import DataClass, munchclass
from .pkgbuild import discover_pkgbuilds, _pkgbuilds_cache, Pkgbuild, parse_pkgbuild
from .pkgbuild import discover_pkgbuilds, get_pkgbuild_by_path, _pkgbuilds_cache, Pkgbuild
DEVICE_DEPRECATIONS = {
"oneplus-enchilada": "sdm845-oneplus-enchilada",
@ -105,8 +105,7 @@ def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy:
else:
relative_path = os.path.join('device', pkgname)
assert os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path))
pkgbuild = [p for p in parse_pkgbuild(relative_path, _config=config) if p.name == pkgname][0]
_pkgbuilds_cache[pkgname] = pkgbuild
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy, _config=config) if p.name == pkgname][0]
device = parse_device_pkg(pkgbuild)
if lazy:
_device_cache[name] = device

View file

@ -6,11 +6,11 @@ import multiprocessing
import os
import subprocess
from constants import REPOSITORIES
from joblib import Parallel, delayed
from typing import Optional, Sequence
from typing import Iterable, Optional
from config import config, ConfigStateHolder
from constants import REPOSITORIES
from exec.cmd import run_cmd
from constants import Arch, MAKEPKG_CMD
from distro.package import PackageInfo
@ -47,8 +47,8 @@ def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=F
def init_pkgbuilds(interactive=False):
pkgbuilds_dir = config.get_path('pkgbuilds')
repo_url = config.file['pkgbuilds']['git_repo']
branch = config.file['pkgbuilds']['git_branch']
repo_url = config.file.pkgbuilds.git_repo
branch = config.file.pkgbuilds.git_branch
clone_pkbuilds(pkgbuilds_dir, repo_url, branch, interactive=interactive, update=False)
@ -65,6 +65,7 @@ class Pkgbuild(PackageInfo):
path: str
pkgver: str
pkgrel: str
sources_refreshed: bool
def __init__(
self,
@ -74,6 +75,7 @@ class Pkgbuild(PackageInfo):
provides: list[str] = [],
replaces: list[str] = [],
repo: Optional[str] = None,
sources_refreshed: bool = False,
) -> None:
"""
Create new Pkgbuild representation for file located at `{relative_path}/PKGBUILD`.
@ -91,9 +93,15 @@ class Pkgbuild(PackageInfo):
self.path = relative_path
self.pkgver = ''
self.pkgrel = ''
self.sources_refreshed = sources_refreshed
def __repr__(self):
return f'Pkgbuild({self.name},{repr(self.path)},{self.version},{self.mode})'
return ','.join([
'Pkgbuild(' + self.name,
repr(self.path),
self.version + ("🔄" if self.sources_refreshed else ""),
self.mode + ')',
])
def names(self):
return list(set([self.name] + self.provides + self.replaces))
@ -102,14 +110,69 @@ class Pkgbuild(PackageInfo):
"""updates `self.version` from `self.pkgver` and `self.pkgrel`"""
self.version = f'{self.pkgver}-{self.pkgrel}'
def update(self, pkg: Pkgbuild):
self.version = pkg.version
self.arches = list(pkg.arches)
self.depends = list(pkg.depends)
self.provides = list(pkg.provides)
self.replaces = list(pkg.replaces)
self.local_depends = list(pkg.local_depends)
self.repo = pkg.repo
self.mode = pkg.mode
self.path = pkg.path
self.pkgver = pkg.pkgver
self.pkgrel = pkg.pkgrel
self.sources_refreshed = self.sources_refreshed or pkg.sources_refreshed
self.update_version()
def refresh_sources(self):
raise NotImplementedError()
def get_filename(self, arch: Arch):
if not self.version:
self.update_version()
if self.arches[0] == 'any':
arch = 'any'
return f'{self.name}-{self.version}-{arch}.pkg.tar.zst'
class Pkgbase(Pkgbuild):
subpackages: Sequence[SubPkgbuild]
subpackages: list[SubPkgbuild]
def __init__(self, relative_path: str, subpackages: Sequence[SubPkgbuild] = [], **args):
def __init__(self, relative_path: str, subpackages: list[SubPkgbuild] = [], **args):
self.subpackages = list(subpackages)
super().__init__(relative_path, **args)
def update(self, pkg: Pkgbuild):
if not isinstance(pkg, Pkgbase):
raise Exception(f"Tried to update pkgbase {self.name} with non-base pkg {pkg}")
Pkgbuild.update(self, pkg)
sub_dict = {p.name: p for p in self.subpackages}
self.subpackages.clear()
for new_pkg in pkg.subpackages:
name = new_pkg.name
if name not in sub_dict:
sub_dict[name] = new_pkg
else:
sub_dict[name].update(new_pkg)
updated = sub_dict[name]
updated.sources_refreshed = self.sources_refreshed
self.subpackages.append(updated)
def refresh_sources(self, lazy: bool = True):
'''
Reloads the pkgbuild from disk.
Does **NOT** actually perform the makepkg action to refresh the pkgver() first!
'''
if lazy and self.sources_refreshed:
return
parsed = parse_pkgbuild(self.path, sources_refreshed=True)
basepkgs = [p for p in parsed if isinstance(p, Pkgbase)]
if not len(basepkgs) == 1:
raise Exception(f"error refreshing {self.name}: wrong number of base packages found: {basepkgs}")
self.sources_refreshed = True
self.update(basepkgs[0])
class SubPkgbuild(Pkgbuild):
pkgbase: Pkgbase
@ -119,28 +182,25 @@ class SubPkgbuild(Pkgbuild):
self.name = name
self.pkgbase = pkgbase
self.version = pkgbase.version
self.arches = pkgbase.arches
self.depends = list(pkgbase.depends)
self.sources_refreshed = False
self.update(pkgbase)
self.provides = []
self.replaces = []
self.local_depends = list(pkgbase.local_depends)
self.repo = pkgbase.repo
self.mode = pkgbase.mode
self.path = pkgbase.path
self.pkgver = pkgbase.pkgver
self.pkgrel = pkgbase.pkgrel
self.update_version()
def refresh_sources(self, lazy: bool = True):
assert self.pkgbase
self.pkgbase.refresh_sources(lazy=lazy)
def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None) -> Sequence[Pkgbuild]:
def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None, sources_refreshed: bool = False) -> list[Pkgbuild]:
"""
Since function may run in a different subprocess, we need to be passed the config via parameter
"""
global config
if _config:
config = _config
setup_logging(verbose=config.runtime['verbose'], log_setup=False) # different thread needs log setup.
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different thread needs log setup.
logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}")
pkgbuilds_dir = config.get_path('pkgbuilds')
pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir)
@ -156,7 +216,7 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
raise Exception((f'{relative_pkg_dir}/PKGBUILD has {"no" if mode is None else "an invalid"} mode configured') +
(f': "{mode}"' if mode is not None else ''))
base_package = Pkgbase(relative_pkg_dir)
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed)
base_package.mode = mode
base_package.repo = relative_pkg_dir.split('/')[0]
srcinfo = run_cmd(
@ -197,7 +257,7 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'):
current.depends.append(splits[1].split('=')[0].split(': ')[0])
results: Sequence[Pkgbuild] = list(base_package.subpackages)
results: list[Pkgbuild] = list(base_package.subpackages)
if len(results) > 1:
logging.debug(f" Split package detected: {base_package.name}: {results}")
base_package.update_version()
@ -214,9 +274,21 @@ def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] =
_pkgbuilds_cache = dict[str, Pkgbuild]()
_pkgbuilds_paths = dict[str, list[Pkgbuild]]()
_pkgbuilds_scanned: bool = False
def get_pkgbuild_by_path(relative_path: str, lazy: bool = True, _config: Optional[ConfigStateHolder] = None) -> list[Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_paths
if lazy and relative_path in _pkgbuilds_paths:
return _pkgbuilds_paths[relative_path]
parsed = parse_pkgbuild(relative_path, _config=_config)
_pkgbuilds_paths[relative_path] = parsed
for pkg in parsed:
_pkgbuilds_cache[pkg.name] = pkg
return parsed
def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_scanned
if lazy and _pkgbuilds_scanned:
@ -228,22 +300,38 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
init_pkgbuilds(interactive=False)
for repo in REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
paths.append(os.path.join(repo, dir))
results = []
p = os.path.join(repo, dir)
if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')):
logging.warning(f"{p} doesn't include a PKGBUILD file; skipping")
continue
paths.append(p)
logging.info("Parsing PKGBUILDs")
logging.debug(f"About to parse pkgbuilds. verbosity: {config.runtime['verbose']}")
results = []
if parallel:
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(delayed(parse_pkgbuild)(path, config) for path in paths))
paths_filtered = paths
if lazy:
# filter out cached packages as the caches don't cross process boundaries
paths_filtered = []
for p in paths:
if p in _pkgbuilds_paths:
# use cache
results += _pkgbuilds_paths[p]
else:
paths_filtered += [p]
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(
delayed(get_pkgbuild_by_path)(path, lazy=lazy, _config=config) for path in paths_filtered))
else:
chunks = (parse_pkgbuild(path) for path in paths)
chunks = (get_pkgbuild_by_path(path, lazy=lazy) for path in paths)
_pkgbuilds_paths.clear()
# one list of packages per path
for pkglist in chunks:
_pkgbuilds_paths[pkglist[0].path] = pkglist
results += pkglist
logging.debug('Building package dictionary!')
logging.info('Building package dictionary')
for package in results:
for name in [package.name] + package.replaces:
if name in packages:
@ -255,11 +343,11 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
package.local_depends = package.depends.copy()
for dep in package.depends.copy():
found = dep in packages
for p in packages.values():
for pkg in packages.values():
if found:
break
if dep in p.names():
logging.debug(f'Found {p.name} that provides {dep}')
if dep in pkg.names():
logging.debug(f'Found {pkg.name} that provides {dep}')
found = True
break
if not found:
@ -270,3 +358,41 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pk
_pkgbuilds_cache.update(packages)
_pkgbuilds_scanned = True
return packages
def filter_pkgbuilds(
paths: Iterable[str],
repo: Optional[dict[str, Pkgbuild]] = None,
arch: Optional[Arch] = None,
allow_empty_results=True,
use_paths=True,
use_names=True,
) -> Iterable[Pkgbuild]:
if not (use_names or use_paths):
raise Exception('Error: filter_packages instructed to match neither by names nor paths; impossible!')
if not allow_empty_results and not paths:
raise Exception("Can't search for packages: no query given")
repo = repo or discover_pkgbuilds()
if 'all' in paths:
all_pkgs = list(repo.values())
if arch:
all_pkgs = [pkg for pkg in all_pkgs if set([arch, 'any']).intersection(pkg.arches)]
return all_pkgs
result = []
for pkg in repo.values():
comparison = set()
if use_paths:
comparison.add(pkg.path)
if use_names:
comparison.add(pkg.name)
matches = list(comparison.intersection(paths))
if matches:
assert pkg.arches
if arch and not set([arch, 'any']).intersection(pkg.arches):
logging.warn(f"Pkg {pkg.name} matches query {matches[0]} but isn't available for architecture {arch}: {pkg.arches}")
continue
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result

View file

@ -42,7 +42,7 @@ ONEPLUS_ENCHILADA_PKG = f'device-{ONEPLUS_ENCHILADA}'
def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder):
config = initialise_pkgbuilds_dir
config.try_load_file()
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), config)[0]
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), _config=config)[0]
def validate_oneplus_enchilada(d: Device):

View file

@ -5,3 +5,4 @@ toml
typing_extensions
coloredlogs
munch
setuptools # required by munch

2
ssh.py
View file

@ -32,7 +32,7 @@ def run_ssh_command(cmd: list[str] = [],
extra_args = []
if len(keys) > 0:
extra_args += ['-i', keys[0]]
if config.runtime['verbose']:
if config.runtime.verbose:
extra_args += ['-v']
if alloc_tty:
extra_args += ['-t']

View file

@ -1,2 +1,2 @@
#!/bin/bash
git ls-files \*.py | sort -u | xargs mypy --pretty --install-types --ignore-missing-imports "$@"
git ls-files \*.py | sort -u | xargs mypy --pretty --show-error-codes --install-types --ignore-missing-imports "$@"

View file

@ -15,7 +15,7 @@ wrapper_impls: dict[str, Wrapper] = {
def get_wrapper_type(wrapper_type: str = None):
return wrapper_type or config.file['wrapper']['type']
return wrapper_type or config.file.wrapper.type
def get_wrapper_impl(wrapper_type: str = None) -> Wrapper:
@ -34,7 +34,7 @@ def is_wrapped(wrapper_type: str = None):
def enforce_wrap(no_wrapper=False):
wrapper_type = get_wrapper_type()
if wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime['no_wrap'] and not no_wrapper:
if wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime.no_wrap and not no_wrapper:
logging.info(f'Wrapping in {wrapper_type}')
wrap()
@ -51,10 +51,10 @@ def wrap_if_foreign_arch(arch: Arch):
nowrapper_option = click.option(
'-W',
'--no-wrapper',
'no_wrapper',
'-w/-W',
'--force-wrapper/--no-wrapper',
'wrapper_override',
is_flag=True,
default=False,
help='Disable the docker wrapper. Defaults to autodetection.',
default=None,
help='Force or disable the docker wrapper. Defaults to autodetection.',
)

View file

@ -5,10 +5,11 @@ import subprocess
import sys
from config import config
from constants import CHROOT_PATHS
from .wrapper import BaseWrapper
from exec.file import makedir
DOCKER_PATHS = CHROOT_PATHS.copy()
from .wrapper import BaseWrapper, WRAPPER_PATHS
DOCKER_PATHS = WRAPPER_PATHS.copy()
def docker_volumes_args(volume_mappings: dict[str, str]) -> list[str]:
@ -22,69 +23,77 @@ class DockerWrapper(BaseWrapper):
type: str = 'docker'
def wrap(self):
script_path = config.runtime['script_source_dir']
script_path = config.runtime.script_source_dir
with open(os.path.join(script_path, 'version.txt')) as version_file:
version = version_file.read().replace('\n', '')
tag = f'registry.gitlab.com/kupfer/kupferbootstrap:{version}'
if version == 'dev':
logging.info(f'Building docker image "{tag}"')
cmd = [
'docker',
'build',
'.',
'-t',
tag,
] + (['-q'] if not config.runtime['verbose'] else [])
logging.debug('Running docker cmd: ' + ' '.join(cmd))
result = subprocess.run(cmd, cwd=script_path, capture_output=True)
if result.returncode != 0:
logging.fatal('Failed to build docker image:\n' + result.stderr.decode())
exit(1)
else:
# Check if the image for the version already exists
result = subprocess.run(
[
'docker',
'images',
'-q',
tag,
],
capture_output=True,
)
if result.stdout == b'':
logging.info(f'Pulling kupferbootstrap docker image version \'{version}\'')
subprocess.run([
'docker',
'pull',
tag,
])
container_name = f'kupferbootstrap-{self.uuid}'
wrapped_config = self.generate_wrapper_config()
ssh_dir = os.path.join(pathlib.Path.home(), '.ssh')
if not os.path.exists(ssh_dir):
os.makedirs(ssh_dir, mode=0o700)
volumes = self.get_bind_mounts_default(wrapped_config)
volumes |= dict({config.get_path(vol_name): vol_dest for vol_name, vol_dest in DOCKER_PATHS.items()})
docker_cmd = [
tag = f'registry.gitlab.com/kupfer/kupferbootstrap:{version}'
if version == 'dev':
logging.info(f'Building docker image "{tag}"')
cmd = [
'docker',
'run',
'--name',
container_name,
'--rm',
'--interactive',
'--tty',
'--privileged',
] + docker_volumes_args(volumes) + [tag]
'build',
'.',
'-t',
tag,
] + (['-q'] if not config.runtime.verbose else [])
logging.debug('Running docker cmd: ' + ' '.join(cmd))
result = subprocess.run(cmd, cwd=script_path, capture_output=True)
if result.returncode != 0:
logging.fatal('Failed to build docker image:\n' + result.stderr.decode())
exit(1)
else:
# Check if the image for the version already exists
result = subprocess.run(
[
'docker',
'images',
'-q',
tag,
],
capture_output=True,
)
if result.stdout == b'':
logging.info(f'Pulling kupferbootstrap docker image version \'{version}\'')
subprocess.run([
'docker',
'pull',
tag,
])
container_name = f'kupferbootstrap-{self.uuid}'
kupfer_cmd = ['kupferbootstrap', '--config', '/root/.config/kupfer/kupferbootstrap.toml'] + self.filter_args_wrapper(sys.argv[1:])
wrapped_config = self.generate_wrapper_config()
cmd = docker_cmd + kupfer_cmd
logging.debug('Wrapping in docker:' + repr(cmd))
result = subprocess.run(cmd)
target_user = 'root' if config.runtime.uid == 0 else 'kupfer'
target_home = '/root' if target_user == 'root' else f'/home/{target_user}'
exit(result.returncode)
ssh_dir = os.path.join(pathlib.Path.home(), '.ssh')
if not os.path.exists(ssh_dir):
os.makedirs(ssh_dir, mode=0o700)
volumes = self.get_bind_mounts_default(wrapped_config, ssh_dir=ssh_dir, target_home=target_home)
for vol_name, vol_dest in DOCKER_PATHS.items():
vol_src = config.get_path(vol_name)
makedir(vol_src)
volumes[vol_src] = vol_dest
docker_cmd = [
'docker',
'run',
'--name',
container_name,
'--rm',
'--interactive',
'--tty',
'--privileged',
] + docker_volumes_args(volumes) + [tag]
kupfer_cmd = ['kupferbootstrap', '--config', volumes[wrapped_config]] + self.filter_args_wrapper(sys.argv[1:])
if config.runtime.uid:
kupfer_cmd = ['wrapper_su_helper', '--uid', str(config.runtime.uid), '--username', 'kupfer', '--'] + kupfer_cmd
cmd = docker_cmd + kupfer_cmd
logging.debug('Wrapping in docker:' + repr(cmd))
result = subprocess.run(cmd)
exit(result.returncode)
def stop(self):
subprocess.run(

View file

@ -9,6 +9,11 @@ from config import config
from config.state import dump_file as dump_config_file
from constants import CHROOT_PATHS
WRAPPER_PATHS = CHROOT_PATHS | {
'ccache': '/ccache',
'rust': '/rust',
}
class Wrapper(Protocol):
"""Wrappers wrap kupferbootstrap in some form of isolation from the host OS, i.e. docker or chroots"""
@ -27,7 +32,7 @@ class Wrapper(Protocol):
class BaseWrapper(Wrapper):
id: str
uuid: str
identifier: str
type: str
wrapped_config_path: str
@ -63,7 +68,7 @@ class BaseWrapper(Wrapper):
def generate_wrapper_config(
self,
target_path: str = '/tmp/kupferbootstrap',
paths: dict[str, str] = CHROOT_PATHS,
paths: dict[str, str] = WRAPPER_PATHS,
config_overrides: dict[str, dict] = {},
) -> str:
wrapped_config = f'{target_path.rstrip("/")}/{self.identifier}_wrapped.toml'

33
wrapper_su_helper.py Executable file
View file

@ -0,0 +1,33 @@
#!/bin/python3
import click
import pwd
from logger import logging, setup_logging
from exec.cmd import run_cmd, flatten_shell_script
from exec.file import chown
@click.command('kupferbootstrap_su')
@click.option('--username', default='kupfer', help="The user's name. If --uid is provided, the user's uid will be changed to this in passwd")
@click.option('--uid', default=1000, type=int, help='uid to change $username to and run as')
@click.argument('cmd', type=str, nargs=-1)
def kupferbootstrap_su(cmd: list[str], uid: int = 1000, username: str = 'kupfer'):
"Changes `username`'s uid to `uid` and executes kupferbootstrap as that user"
cmd = list(cmd)
user = pwd.getpwnam(username)
home = user.pw_dir
if uid != user.pw_uid:
run_cmd(['usermod', '-u', str(uid), username]).check_returncode() # type: ignore[union-attr]
chown(home, username, recursive=False)
logging.debug(f'wrapper_su_helper: running {cmd} as {repr(username)}')
su_cmd = ['sudo', 'su', '-P', username, '-c', flatten_shell_script(cmd, wrap_in_shell_quote=True, shell_quote_items=True)]
result = run_cmd(su_cmd, attach_tty=True)
assert isinstance(result, int)
exit(result)
if __name__ == '__main__':
setup_logging(True)
kupferbootstrap_su(prog_name='kupferbootstrap_su_helper')