Compare commits

..

1 Commits

Author SHA1 Message Date
InsanePrawn
5c7ec5d0fa exec/file: remove_file(): use --one-file-system when recursive is requested 2023-01-06 03:14:31 +01:00
107 changed files with 1080 additions and 2829 deletions

View File

@@ -1,5 +0,0 @@
/venv
/build
__pycache__
.mypy_cache
*.xml

5
.gitignore vendored
View File

@@ -1,7 +1,4 @@
*.kate-swp
/venv
/build
venv/
__pycache__/
.coverage*
*.xml
*.egg-info

View File

@@ -7,18 +7,14 @@ format:
stage: check
image: python
before_script:
- python3 -m venv venv
- venv/bin/pip3 install yapf autoflake
- pip install yapf autoflake
script:
- source venv/bin/activate
- ./format.sh --check
typecheck:
stage: check
image: python
before_script:
- python3 -m venv venv
- source venv/bin/activate
- pip install mypy
script:
- ./typecheck.sh --non-interactive --junit-xml mypy-report.xml
@@ -31,13 +27,12 @@ pytest:
image: archlinux
before_script:
- pacman -Sy --noconfirm --needed archlinux-keyring && pacman -Su --noconfirm python python-pip sudo git base-devel arch-install-scripts rsync
- python3 -m venv venv
- venv/bin/pip3 install -r test_requirements.txt -r requirements.txt
- pip install -r test_requirements.txt -r requirements.txt
- 'echo "kupfer ALL = (ALL) NOPASSWD: ALL" > /etc/sudoers.d/kupfer_all'
- useradd -m kupfer
- chmod 777 .
script:
- script -e -c 'su kupfer -s /bin/bash -c ". venv/bin/activate && INTEGRATION_TESTS_USE_GLOBAL_CONFIG=TRUE KUPFERBOOTSTRAP_WRAPPED=DOCKER ./pytest.sh --junit-xml=pytest-report.xml --cov-report=xml:coverage.xml integration_tests.py"'
- script -e -c 'su kupfer -s /bin/bash -c "INTEGRATION_TESTS_USE_GLOBAL_CONFIG=TRUE KUPFERBOOTSTRAP_WRAPPED=DOCKER ./pytest.sh --junit-xml=pytest-report.xml --cov-report=xml:coverage.xml integration_tests.py"'
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
@@ -49,14 +44,12 @@ pytest:
build_docker:
stage: build
image: docker:latest
services:
- name: docker:dind
command: ["--mtu=1100"] # very low, safe value -.-
services: ['docker:dind']
variables:
DOCKER_DRIVER: vfs # overlay2 is not available on ZFS
DOCKER_TLS_CERTDIR: ""
script:
- 'docker build --pull -t "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHA}" -t "${CI_REGISTRY_IMAGE}:${CI_COMMIT_REF_SLUG}" .'
- 'echo "running sanity check" && docker run -it --rm "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHA}" kupferbootstrap --help'
only:
- branches
except:
@@ -83,8 +76,6 @@ push_docker:
DOCS_MAKE_TARGET: "html"
DOCS_MAKE_THREADS: 6
before_script: &docs_before_script
- python3 -m venv venv
- source venv/bin/activate
- pip install -r requirements.txt -r docs/requirements.txt
script: &docs_script
- make -C docs -j$DOCS_MAKE_THREADS SPHINXARGS="$DOCS_SPHINXARGS" $DOCS_MAKE_TARGET

View File

@@ -10,21 +10,22 @@ RUN pacman-key --init && \
android-tools openssh inetutils \
parted
RUN sed -i "s/EUID == 0/EUID == -1/g" "$(which makepkg)"
RUN sed -i "s/EUID == 0/EUID == -1/g" $(which makepkg)
RUN yes | pacman -Scc
RUN sed -i "s/SigLevel.*/SigLevel = Never/g" /etc/pacman.conf
ENV KUPFERBOOTSTRAP_WRAPPED=DOCKER
ENV PATH=/app/bin:/app/local/bin:/app/venv/bin:$PATH
ENV PATH=/app/bin:/app/local/bin:$PATH
WORKDIR /app
COPY . .
RUN python3 -m venv /app/venv
RUN /app/venv/bin/pip3 install -r requirements.txt
COPY requirements.txt .
RUN pip install -r requirements.txt
RUN /app/venv/bin/python3 -c "from kupferbootstrap.distro import distro; distro.get_kupfer_local(arch=None,in_chroot=False).repos_config_snippet()" | tee -a /etc/pacman.conf
COPY . .
RUN python -c "from distro import distro; distro.get_kupfer_local(arch=None,in_chroot=False).repos_config_snippet()" | tee -a /etc/pacman.conf
RUN useradd -m -g users kupfer
RUN echo "kupfer ALL=(ALL) NOPASSWD: ALL" | tee /etc/sudoers.d/kupfer

View File

@@ -18,26 +18,14 @@ This will run a webserver on localhost:9999. Access it like `firefox http://loca
## Installation
0. If you're not on ArchLinux (i.e. don't have `pacman`, `makepkg`, etc. available in your $PATH), install Docker and add yourself to the docker group.
1. Craate a python venv: `python3 -m venv venv`
1. Activate it: `source venv/bin/activate`
1. Install KBS: `pip3 install .`
Then run `kupferbootstrap`.
### Pro Tip:
- You can add a shell alias for `$(PWD)/venv/bin/kupferbootstrap` or create a symlink to it at `/usr/local/bin/kuperbootstrap` for quick access without needing to manually source the venv script every time.
- It is recommended to abbreviate `kupferbootstrap` to `kbs` for even less typing.
Install Docker, Python 3 with the libraries from `requirements.txt` and put `bin/` into your `PATH`.
Then use `kupferbootstrap`.
## Quickstart
1. Initialize config with defaults, configure your device and flavour: `kupferbootstrap config init`
1. Initialize PKGBUILDs and caches: `kupferbootstrap packages init`
1. Build an image and packages along the way: `kupferbootstrap image build`
## Development
### Docker
Put `BUILD` (the default) into `docker_version.txt` to always rebuild kupferboostrap from this directory; otherwise the image is pulled from `registry.gitlab.com/kupfer/kupferbootstrap:$VERSION`, where `$VERSION` is the contents of `docker_version.txt`.
Put `dev` into `version.txt` to always rebuild kupferboostrap from this directory and use `kupferbootstrap` as normal.

4
bin/kupferbootstrap Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
# shellcheck disable=SC2068
python3 "$(dirname "$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")")/main.py" $@

View File

@@ -5,10 +5,10 @@ import logging
from typing import Optional
from kupferbootstrap.chroot.abstract import Chroot
from kupferbootstrap.constants import Arch, QEMU_ARCHES
from kupferbootstrap.exec.cmd import run_root_cmd, CompletedProcess
from kupferbootstrap.utils import mount
from chroot.abstract import Chroot
from constants import Arch, QEMU_ARCHES
from exec.cmd import run_root_cmd
from utils import mount
def binfmt_info(chroot: Optional[Chroot] = None):
@@ -66,23 +66,25 @@ def binfmt_ensure_mounted(chroot: Optional[Chroot] = None):
binfmt_path = '/proc/sys/fs/binfmt_misc'
register_path = binfmt_path + '/register'
if chroot:
binfmt_path = chroot.get_path(binfmt_path)
register_path = chroot.get_path(register_path)
chroot.activate()
if not os.path.exists(register_path):
logging.info('mounting binfmt_misc')
result = (chroot.mount if chroot else mount)('binfmt_misc', binfmt_path, options=[], fs_type='binfmt_misc') # type: ignore[operator]
if (isinstance(result, CompletedProcess) and result.returncode != 0) or not result:
result = mount('binfmt_misc', binfmt_path, options=[], fs_type='binfmt_misc')
if result.returncode != 0:
raise Exception(f'Failed mounting binfmt_misc to {binfmt_path}')
def binfmt_register(arch: Arch, chroot: Optional[Chroot] = None):
def register(arch: Arch, chroot: Optional[Chroot] = None):
binfmt_path = '/proc/sys/fs/binfmt_misc'
register_path = binfmt_path + '/register'
is_arch_known(arch, True, 'register')
qemu_arch = QEMU_ARCHES[arch]
if binfmt_is_registered(arch, chroot=chroot):
if binfmt_is_registered(arch):
return
lines = binfmt_info(chroot=chroot)
lines = binfmt_info()
_runcmd = run_root_cmd
if chroot:
@@ -97,19 +99,15 @@ def binfmt_register(arch: Arch, chroot: Optional[Chroot] = None):
info = lines[qemu_arch]
code = info['line']
if arch == os.uname().machine:
logging.fatal("Attempted to register qemu binfmt for host architecture, skipping!")
return
# Register in binfmt_misc
logging.info(f"Registering qemu binfmt ({arch})")
_runcmd(f'echo "{code}" > "{register_path}" 2>/dev/null') # use path without chroot path prefix
if not binfmt_is_registered(arch, chroot=chroot):
if not binfmt_is_registered(arch):
logging.debug(f'binfmt line: {code}')
raise Exception(f'Failed to register qemu-user for {arch} with binfmt_misc, {binfmt_path}/{info["name"]} not found')
def binfmt_unregister(arch, chroot: Optional[Chroot] = None):
def unregister(arch, chroot: Optional[Chroot] = None):
is_arch_known(arch, True, 'unregister')
qemu_arch = QEMU_ARCHES[arch]
binfmt_ensure_mounted(chroot)
@@ -117,9 +115,6 @@ def binfmt_unregister(arch, chroot: Optional[Chroot] = None):
if chroot:
binfmt_file = chroot.get_path(binfmt_file)
if not os.path.exists(binfmt_file):
logging.debug(f"qemu binfmt for {arch} not registered")
return
logging.info(f"Unregistering qemu binfmt ({arch})")
run_root_cmd(f"echo -1 > {binfmt_file}")
if binfmt_is_registered(arch, chroot=chroot):
raise Exception(f'Failed to UNregister qemu-user for {arch} with binfmt_misc, {chroot=}')

View File

@@ -2,11 +2,11 @@ import click
import os
import logging
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import CHROOT_PATHS
from kupferbootstrap.exec.file import remove_file
from kupferbootstrap.packages.cli import cmd_clean as cmd_clean_pkgbuilds
from kupferbootstrap.wrapper import enforce_wrap
from config.state import config
from constants import CHROOT_PATHS
from exec.file import remove_file
from packages.cli import cmd_clean as cmd_clean_pkgbuilds
from wrapper import enforce_wrap
PATHS = list(CHROOT_PATHS.keys())

View File

@@ -2,20 +2,18 @@ import atexit
import logging
import os
import subprocess
import sys
from copy import deepcopy
from shlex import quote as shell_quote
from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping
from uuid import uuid4
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from kupferbootstrap.distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from kupferbootstrap.exec.cmd import FileDescriptor, run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from kupferbootstrap.exec.file import makedir, root_makedir, root_write_file, write_file
from kupferbootstrap.generator import generate_makepkg_conf
from kupferbootstrap.utils import mount, umount, check_findmnt, log_or_exception
from config.state import config
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.file import makedir, root_makedir, root_write_file, write_file
from generator import generate_makepkg_conf
from utils import mount, umount, check_findmnt, log_or_exception
from .helpers import BASE_CHROOT_PREFIX, BASIC_MOUNTS, base_chroot_name, make_abs_path
@@ -60,8 +58,7 @@ class AbstractChroot(Protocol):
capture_output: bool,
cwd: str,
fail_inactive: bool,
stdout: Optional[FileDescriptor],
stderr: Optional[FileDescriptor],
stdout: Optional[int],
):
pass
@@ -225,8 +222,7 @@ class Chroot(AbstractChroot):
capture_output: bool = False,
cwd: Optional[str] = None,
fail_inactive: bool = True,
stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None,
stdout: Optional[int] = None,
switch_user: Optional[str] = None,
) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive:
@@ -250,7 +246,7 @@ class Chroot(AbstractChroot):
inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
return self.mount(
@@ -375,22 +371,20 @@ class Chroot(AbstractChroot):
packages: list[str],
refresh: bool = False,
allow_fail: bool = True,
redirect_stderr: bool = True,
) -> dict[str, Union[int, subprocess.CompletedProcess]]:
"""Try installing packages, fall back to installing one by one"""
results = {}
stderr = sys.stdout if redirect_stderr else sys.stderr
if refresh:
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm', stderr=stderr)
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm')
cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}', stderr=stderr)
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}')
assert isinstance(result, subprocess.CompletedProcess)
results |= {package: result for package in packages}
if result.returncode != 0 and allow_fail:
results = {}
logging.debug('Falling back to serial installation')
for pkg in set(packages):
results[pkg] = self.run_cmd(f'{cmd} {pkg}', stderr=stderr)
results[pkg] = self.run_cmd(f'{cmd} {pkg}')
return results

View File

@@ -1,15 +1,14 @@
import logging
import os
import sys
from glob import glob
from shutil import rmtree
from typing import ClassVar
from kupferbootstrap.constants import Arch
from kupferbootstrap.exec.cmd import run_root_cmd
from kupferbootstrap.exec.file import makedir, root_makedir
from kupferbootstrap.config.state import config
from constants import Arch
from exec.cmd import run_root_cmd
from exec.file import makedir, root_makedir
from config.state import config
from .abstract import Chroot, get_chroot
from .helpers import base_chroot_name
@@ -32,20 +31,17 @@ class BaseChroot(Chroot):
logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}')
result = run_root_cmd(
[
'pacstrap',
'-C',
pacman_conf_target,
'-G',
self.path,
*self.base_packages,
'--needed',
'--overwrite=*',
'-yyuu',
],
stderr=sys.stdout,
)
result = run_root_cmd([
'pacstrap',
'-C',
pacman_conf_target,
'-G',
self.path,
] + self.base_packages + [
'--needed',
'--overwrite=*',
'-yyuu',
])
if result.returncode != 0:
raise Exception(f'Failed to initialize chroot "{self.name}"')
self.initialized = True

View File

@@ -4,11 +4,11 @@ import subprocess
from glob import glob
from typing import ClassVar, Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch, GCC_HOSTSPECS, CROSSDIRECT_PKGS, CHROOT_PATHS
from kupferbootstrap.distro.distro import get_kupfer_local
from kupferbootstrap.exec.cmd import run_root_cmd
from kupferbootstrap.exec.file import makedir, remove_file, root_makedir, root_write_file, symlink
from config.state import config
from constants import Arch, GCC_HOSTSPECS, CROSSDIRECT_PKGS, CHROOT_PATHS
from distro.distro import get_kupfer_local
from exec.cmd import run_root_cmd
from exec.file import makedir, remove_file, root_makedir, root_write_file, symlink
from .abstract import Chroot, get_chroot
from .helpers import build_chroot_name
@@ -82,7 +82,6 @@ class BuildChroot(Chroot):
native_chroot.mount_pacman_cache()
native_chroot.mount_packages()
native_chroot.activate()
logging.debug(f"Installing {CROSSDIRECT_PKGS=} + {gcc=}")
results = dict(native_chroot.try_install_packages(
CROSSDIRECT_PKGS + [gcc],
refresh=True,
@@ -104,8 +103,8 @@ class BuildChroot(Chroot):
target_include_dir = os.path.join(self.path, 'include')
for target, source in {cc_path: gcc, target_lib_dir: 'lib', target_include_dir: 'usr/include'}.items():
if not (os.path.exists(target) or os.path.islink(target)):
logging.debug(f'Symlinking {source=} at {target=}')
if not os.path.exists(target):
logging.debug(f'Symlinking {source} at {target}')
symlink(source, target)
ld_so = os.path.basename(glob(f"{os.path.join(native_chroot.path, 'usr', 'lib', 'ld-linux-')}*")[0])
ld_so_target = os.path.join(target_lib_dir, ld_so)

View File

@@ -4,9 +4,9 @@ import os
from typing import Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.wrapper import enforce_wrap
from kupferbootstrap.devices.device import get_profile_device
from config.state import config
from wrapper import enforce_wrap
from devices.device import get_profile_device
from .abstract import Chroot
from .base import get_base_chroot
@@ -30,7 +30,7 @@ def cmd_chroot(ctx: click.Context, type: str = 'build', name: Optional[str] = No
raise Exception(f'Unknown chroot type: "{type}"')
if type == 'rootfs':
from ..image.image import cmd_inspect
from image.image import cmd_inspect
assert isinstance(cmd_inspect, click.Command)
ctx.invoke(cmd_inspect, profile=name, shell=True)
return

View File

@@ -3,12 +3,12 @@ import os
from typing import ClassVar, Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch, BASE_PACKAGES
from kupferbootstrap.distro.repo import RepoInfo
from kupferbootstrap.distro.distro import get_kupfer_local, get_kupfer_https
from kupferbootstrap.exec.file import get_temp_dir, makedir, root_makedir
from kupferbootstrap.utils import check_findmnt
from config.state import config
from constants import Arch, BASE_PACKAGES
from distro.repo import RepoInfo
from distro.distro import get_kupfer_local, get_kupfer_https
from exec.file import get_temp_dir, makedir, root_makedir
from utils import check_findmnt
from .base import BaseChroot
from .build import BuildChroot

View File

@@ -1,8 +1,8 @@
import os
from typing import Optional, TypedDict
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch
from config.state import config
from constants import Arch
BIND_BUILD_DIRS = 'BINDBUILDDIRS'
BASE_CHROOT_PREFIX = 'base_'

View File

@@ -1,16 +1,15 @@
import click
import logging
import os
from copy import deepcopy
from typing import Any, Callable, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, Optional, Union
from kupferbootstrap.devices.device import get_devices, sanitize_device_name
from kupferbootstrap.flavours.flavour import get_flavours
from kupferbootstrap.utils import color_bold, colors_supported, color_mark_selected
from kupferbootstrap.wrapper import execute_without_exit
from devices.device import get_devices
from flavours.flavour import get_flavours
from .scheme import Profile
from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS, resolve_profile_attr, SparseProfile
from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS
from .state import config, CONFIG_DEFAULTS, CONFIG_SECTIONS, merge_configs
@@ -88,7 +87,6 @@ def prompt_profile(
raise Exception("profile name 'current' not allowed")
# don't use get_profile() here because we need the sparse profile
if name in config.file.profiles:
logging.debug(f"Merging with existing profile config for {name}")
profile |= config.file.profiles[name]
elif create:
logging.info(f"Profile {name} doesn't exist yet, creating new profile.")
@@ -96,26 +94,27 @@ def prompt_profile(
raise Exception(f'Unknown profile "{name}"')
logging.info(f'Configuring profile "{name}"')
changed = False
if not (no_parse or os.path.exists(os.path.join(config.get_path('pkgbuilds'), 'device'))):
logging.warning("PKGBUILDS NOT INITIALISED:\n"
"Usually we'd present you with detailed lists of choices for devices and flavours in this dialogue,\n"
"but your pkgbuilds.git seem to not have been cloned yet.\n\n"
"You can:\n1. complete the dialogue with default values for now\n"
"2. run `kupferbootstrap packages update` afterwards\n"
f"3. then get back to this dialogue by running `kupferbootstrap config profile init {name}`\n\n"
"You can also use `kupferbootstrap packages flavours` and `kupferbootstrap packages devices` to list them.")
no_parse = True
for key, current in profile.items():
current = profile[key]
text = f'profiles.{name}.{key}'
text = f'{name}.{key}'
if not no_parse and key in PARSEABLE_FIELDS:
parse_prompt = None
sanitize_func = None
if key == 'device':
parse_prompt = prompt_profile_device
sanitize_func = sanitize_device_name
elif key == 'flavour':
parse_prompt = prompt_profile_flavour
else:
raise Exception(f'config: Unhandled parseable field {key}, this is a bug in kupferbootstrap.')
result, _changed = parse_prompt(
current=current,
profile_name=name,
sparse_profiles=config.file.profiles,
use_colors=config.runtime.colors,
sanitize_func=sanitize_func,
) # type: ignore
result, _changed = parse_prompt(current, name) # type: ignore
else:
result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key])) # type: ignore
if _changed:
@@ -129,51 +128,23 @@ def prompt_choice(current: Optional[Any], key: str, choices: Iterable[Any], allo
res, _ = prompt_config(text=key, default=current, field_type=click.Choice(choices), show_choices=show_choices)
if allow_none and res == '':
res = None
return res, res != current
return res, res == current
def resolve_profile_field(current: Any, *kargs):
try:
return resolve_profile_attr(*kargs)
except KeyError as err:
logging.debug(err)
return current, None
def prompt_profile_device(current: Optional[str], profile_name: str) -> tuple[str, bool]:
devices = get_devices()
print(click.style("Pick your device!\nThese are the available devices:", bold=True))
for dev in sorted(devices.keys()):
print(devices[dev])
return prompt_choice(current, f'profiles.{profile_name}.device', devices.keys())
def prompt_wrappable(
attr_name: str,
native_cmd: Callable,
cli_cmd: list[str],
current: Optional[str],
profile_name: str,
sparse_profiles: Mapping[str, SparseProfile],
sanitize_func: Optional[Callable[[str], str]] = None,
use_colors: Optional[bool] = None,
) -> tuple[str, bool]:
use_colors = colors_supported(use_colors)
print(color_bold(f"Pick your {attr_name}!\nThese are the available choices:", use_colors=use_colors))
items = execute_without_exit(native_cmd, cli_cmd)
if items is None:
logging.warning("(wrapper mode, input for this field will not be checked for correctness)")
return prompt_config(text=f'profiles.{profile_name}.{attr_name}', default=current)
selected, inherited_from = resolve_profile_field(current, profile_name, attr_name, sparse_profiles)
if selected and sanitize_func:
selected = sanitize_func(selected)
for key in sorted(items.keys()):
text = items[key].nice_str(newlines=True, colors=use_colors)
if key == selected:
text = color_mark_selected(text, profile_name, inherited_from)
print(text + '\n')
return prompt_choice(current, f'profiles.{profile_name}.{attr_name}', items.keys())
def prompt_profile_device(*kargs, **kwargs) -> tuple[str, bool]:
return prompt_wrappable('device', get_devices, ['devices'], *kargs, **kwargs)
def prompt_profile_flavour(*kargs, **kwargs) -> tuple[str, bool]:
return prompt_wrappable('flavour', get_flavours, ['flavours'], *kargs, **kwargs)
def prompt_profile_flavour(current: Optional[str], profile_name: str) -> tuple[str, bool]:
flavours = get_flavours()
print(click.style("Pick your flavour!\nThese are the available flavours:", bold=True))
for f in sorted(flavours.keys()):
print(flavours[f])
return prompt_choice(current, f'profiles.{profile_name}.flavour', flavours.keys())
def config_dot_name_get(name: str, config: dict[str, Any], prefix: str = '') -> Any:
@@ -205,12 +176,7 @@ def prompt_for_save(retry_ctx: Optional[click.Context] = None):
If `retry_ctx` is passed, the context's command will be reexecuted with the same arguments if the user chooses to retry.
False will still be returned as the retry is expected to either save, perform another retry or arbort.
"""
from ..wrapper import is_wrapped
if click.confirm(f'Do you want to save your changes to {config.runtime.config_file}?', default=True):
if is_wrapped():
logging.warning("Writing to config file inside wrapper."
"This is pointless and probably a bug."
"Your host config file will not be modified.")
return True
if retry_ctx:
if click.confirm('Retry? ("n" to quit without saving)', default=True):
@@ -235,8 +201,6 @@ noninteractive_flag = click.option('-N', '--non-interactive', is_flag=True)
noop_flag = click.option('--noop', '-n', help="Don't write changes to file", is_flag=True)
noparse_flag = click.option('--no-parse', help="Don't search PKGBUILDs for devices and flavours", is_flag=True)
CONFIG_MSG = ("Leave fields empty to leave them at their currently displayed value.")
@cmd_config.command(name='init')
@noninteractive_flag
@@ -260,7 +224,6 @@ def cmd_config_init(
):
"""Initialize the config file"""
if not non_interactive:
logging.info(CONFIG_MSG)
results: dict[str, dict] = {}
for section in sections:
if section not in CONFIG_SECTIONS:
@@ -276,14 +239,7 @@ def cmd_config_init(
results[section][key] = result
config.update(results)
print("Main configuration complete")
if not noop:
if prompt_for_save(ctx):
config.write()
else:
return
if 'profiles' in sections:
print("Configuring profiles")
current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current
new_current, _ = prompt_config('profiles.current', default=current_profile, field_type=str)
profile, changed = prompt_profile(new_current, create=True, no_parse=no_parse)
@@ -310,7 +266,6 @@ def cmd_config_set(ctx, key_vals: list[str], non_interactive: bool = False, noop
like `build.clean_mode=false` or alternatively just keys to get prompted if run interactively.
"""
config.enforce_config_loaded()
logging.info(CONFIG_MSG)
config_copy = deepcopy(config.file)
for pair in key_vals:
split_pair = pair.split('=')
@@ -368,7 +323,6 @@ def cmd_profile_init(ctx, name: Optional[str] = None, non_interactive: bool = Fa
profile = deepcopy(PROFILE_EMPTY)
if name == 'current':
raise Exception("profile name 'current' not allowed")
logging.info(CONFIG_MSG)
name = name or config.file.profiles.current
if name in config.file.profiles:
profile |= config.file.profiles[name]
@@ -379,9 +333,7 @@ def cmd_profile_init(ctx, name: Optional[str] = None, non_interactive: bool = Fa
config.update_profile(name, profile)
if not noop:
if not prompt_for_save(ctx):
logging.info("Not saving.")
return
config.write()
else:
logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')

View File

@@ -21,10 +21,6 @@ PROFILE_DEFAULTS = Profile.fromDict(PROFILE_DEFAULTS_DICT)
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
class ProfileNotFoundException(Exception):
pass
def resolve_profile(
name: str,
sparse_profiles: dict[str, SparseProfile],
@@ -89,40 +85,3 @@ def resolve_profile(
resolved[name] = Profile.fromDict(full)
return resolved
def resolve_profile_attr(
profile_name: str,
attr_name: str,
profiles_sparse: dict[str, SparseProfile],
) -> tuple[str, str]:
"""
This function tries to resolve a profile attribute recursively,
and throws KeyError if the key is not found anywhere in the hierarchy.
Throws a ProfileNotFoundException if the profile is not in profiles_sparse
"""
if profile_name not in profiles_sparse:
raise ProfileNotFoundException(f"Unknown profile {profile_name}")
profile: Profile = profiles_sparse[profile_name]
if attr_name in profile:
return profile[attr_name], profile_name
if 'parent' not in profile:
raise KeyError(f'Profile attribute {attr_name} not found in {profile_name} and no parents')
parent = profile
parent_name = profile_name
seen = []
while True:
if attr_name in parent:
return parent[attr_name], parent_name
seen.append(parent_name)
if not parent.get('parent', None):
raise KeyError(f'Profile attribute {attr_name} not found in inheritance chain, '
f'we went down to {parent_name}.')
parent_name = parent['parent']
if parent_name in seen:
raise RecursionError(f"Profile recursion loop: profile {profile_name} couldn't be resolved"
f"because of a dependency loop:\n{' -> '.join([*seen, parent_name])}")
parent = profiles_sparse[parent_name]

View File

@@ -3,11 +3,12 @@ from __future__ import annotations
from munch import Munch
from typing import Any, Optional, Mapping, Union
from kupferbootstrap.dictscheme import DictScheme
from kupferbootstrap.constants import Arch
from dataclass import DataClass, munchclass
from constants import Arch
class SparseProfile(DictScheme):
@munchclass()
class SparseProfile(DataClass):
parent: Optional[str]
device: Optional[str]
flavour: Optional[str]
@@ -22,6 +23,7 @@ class SparseProfile(DictScheme):
return f'{type(self)}{dict.__repr__(self.toDict())}'
@munchclass()
class Profile(SparseProfile):
parent: Optional[str]
device: str
@@ -34,11 +36,13 @@ class Profile(SparseProfile):
size_extra_mb: Union[str, int]
class WrapperSection(DictScheme):
@munchclass()
class WrapperSection(DataClass):
type: str # NOTE: rename to 'wrapper_type' if this causes problems
class BuildSection(DictScheme):
@munchclass()
class BuildSection(DataClass):
ccache: bool
clean_mode: bool
crosscompile: bool
@@ -46,18 +50,21 @@ class BuildSection(DictScheme):
threads: int
class PkgbuildsSection(DictScheme):
@munchclass()
class PkgbuildsSection(DataClass):
git_repo: str
git_branch: str
class PacmanSection(DictScheme):
@munchclass()
class PacmanSection(DataClass):
parallel_downloads: int
check_space: bool
repo_branch: str
class PathsSection(DictScheme):
@munchclass()
class PathsSection(DataClass):
cache_dir: str
chroots: str
pacman: str
@@ -69,12 +76,12 @@ class PathsSection(DictScheme):
rust: str
class ProfilesSection(DictScheme):
class ProfilesSection(DataClass):
current: str
default: SparseProfile
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None):
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = True):
results = {}
for k, v in values.items():
if k == 'current':
@@ -94,7 +101,8 @@ class ProfilesSection(DictScheme):
return f'{type(self)}{dict.__repr__(self.toDict())}'
class Config(DictScheme):
@munchclass()
class Config(DataClass):
wrapper: WrapperSection
build: BuildSection
pkgbuilds: PkgbuildsSection
@@ -130,7 +138,8 @@ class Config(DictScheme):
return Config(_vals, validate=validate)
class RuntimeConfiguration(DictScheme):
@munchclass()
class RuntimeConfiguration(DataClass):
verbose: bool
no_wrap: bool
error_shell: bool
@@ -138,11 +147,9 @@ class RuntimeConfiguration(DictScheme):
script_source_dir: Optional[str]
arch: Optional[Arch]
uid: Optional[int]
progress_bars: Optional[bool]
colors: Optional[bool]
class ConfigLoadState(DictScheme):
class ConfigLoadState(DataClass):
load_finished: bool
exception: Optional[Exception]

View File

@@ -5,9 +5,9 @@ import toml
from copy import deepcopy
from typing import Mapping, Optional
from kupferbootstrap.constants import DEFAULT_PACKAGE_BRANCH
from constants import DEFAULT_PACKAGE_BRANCH
from .scheme import Config, ConfigLoadState, DictScheme, Profile, RuntimeConfiguration
from .scheme import Config, ConfigLoadState, DataClass, Profile, RuntimeConfiguration
from .profile import PROFILE_DEFAULTS, PROFILE_DEFAULTS_DICT, resolve_profile
CONFIG_DIR = appdirs.user_config_dir('kupfer')
@@ -61,8 +61,6 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'script_source_dir': None,
'arch': None,
'uid': None,
'progress_bars': None,
'colors': None,
})
@@ -95,7 +93,7 @@ def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defau
continue
logging.debug(f'Parsing config section "{outer_name}"')
# check if outer_conf is a dict
if not (isinstance(outer_conf, (dict, DictScheme))):
if not (isinstance(outer_conf, (dict, DataClass))):
parsed[outer_name] = outer_conf
else:
# init section
@@ -257,7 +255,7 @@ class ConfigStateHolder:
profile = self.get_profile(profile_name)
if field not in profile or not profile[field]:
m = (f'Profile "{profile_name}" has no {field.upper()} configured.\n'
f'Please run `kupferbootstrap config profile init {profile_name}`{arch_hint}')
f'Please run `kupferbootstrap config profile init {field}`{arch_hint}')
raise Exception(m)
return profile

View File

@@ -7,9 +7,9 @@ import toml
from tempfile import mktemp, gettempdir as get_system_tempdir
from typing import Any, Optional
from kupferbootstrap.config.profile import PROFILE_DEFAULTS
from kupferbootstrap.config.scheme import Config, Profile
from kupferbootstrap.config.state import CONFIG_DEFAULTS, ConfigStateHolder
from config.profile import PROFILE_DEFAULTS
from config.scheme import Config, Profile
from config.state import CONFIG_DEFAULTS, ConfigStateHolder
def get_filename():
@@ -157,7 +157,7 @@ def test_config_save_modified(configstate_emptyfile: ConfigStateHolder):
def get_config_scheme(data: dict[str, Any], validate=True, allow_incomplete=False) -> Config:
"""
helper func to ignore a false type error.
for some reason, mypy argues about DictScheme.fromDict() instead of Config.fromDict() here
for some reason, mypy argues about DataClass.fromDict() instead of Config.fromDict() here
"""
return Config.fromDict(data, validate=validate, allow_incomplete=allow_incomplete) # type: ignore[call-arg]

View File

@@ -1,9 +1,9 @@
from .typehelpers import TypeAlias
from typing_extensions import TypeAlias
FASTBOOT = 'fastboot'
FLASH_PARTS = {
'FULL': 'full',
'ABOOT': 'abootimg',
'ROOTFS': 'rootfs',
'ABOOT': 'aboot',
'LK2ND': 'lk2nd',
'QHYPSTUB': 'qhypstub',
}
@@ -24,12 +24,7 @@ BASE_PACKAGES: list[str] = BASE_LOCAL_PACKAGES + [
'vim',
]
POST_INSTALL_CMDS = [
'kupfer-config apply',
'kupfer-config --user apply',
]
REPOS_CONFIG_FILE = "repos.yml"
POST_CMDS = ['kupfer-config apply']
REPOSITORIES = [
'boot',
@@ -42,8 +37,7 @@ REPOSITORIES = [
]
DEFAULT_PACKAGE_BRANCH = 'dev'
KUPFER_BRANCH_MARKER = '%kupfer_branch%'
KUPFER_HTTPS_BASE = f'https://gitlab.com/kupfer/packages/prebuilts/-/raw/{KUPFER_BRANCH_MARKER}'
KUPFER_HTTPS_BASE = 'https://gitlab.com/kupfer/packages/prebuilts/-/raw/%branch%'
KUPFER_HTTPS = KUPFER_HTTPS_BASE + '/$arch/$repo'
Arch: TypeAlias = str
@@ -89,7 +83,7 @@ COMPILE_ARCHES: dict[Arch, str] = {
GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = {
'x86_64': {
'x86_64': 'x86_64-pc-linux-gnu',
'aarch64': 'aarch64-unknown-linux-gnu',
'aarch64': 'aarch64-linux-gnu',
'armv7h': 'arm-unknown-linux-gnueabihf'
},
'aarch64': {

102
dataclass.py Normal file
View File

@@ -0,0 +1,102 @@
from __future__ import annotations
from dataclasses import dataclass
from munch import Munch
from typing import ClassVar, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
from types import UnionType
def munchclass(*args, init=False, **kwargs):
return dataclass(*args, init=init, slots=True, **kwargs)
def resolve_type_hint(hint: type) -> Iterable[type]:
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin is Optional:
args = set(list(args) + [type(None)])
if origin in [Union, UnionType, Optional]:
results: list[type] = []
for arg in args:
results += resolve_type_hint(arg)
return results
return [origin or hint]
class DataClass(Munch):
_type_hints: ClassVar[dict[str, Any]]
def __init__(self, d: dict = {}, validate: bool = True, **kwargs):
self.update(d | kwargs, validate=validate)
@classmethod
def transform(cls, values: Mapping[str, Any], validate: bool = True, allow_extra: bool = False) -> Any:
results = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints
if key in type_hints:
_classes = tuple[type](resolve_type_hint(type_hints[key]))
optional = type(None) in _classes
if issubclass(_classes[0], dict):
assert isinstance(value, dict) or optional
target_class = _classes[0]
if target_class is dict:
target_class = Munch
if not isinstance(value, target_class):
if not (optional and value is None):
assert issubclass(target_class, Munch)
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
kwargs = {'validate': validate} if issubclass(target_class, DataClass) else {}
value = target_class.fromDict(value, **kwargs) # type:ignore[attr-defined]
# handle numerics
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
parsed_number = None
parsers: list[tuple[type, list]] = [(int, [10]), (int, [0]), (float, [])]
for _cls, args in parsers:
if _cls not in _classes:
continue
try:
parsed_number = _cls(value, *args)
break
except ValueError:
continue
if parsed_number is None:
if validate:
raise Exception(f"Couldn't parse string value {repr(value)} for key '{key}' into number formats: " +
(', '.join(list(c.__name__ for c in _classes))))
else:
value = parsed_number
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type! expected: '
f'{" ,".join([ c.__name__ for c in _classes])}; '
f'got: {type(value).__name__}; value: {value}')
elif validate and not allow_extra:
raise Exception(f'Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(**cls.transform(values, validate))
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar}
def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}'

13
devices/cli.py Normal file
View File

@@ -0,0 +1,13 @@
import click
from .device import get_devices
@click.command(name='devices')
def cmd_devices():
'list the available devices and descriptions'
devices = get_devices()
if not devices:
raise Exception("No devices found!")
for d in sorted(devices.keys()):
print(devices[d])

View File

@@ -3,15 +3,15 @@ import os
from typing import Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch, ARCHES
from kupferbootstrap.dictscheme import DictScheme
from kupferbootstrap.distro.distro import get_kupfer_local
from kupferbootstrap.distro.package import LocalPackage
from kupferbootstrap.packages.pkgbuild import Pkgbuild, _pkgbuilds_cache, discover_pkgbuilds, get_pkgbuild_by_path, init_pkgbuilds
from kupferbootstrap.utils import read_files_from_tar, color_str
from config.state import config
from constants import Arch, ARCHES
from config.scheme import DataClass, munchclass
from distro.distro import get_kupfer_local
from distro.package import LocalPackage
from packages.pkgbuild import Pkgbuild, _pkgbuilds_cache, discover_pkgbuilds, get_pkgbuild_by_path, init_pkgbuilds
from utils import read_files_from_tar
from .deviceinfo import DEFAULT_IMAGE_SECTOR_SIZE, DeviceInfo, parse_deviceinfo
from .deviceinfo import DeviceInfo, parse_deviceinfo
DEVICE_DEPRECATIONS = {
"oneplus-enchilada": "sdm845-oneplus-enchilada",
@@ -22,57 +22,21 @@ DEVICE_DEPRECATIONS = {
}
class DeviceSummary(DictScheme):
name: str
description: str
arch: str
package_name: Optional[str]
package_path: Optional[str]
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
separator = '\n' if newlines else ', '
assert bool(self.package_path) == bool(self.package_name)
package_path = {"Package Path": self.package_path} if self.package_path else {}
fields = {
"Device": self.name,
"Description": self.description or f"[no package {'description' if self.package_name else 'associated (?!)'} and deviceinfo not parsed]",
"Architecture": self.arch,
"Package Name": self.package_name or "no package associated. PROBABLY A BUG!",
**package_path,
}
return separator.join([f"{color_str(name, bold=True, use_colors=colors)}: {value}" for name, value in fields.items()])
class Device(DictScheme):
@munchclass()
class Device(DataClass):
name: str
arch: Arch
package: Pkgbuild
deviceinfo: Optional[DeviceInfo]
def __repr__(self):
return f'Device<{self.name},{self.arch},{self.package.path if self.package else "[no package]"}>'
return (f'Device "{self.name}": "{self.package.description if self.package else ""}", '
f'Architecture: {self.arch}, package: {self.package.name if self.package else "??? PROBABLY A BUG!"}')
def __str__(self):
return self.nice_str(newlines=True)
def nice_str(self, *args, **kwargs) -> str:
return self.get_summary().nice_str(*args, **kwargs)
def get_summary(self) -> DeviceSummary:
result: dict[str, Optional[str]] = {}
description = ((self.package.description if self.package else "").strip() or
(self.deviceinfo.get("name", "[No name in deviceinfo]") if self.deviceinfo else "")).strip()
result["name"] = self.name
result["description"] = description
result["arch"] = self.arch
result["package_name"] = self.package.name if self.package else None
result["package_path"] = self.package.path if self.package else None
return DeviceSummary(result)
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True) -> DeviceInfo:
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True):
if not lazy or 'deviceinfo' not in self or self.deviceinfo is None:
# avoid import loop
from kupferbootstrap.packages.build import check_package_version_built
from packages.build import check_package_version_built
is_built = check_package_version_built(self.package, self.arch, try_download=try_download)
if not is_built:
raise Exception(f"device package {self.package.name} for device {self.name} couldn't be acquired!")
@@ -96,16 +60,8 @@ class Device(DictScheme):
assert info.arch
assert info.arch == self.arch
self['deviceinfo'] = info
assert self.deviceinfo
return self.deviceinfo
def get_image_sectorsize(self, **kwargs) -> Optional[int]:
"""Gets the deviceinfo_rootfs_image_sector_size if defined, otherwise None"""
return self.parse_deviceinfo(**kwargs).get('rootfs_image_sector_size', None)
def get_image_sectorsize_default(self, **kwargs) -> int:
return self.get_image_sectorsize(**kwargs) or DEFAULT_IMAGE_SECTOR_SIZE
def check_devicepkg_name(name: str, log_level: Optional[int] = None):
valid = True
@@ -135,20 +91,6 @@ def parse_device_pkg(pkgbuild: Pkgbuild) -> Device:
return Device(name=name, arch=arch, package=pkgbuild, deviceinfo=None)
def sanitize_device_name(name: str, warn: bool = True) -> str:
if name not in DEVICE_DEPRECATIONS:
return name
warning = f"Deprecated device {name}"
replacement = DEVICE_DEPRECATIONS[name]
if replacement:
warning += (f': Device has been renamed to {replacement}! Please adjust your profile config!\n'
'This will become an error in a future version!')
name = replacement
if warn:
logging.warning(warning)
return name
_device_cache: dict[str, Device] = {}
_device_cache_populated: bool = False
@@ -173,7 +115,14 @@ def get_devices(pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy: bool = Tr
def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy: bool = True, scan_all=False) -> Device:
global _device_cache, _device_cache_populated
assert lazy or pkgbuilds
name = sanitize_device_name(name)
if name in DEVICE_DEPRECATIONS:
warning = f"Deprecated device {name}"
replacement = DEVICE_DEPRECATIONS[name]
if replacement:
warning += (f': Device has been renamed to {replacement}! Please adjust your profile config!\n'
'This will become an error in a future version!')
name = replacement
logging.warning(warning)
if lazy and name in _device_cache:
return _device_cache[name]
if scan_all:
@@ -197,7 +146,7 @@ def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy:
if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path)):
logging.debug(f'Exact device pkgbuild path "pkgbuilds/{relative_path}" doesn\'t exist, scanning entire repo')
return get_device(name, pkgbuilds=pkgbuilds, lazy=lazy, scan_all=True)
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy) if p.name == pkgname][0]
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy, _config=config) if p.name == pkgname][0]
device = parse_device_pkg(pkgbuild)
if lazy:
_device_cache[name] = device

View File

@@ -5,20 +5,18 @@ import copy
import logging
import os
from typing import Mapping, Optional
from typing import Mapping
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch
from kupferbootstrap.dictscheme import DictScheme
from config.state import config
from constants import Arch
from dataclass import DataClass
PMOS_ARCHES_OVERRIDES: dict[str, Arch] = {
"armv7": 'armv7h',
}
DEFAULT_IMAGE_SECTOR_SIZE = 512
class DeviceInfo(DictScheme):
class DeviceInfo(DataClass):
arch: Arch
name: str
manufacturer: str
@@ -26,12 +24,10 @@ class DeviceInfo(DictScheme):
chassis: str
flash_pagesize: int
flash_method: str
rootfs_image_sector_size: Optional[int]
@classmethod
def transform(cls, values: Mapping[str, Optional[str]], **kwargs):
kwargs = {'allow_extra': True} | kwargs
return super().transform(values, **kwargs)
def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True):
return super().transform(values, validate=validate, allow_extra=allow_extra)
# Variables from deviceinfo. Reference: <https://postmarketos.org/deviceinfo>
@@ -119,7 +115,7 @@ deviceinfo_chassis_types = [
]
def sanity_check(deviceinfo: dict[str, Optional[str]], device_name: str):
def sanity_check(deviceinfo: dict[str, str], device_name: str):
try:
_pmos_sanity_check(deviceinfo, device_name)
except RuntimeError as err:
@@ -133,7 +129,7 @@ def sanity_check(deviceinfo: dict[str, Optional[str]], device_name: str):
f"{err}")
def _pmos_sanity_check(info: dict[str, Optional[str]], device_name: str):
def _pmos_sanity_check(info: dict[str, str], device_name: str):
# Resolve path for more readable error messages
path = os.path.join(config.get_path('pkgbuilds'), 'device', device_name, 'deviceinfo')
@@ -198,19 +194,22 @@ def _pmos_sanity_check(info: dict[str, Optional[str]], device_name: str):
f" and try again: {path}")
def parse_kernel_suffix(deviceinfo: dict[str, Optional[str]], kernel: str = 'mainline') -> dict[str, Optional[str]]:
def parse_kernel_suffix(deviceinfo: dict[str, str], kernel: str = 'mainline') -> dict[str, str]:
"""
Remove the kernel suffix (as selected in 'pmbootstrap init') from
deviceinfo variables. Related:
https://wiki.postmarketos.org/wiki/Device_specific_package#Multiple_kernels
:param info: deviceinfo dict, e.g.:
{"a": "first", "b_mainline": "second", "b_downstream": "third"}
{"a": "first",
"b_mainline": "second",
"b_downstream": "third"}
:param device: which device info belongs to
:param kernel: which kernel suffix to remove (e.g. "mainline")
:returns: info, but with the configured kernel suffix removed, e.g:
{"a": "first", "b": "second", "b_downstream": "third"}
{"a": "first",
"b": "second",
"b_downstream": "third"}
"""
# Do nothing if the configured kernel isn't available in the kernel (e.g.
# after switching from device with multiple kernels to device with only one
@@ -241,7 +240,7 @@ def parse_deviceinfo(deviceinfo_lines: list[str], device_name: str, kernel='main
:param device: defaults to args.device
:param kernel: defaults to args.kernel
"""
info: dict[str, Optional[str]] = {}
info = {}
for line in deviceinfo_lines:
line = line.strip()
if line.startswith("#") or not line:
@@ -259,12 +258,12 @@ def parse_deviceinfo(deviceinfo_lines: list[str], device_name: str, kernel='main
# Assign empty string as default
for key in deviceinfo_attributes:
if key not in info:
info[key] = None
info[key] = ""
info = parse_kernel_suffix(info, kernel)
sanity_check(info, device_name)
if 'arch' in info:
arch = info['arch']
info['arch'] = PMOS_ARCHES_OVERRIDES.get(arch, arch) # type: ignore[arg-type]
info['arch'] = PMOS_ARCHES_OVERRIDES.get(arch, arch)
dev = DeviceInfo.fromDict(info)
return dev

View File

@@ -4,8 +4,8 @@ import os
from copy import copy
from kupferbootstrap.config.state import ConfigStateHolder, config
from kupferbootstrap.packages.pkgbuild import init_pkgbuilds, discover_pkgbuilds, Pkgbuild, parse_pkgbuild
from config.state import ConfigStateHolder, config
from packages.pkgbuild import init_pkgbuilds, discover_pkgbuilds, Pkgbuild, parse_pkgbuild
from .device import Device, DEVICE_DEPRECATIONS, get_device, get_devices, parse_device_pkg, check_devicepkg_name
@@ -42,7 +42,7 @@ ONEPLUS_ENCHILADA_PKG = f'device-{ONEPLUS_ENCHILADA}'
def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder):
config = initialise_pkgbuilds_dir
config.try_load_file()
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG))[0]
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), _config=config)[0]
def validate_oneplus_enchilada(d: Device):

View File

@@ -1,4 +1,4 @@
from kupferbootstrap.config.state import config
from config.state import config
from .deviceinfo import DeviceInfo, parse_deviceinfo
from .device import get_device

129
distro/distro.py Normal file
View File

@@ -0,0 +1,129 @@
from typing import Generic, Mapping, Optional, TypeVar
from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from generator import generate_pacman_conf_body
from config.state import config
from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo
RepoType = TypeVar('RepoType', bound=Repo)
class Distro(Generic[RepoType]):
repos: Mapping[str, RepoType]
arch: str
def __init__(self, arch: Arch, repo_infos: dict[str, RepoInfo], scan=False):
assert (arch in ARCHES)
self.arch = arch
self.repos = dict[str, RepoType]()
for repo_name, repo_info in repo_infos.items():
self.repos[repo_name] = self._create_repo(
name=repo_name,
arch=arch,
url_template=repo_info.url_template,
options=repo_info.options,
scan=scan,
)
def _create_repo(self, **kwargs) -> RepoType:
raise NotImplementedError()
Repo(**kwargs)
def get_packages(self) -> dict[str, BinaryPackageType]:
""" get packages from all repos, semantically overlaying them"""
results = dict[str, BinaryPackageType]()
for repo in list(self.repos.values())[::-1]:
assert repo.packages is not None
results.update(repo.packages)
return results
def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str:
extras: list[Repo] = [
Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()
]
return '\n\n'.join(repo.config_snippet() for repo in (extras + list(self.repos.values())))
def get_pacman_conf(self, extra_repos: Mapping[str, RepoInfo] = {}, check_space: bool = True, in_chroot: bool = True):
body = generate_pacman_conf_body(self.arch, check_space=check_space)
return body + self.repos_config_snippet(extra_repos)
def scan(self, lazy=True):
for repo in self.repos.values():
if not (lazy and repo.scanned):
repo.scan()
def is_scanned(self):
for repo in self.repos.values():
if not repo.scanned:
return False
return True
class LocalDistro(Distro[LocalRepo]):
def _create_repo(self, **kwargs) -> LocalRepo:
return LocalRepo(**kwargs)
class RemoteDistro(Distro[RemoteRepo]):
def _create_repo(self, **kwargs) -> RemoteRepo:
return RemoteRepo(**kwargs)
def get_base_distro(arch: str) -> RemoteDistro:
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
return RemoteDistro(arch=arch, repo_infos=repos, scan=False)
def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
remote = not url_template.startswith('file://')
clss = RemoteDistro if remote else LocalDistro
distro = clss(
arch=arch,
repo_infos=repos,
scan=scan,
)
assert isinstance(distro, (LocalDistro, RemoteDistro))
return distro
_kupfer_https = dict[Arch, RemoteDistro]()
_kupfer_local = dict[Arch, LocalDistro]()
_kupfer_local_chroots = dict[Arch, LocalDistro]()
def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str:
"""gets the repo URL for `branch`, getting branch from config if `None` is passed."""
branch = config.file.pacman.repo_branch if branch is None else branch
return url.replace('%branch%', branch)
def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro:
global _kupfer_https
if arch not in _kupfer_https or not _kupfer_https[arch]:
kupfer = get_kupfer(arch, get_kupfer_url(), scan)
assert isinstance(kupfer, RemoteDistro)
_kupfer_https[arch] = kupfer
item = _kupfer_https[arch]
if scan and not item.is_scanned():
item.scan()
return item
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro:
global _kupfer_local, _kupfer_local_chroots
cache = _kupfer_local_chroots if in_chroot else _kupfer_local
arch = arch or config.runtime.arch
assert arch
if arch not in cache or not cache[arch]:
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
kupfer = get_kupfer(arch, f"file://{dir}/$arch/$repo")
assert isinstance(kupfer, LocalDistro)
cache[arch] = kupfer
item = cache[arch]
if scan and not item.is_scanned():
item.scan()
return item

View File

@@ -2,10 +2,10 @@ import logging
import os
from shutil import copyfileobj
from typing import Optional, Union
from typing import Optional
from urllib.request import urlopen
from kupferbootstrap.exec.file import get_temp_dir, makedir
from exec.file import get_temp_dir, makedir
class PackageInfo:
@@ -17,7 +17,7 @@ class BinaryPackage(PackageInfo):
arch: str
filename: str
resolved_url: Optional[str]
_desc: Optional[dict[str, Union[str, list[str]]]]
_desc: Optional[dict[str, str]]
def __init__(
self,
@@ -39,25 +39,17 @@ class BinaryPackage(PackageInfo):
@classmethod
def parse_desc(clss, desc_str: str, resolved_repo_url=None):
"""Parses a desc file, returning a PackageInfo"""
desc: dict[str, Union[str, list[str]]] = {}
for segment in f'\n{desc_str}'.split('\n%'):
if not segment.strip():
continue
key, elements = (e.strip() for e in segment.strip().split('%\n', 1))
elements_split = elements.split('\n')
desc[key] = elements if len(elements_split) == 1 else elements_split
validated: dict[str, str] = {}
for key in ['NAME', 'VERSION', 'ARCH', 'FILENAME']:
assert key in desc
value = desc[key]
assert isinstance(value, str)
validated[key] = value
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
desc = {}
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
desc[key.strip()] = value.strip()
p = clss(
name=validated['NAME'],
version=validated['VERSION'],
arch=validated['ARCH'],
filename=validated['FILENAME'],
resolved_url='/'.join([resolved_repo_url, validated['FILENAME']]),
name=desc['NAME'],
version=desc['VERSION'],
arch=desc['ARCH'],
filename=desc['FILENAME'],
resolved_url='/'.join([resolved_repo_url, desc['FILENAME']]),
)
p._desc = desc
return p

View File

@@ -2,13 +2,11 @@ from copy import deepcopy
import logging
import os
import tarfile
import tempfile
import urllib.request
from typing import Generic, TypeVar
from kupferbootstrap.config.state import config
from kupferbootstrap.exec.file import get_temp_dir
from kupferbootstrap.utils import download_file
from .package import BinaryPackage, LocalPackage, RemotePackage
BinaryPackageType = TypeVar('BinaryPackageType', bound=BinaryPackage)
@@ -22,12 +20,12 @@ def resolve_url(url_template, repo_name: str, arch: str):
class RepoInfo:
options: dict[str, str]
options: dict[str, str] = {}
url_template: str
def __init__(self, url_template: str, options: dict[str, str] = {}):
self.url_template = url_template
self.options = {} | options
self.options.update(options)
class Repo(RepoInfo, Generic[BinaryPackageType]):
@@ -41,39 +39,21 @@ class Repo(RepoInfo, Generic[BinaryPackageType]):
def resolve_url(self) -> str:
return resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
def scan(self, allow_failure: bool = False) -> bool:
failed = False
def scan(self):
self.resolved_url = self.resolve_url()
self.remote = not self.resolved_url.startswith('file://')
try:
path = self.acquire_db_file()
index = tarfile.open(path)
except Exception as ex:
if not allow_failure:
raise ex
logging.error(f"Repo {self.name}, {self.arch}: Error acquiring repo DB: {ex!r}")
return False
path = self.acquire_db_file()
logging.debug(f'Parsing repo file at {path}')
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
pkgname = os.path.dirname(node.name)
logging.debug(f'Parsing desc file for {pkgname}')
fd = index.extractfile(node)
assert fd
contents = fd.read().decode()
try:
pkg = self._parse_desc(contents)
except Exception as ex:
if not allow_failure:
raise ex
logging.error(f'Repo {self.name}, {self.arch}: Error parsing desc for "{pkgname}": {ex!r}')
failed = True
continue
self.packages[pkg.name] = pkg
if failed:
return False
with tarfile.open(path) as index:
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
fd = index.extractfile(node)
assert fd
pkg = self._parse_desc(fd.read().decode())
self.packages[pkg.name] = pkg
self.scanned = True
return True
def _parse_desc(self, desc_text: str): # can't annotate the type properly :(
raise NotImplementedError()
@@ -114,11 +94,6 @@ class LocalRepo(Repo[LocalPackage]):
class RemoteRepo(Repo[RemotePackage]):
cache_repo_db: bool
def __init__(self, *kargs, cache_repo_db: bool = False, **kwargs):
self.cache_repo_db = cache_repo_db
super().__init__(*kargs, **kwargs)
def _parse_desc(self, desc_text: str) -> RemotePackage:
return RemotePackage.parse_desc(desc_text, resolved_repo_url=self.resolved_url)
@@ -126,9 +101,8 @@ class RemoteRepo(Repo[RemotePackage]):
def acquire_db_file(self) -> str:
uri = f'{self.resolved_url}/{self.name}.db'
logging.info(f'Downloading repo file from {uri}')
assert self.arch and self.name, f"repo has incomplete information: {self.name=}, {self.arch=}"
path = get_temp_dir() if not self.cache_repo_db else os.path.join(config.get_path('pacman'), 'repo_dbs', self.arch)
os.makedirs(path, exist_ok=True)
repo_file = f'{path}/{self.name}.tar.gz'
download_file(repo_file, uri, update=True)
return repo_file
with urllib.request.urlopen(uri) as request:
fd, path = tempfile.mkstemp()
with open(fd, 'wb') as writable:
writable.write(request.read())
return path

View File

@@ -1 +0,0 @@
BUILD

1
docs/.gitignore vendored
View File

@@ -2,7 +2,6 @@
.doctrees
html
source/cli
source/code
checkouts
versions
archived

View File

@@ -14,7 +14,7 @@ cleanbuild:
@$(MAKE) html
clean:
rm -rf html source/cli source/code .buildinfo .doctrees versions checkouts
rm -rf html source/cli .buildinfo .doctrees versions checkouts
html:
sphinx-build $(SPHINXARGS) $(buildargs) html

View File

@@ -1,7 +1,7 @@
# CLI Interface
```{eval-rst}
.. click:: kupferbootstrap.main:cli
.. click:: main:cli
:nested: none
:prog: kupferbootstrap

View File

@@ -6,13 +6,11 @@ orphan: true
only used to trigger builds of the submodule docs!
```{eval-rst}
.. currentmodule:: kupferbootstrap
.. autosummary::
:toctree: cli
:template: command.rst
:recursive:
binfmt
cache
chroot
config

View File

@@ -1,9 +0,0 @@
# Code
Code documentation is available here
```{toctree}
:glob: true
code/kupferbootstrap
```

View File

@@ -1,8 +0,0 @@
:nosearch:
:orphan:
.. autosummary::
:toctree: code
:recursive:
kupferbootstrap

View File

@@ -1,14 +1,10 @@
import logging
import os
from sphinx.config import getenv
from kupferbootstrap.utils import git
import sys
#sys.path.insert(0, os.path.abspath('../..'))
sys.path.insert(0, os.path.abspath('../..'))
extensions = [
'sphinx_click',
"sphinx.ext.autodoc",
'sphinx.ext.autosummary',
"sphinx.ext.linkcode",
'sphinx.ext.autosummary', # Create neat summary tables
'myst_parser'
]
myst_all_links_external = True
@@ -33,45 +29,4 @@ html_theme_options = {
"color-brand-content": "#eba38d",
"color-problematic": "#ff7564",
},
"source_repository": "https://gitlab.com/kupfer/kupferbootstrap",
"source_directory": "docs/source/",
}
autosummary_generate = True
autodoc_default_options = {
"members": True,
"undoc-members": True,
"show-inheritance": True,
"inherited-members": True,
}
autodoc_preserve_defaults = True
def get_version():
try:
res = git(
["rev-parse", "HEAD"],
dir=os.path.join(os.path.dirname(__file__), "../.."),
use_git_dir=True,
capture_output=True,
)
res.check_returncode()
ver = res.stdout.decode().strip()
logging.info(f"Detected git {ver=}")
return ver
except Exception as ex:
logging.warning("Couldn't get git branch:", exc_info=ex)
return "HEAD"
version = getenv("version") or get_version()
def linkcode_resolve(domain, info):
if domain != 'py':
return None
if not info['module']:
return None
filename = info['module'].replace('.', '/')
return "%s/-/blob/%s/src/%s.py" % (html_theme_options["source_repository"], version, filename)

View File

@@ -2,14 +2,10 @@
Kupferbootstrap uses [toml](https://en.wikipedia.org/wiki/TOML) for its configuration file.
The file can either be edited manually or managed via the [`kupferbootstrap config`](../../cli/config) subcommand.
The file can either be edited manually or managed via the {doc}`cli/config` subcommand.
```{hint}
You can quickly generate a default config by running {code}`kupferbootstrap config init -N`.
For an interactive dialogue, omit the `-N`.
```
## File Location
The configuration is stored in `~/.config/kupfer/kupferbootstrap.toml`, where `~` is your user's home folder.
@@ -58,7 +54,7 @@ This allows you to easily keep a number of slight variations of the same target
without the need to constantly modify your Kupferbootstrap configuration file.
You can easily create new profiles with
[kupferbootstrap config profile init](../../cli/config/#kupferbootstrap-config-profile-init).
[kupferbootstrap config profile init](../cli/config/#kupferbootstrap-config-profile-init).
Here's an example:
@@ -68,7 +64,7 @@ current = "graphical"
[profiles.default]
parent = ""
device = "sdm845-oneplus-enchilada"
device = "oneplus-enchilada"
flavour = "barebone"
pkgs_include = [ "wget", "rsync", "nano", "tmux", "zsh", "pv", ]
pkgs_exclude = []
@@ -93,7 +89,7 @@ flavour = "debug-shell"
[profiles.beryllium]
parent = "graphical"
device = "sdm845-xiaomi-beryllium-ebbg"
device = "xiaomi-beryllium-ebbg"
flavour = "gnome"
hostname = "pocof1"
```
@@ -101,7 +97,7 @@ hostname = "pocof1"
The `current` key in the `profiles` section controlls which profile gets used by Kupferbootstrap by default.
The first subsection (`profiles.default`) describes the `default` profile
which gets created by [`kupferbootstrap config init`](../../cli/config/#kupferbootstrap-config-init).
which gets created by [config init](../cli/config/#kupferbootstrap-config-init).
Next, we have a `graphical` profile that defines a couple of graphical programs for all but the `recovery` profile,
since that doesn't have a GUI.

View File

@@ -1,2 +0,0 @@
Module Index
============

View File

@@ -6,8 +6,7 @@ a tool to build and flash packages and images for the [Kupfer](https://gitlab.co
## Documentation pages
```{toctree}
usage/index
install
config
cli
code
genindex
```

View File

@@ -1,36 +0,0 @@
{% set reduced_name = fullname.split(".", 1)[-1] if fullname.startswith("kupferbootstrap.") else fullname %}
{{ fullname | escape | underline }}
.. rubric:: Description
.. automodule:: {{ fullname }}
:members:
:undoc-members:
.. currentmodule:: {{ fullname }}
{% if classes %}
.. rubric:: Classes
.. autosummary::
:toctree: .
{% for class in classes %}
{{ class }}
{% endfor %}
{% endif %}
{% if functions %}
.. rubric:: Functions
.. autosummary::
:toctree: .
{% for function in functions %}
{{ function }}
{% endfor %}
{% endif %}

View File

@@ -1,9 +1,6 @@
{% set reduced_name = fullname.split(".", 1)[-1] if fullname.startswith("kupferbootstrap.") else fullname %}
.. title: {{reduced_name}}
.. title: {{fullname}}
.. currentmodule:: {{ fullname }}
.. click:: {% if fullname == 'main' %}kupferbootstrap.main:cli{% else %}{{fullname}}.cli:cmd_{{reduced_name}}{% endif %}
:prog: kupferbootstrap {{reduced_name}}
.. click:: {% if fullname == 'main' %}main:cli{% else %}{{fullname}}.cli:cmd_{{fullname}}{% endif %}
:prog: kupferbootstrap {{fullname}}
:nested: full

View File

@@ -1,39 +0,0 @@
# FAQ
```{contents} Table of Contents
:class: this-will-duplicate-information-and-it-is-still-useful-here
:depth: 3
```
## Which devices are currently supported?
Currently very few!
See [the `devices` repo](https://gitlab.com/kupfer/packages/pkgbuilds/-/tree/dev/device). We use the same codenames as [postmarketOS](https://wiki.postmarketos.org/wiki/Devices) (although we prefix them with the SoC)
## How to port a new device or package?
See [Porting](../porting)
## How to build a specific package
See also: The full [`kupferbootstrap packages build` docs](../../cli/packages#kupferbootstrap-packages-build)
### Example
For rebuilding `kupfer-config` and `crossdirect`, defaulting to your device's architecture
```sh
kupferbootstrap packages build [--force] [--arch $target_arch] kupfer-config crossdirect
```
### By package path
You can also use the a path snippet (`$repo/$pkgbase`) to the PKGBUILD folder as seen inside your pkgbuilds.git:
```sh
kupferbootstrap packages build [--force] main/kupfer-config cross/crossdirect
```

View File

@@ -1,9 +0,0 @@
# Usage
```{toctree}
quickstart
faq
install
config
porting
```

View File

@@ -1,94 +0,0 @@
# Porting
## Porting devices
### Homework
Before you can get started porting a device, you'll need to do some research:
1. Familiarize yourself with git basics.
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
1. Familiarize yourself with the postmarketOS port of the device.
```{warning}
If there is no postmarketOS port yet, you'll probably need to get deep into kernel development.
We suggest [starting with a port to pmOS](https://wiki.postmarketos.org/wiki/Porting_to_a_new_device) then, especially if you're not familiar with the process already.
```
### Porting
1. Navigate to your pkgbuilds checkout
1. Follow the [general package porting guidelines](#porting-packages) to create a device-, kernel- and probably also a firmware-package for the device and SoC. Usually this roughly means porting the postmarketOS APKBUILDs to our PKGBUILD scheme.
You can get inspiration by comparing existing Kupfer ports (e.g. one of the SDM845 devices) to the [postmarketOS packages](https://gitlab.com/postmarketOS/pmaports/-/tree/master/device) for that device.
Usually you should start out by copying and then customizing the Kupfer packages for a device that's as similar to yours as possible, i.e. uses the same or a related SoC, if something like that is already available in Kupfer.
```{hint} Package Repos:
Device packages belong into `device/`, kernels into `linux/` and firmware into `firmware/`.
```
1. When submitting your MR, please include some information:
- what you have found to be working, broken, and not tested (and why)
- any necessary instructions for testing
- whether you'd be willing to maintain the device long-term (test kernel upgrades, submit device package updates, etc.)
### Gotchas
Please be aware of these gotchas:
- As of now, Kupfer only really supports platforms using Android's `aboot` bootloader, i.e. ex-Android phones. In order to support other boot modes (e.g. uboot on the Librem5 and Pine devices), we'll need to port and switch to postmarketOS's [boot-deploy](https://gitlab.com/postmarketOS/boot-deploy) first and add support for EFI setups to Kupferbootstrap.
## Porting packages
### Homework
Before you can get started, you'll need to do some research:
1. Familiarize yourself with git basics.
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
### Development
```{warning}
Throughout the process, use git to version your changes.
- Don't procrastinate using git or committing until you're "done" or "have got something working", you'll regret it.
- Don't worry about a "clean" git history while you're developing; we can squash it up later.
- \[Force-]Push your changes regularly, just like committing. Don't wait for perfection.
```
1. Create a new git branch for your package locally.
```{hint}
It might be a good ideaa to get into the habit of prefixing branch names with \[a part of] your username and a slash like so:
`myNickname/myFeatureNme`
This makes it easier to work in the same remote repo with multiple people.
```
1.
```{note}
The pkgbuilds git repo contains multiple package repositories, represented by folders at the top level (`main`, `cross`, `phosh`, etc.).
```
Try to choose a sensible package repo for your new packages and create new folders for each `pkgbase` inside the repo folder.
1. Navigate into the folder of the new package and create a new `PKGBUILD`; fill it with life!
1. **`_mode`**: Add the build mode at the top of the PKGBUILD.
```{hint}
If you're unsure what to pick, go with `_mode=host`. It'll use `crossdirect` to get speeds close to proper cross-compiling.
```
This determines whether it's built using a foreign-arch chroot (`_mode=host`) executed with qemu-user, or using real cross-compilation (`_mode=cross`) from a host-architecture chroot, but the package's build tooling has to specifically support the latter, so it's mostly useful for kernels and uncompiled packages.
1. **`_nodeps`**: (Optional) If your package doesn't require its listed dependencies to build
(usually because you're packaging a meta-package or only configs or scripts)
you can add `_nodeps=true` as the next line after the `_mode=` line to speed up packaging.
`makedeps` are still installed anyway.
1. Test building it with `kupferbootstrap packages build $pkgbname`
1. For any files and git repos downloaded by your PKGBUILD,
add them to a new `.gitignore` file in the same directory as your `PKGBUILD`.
```{hint}
Don't forget to `git add` the new `.gitignore` file!
```
1. Run `kupferbootstrap packages check` to make sure the formatting for your PKGBUILDs is okay.
```{warning}
This is **not** optional. MRs with failing CI will **not** be merged.
```
### Pushing
1. Fork the Kupfer pkgbuilds repo on Gitlab using the Fork button
1. Add your fork's **SSH** URI to your local git repo as a **new remote**: `git remote add fork git@gitlab...`
1. `git push -u fork $branchname` it
### Submitting the MR
When you're ready, open a Merge Request on the Kupfer pkgbuilds repo.
```{hint}
Prefix the MR title with `Draft: ` to indicate a Work In Progress state.
```

View File

@@ -1,9 +0,0 @@
# Quickstart
1. [Install](../install) Kupferbootstrap
1. [Configure](../config) it: `kuperbootstrap config init`
1. [Update your PKGBUILDs + SRCINFO cache](../../cli/packages#kupferbootstrap-packages-update): `kupferbootstrap packages update`
1. [Build an image](../../cli/image#kupferbootstrap-image-build): `kupferbootstrap image build`
1. [Flash the image](../../cli/image#kupferbootstrap-image-flash): `kupferbootstrap image flash abootimg && kupferbootstrap image flash full userdata`
See also: [Frequently Asked Questions](../faq)

View File

@@ -5,14 +5,10 @@ import subprocess
from subprocess import CompletedProcess # make it easy for users of this module
from shlex import quote as shell_quote
from typing import IO, Optional, Union
from kupferbootstrap.typehelpers import TypeAlias
from typing import Optional, Union, TypeAlias
ElevationMethod: TypeAlias = str
FileDescriptor: TypeAlias = Union[int, IO]
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
@@ -40,8 +36,6 @@ def flatten_shell_script(script: Union[list[str], str], shell_quote_items: bool
cmds = script
if shell_quote_items:
cmds = [shell_quote(i) for i in cmds]
else:
cmds = [(i if i != '' else '""') for i in cmds]
script = " ".join(cmds)
if wrap_in_shell_quote:
script = shell_quote(script)
@@ -95,8 +89,8 @@ def run_cmd(
cwd: Optional[str] = None,
switch_user: Optional[str] = None,
elevation_method: Optional[ElevationMethod] = None,
stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None,
stdout: Optional[int] = None,
stderr=None,
) -> Union[CompletedProcess, int]:
"execute `script` as `switch_user`, elevating and su'ing as necessary"
kwargs: dict = {}
@@ -105,12 +99,10 @@ def run_cmd(
env_cmd = generate_env_cmd(env)
kwargs['env'] = env
if not attach_tty:
if (stdout, stderr) == (None, None):
kwargs['capture_output'] = capture_output
else:
for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
if fd is not None:
kwargs[name] = fd
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
if stderr:
kwargs['stderr'] = stderr
script = flatten_shell_script(script)
if cwd:
kwargs['cwd'] = cwd

View File

@@ -8,8 +8,8 @@ from shutil import rmtree
from tempfile import mkdtemp
from typing import Optional, Union
from .cmd import run_cmd, run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote
from kupferbootstrap.utils import get_user_name, get_group_name
from .cmd import run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote
from utils import get_user_name, get_group_name
def try_native_filewrite(path: str, content: Union[str, bytes], chmod: Optional[str] = None) -> Optional[Exception]:
@@ -41,7 +41,7 @@ def chown(path: str, user: Optional[Union[str, int]] = None, group: Optional[Uni
raise Exception(f"Failed to change owner of '{path}' to '{owner}'")
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: bool = True):
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True):
if not isinstance(mode, str):
octal = oct(mode)[2:]
else:
@@ -54,7 +54,7 @@ def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: b
os.chmod(path, mode=octal) # type: ignore
except:
cmd = ["chmod", octal, path]
result = run_cmd(cmd, switch_user='root' if privileged else None)
result = run_root_cmd(cmd)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode:
raise Exception(f"Failed to set mode of '{path}' to '{chmod}'")
@@ -138,19 +138,13 @@ def remove_file(path: str, recursive=False):
rm = rmtree if recursive else os.unlink
rm(path) # type: ignore
except:
cmd = ['rm'] + (['-r'] if recursive else []) + [path]
cmd = ['rm', *(['-r', '--one-file-system'] if recursive else []), path]
rc = run_root_cmd(cmd).returncode
if rc:
raise Exception(f"Unable to remove {path}: cmd returned {rc}")
def makedir(
path,
user: Optional[Union[str, int]] = None,
group: Optional[Union[str, int]] = None,
parents: bool = True,
mode: Optional[Union[int, str]] = None,
):
def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None, parents: bool = True):
if not root_check_exists(path):
try:
if parents:
@@ -159,8 +153,6 @@ def makedir(
os.mkdir(path)
except:
run_root_cmd(['mkdir'] + (['-p'] if parents else []) + [path])
if mode is not None:
chmod(path, mode=mode)
chown(path, user, group)
@@ -182,7 +174,7 @@ def symlink(source, target):
def get_temp_dir(register_cleanup=True, mode: int = 0o0755):
"create a new tempdir and sanitize ownership so root can access user files as god intended"
t = mkdtemp()
chmod(t, mode, privileged=False)
chmod(t, mode)
if register_cleanup:
atexit.register(remove_file, t, recursive=True)
return t

View File

@@ -8,7 +8,7 @@ from dataclasses import dataclass
from .cmd import run_root_cmd
from .file import chmod, chown, get_temp_dir, write_file
from kupferbootstrap.utils import get_gid, get_uid
from utils import get_gid, get_uid
TEMPDIR_MODE = 0o755

20
flavours/cli.py Normal file
View File

@@ -0,0 +1,20 @@
import click
from .flavour import get_flavours
profile_option = click.option('-p', '--profile', help="name of the profile to use", required=False, default=None)
@click.command(name='flavours')
def cmd_flavours():
'list information about available flavours'
flavours = get_flavours()
if not flavours:
raise Exception("No flavours found!")
for name in sorted(flavours.keys()):
f = flavours[name]
try:
f.parse_flavourinfo()
except:
pass
print(f)

View File

@@ -4,16 +4,16 @@ import json
import logging
import os
from dataclasses import dataclass
from typing import Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import FLAVOUR_DESCRIPTION_PREFIX, FLAVOUR_INFO_FILE
from kupferbootstrap.dictscheme import DictScheme
from kupferbootstrap.packages.pkgbuild import discover_pkgbuilds, get_pkgbuild_by_name, init_pkgbuilds, Pkgbuild
from kupferbootstrap.utils import color_str
from config.state import config
from constants import FLAVOUR_DESCRIPTION_PREFIX, FLAVOUR_INFO_FILE
from packages.pkgbuild import discover_pkgbuilds, get_pkgbuild_by_name, init_pkgbuilds, Pkgbuild
class FlavourInfo(DictScheme):
@dataclass
class FlavourInfo:
rootfs_size: int # rootfs size in GB
description: Optional[str]
@@ -21,7 +21,8 @@ class FlavourInfo(DictScheme):
return f'rootfs_size: {self.rootfs_size}'
class Flavour(DictScheme):
@dataclass
class Flavour:
name: str
pkgbuild: Pkgbuild
description: str
@@ -42,27 +43,7 @@ class Flavour(DictScheme):
return Flavour(name=name, pkgbuild=pkgbuild, description=description.strip(), flavour_info=None)
def __repr__(self):
return f'Flavour<"{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}>'
def __str__(self):
return self.nice_str()
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
separator = '\n' if newlines else ', '
def get_lines(k, v, key_prefix=''):
results = []
full_k = f'{key_prefix}.{k}' if key_prefix else k
if not isinstance(v, (dict, DictScheme)):
results = [f'{color_str(full_k, bold=True)}: {v}']
else:
for _k, _v in v.items():
if _k.startswith('_'):
continue
results += get_lines(_k, _v, key_prefix=full_k)
return results
return separator.join(get_lines(None, self))
return f'Flavour "{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}'
def parse_flavourinfo(self, lazy: bool = True):
if lazy and self.flavour_info is not None:

View File

@@ -6,7 +6,7 @@ autoflake_args=('--recursive' '--remove-unused-variables' '--remove-all-unused-i
format() {
files=("$@")
if [[ -z "${files[*]}" ]]; then
files=(*.py "src")
files=(".")
fi
yapf "${yapf_args[@]}" "${files[@]}"

View File

@@ -1,7 +1,7 @@
from typing import Optional
from .constants import Arch, CFLAGS_ARCHES, CFLAGS_GENERAL, COMPILE_ARCHES, GCC_HOSTSPECS
from .config.state import config
from constants import Arch, CFLAGS_ARCHES, CFLAGS_GENERAL, COMPILE_ARCHES, GCC_HOSTSPECS
from config.state import config
def generate_makepkg_conf(arch: Arch, cross: bool = False, chroot: Optional[str] = None) -> str:

View File

@@ -4,50 +4,35 @@ import click
from typing import Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION
from kupferbootstrap.exec.file import makedir
from kupferbootstrap.devices.device import get_profile_device
from kupferbootstrap.flavours.flavour import get_profile_flavour
from kupferbootstrap.flavours.cli import profile_option
from kupferbootstrap.wrapper import enforce_wrap
from config.state import config
from constants import FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION
from exec.file import makedir
from devices.device import get_profile_device
from flavours.flavour import get_profile_flavour
from flavours.cli import profile_option
from wrapper import enforce_wrap
from .fastboot import fastboot_boot, fastboot_erase
from .fastboot import fastboot_boot, fastboot_erase_dtbo
from .image import get_device_name, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd
LK2ND = FLASH_PARTS['LK2ND']
ABOOT = FLASH_PARTS['ABOOT']
BOOT_TYPES = [ABOOT, LK2ND, JUMPDRIVE]
TYPES = [LK2ND, JUMPDRIVE, ABOOT]
@click.command(name='boot')
@profile_option
@click.argument('type', required=False, default=ABOOT, type=click.Choice(BOOT_TYPES))
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
@click.option(
'--erase-dtbo/--no-erase-dtbo',
is_flag=True,
default=True,
show_default=True,
help="Erase the DTBO partition before flashing",
)
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
def cmd_boot(
type: str,
profile: Optional[str] = None,
sector_size: Optional[int] = None,
erase_dtbo: bool = True,
confirm: bool = False,
):
@click.argument('type', required=False, default=ABOOT, type=click.Choice(TYPES))
def cmd_boot(type: str, profile: Optional[str] = None):
"""Boot JumpDrive or the Kupfer aboot image. Erases Android DTBO in the process."""
enforce_wrap()
device = get_profile_device(profile)
flavour = get_profile_flavour(profile).name
deviceinfo = device.parse_deviceinfo()
sector_size = sector_size or device.get_image_sectorsize_default()
sector_size = deviceinfo.flash_pagesize
if not sector_size:
raise Exception(f"Device {device.name} has no rootfs_image_sector_size specified")
raise Exception(f"Device {device.name} has no flash_pagesize specified")
image_path = get_image_path(device, flavour)
strategy = deviceinfo.flash_method
if not strategy:
@@ -68,8 +53,7 @@ def cmd_boot(
path = dump_aboot(loop_device + 'p1')
else:
raise Exception(f'Unknown boot image type {type}')
if erase_dtbo:
fastboot_erase('dtbo', confirm=confirm)
fastboot_boot(path, confirm=confirm)
fastboot_erase_dtbo()
fastboot_boot(path)
else:
raise Exception(f'Unsupported flash strategy "{strategy}" for device {device.name}')
raise Exception(f"Unknown flash strategy {strategy} for device {device.name}")

37
image/fastboot.py Normal file
View File

@@ -0,0 +1,37 @@
import logging
import subprocess
def fastboot_erase_dtbo():
logging.info("Fastboot: Erasing DTBO")
subprocess.run(
[
'fastboot',
'erase',
'dtbo',
],
capture_output=True,
)
def fastboot_flash(partition, file):
logging.info(f"Fastboot: Flashing {file} to {partition}")
result = subprocess.run([
'fastboot',
'flash',
partition,
file,
])
if result.returncode != 0:
raise Exception(f'Failed to flash {file}')
def fastboot_boot(file):
logging.info(f"Fastboot: booting {file}")
result = subprocess.run([
'fastboot',
'boot',
file,
])
if result.returncode != 0:
raise Exception(f'Failed to boot {file} using fastboot')

96
image/flash.py Normal file
View File

@@ -0,0 +1,96 @@
import shutil
import os
import click
from typing import Optional
from constants import FLASH_PARTS, LOCATIONS
from exec.cmd import run_root_cmd
from exec.file import get_temp_dir
from devices.device import get_profile_device
from flavours.flavour import get_profile_flavour
from flavours.cli import profile_option
from wrapper import enforce_wrap
from .fastboot import fastboot_flash
from .image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_name, get_image_path
ABOOT = FLASH_PARTS['ABOOT']
LK2ND = FLASH_PARTS['LK2ND']
QHYPSTUB = FLASH_PARTS['QHYPSTUB']
ROOTFS = FLASH_PARTS['ROOTFS']
@click.command(name='flash')
@profile_option
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
@click.argument('location', type=str, required=False)
def cmd_flash(what: str, location: str, profile: Optional[str] = None):
"""Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard"""
enforce_wrap()
device = get_profile_device(profile)
flavour = get_profile_flavour(profile).name
device_image_name = get_image_name(device, flavour)
device_image_path = get_image_path(device, flavour)
deviceinfo = device.parse_deviceinfo()
sector_size = deviceinfo.flash_pagesize
if not sector_size:
raise Exception(f"Device {device.name} has no flash_pagesize specified")
if what not in FLASH_PARTS.values():
raise Exception(f'Unknown what "{what}", must be one of {", ".join(FLASH_PARTS.values())}')
if what == ROOTFS:
if location is None:
raise Exception(f'You need to specify a location to flash {what} to')
path = ''
if location.startswith("/dev/"):
path = location
else:
if location not in LOCATIONS:
raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}')
dir = '/dev/disk/by-id'
for file in os.listdir(dir):
sanitized_file = file.replace('-', '').replace('_', '').lower()
if f'jumpdrive{location.split("-")[0]}' in sanitized_file:
path = os.path.realpath(os.path.join(dir, file))
partprobe(path)
result = run_root_cmd(['lsblk', path, '-o', 'SIZE'], capture_output=True)
if result.returncode != 0:
raise Exception(f'Failed to lsblk {path}')
if result.stdout == b'SIZE\n 0B\n':
raise Exception(f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no'
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
if path == '':
raise Exception('Unable to discover Jumpdrive')
minimal_image_dir = get_temp_dir(register_cleanup=True)
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{device_image_name}')
shutil.copyfile(device_image_path, minimal_image_path)
loop_device = losetup_rootfs_image(minimal_image_path, sector_size)
partprobe(loop_device)
shrink_fs(loop_device, minimal_image_path, sector_size)
losetup_destroy(loop_device)
result = dd_image(input=minimal_image_path, output=path)
if result.returncode != 0:
raise Exception(f'Failed to flash {minimal_image_path} to {path}')
else:
loop_device = losetup_rootfs_image(device_image_path, sector_size)
if what == ABOOT:
path = dump_aboot(f'{loop_device}p1')
fastboot_flash('boot', path)
elif what == LK2ND:
path = dump_lk2nd(f'{loop_device}p1')
fastboot_flash('lk2nd', path)
elif what == QHYPSTUB:
path = dump_qhypstub(f'{loop_device}p1')
fastboot_flash('qhypstub', path)
else:
raise Exception(f'Unknown what "{what}", this must be a bug in kupferbootstrap!')

View File

@@ -9,17 +9,17 @@ from signal import pause
from subprocess import CompletedProcess
from typing import Optional, Union
from kupferbootstrap.config.state import config, Profile
from kupferbootstrap.chroot.device import DeviceChroot, get_device_chroot
from kupferbootstrap.constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_INSTALL_CMDS
from kupferbootstrap.distro.distro import get_base_distro, get_kupfer_https
from kupferbootstrap.devices.device import Device, get_profile_device
from kupferbootstrap.exec.cmd import run_root_cmd, generate_cmd_su
from kupferbootstrap.exec.file import get_temp_dir, root_write_file, root_makedir, makedir
from kupferbootstrap.flavours.flavour import Flavour, get_profile_flavour
from kupferbootstrap.net.ssh import copy_ssh_keys
from kupferbootstrap.packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds
from kupferbootstrap.wrapper import enforce_wrap
from config.state import config, Profile
from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_CMDS
from distro.distro import get_base_distro, get_kupfer_https
from devices.device import Device, get_profile_device
from exec.cmd import run_root_cmd, generate_cmd_su
from exec.file import root_write_file, root_makedir, makedir
from flavours.flavour import Flavour, get_profile_flavour
from net.ssh import copy_ssh_keys
from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds
from wrapper import enforce_wrap
# image files need to be slightly smaller than partitions to fit
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
@@ -44,39 +44,10 @@ def partprobe(device: str):
return run_root_cmd(['partprobe', device])
def bytes_to_sectors(b: int, sector_size: int, round_up: bool = True):
sectors, rest = divmod(b, sector_size)
if rest and round_up:
sectors += 1
return sectors
def get_fs_size(partition: str) -> tuple[int, int]:
blocks_cmd = run_root_cmd(['dumpe2fs', '-h', partition], env={"LC_ALL": "C"}, capture_output=True)
if blocks_cmd.returncode != 0:
logging.debug(f"dumpe2fs stdout:\n: {blocks_cmd.stdout}")
logging.debug(f"dumpe2fs stderr:\n {blocks_cmd.stderr}")
raise Exception(f'Failed to detect new filesystem size of {partition}')
blocks_text = blocks_cmd.stdout.decode('utf-8') if blocks_cmd.stdout else ''
try:
fs_blocks = int(re.search('\\nBlock count:[ ]+([0-9]+)\\n', blocks_text, flags=re.MULTILINE).group(1)) # type: ignore[union-attr]
fs_block_size = int(re.search('\\nBlock size:[ ]+([0-9]+)\\n', blocks_text).group(1)) # type: ignore[union-attr]
except Exception as ex:
logging.debug(f"dumpe2fs stdout:\n {blocks_text}")
logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}")
logging.info("Failed to scrape block size and count from dumpe2fs:", ex)
raise ex
return fs_blocks, fs_block_size
def align_bytes(size_bytes: int, alignment: int = 4096) -> int:
rest = size_bytes % alignment
if rest:
size_bytes += alignment - rest
return size_bytes
def shrink_fs(loop_device: str, file: str, sector_size: int):
# 8: 512 bytes sectors
# 1: 4096 bytes sectors
sectors_blocks_factor = 4096 // sector_size
partprobe(loop_device)
logging.debug(f"Checking filesystem at {loop_device}p2")
result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2'])
@@ -84,16 +55,18 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}')
logging.info(f'Shrinking filesystem at {loop_device}p2')
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2'])
logging.debug(f'Shrinking filesystem at {loop_device}p2')
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2'], capture_output=True)
if result.returncode != 0:
print(result.stdout)
print(result.stderr)
raise Exception(f'Failed to resize2fs {loop_device}p2')
logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2')
fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2')
sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size)
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore
sectors = blocks * sectors_blocks_factor #+ 157812 - 25600
logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)')
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
child_proccess = subprocess.Popen(
generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore
stdin=subprocess.PIPE,
@@ -119,7 +92,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
if returncode > 1:
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk')
partprobe(loop_device).check_returncode()
partprobe(loop_device)
logging.debug(f'Finding end sector of partition at {loop_device}p2')
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True)
@@ -137,7 +110,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
if end_sector == 0:
raise Exception(f'Failed to find end sector of {loop_device}p2')
end_size = align_bytes((end_sector + 1) * sector_size, 4096)
end_size = (end_sector + 1) * sector_size
logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes')
logging.info(f'Truncating {file} to {end_size} bytes')
@@ -227,14 +200,14 @@ def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot):
def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None):
target_path = target_path or os.path.join(get_temp_dir(), os.path.basename(file_path))
target_path = target_path or os.path.join('/tmp', os.path.basename(file_path))
result = run_root_cmd([
'debugfs',
image_path,
'-R',
f'\'dump /{file_path.lstrip("/")} {target_path}\'',
])
if result.returncode != 0 or not os.path.exists(target_path):
if result.returncode != 0:
raise Exception(f'Failed to dump {file_path} from /boot')
return target_path
@@ -281,31 +254,30 @@ def partition_device(device: str):
raise Exception(f'Failed to create partitions on {device}')
def create_filesystem(device: str, blocksize: Optional[int], label=None, options=[], fstype='ext4'):
"""Creates a new filesystem. Blocksize defaults"""
def create_filesystem(device: str, blocksize: int = 4096, label=None, options=[], fstype='ext4'):
# blocksize can be 4k max due to pagesize
blocksize = min(blocksize, 4096)
if fstype.startswith('ext'):
# blocksize for ext-fs must be >=1024
blocksize = max(blocksize, 1024)
labels = ['-L', label] if label else []
cmd = [f'mkfs.{fstype}', '-F', *labels]
if blocksize:
# blocksize can be 4k max due to pagesize
blocksize = min(blocksize, 4096)
if fstype.startswith('ext'):
# blocksize for ext-fs must be >=1024
blocksize = max(blocksize, 1024)
cmd += [
'-b',
str(blocksize),
]
cmd.append(device)
cmd = [
f'mkfs.{fstype}',
'-F',
'-b',
str(blocksize),
] + labels + [device]
result = run_root_cmd(cmd)
if result.returncode != 0:
raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}')
def create_root_fs(device: str, blocksize: Optional[int]):
def create_root_fs(device: str, blocksize: int):
create_filesystem(device, blocksize=blocksize, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000'])
def create_boot_fs(device: str, blocksize: Optional[int]):
def create_boot_fs(device: str, blocksize: int):
create_filesystem(device, blocksize=blocksize, label='kupfer_boot', fstype='ext2')
@@ -333,9 +305,8 @@ def install_rootfs(
)
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
copy_ssh_keys(
chroot,
chroot.path,
user=user,
allow_fail=True,
)
files = {
'etc/pacman.conf': get_base_distro(arch).get_pacman_conf(
@@ -347,13 +318,12 @@ def install_rootfs(
}
for target, content in files.items():
root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)
logging.info("Running post-install CMDs")
for cmd in POST_INSTALL_CMDS:
result = chroot.run_cmd(cmd)
if POST_CMDS:
logging.info("Running post-install CMDs")
result = chroot.run_cmd(' && '.join(POST_CMDS))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Error running post-install cmd: {cmd}')
raise Exception('Error running post_cmds')
logging.info('Preparing to unmount chroot')
res = chroot.run_cmd('sync && umount /boot', attach_tty=True)
@@ -371,61 +341,33 @@ def cmd_image():
"""Build, flash and boot device images"""
sectorsize_option = click.option(
'-b',
'--sector-size',
help="Override the device's sector size",
type=int,
default=None,
)
@cmd_image.command(name='build')
@click.argument('profile_name', required=False)
@click.option(
'--local-repos/--no-local-repos',
'-l/-L',
help='Whether to use local package repos at all or only use HTTPS repos.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--build-pkgs/--no-build-pkgs',
'-p/-P',
help='Whether to build missing/outdated local packages if local repos are enabled.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--no-download-pkgs',
help='Disable trying to download packages instead of building if building is enabled.',
default=False,
is_flag=True,
)
@click.option(
'--block-target',
help='Override the block device file to write the final image to',
type=click.Path(),
default=None,
)
@click.option(
'--skip-part-images',
help='Skip creating image files for the partitions and directly work on the target block device.',
default=False,
is_flag=True,
)
@sectorsize_option
def cmd_build(
profile_name: Optional[str] = None,
local_repos: bool = True,
build_pkgs: bool = True,
no_download_pkgs=False,
block_target: Optional[str] = None,
sector_size: Optional[int] = None,
skip_part_images: bool = False,
):
@click.option('--local-repos/--no-local-repos',
'-l/-L',
default=True,
show_default=True,
help='Whether to use local package repos at all or only use HTTPS repos.')
@click.option('--build-pkgs/--no-build-pkgs',
'-p/-P',
default=True,
show_default=True,
help='Whether to build missing/outdated local packages if local repos are enabled.')
@click.option('--no-download-pkgs',
is_flag=True,
default=False,
help='Disable trying to download packages instead of building if building is enabled.')
@click.option('--block-target', type=click.Path(), default=None, help='Override the block device file to write the final image to')
@click.option('--skip-part-images',
is_flag=True,
default=False,
help='Skip creating image files for the partitions and directly work on the target block device.')
def cmd_build(profile_name: Optional[str] = None,
local_repos: bool = True,
build_pkgs: bool = True,
no_download_pkgs=False,
block_target: Optional[str] = None,
skip_part_images: bool = False):
"""
Build a device image.
@@ -456,7 +398,10 @@ def cmd_build(
pkgbuilds |= set(filter_pkgbuilds(packages_extra, arch=arch, allow_empty_results=True, use_paths=False))
build_packages(pkgbuilds, arch, try_download=not no_download_pkgs)
sector_size = sector_size or device.get_image_sectorsize()
deviceinfo = device.parse_deviceinfo()
sector_size = deviceinfo.flash_pagesize
if not sector_size:
raise Exception(f"Device {device.name} has no flash_pagesize specified")
image_path = block_target or get_image_path(device, flavour.name)
@@ -465,7 +410,7 @@ def cmd_build(
logging.info(f'Creating new file at {image_path}')
create_img_file(image_path, f"{rootfs_size_mb}M")
loop_device = losetup_rootfs_image(image_path, sector_size or device.get_image_sectorsize_default())
loop_device = losetup_rootfs_image(image_path, sector_size)
partition_device(loop_device)
partprobe(loop_device)
@@ -508,17 +453,19 @@ def cmd_build(
@cmd_image.command(name='inspect')
@click.option('--shell', '-s', is_flag=True)
@sectorsize_option
@click.argument('profile', required=False)
def cmd_inspect(profile: Optional[str] = None, shell: bool = False, sector_size: Optional[int] = None):
"""Loop-mount the device image for inspection."""
def cmd_inspect(profile: Optional[str] = None, shell: bool = False):
"""Open a shell in a device image"""
config.enforce_profile_device_set()
config.enforce_profile_flavour_set()
enforce_wrap()
device = get_profile_device(profile)
arch = device.arch
flavour = get_profile_flavour(profile).name
sector_size = sector_size or device.get_image_sectorsize_default()
deviceinfo = device.parse_deviceinfo()
sector_size = deviceinfo.flash_pagesize
if not sector_size:
raise Exception(f"Device {device.name} has no flash_pagesize specified")
chroot = get_device_chroot(device.name, flavour, arch)
image_path = get_image_path(device, flavour)

View File

@@ -5,13 +5,13 @@ import pytest
from glob import glob
from subprocess import CompletedProcess
from kupferbootstrap.config.state import config, CONFIG_DEFAULTS
from kupferbootstrap.constants import SRCINFO_METADATA_FILE
from kupferbootstrap.exec.cmd import run_cmd
from kupferbootstrap.exec.file import get_temp_dir
from kupferbootstrap.logger import setup_logging
from kupferbootstrap.packages.cli import SRCINFO_CACHE_FILES, cmd_build, cmd_clean, cmd_init, cmd_update
from kupferbootstrap.utils import git_get_branch
from config.state import config, CONFIG_DEFAULTS
from constants import SRCINFO_METADATA_FILE
from exec.cmd import run_cmd
from exec.file import get_temp_dir
from logger import setup_logging
from packages.cli import SRCINFO_CACHE_FILES, cmd_build, cmd_clean, cmd_init, cmd_update
from utils import git_get_branch
tempdir = None
config.try_load_file()
@@ -37,11 +37,6 @@ def ctx() -> click.Context:
return click.Context(click.Command('integration_tests'))
def test_main_import():
from main import cli
assert cli
def test_config_load(ctx: click.Context):
path = config.runtime.config_file
assert path
@@ -52,8 +47,7 @@ def test_config_load(ctx: click.Context):
def test_packages_update(ctx: click.Context):
pkgbuilds_path = config.get_path('pkgbuilds')
assert config.runtime.script_source_dir
kbs_branch = git_get_branch(os.path.join(config.runtime.script_source_dir, "../.."))
kbs_branch = git_get_branch(config.runtime.script_source_dir)
# Gitlab CI integration: the CI checks out a detached commit, branch comes back empty.
if not kbs_branch and os.environ.get('CI', 'false') == 'true':
kbs_branch = os.environ.get('CI_COMMIT_BRANCH', '')

28
logger.py Normal file
View File

@@ -0,0 +1,28 @@
import click
import coloredlogs
import logging
import sys
def setup_logging(verbose: bool, log_setup: bool = True):
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
level = logging.DEBUG if verbose else logging.INFO
coloredlogs.install(
stream=sys.stdout,
fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=level,
level_styles=level_colors,
field_styles=field_colors,
)
if log_setup:
logging.debug('Logging set up.')
verbose_option = click.option(
'-v',
'--verbose',
is_flag=True,
help='Enables verbose logging',
)

View File

@@ -3,54 +3,36 @@
import click
import subprocess
from os import isatty
from traceback import format_exc, format_exception_only, format_tb
from typing import Optional
from .logger import color_option, logging, quiet_option, setup_logging, verbose_option
from .wrapper import get_wrapper_type, enforce_wrap, nowrapper_option
from .progressbar import progress_bars_option
from logger import logging, setup_logging, verbose_option
from wrapper import nowrapper_option, enforce_wrap
from .binfmt.cli import cmd_binfmt
from .config.cli import config, config_option, cmd_config
from .packages.cli import cmd_packages
from .flavours.cli import cmd_flavours
from .devices.cli import cmd_devices
from .net.cli import cmd_net
from .chroot.cli import cmd_chroot
from .cache.cli import cmd_cache
from .image.cli import cmd_image
from config.cli import config, config_option, cmd_config
from packages.cli import cmd_packages
from flavours.cli import cmd_flavours
from devices.cli import cmd_devices
from net.cli import cmd_net
from chroot.cli import cmd_chroot
from cache.cli import cmd_cache
from image.cli import cmd_image
@click.group()
@click.option('--error-shell', '-E', 'error_shell', is_flag=True, default=False, help='Spawn shell after error occurs')
@verbose_option
@quiet_option
@config_option
@nowrapper_option
@color_option
@progress_bars_option
def cli(
verbose: bool = False,
quiet: bool = False,
config_file: Optional[str] = None,
wrapper_override: Optional[bool] = None,
error_shell: bool = False,
force_colors: Optional[bool] = None,
force_progress_bars: Optional[bool] = None,
):
setup_logging(verbose, quiet=quiet, force_colors=force_colors)
# stdout is fd 1
config.runtime.colors = isatty(1) if force_colors is None else force_colors
def cli(verbose: bool = False, config_file: Optional[str] = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
setup_logging(verbose)
config.runtime.verbose = verbose
config.runtime.progress_bars = force_progress_bars
config.runtime.no_wrap = wrapper_override is False
config.runtime.error_shell = error_shell
config.try_load_file(config_file)
if config.file_state.exception:
logging.warning(f"Config file couldn't be loaded: {config.file_state.exception}")
if wrapper_override:
logging.info(f'Force-wrapping in wrapper-type: "{get_wrapper_type()}"!')
enforce_wrap()
@@ -78,7 +60,6 @@ def main():
exit(1)
cli.add_command(cmd_binfmt)
cli.add_command(cmd_cache)
cli.add_command(cmd_chroot)
cli.add_command(cmd_config)

View File

@@ -1,8 +1,8 @@
import click
import logging
from kupferbootstrap.exec.cmd import run_root_cmd
from kupferbootstrap.wrapper import check_programs_wrap
from exec.cmd import run_root_cmd
from wrapper import check_programs_wrap
from .ssh import run_ssh_command

View File

@@ -4,11 +4,10 @@ import os
import pathlib
import click
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import SSH_COMMON_OPTIONS, SSH_DEFAULT_HOST, SSH_DEFAULT_PORT
from kupferbootstrap.exec.cmd import run_cmd
from kupferbootstrap.exec.file import write_file
from kupferbootstrap.wrapper import check_programs_wrap
from config.state import config
from constants import SSH_COMMON_OPTIONS, SSH_DEFAULT_HOST, SSH_DEFAULT_PORT
from exec.cmd import run_cmd
from wrapper import check_programs_wrap
@click.command(name='ssh')
@@ -84,16 +83,21 @@ def find_ssh_keys():
return keys
def copy_ssh_keys(chroot: Chroot, user: str, allow_fail: bool = False):
def copy_ssh_keys(root_dir: str, user: str):
check_programs_wrap(['ssh-keygen'])
ssh_dir_relative = os.path.join('/home', user, '.ssh')
ssh_dir = chroot.get_path(ssh_dir_relative)
authorized_keys_file_rel = os.path.join(ssh_dir_relative, 'authorized_keys')
authorized_keys_file = chroot.get_path(authorized_keys_file_rel)
authorized_keys_file = os.path.join(
root_dir,
'home',
user,
'.ssh',
'authorized_keys',
)
if os.path.exists(authorized_keys_file):
os.unlink(authorized_keys_file)
keys = find_ssh_keys()
if len(keys) == 0:
logging.warning("Could not find any ssh key to copy")
logging.info("Could not find any ssh key to copy")
create = click.confirm("Do you want me to generate an ssh key for you?", True)
if not create:
return
@@ -112,34 +116,15 @@ def copy_ssh_keys(chroot: Chroot, user: str, allow_fail: bool = False):
logging.fatal("Failed to generate ssh key")
keys = find_ssh_keys()
if not keys:
logging.warning("No SSH keys to be copied. Skipping.")
return
auth_key_lines = []
for key in keys:
pub = f'{key}.pub'
if not os.path.exists(pub):
logging.debug(f'Skipping key {key}: {pub} not found')
continue
try:
with open(pub, 'r') as file:
contents = file.read()
if not contents.strip():
continue
auth_key_lines.append(contents)
except Exception as ex:
logging.warning(f"Could not read ssh pub key {pub}", exc_info=ex)
continue
ssh_dir = os.path.join(root_dir, 'home', user, '.ssh')
if not os.path.exists(ssh_dir):
logging.info(f"Creating {ssh_dir_relative!r} dir in chroot {chroot.path!r}")
chroot.run_cmd(["mkdir", "-p", "-m", "700", ssh_dir_relative], switch_user=user)
logging.info(f"Writing SSH pub keys to {authorized_keys_file}")
try:
write_file(authorized_keys_file, "\n".join(auth_key_lines), user=str(chroot.get_uid(user)), mode="644")
except Exception as ex:
logging.error(f"Failed to write SSH authorized_keys_file at {authorized_keys_file!r}:", exc_info=ex)
if allow_fail:
return
raise ex from ex
os.makedirs(ssh_dir, exist_ok=True, mode=0o700)
with open(authorized_keys_file, 'a') as authorized_keys:
for key in keys:
pub = f'{key}.pub'
if not os.path.exists(pub):
logging.debug(f'Skipping key {key}: {pub} not found')
continue
with open(pub, 'r') as file:
authorized_keys.write(file.read())

View File

@@ -1,6 +1,6 @@
import subprocess
import click
from kupferbootstrap.wrapper import check_programs_wrap
from wrapper import check_programs_wrap
@click.command(name='telnet')

View File

@@ -3,24 +3,22 @@ import multiprocessing
import os
import shutil
import subprocess
import sys
from copy import deepcopy
from urllib.error import HTTPError
from typing import Iterable, Iterator, Optional
from kupferbootstrap.binfmt.binfmt import binfmt_is_registered, binfmt_register
from kupferbootstrap.constants import CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from kupferbootstrap.config.state import config
from kupferbootstrap.exec.cmd import run_cmd, run_root_cmd
from kupferbootstrap.exec.file import makedir, remove_file, symlink
from kupferbootstrap.chroot.build import get_build_chroot, BuildChroot
from kupferbootstrap.distro.distro import get_kupfer_https, get_kupfer_local, get_kupfer_repo_names
from kupferbootstrap.distro.package import RemotePackage, LocalPackage
from kupferbootstrap.distro.repo import LocalRepo
from kupferbootstrap.progressbar import BAR_PADDING, get_levels_bar
from kupferbootstrap.wrapper import check_programs_wrap, is_wrapped
from kupferbootstrap.utils import ellipsize, sha256sum
from binfmt import register as binfmt_register, binfmt_is_registered
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config.state import config
from exec.cmd import run_cmd, run_root_cmd
from exec.file import makedir, remove_file, symlink
from chroot.build import get_build_chroot, BuildChroot
from distro.distro import get_kupfer_https, get_kupfer_local
from distro.package import RemotePackage, LocalPackage
from distro.repo import LocalRepo
from wrapper import check_programs_wrap, is_wrapped
from utils import sha256sum
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild
@@ -53,13 +51,13 @@ def get_makepkg_env(arch: Optional[Arch] = None):
def init_local_repo(repo: str, arch: Arch):
repo_dir = os.path.join(config.get_package_dir(arch), repo)
if not os.path.exists(repo_dir):
logging.info(f'Creating local repo "{repo}" ({arch})')
logging.info(f"Creating local repo {repo} ({arch})")
makedir(repo_dir)
for ext in ['db', 'files']:
filename_stripped = f'{repo}.{ext}'
filename = f'{filename_stripped}.tar.xz'
if not os.path.exists(os.path.join(repo_dir, filename)):
logging.info(f'Initialising local repo {f"{ext} " if ext != "db" else ""}db for repo "{repo}" ({arch})')
logging.info(f"Initialising local repo {f'{ext} ' if ext != 'db' else ''}db for repo {repo} ({arch})")
result = run_cmd(
[
'tar',
@@ -72,7 +70,7 @@ def init_local_repo(repo: str, arch: Arch):
)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to create local repo "{repo}"')
raise Exception(f'Failed to create local repo {repo}')
symlink_path = os.path.join(repo_dir, filename_stripped)
if not os.path.islink(symlink_path):
if os.path.exists(symlink_path):
@@ -84,7 +82,7 @@ def init_prebuilts(arch: Arch):
"""Ensure that all `constants.REPOSITORIES` inside `dir` exist"""
prebuilts_dir = config.get_path('packages')
makedir(prebuilts_dir)
for repo in get_kupfer_repo_names(local=True):
for repo in REPOSITORIES:
init_local_repo(repo, arch)
@@ -229,7 +227,7 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
target_file,
]
logging.debug(f'repo: running cmd: {cmd}')
result = run_cmd(cmd, stderr=sys.stdout)
result = run_cmd(cmd)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed add package {target_file} to repo {repo_name}')
@@ -276,8 +274,8 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]:
logging.debug(f"checking if we can download {package.name}")
filename = os.path.basename(dest_file_path)
logging.debug(f"checking if we can download {filename}")
pkgname = package.name
repo_name = package.repo
repos = get_kupfer_https(arch, scan=True).repos
@@ -290,8 +288,7 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
return None
repo_pkg: RemotePackage = repo.packages[pkgname]
if repo_pkg.version != package.version:
logging.debug(f"Package {pkgname} versions differ: local: {package.version}, "
f"remote: {repo_pkg.version}. Building instead.")
logging.debug(f"Package {pkgname} versions differ: local: {package.version}, remote: {repo_pkg.version}. Building instead.")
return None
if repo_pkg.filename != filename:
versions_str = f"local: {filename}, remote: {repo_pkg.filename}"
@@ -299,20 +296,6 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
logging.debug(f"package filenames don't match: {versions_str}")
return None
logging.debug(f"ignoring compression extension difference: {versions_str}")
cache_file = os.path.join(config.get_path('pacman'), arch, repo_pkg.filename)
if os.path.exists(cache_file):
if not repo_pkg._desc or 'SHA256SUM' not in repo_pkg._desc:
cache_matches = False
extra_msg = ". However, we can't validate it, as the https repo doesnt provide a SHA256SUM for it."
else:
cache_matches = sha256sum(cache_file) == repo_pkg._desc['SHA256SUM']
extra_msg = (". However its checksum doesn't match." if not cache_matches else " and its checksum matches.")
logging.debug(f"While checking the HTTPS repo DB, we found a matching filename in the pacman cache{extra_msg}")
if cache_matches:
logging.info(f'copying cache file {cache_file} to repo as verified by remote checksum')
shutil.copy(cache_file, dest_file_path)
remove_file(cache_file)
return dest_file_path
url = repo_pkg.resolved_url
assert url
try:
@@ -439,11 +422,10 @@ def setup_build_chroot(
extra_packages: list[str] = [],
add_kupfer_repos: bool = True,
clean_chroot: bool = False,
repo: Optional[dict[str, Pkgbuild]] = None,
) -> BuildChroot:
assert config.runtime.arch
if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch, repo=repo or discover_pkgbuilds(), lazy=False)
build_enable_qemu_binfmt(arch)
init_prebuilts(arch)
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
chroot.mount_packages()
@@ -491,7 +473,7 @@ def setup_sources(package: Pkgbuild, lazy: bool = True):
assert config.runtime.arch
chroot = setup_build_chroot(config.runtime.arch)
logging.info(f'{package.path}: Setting up sources with makepkg')
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer', stderr=sys.stdout)
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer')
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}')
@@ -512,7 +494,6 @@ def build_package(
enable_ccache: bool = True,
clean_chroot: bool = False,
build_user: str = 'kupfer',
repo: Optional[dict[str, Pkgbuild]] = None,
):
makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf'
@@ -532,7 +513,6 @@ def build_package(
arch=arch,
extra_packages=deps,
clean_chroot=clean_chroot,
repo=repo,
)
assert config.runtime.arch
native_chroot = target_chroot
@@ -542,7 +522,6 @@ def build_package(
arch=config.runtime.arch,
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot,
repo=repo,
)
if not package.mode:
logging.warning(f'Package {package.path} has no _mode set, assuming "host"')
@@ -575,7 +554,7 @@ def build_package(
build_root = target_chroot
makepkg_compile_opts += ['--nodeps' if package.nodeps else '--syncdeps']
env = deepcopy(get_makepkg_env(arch))
if foreign_arch and package.crossdirect and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
if foreign_arch and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}"
target_chroot.mount_crossdirect(native_chroot)
else:
@@ -597,14 +576,13 @@ def build_package(
setup_git_insecure_paths(build_root)
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
build_cmd = ['source', '/etc/profile', '&&', *MAKEPKG_CMD, '--config', makepkg_conf_absolute, '--skippgpcheck', *makepkg_compile_opts]
build_cmd = MAKEPKG_CMD + ['--config', makepkg_conf_absolute, '--skippgpcheck'] + makepkg_compile_opts
logging.debug(f'Building: Running {build_cmd}')
result = build_root.run_cmd(
build_cmd,
inner_env=env,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
switch_user=build_user,
stderr=sys.stdout,
)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
@@ -657,33 +635,17 @@ def get_unbuilt_package_levels(
includes_dependants = " (includes dependants)" if rebuild_dependants else ""
logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}")
i = 0
total_levels = len(package_levels)
package_bar = get_levels_bar(
total=sum([len(lev) for lev in package_levels]),
desc=f"Checking pkgs ({arch})",
unit='pkgs',
fields={"levels_total": total_levels},
enable_rate=False,
)
counter_built = package_bar.add_subcounter('green')
counter_unbuilt = package_bar.add_subcounter('blue')
for level_num, level_packages in enumerate(package_levels):
level_num = level_num + 1
package_bar.update(0, name=" " * BAR_PADDING, level=level_num)
for level_packages in package_levels:
level = set[Pkgbuild]()
if not level_packages:
continue
def add_to_level(pkg, level, reason=''):
if reason:
reason = f': {reason}'
counter_unbuilt.update(force=True)
logging.info(f"Level {level}/{total_levels} ({arch}): Adding {package.path}{reason}")
logging.info(f"Level {i} ({arch}): Adding {package.path}{reason}")
level.add(package)
build_names.update(package.names())
for package in level_packages:
package_bar.update(0, force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
if (force and package in packages):
add_to_level(package, level, 'query match and force=True')
elif rebuild_dependants and package in dependants:
@@ -691,14 +653,12 @@ def get_unbuilt_package_levels(
elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources):
add_to_level(package, level, 'package unbuilt')
else:
logging.info(f"Level {level_num}/{total_levels} ({arch}): {package.path}: Package doesn't need [re]building")
counter_built.update(force=True)
logging.info(f"Level {i}: {package.path} ({arch}): Package doesn't need [re]building")
logging.debug(f'Finished checking level {level_num}/{total_levels} ({arch}). Adding unbuilt pkgs: {get_pkg_names_str(level)}')
if level:
build_levels.append(level)
logging.debug(f'Finished checking level {i}. Adding unbuilt pkgs: {get_pkg_names_str(level)}')
i += 1
package_bar.close(clear=True)
return build_levels
@@ -731,24 +691,11 @@ def build_packages(
logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}")
total_levels = len(build_levels)
package_bar = get_levels_bar(
desc=f'Building pkgs ({arch})',
color='purple',
unit='pkgs',
total=sum([len(lev) for lev in build_levels]),
fields={"levels_total": total_levels},
enable_rate=False,
)
files = []
updated_repos: set[str] = set()
package_bar.update(-1)
for level, need_build in enumerate(build_levels):
level = level + 1
package_bar.update(incr=0, force=True, name=" " * BAR_PADDING, level=level)
logging.info(f"(Level {level}/{total_levels}) Building {get_pkg_names_str(need_build)}")
logging.info(f"(Level {level}) Building {get_pkg_names_str(need_build)}")
for package in need_build:
package_bar.update(force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
base = package.pkgbase if isinstance(package, SubPkgbuild) else package
assert isinstance(base, Pkgbase)
if package.is_built(arch):
@@ -761,21 +708,17 @@ def build_packages(
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
repo=repo,
)
files += add_package_to_repo(package, arch)
updated_repos.add(package.repo)
for _arch in ['any', arch]:
if _arch in base.arches:
base._built_for.add(_arch)
package_bar.update()
# rescan affected repos
local_repos = get_kupfer_local(arch, in_chroot=False, scan=False)
for repo_name in updated_repos:
assert repo_name in local_repos.repos
local_repos.repos[repo_name].scan()
package_bar.close(clear=True)
return files
@@ -817,39 +760,20 @@ _qemu_enabled: dict[Arch, bool] = {arch: False for arch in ARCHES}
def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = None, lazy: bool = True, native_chroot: Optional[BuildChroot] = None):
"""
Build and enable qemu-user-static, binfmt and crossdirect
Specify lazy=False to force building the packages.
"""
if arch not in ARCHES:
raise Exception(f'Unknown binfmt architecture "{arch}". Choices: {", ".join(ARCHES)}')
if _qemu_enabled[arch] or (lazy and binfmt_is_registered(arch)):
if not _qemu_enabled[arch]:
logging.info(f"qemu binfmt for {arch} was already enabled!")
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
logging.info('Installing qemu-user (building if necessary)')
if lazy and _qemu_enabled[arch] and binfmt_is_registered(arch):
_qemu_enabled[arch] = True
return
native = config.runtime.arch
assert native
if arch == native:
_qemu_enabled[arch] = True
logging.warning("Not enabling binfmt for host architecture!")
return
logging.info('Installing qemu-user (building if necessary)')
check_programs_wrap(['pacman', 'makepkg', 'pacstrap'])
# build qemu-user, binfmt, crossdirect
packages = list(CROSSDIRECT_PKGS)
hostspec = GCC_HOSTSPECS[arch][arch]
cross_gcc = f"{hostspec}-gcc"
if repo:
for pkg in repo.values():
if (pkg.name == cross_gcc or cross_gcc in pkg.provides):
if config.runtime.arch not in pkg.arches:
logging.debug(f"Package {pkg.path} matches {cross_gcc=} name but not arch: {pkg.arches=}")
continue
packages.append(pkg.path)
logging.debug(f"Adding gcc package {pkg.path} to the necessary crosscompilation tools")
break
build_packages_by_paths(
packages,
CROSSDIRECT_PKGS,
native,
repo=repo,
try_download=True,
@@ -870,6 +794,6 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
assert p.startswith(hostdir)
_files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/')))
pkgfiles = _files
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles, stderr=sys.stdout)
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles)
binfmt_register(arch, chroot=native_chroot)
_qemu_enabled[arch] = True

View File

@@ -6,18 +6,18 @@ import os
from glob import glob
from typing import Iterable, Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import Arch, ARCHES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL
from kupferbootstrap.exec.cmd import run_cmd, shell_quote, CompletedProcess
from kupferbootstrap.exec.file import get_temp_dir, makedir, remove_file
from kupferbootstrap.devices.device import get_profile_device
from kupferbootstrap.distro.distro import get_kupfer_local, get_kupfer_url, get_kupfer_repo_names
from kupferbootstrap.distro.package import LocalPackage
from kupferbootstrap.net.ssh import run_ssh_command, scp_put_files
from kupferbootstrap.utils import download_file, git, sha256sum
from kupferbootstrap.wrapper import check_programs_wrap, enforce_wrap
from config.state import config
from constants import Arch, ARCHES, REPOSITORIES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL
from exec.cmd import run_cmd, shell_quote, CompletedProcess
from exec.file import get_temp_dir, makedir, remove_file
from devices.device import get_profile_device
from distro.distro import get_kupfer_local, get_kupfer_url
from distro.package import LocalPackage
from net.ssh import run_ssh_command, scp_put_files
from utils import download_file, git, sha256sum
from wrapper import check_programs_wrap, enforce_wrap
from .build import build_packages_by_paths, init_prebuilts
from .build import build_packages_by_paths
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, get_pkgbuild_dirs, init_pkgbuilds
SRCINFO_CACHE_FILES = [SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE]
@@ -158,7 +158,6 @@ def cmd_update(
discard_changes: bool = False,
):
"""Update PKGBUILDs git repo"""
enforce_wrap()
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=True, switch_branch=switch_branch, discard_changes=discard_changes)
if init_caches:
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
@@ -185,8 +184,6 @@ def cmd_init(
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=update, switch_branch=switch_branch, discard_changes=discard_changes)
if init_caches:
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
for arch in ARCHES:
init_prebuilts(arch)
@cmd_packages.command(name='build')
@@ -227,17 +224,15 @@ def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bo
logging.fatal("No packages matched")
return
scp_put_files(files, '/tmp').check_returncode()
run_ssh_command(
[
'sudo',
'pacman',
'-U',
*[os.path.join('/tmp', os.path.basename(file)) for file in files],
'--noconfirm',
"'--overwrite=\\*'",
],
alloc_tty=True,
).check_returncode()
run_ssh_command([
'sudo',
'pacman',
'-U',
] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [
'--noconfirm',
"'--overwrite=\\*'",
],
alloc_tty=True).check_returncode()
CLEAN_LOCATIONS = ['src', 'pkg', *SRCINFO_CACHE_FILES]
@@ -272,7 +267,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
[
'clean',
'-dffX' + ('n' if noop else ''),
] + get_kupfer_repo_names(local=True),
] + REPOSITORIES,
dir=pkgbuilds,
)
if result.returncode != 0:
@@ -304,7 +299,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
@cmd_packages.command(name='list')
def cmd_list():
"List information about available source packages (PKGBUILDs)"
pkgdir = os.path.join(config.get_path('pkgbuilds'), get_kupfer_repo_names(local=False)[0])
pkgdir = os.path.join(config.get_path('pkgbuilds'), REPOSITORIES[0])
if not os.path.exists(pkgdir):
raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n"
f"Try running `kupferbootstrap packages init` first!")
@@ -313,7 +308,7 @@ def cmd_list():
logging.info(f'Done! {len(packages)} Pkgbuilds:')
for name in sorted(packages.keys()):
p = packages[name]
print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; crossdirect: {p.crossdirect} provides: {p.provides}; replaces: {p.replaces};'
print(f'name: {p.name}; ver: {p.version}; provides: {p.provides}; replaces: {p.replaces};'
f'local_depends: {p.local_depends}; depends: {p.depends}')
@@ -346,7 +341,6 @@ def cmd_check(paths):
mode_key = '_mode'
nodeps_key = '_nodeps'
crossdirect_key = '_crossdirect'
pkgbase_key = 'pkgbase'
pkgname_key = 'pkgname'
arches_key = '_arches'
@@ -357,7 +351,6 @@ def cmd_check(paths):
required = {
mode_key: True,
nodeps_key: False,
crossdirect_key: False,
pkgbase_key: False,
pkgname_key: True,
'pkgdesc': False,
@@ -379,7 +372,6 @@ def cmd_check(paths):
commit_key: is_git_package,
source_key: False,
sha256sums_key: False,
'noextract': False,
}
pkgbuild_path = os.path.join(config.get_path('pkgbuilds'), package.path, 'PKGBUILD')
with open(pkgbuild_path, 'r') as file:
@@ -447,7 +439,7 @@ def cmd_check(paths):
formatted = False
reason = 'Found literal " although no special character was found in the line to justify the usage of a literal "'
if "'" in line and '"' not in line:
if "'" in line and not '"' in line:
formatted = False
reason = 'Found literal \' although either a literal " or no qoutes should be used'

View File

@@ -6,17 +6,16 @@ import multiprocessing
import os
from joblib import Parallel, delayed
from typing import Iterable, Optional
from typing import Iterable, Optional, TypeAlias
from kupferbootstrap.config.state import config, ConfigStateHolder
from kupferbootstrap.constants import Arch
from kupferbootstrap.distro.distro import get_kupfer_repo_names
from kupferbootstrap.distro.package import PackageInfo
from kupferbootstrap.exec.file import remove_file
from kupferbootstrap.logger import setup_logging
from kupferbootstrap.utils import git, git_get_branch
from kupferbootstrap.wrapper import check_programs_wrap
from kupferbootstrap.typehelpers import TypeAlias
from config.state import config, ConfigStateHolder
from constants import REPOSITORIES
from constants import Arch
from distro.package import PackageInfo
from exec.file import remove_file
from logger import setup_logging
from utils import git, git_get_branch
from wrapper import check_programs_wrap
from .srcinfo_cache import SrcinfoMetaFile
@@ -156,7 +155,6 @@ class Pkgbuild(PackageInfo):
repo: str
mode: str
nodeps: bool
crossdirect: bool
path: str
pkgver: str
pkgrel: str
@@ -191,7 +189,6 @@ class Pkgbuild(PackageInfo):
self.repo = repo or ''
self.mode = ''
self.nodeps = False
self.crossdirect = True
self.path = relative_path
self.pkgver = ''
self.pkgrel = ''
@@ -203,8 +200,8 @@ class Pkgbuild(PackageInfo):
return ','.join([
'Pkgbuild(' + self.name,
repr(self.path),
str(self.version) + ("🔄" if self.sources_refreshed else ""),
repr(self.mode) + ')',
self.version + ("🔄" if self.sources_refreshed else ""),
self.mode + ')',
])
def names(self) -> list[str]:
@@ -225,7 +222,6 @@ class Pkgbuild(PackageInfo):
self.repo = pkg.repo
self.mode = pkg.mode
self.nodeps = pkg.nodeps
self.crossdirect = pkg.crossdirect
self.path = pkg.path
self.pkgver = pkg.pkgver
self.pkgrel = pkg.pkgrel
@@ -313,11 +309,8 @@ class SubPkgbuild(Pkgbuild):
self.sources_refreshed = False
self.update(pkgbase)
# set to None - will be replaced with base_pkg if still None after parsing
self.depends = None # type: ignore[assignment]
self.makedepends = None # type: ignore[assignment]
self.provides = None # type: ignore[assignment]
self.replaces = None # type: ignore[assignment]
self.provides = {}
self.replaces = []
def refresh_sources(self, lazy: bool = True):
assert self.pkgbase
@@ -339,7 +332,7 @@ def parse_pkgbuild(
global config
if _config:
config = _config
setup_logging(verbose=config.runtime.verbose, force_colors=config.runtime.colors, log_setup=False) # different subprocess needs log setup.
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup.
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
if force_refresh_srcinfo:
@@ -360,11 +353,7 @@ def parse_pkgbuild(
else:
raise Exception(msg)
# if _crossdirect is unset (None), it defaults to True
crossdirect_enabled = srcinfo_cache.build_crossdirect in (None, True)
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed, srcinfo_cache=srcinfo_cache)
base_package.crossdirect = crossdirect_enabled
base_package.mode = mode
base_package.nodeps = nodeps
base_package.repo = relative_pkg_dir.split('/')[0]
@@ -393,21 +382,13 @@ def parse_pkgbuild(
elif line.startswith('arch'):
current.arches.append(splits[1])
elif line.startswith('provides'):
if not current.provides:
current.provides = {}
current.provides = get_version_specs(splits[1], current.provides)
elif line.startswith('replaces'):
if not current.replaces:
current.replaces = []
current.replaces.append(splits[1])
elif splits[0] in ['depends', 'makedepends', 'checkdepends', 'optdepends']:
spec = splits[1].split(': ', 1)[0]
if not current.depends:
current.depends = (base_package.makedepends or {}).copy()
current.depends = get_version_specs(spec, current.depends)
if splits[0] == 'makedepends':
if not current.makedepends:
current.makedepends = {}
current.makedepends = get_version_specs(spec, current.makedepends)
results: list[Pkgbuild] = list(base_package.subpackages)
@@ -420,15 +401,6 @@ def parse_pkgbuild(
pkg.update_version()
if not (pkg.version == base_package.version):
raise Exception(f'Subpackage malformed! Versions differ! base: {base_package}, subpackage: {pkg}')
if isinstance(pkg, SubPkgbuild):
if pkg.depends is None:
pkg.depends = base_package.depends
if pkg.makedepends is None:
pkg.makedepends = base_package.makedepends
if pkg.replaces is None:
pkg.replaces = base_package.replaces
if pkg.provides is None:
pkg.provides = base_package.provides
return results
@@ -467,13 +439,8 @@ def get_pkgbuild_dirs(quiet: bool = True, repositories: Optional[list[str]] = No
"""Gets the relative paths to directories containing PKGBUILDs, optionally warns about dirs without a PKGBUILD"""
pkgbuilds_dir = config.get_path('pkgbuilds')
paths = []
for repo in repositories or get_kupfer_repo_names(local=True):
path = os.path.join(pkgbuilds_dir, repo)
if not os.path.exists(path):
if not quiet:
logging.warning(f'repo "{repo}" can\'t be listed: "{path}" doesn\'t exist; skipping')
continue
for dir in os.listdir(path):
for repo in repositories or REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
p = os.path.join(repo, dir)
if not os.path.exists(os.path.join(pkgbuilds_dir, p, 'PKGBUILD')):
if not quiet:

View File

@@ -7,25 +7,23 @@ import subprocess
from typing import Any, ClassVar, Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE, SRCINFO_INITIALISED_FILE
from kupferbootstrap.dictscheme import DictScheme
from kupferbootstrap.exec.cmd import run_cmd
from kupferbootstrap.utils import sha256sum
from config.state import config
from constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE, SRCINFO_INITIALISED_FILE
from dataclass import DataClass
from exec.cmd import run_cmd
from utils import sha256sum
SRCINFO_CHECKSUM_FILES = ['PKGBUILD', SRCINFO_FILE]
class JsonFile(DictScheme):
class JsonFile(DataClass):
_filename: ClassVar[str]
_relative_path: str
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = False
def toJSON(self) -> str:
'Returns a json representation, with private keys that start with "_" filtered out'
return json.dumps(self.toDict(), indent=2)
return json.dumps({key: val for key, val in self.toDict().items() if not key.startswith('_')}, indent=2)
def write(self):
'Write the filtered json representation to disk'
@@ -68,19 +66,11 @@ class SrcInitialisedFile(JsonFile):
raise ex
srcinfo_meta_defaults = {
'build_mode': None,
"build_nodeps": None,
"build_crossdirect": None,
}
class SrcinfoMetaFile(JsonFile):
checksums: dict[str, str]
build_mode: Optional[str]
build_nodeps: Optional[bool]
build_crossdirect: Optional[bool]
_changed: bool
_filename: ClassVar[str] = SRCINFO_METADATA_FILE
@@ -100,8 +90,9 @@ class SrcinfoMetaFile(JsonFile):
s = SrcinfoMetaFile({
'_relative_path': relative_pkg_dir,
'_changed': True,
'build_mode': '',
'build_nodeps': None,
'checksums': {},
**srcinfo_meta_defaults,
})
return s, s.refresh_all()
@@ -127,11 +118,9 @@ class SrcinfoMetaFile(JsonFile):
if not force_refresh:
logging.debug(f'{metadata._relative_path}: srcinfo checksums match!')
lines = lines or metadata.read_srcinfo_file()
for build_field in srcinfo_meta_defaults.keys():
for build_field in ['build_mode', 'build_nodeps']:
if build_field not in metadata:
metadata.refresh_build_fields()
if write:
metadata.write()
break
else:
lines = metadata.refresh_all(write=write)
@@ -152,7 +141,8 @@ class SrcinfoMetaFile(JsonFile):
self._changed = True
def refresh_build_fields(self):
self.update(srcinfo_meta_defaults)
self['build_mode'] = None
self['build_nodeps'] = None
with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file:
lines = file.read().split('\n')
for line in lines:
@@ -164,8 +154,6 @@ class SrcinfoMetaFile(JsonFile):
self.build_mode = val
elif key == '_nodeps':
self.build_nodeps = val.lower() == 'true'
elif key == '_crossdirect':
self.build_crossdirect = val.lower() == 'true'
else:
continue

View File

@@ -1,29 +0,0 @@
[project]
name = "kupferbootstrap"
dependencies = [
"click>=8.0.1",
"appdirs>=1.4.4",
"joblib>=1.0.1",
"toml",
"typing_extensions",
"coloredlogs",
"munch",
"requests",
"python-dateutil",
"enlighten",
"PyYAML",
]
dynamic = ["version"]
[project.scripts]
kupferbootstrap = "kupferbootstrap.main:main"
[tool.setuptools.package-data]
"*" = ["version.txt"]
[build-system]
requires = [ "setuptools>=41", "wheel", "setuptools-git-versioning<2", ]
build-backend = "setuptools.build_meta"
[tool.setuptools-git-versioning]
enabled = true

View File

@@ -1,4 +1,4 @@
#!/bin/bash
sudo -v
python -m pytest -v --cov=. --cov-branch --cov-report=term "$@" src/kupferbootstrap
python -m pytest -v --cov=. --cov-branch --cov-report=term "$@" ./*/test_*.py

View File

@@ -1 +1,10 @@
-e .
click>=8.0.1
appdirs>=1.4.4
joblib>=1.0.1
toml
typing_extensions
coloredlogs
munch
setuptools # required by munch
requests
python-dateutil

View File

@@ -1,44 +0,0 @@
import click
import os
from typing import Optional
from kupferbootstrap.constants import Arch, ARCHES
from .binfmt import binfmt_unregister, binfmt_is_registered
cmd_binfmt = click.Group('binfmt', help='Manage qemu binfmt for executing foreign architecture binaries')
arches_arg = click.argument('arches', type=click.Choice(ARCHES), nargs=-1, required=True)
arches_arg_optional = click.argument('arches', type=click.Choice(ARCHES), nargs=-1, required=False)
@cmd_binfmt.command('register', help='Register a binfmt handler with the kernel')
@arches_arg
def cmd_register(arches: list[Arch], disable_chroot: bool = False):
from ..packages.build import build_enable_qemu_binfmt
for arch in arches:
build_enable_qemu_binfmt(arch)
@cmd_binfmt.command('unregister', help='Unregister a binfmt handler from the kernel')
@arches_arg_optional
def cmd_unregister(arches: Optional[list[Arch]]):
for arch in arches or ARCHES:
binfmt_unregister(arch)
@cmd_binfmt.command('status', help='Get the status of a binfmt handler from the kernel')
@arches_arg_optional
def cmd_status(arches: Optional[list[Arch]]):
for arch in arches or ARCHES:
native = arch == os.uname().machine
active = binfmt_is_registered(arch)
if native and not active:
# boooring
continue
verb = click.style(
"is" if active else "is NOT",
fg='green' if (active ^ native) else 'red',
bold=True,
)
click.echo(f'Binfmt for {arch} {verb} set up! {"(host architecture!)" if native else ""}')

View File

@@ -1,80 +0,0 @@
import click
import logging
from json import dumps as json_dump
from typing import Optional
from kupferbootstrap.config.state import config
from kupferbootstrap.config.cli import resolve_profile_field
from kupferbootstrap.utils import color_mark_selected, colors_supported
from .device import get_devices, get_device
@click.command(name='devices')
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
@click.option(
'--force-parse-deviceinfo/--no-parse-deviceinfo',
is_flag=True,
default=None,
help="Force or disable deviceinfo parsing. The default is to try but continue if it fails.",
)
@click.option(
'--download-packages/--no-download-packages',
is_flag=True,
default=False,
help='Download packages while trying to parse deviceinfo',
)
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
def cmd_devices(
json: bool = False,
force_parse_deviceinfo: Optional[bool] = True,
download_packages: bool = False,
output_file: Optional[str] = None,
):
'list the available devices and descriptions'
devices = get_devices()
if not devices:
raise Exception("No devices found!")
profile_device = None
profile_name = config.file.profiles.current
selected, inherited_from = None, None
try:
selected, inherited_from = resolve_profile_field(None, profile_name, 'device', config.file.profiles)
if selected:
profile_device = get_device(selected)
except Exception as ex:
logging.debug(f"Failed to get profile device for marking as currently selected, continuing anyway. Exception: {ex}")
output = ['']
json_output = {}
interactive_json = json and not output_file
if output_file:
json = True
use_colors = colors_supported(False if interactive_json else config.runtime.colors)
for name in sorted(devices.keys()):
device = devices[name]
assert device
if force_parse_deviceinfo in [None, True]:
try:
device.parse_deviceinfo(try_download=download_packages)
except Exception as ex:
if not force_parse_deviceinfo:
logging.debug(f"Failed to parse deviceinfo for extended description, not a problem: {ex}")
else:
raise ex
if json:
json_output[name] = device.get_summary().toDict()
if interactive_json:
continue
snippet = device.nice_str(colors=use_colors, newlines=True)
if profile_device and profile_device.name == device.name:
snippet = color_mark_selected(snippet, profile_name or '[unknown]', inherited_from)
output.append(f"{snippet}\n")
if interactive_json:
output = ['\n' + json_dump(json_output, indent=4)]
if output_file:
with open(output_file, 'w') as fd:
fd.write(json_dump(json_output))
for line in output:
print(line)

View File

@@ -1,299 +0,0 @@
from __future__ import annotations
import logging
import toml
from munch import Munch
from toml.encoder import TomlEncoder, TomlPreserveInlineDictEncoder
from typing import ClassVar, Generator, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
from .typehelpers import UnionType, NoneType
def resolve_type_hint(hint: type, ignore_origins: list[type] = []) -> Iterable[type]:
origin = get_origin(hint)
args: Iterable[type] = get_args(hint)
if origin in ignore_origins:
return [hint]
if origin is Optional:
args = set(list(args) + [NoneType])
if origin in [Union, UnionType, Optional]:
results: list[type] = []
for arg in args:
results += resolve_type_hint(arg, ignore_origins=ignore_origins)
return results
return [origin or hint]
def flatten_hints(hints: Any) -> Generator[Any, None, None]:
if not isinstance(hints, (list, tuple)):
yield hints
return
for i in hints:
yield from flatten_hints(i)
def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
for hint in flatten_hints(hints):
t_origin = get_origin(hint)
t_args = get_args(hint)
if t_origin == dict:
yield t_args
continue
if t_origin in [NoneType, Optional, Union, UnionType] and t_args:
yield from resolve_dict_hints(t_args)
continue
class DictScheme(Munch):
_type_hints: ClassVar[dict[str, Any]]
_strip_hidden: ClassVar[bool] = False
_sparse: ClassVar[bool] = False
def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs):
self.update(dict(d) | kwargs, validate=validate)
@classmethod
def transform(
cls,
values: Mapping[str, Any],
*,
validate: bool = True,
allow_extra: bool = False,
type_hints: Optional[dict[str, Any]] = None,
) -> Any:
results: dict[str, Any] = {}
values = dict(values)
for key in list(values.keys()):
value = values.pop(key)
type_hints = cls._type_hints if type_hints is None else type_hints
if key in type_hints:
_classes = tuple[type](resolve_type_hint(type_hints[key]))
optional = bool(set([NoneType, None]).intersection(_classes))
if optional and value is None:
results[key] = None
continue
if issubclass(_classes[0], dict):
assert isinstance(value, dict) or (optional and value is None), f'{key=} is not dict: {value!r}, {_classes=}'
target_class = _classes[0]
if target_class in [None, NoneType, Optional]:
for target in _classes[1:]:
if target not in [None, NoneType, Optional]:
target_class = target
break
if target_class is dict:
dict_hints = list(resolve_dict_hints(type_hints[key]))
if len(dict_hints) != 1:
msg = f"transform(): Received wrong amount of type hints for key {key}: {len(dict_hints)}"
if validate:
raise Exception(msg)
logging.warning(msg)
if len(dict_hints) == 1 and value is not None:
if len(dict_hints[0]) != 2 or not all(dict_hints[0]):
logging.debug(f"Weird dict hints received: {dict_hints}")
continue
key_type, value_type = dict_hints[0]
if not isinstance(value, Mapping):
msg = f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}"
if validate:
raise Exception(msg)
logging.warning(msg)
results[key] = value
continue
if isinstance(key_type, type):
if issubclass(key_type, str):
target_class = Munch
else:
msg = f"{key=} subdict got wrong key type hint (expected str): {key_type}"
if validate:
raise Exception(msg)
logging.warning(msg)
if validate:
for k in value:
if not isinstance(k, tuple(flatten_hints(key_type))):
raise Exception(f'Subdict "{key}": wrong type for subkey "{k}": got: {type(k)}, expected: {key_type}')
dict_content_hints = {k: value_type for k in value}
value = cls.transform(value, validate=validate, allow_extra=allow_extra, type_hints=dict_content_hints)
if not isinstance(value, target_class):
if not (optional and value is None):
assert issubclass(target_class, Munch)
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
kwargs = {'validate': validate} if issubclass(target_class, DictScheme) else {}
value = target_class(value, **kwargs) # type:ignore[attr-defined]
else:
# print(f"nothing to do: '{key}' was already {target_class})
pass
# handle numerics
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
parsed_number = None
parsers: list[tuple[type, list]] = [(int, [10]), (int, [0]), (float, [])]
for _cls, args in parsers:
if _cls not in _classes:
continue
try:
parsed_number = _cls(value, *args)
break
except ValueError:
continue
if parsed_number is None:
if validate:
raise Exception(f"Couldn't parse string value {repr(value)} for key '{key}' into number formats: " +
(', '.join(list(c.__name__ for c in _classes))))
else:
value = parsed_number
if validate:
if not isinstance(value, _classes):
raise Exception(f'key "{key}" has value of wrong type! expected: '
f'{" ,".join([ c.__name__ for c in _classes])}; '
f'got: {type(value).__name__}; value: {value}')
elif validate and not allow_extra:
logging.debug(f"{cls}: unknown key '{key}': {value}")
raise Exception(f'{cls}: Unknown key "{key}"')
else:
if isinstance(value, dict) and not isinstance(value, Munch):
value = Munch.fromDict(value)
results[key] = value
if values:
if validate:
raise Exception(f'values contained unknown keys: {list(values.keys())}')
results |= values
return results
@classmethod
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
return cls(d=values, validate=validate)
def toDict(
self,
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
):
return self.strip_dict(
self,
strip_hidden=strip_hidden,
sparse=sparse,
recursive=True,
)
@classmethod
def strip_dict(
cls,
d: dict[Any, Any],
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
recursive: bool = True,
hints: Optional[dict[str, Any]] = None,
validate: bool = True,
) -> dict[Any, Any]:
# preserve original None-type args
_sparse = cls._sparse if sparse is None else sparse
_strip_hidden = cls._strip_hidden if strip_hidden is None else strip_hidden
hints = cls._type_hints if hints is None else hints
result = dict(d)
if not (_strip_hidden or _sparse or result):
return result
for k, v in d.items():
type_hint = resolve_type_hint(hints.get(k, "abc"))
if not isinstance(k, str):
msg = f"strip_dict(): unknown key type {k=}: {type(k)=}"
if validate:
raise Exception(msg)
logging.warning(f"{msg} (skipping)")
continue
if _strip_hidden and k.startswith('_'):
result.pop(k)
continue
if v is None:
if NoneType not in type_hint:
msg = f'encountered illegal null value at key "{k}" for typehint {type_hint}'
if validate:
raise Exception(msg)
logging.warning(msg)
if _sparse:
result.pop(k)
continue
if recursive and isinstance(v, dict):
if not v:
result[k] = {}
continue
if isinstance(v, DictScheme):
# pass None in sparse and strip_hidden
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse)
continue
if isinstance(v, Munch):
result[k] = v.toDict()
if k not in hints:
continue
_subhints = {}
_hints = resolve_type_hint(hints[k], [dict])
hints_flat = list(flatten_hints(_hints))
subclass = DictScheme
for hint in hints_flat:
if get_origin(hint) == dict:
_valtype = get_args(hint)[1]
_subhints = {n: _valtype for n in v.keys()}
break
if isinstance(hint, type) and issubclass(hint, DictScheme):
subclass = hint
_subhints = hint._type_hints
break
else:
# print(f"ignoring {hint=}")
continue
result[k] = subclass.strip_dict(
v,
hints=_subhints,
sparse=_sparse,
strip_hidden=_strip_hidden,
recursive=recursive,
)
return result
def update(self, d: Mapping[str, Any], validate: bool = True):
Munch.update(self, type(self).transform(d, validate=validate))
def __init_subclass__(cls):
super().__init_subclass__()
cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar}
def __repr__(self):
return f'{type(self)}{dict.__repr__(dict(self))}'
def toYAML(
self,
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
**yaml_args,
) -> str:
import yaml
yaml_args = {'sort_keys': False} | yaml_args
dumped = yaml.dump(
self.toDict(strip_hidden=strip_hidden, sparse=sparse),
**yaml_args,
)
if dumped is None:
raise Exception(f"Failed to yaml-serialse {self}")
return dumped
def toToml(
self,
strip_hidden: Optional[bool] = None,
sparse: Optional[bool] = None,
encoder: Optional[TomlEncoder] = TomlPreserveInlineDictEncoder(),
) -> str:
return toml.dumps(
self.toDict(strip_hidden=strip_hidden, sparse=sparse),
encoder=encoder,
)
class TomlInlineDict(dict, toml.decoder.InlineTableDict):
pass
def toml_inline_dicts(value: Any) -> Any:
if not isinstance(value, Mapping):
return value
return TomlInlineDict({k: toml_inline_dicts(v) for k, v in value.items()})

View File

@@ -1,247 +0,0 @@
import logging
from enum import IntFlag
from typing import Generic, Mapping, Optional, TypeVar
from kupferbootstrap.constants import Arch, ARCHES, REPOSITORIES, KUPFER_BRANCH_MARKER, KUPFER_HTTPS, CHROOT_PATHS
from kupferbootstrap.generator import generate_pacman_conf_body
from kupferbootstrap.config.state import config
from .repo import BinaryPackageType, RepoInfo, Repo, LocalRepo, RemoteRepo
from .repo_config import AbstrRepoConfig, BaseDistro, ReposConfigFile, REPOS_CONFIG_DEFAULT, get_repo_config as _get_repo_config
class DistroLocation(IntFlag):
REMOTE = 0
LOCAL = 1
CHROOT = 3
RepoType = TypeVar('RepoType', bound=Repo)
class Distro(Generic[RepoType]):
repos: Mapping[str, RepoType]
arch: str
def __init__(self, arch: Arch, repo_infos: dict[str, RepoInfo], scan=False):
assert (arch in ARCHES)
self.arch = arch
self.repos = dict[str, RepoType]()
for repo_name, repo_info in repo_infos.items():
self.repos[repo_name] = self._create_repo(
name=repo_name,
arch=arch,
url_template=repo_info.url_template,
options=repo_info.options,
scan=scan,
)
def _create_repo(self, **kwargs) -> RepoType:
raise NotImplementedError()
Repo(**kwargs)
def get_packages(self) -> dict[str, BinaryPackageType]:
""" get packages from all repos, semantically overlaying them"""
results = dict[str, BinaryPackageType]()
for repo in list(self.repos.values())[::-1]:
assert repo.packages is not None
results.update(repo.packages)
return results
def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str:
extras: list[Repo] = [
Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()
]
return '\n\n'.join(repo.config_snippet() for repo in (extras + list(self.repos.values())))
def get_pacman_conf(self, extra_repos: Mapping[str, RepoInfo] = {}, check_space: bool = True, in_chroot: bool = True):
body = generate_pacman_conf_body(self.arch, check_space=check_space)
return body + self.repos_config_snippet(extra_repos)
def scan(self, lazy=True):
for repo in self.repos.values():
if not (lazy and repo.scanned):
repo.scan()
def is_scanned(self):
for repo in self.repos.values():
if not repo.scanned:
return False
return True
class LocalDistro(Distro[LocalRepo]):
def _create_repo(self, **kwargs) -> LocalRepo:
return LocalRepo(**kwargs)
class RemoteDistro(Distro[RemoteRepo]):
def _create_repo(self, **kwargs) -> RemoteRepo:
return RemoteRepo(**kwargs)
def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
remote = not url_template.startswith('file://')
clss = RemoteDistro if remote else LocalDistro
distro = clss(
arch=arch,
repo_infos=repos,
scan=scan,
)
assert isinstance(distro, (LocalDistro, RemoteDistro))
if remote:
assert isinstance(distro, RemoteDistro)
for repo in distro.repos.values():
repo.cache_repo_db = True
return distro
_kupfer_https: dict[Arch, RemoteDistro] = {}
_kupfer_local: dict[Arch, LocalDistro] = {}
_kupfer_local_chroots: dict[Arch, LocalDistro] = {}
def reset_distro_caches():
global _kupfer_https, _kupfer_local, _kupfer_local_chroots
for cache in _kupfer_https, _kupfer_local, _kupfer_local_chroots:
assert isinstance(cache, dict)
cache.clear()
def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str:
"""gets the repo URL for `branch`, getting branch from config if `None` is passed."""
branch = config.file.pacman.repo_branch if branch is None else branch
return url.replace(KUPFER_BRANCH_MARKER, branch)
def get_repo_config(*args, **kwargs) -> ReposConfigFile:
repo_config, changed = _get_repo_config(*args, **kwargs)
if changed:
logging.debug("Repo configs changed, resetting caches")
reset_distro_caches()
return repo_config
def get_kupfer_repo_names(local) -> list[str]:
configs = get_repo_config()
results = []
for repo, repo_config in configs.repos.items():
if not local and repo_config.local_only:
continue
results.append(repo)
return results
def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo:
url = repo_config.remote_url or default_url
if isinstance(url, dict):
if arch not in url and not default_url:
raise Exception(f"Invalid repo config: Architecture {arch} not in remote_url mapping: {url}")
url = url.get(arch, default_url)
assert url
return RepoInfo(
url_template=get_kupfer_url(url),
options=repo_config.get('options', None) or {},
)
def get_base_distro(arch: Arch, scan: bool = False, unsigned: bool = True, cache_db: bool = True) -> RemoteDistro:
base_distros = get_repo_config().base_distros
if base_distros is None or arch not in base_distros:
base_distros = REPOS_CONFIG_DEFAULT.base_distros
assert base_distros
distro_config: BaseDistro
distro_config = base_distros.get(arch) # type: ignore[assignment]
repos = {}
for repo, repo_config in distro_config.repos.items():
if unsigned:
repo_config['options'] = (repo_config.get('options', None) or {}) | {'SigLevel': 'Never'}
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro_config.remote_url)
distro = RemoteDistro(arch=arch, repo_infos=repos, scan=False)
if cache_db:
for r in distro.repos.values():
assert isinstance(r, RemoteRepo)
r.cache_repo_db = True
if scan:
distro.scan()
return distro
def get_kupfer_distro(
arch: Arch,
location: DistroLocation,
scan: bool = False,
cache_db: bool = True,
) -> Distro:
global _kupfer_https, _kupfer_local, _kupfer_local_chroots
cls: type[Distro]
cache: Mapping[str, Distro]
repo_config = get_repo_config()
remote = False
if location == DistroLocation.REMOTE:
remote = True
cache = _kupfer_https
default_url = repo_config.remote_url or KUPFER_HTTPS
repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only}
cls = RemoteDistro
elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]:
if location == DistroLocation.CHROOT:
cache = _kupfer_local_chroots
pkgdir = CHROOT_PATHS['packages']
else:
assert location == DistroLocation.LOCAL
cache = _kupfer_local
pkgdir = config.get_path('packages')
default_url = f"file://{pkgdir}/$arch/$repo"
cls = LocalDistro
repos = {}
for name, repo in repo_config.repos.items():
repo = repo.copy()
repo.remote_url = default_url
repos[name] = get_RepoInfo(arch, repo, default_url)
else:
raise Exception(f"Unknown distro location {location}")
if cache is None:
cache = {}
assert arch
assert isinstance(cache, dict)
if arch not in cache or not cache[arch]:
distro = cls(
arch=arch,
repo_infos=repos,
scan=False,
)
assert isinstance(distro, (LocalDistro, RemoteDistro))
cache[arch] = distro
if remote and cache_db:
assert isinstance(distro, RemoteDistro)
for r in distro.repos.values():
r.cache_repo_db = True
if scan:
distro.scan()
return distro
item: Distro = cache[arch]
if scan and not item.is_scanned():
item.scan()
return item
def get_kupfer_https(arch: Arch, scan: bool = False, cache_db: bool = True) -> RemoteDistro:
d = get_kupfer_distro(arch, location=DistroLocation.REMOTE, scan=scan, cache_db=cache_db)
assert isinstance(d, RemoteDistro)
return d
def get_kupfer_local(arch: Optional[Arch] = None, scan: bool = False, in_chroot: bool = True) -> LocalDistro:
arch = arch or config.runtime.arch
assert arch
location = DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL
d = get_kupfer_distro(arch, location=location, scan=scan)
assert isinstance(d, LocalDistro)
return d

View File

@@ -1,170 +0,0 @@
from __future__ import annotations
import logging
import os
import toml
import yaml
from copy import deepcopy
from typing import ClassVar, Optional, Mapping, Union
from ..config.state import config
from ..constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES
from ..dictscheme import DictScheme, toml_inline_dicts, TomlPreserveInlineDictEncoder
from ..utils import sha256sum
REPOS_KEY = 'repos'
REMOTEURL_KEY = 'remote_url'
LOCALONLY_KEY = 'local_only'
OPTIONS_KEY = 'options'
BASEDISTROS_KEY = 'base_distros'
_current_config: Optional[ReposConfigFile]
class AbstrRepoConfig(DictScheme):
options: Optional[dict[str, str]]
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = True
class BaseDistroRepo(AbstrRepoConfig):
remote_url: Optional[str]
class RepoConfig(AbstrRepoConfig):
remote_url: Optional[Union[str, dict[Arch, str]]]
local_only: Optional[bool]
class BaseDistro(DictScheme):
remote_url: Optional[str]
repos: dict[str, BaseDistroRepo]
class ReposConfigFile(DictScheme):
remote_url: Optional[str]
repos: dict[str, RepoConfig]
base_distros: dict[Arch, BaseDistro]
_path: Optional[str]
_checksum: Optional[str]
_strip_hidden: ClassVar[bool] = True
_sparse: ClassVar[bool] = True
def __init__(self, d, **kwargs):
super().__init__(d=d, **kwargs)
self[REPOS_KEY] = self.get(REPOS_KEY, {})
for repo_cls, defaults, repos, remote_url in [
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY), d.get(REMOTEURL_KEY, None)),
*[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos, _distro.get(REMOTEURL_KEY, None)) for _distro in self.base_distros.values()],
]:
if repos is None:
continue
for name, repo in repos.items():
_repo = dict(defaults | (repo or {})) # type: ignore[operator]
if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None):
_repo[REMOTEURL_KEY] = remote_url
repos[name] = repo_cls(_repo, **kwargs)
@staticmethod
def parse_config(path: str) -> ReposConfigFile:
try:
with open(path, 'r') as fd:
data = yaml.safe_load(fd)
data['_path'] = path
data['_checksum'] = sha256sum(path)
return ReposConfigFile(data, validate=True)
except Exception as ex:
logging.error(f'Error parsing repos config at "{path}":\n{ex}')
raise ex
def toToml(self, strip_hidden=None, sparse=None, encoder=TomlPreserveInlineDictEncoder()):
d = self.toDict(strip_hidden=strip_hidden, sparse=sparse)
for key in [REPOS_KEY]:
if key not in d or not isinstance(d[key], Mapping):
continue
inline = {name: {k: toml_inline_dicts(v) for k, v in value.items()} for name, value in d[key].items()}
logging.info(f"Inlined {key}: {inline}")
d[key] = inline
return toml.dumps(d, encoder=encoder)
REPO_DEFAULTS = {
LOCALONLY_KEY: None,
REMOTEURL_KEY: None,
OPTIONS_KEY: {
'SigLevel': 'Never'
},
}
BASE_DISTRO_DEFAULTS = {
REMOTEURL_KEY: None,
OPTIONS_KEY: None,
}
REPOS_CONFIG_DEFAULT = ReposConfigFile({
'_path': '__DEFAULTS__',
'_checksum': None,
REMOTEURL_KEY: KUPFER_HTTPS,
REPOS_KEY: {
'kupfer_local': REPO_DEFAULTS | {
LOCALONLY_KEY: True
},
**{
r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES
},
},
BASEDISTROS_KEY: {
arch: {
REMOTEURL_KEY: None,
'repos': {
k: {
'remote_url': v
} for k, v in arch_def['repos'].items()
},
} for arch, arch_def in BASE_DISTROS.items()
},
})
_current_config = None
def get_repo_config(
initialize_pkgbuilds: bool = False,
repo_config_file: Optional[str] = None,
) -> tuple[ReposConfigFile, bool]:
global _current_config
repo_config_file_default = os.path.join(config.get_path('pkgbuilds'), REPOS_CONFIG_FILE)
if repo_config_file is None:
repo_config_file_path = repo_config_file_default
else:
repo_config_file_path = repo_config_file
config_exists = os.path.exists(repo_config_file_path)
if not config_exists and _current_config is None:
if initialize_pkgbuilds:
from ..packages.pkgbuild import init_pkgbuilds
init_pkgbuilds(update=False)
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file)
if repo_config_file is not None:
raise Exception(f"Requested repo config {repo_config_file} doesn't exist")
logging.warning(f"{repo_config_file_path} doesn't exist, using built-in repo config defaults")
_current_config = deepcopy(REPOS_CONFIG_DEFAULT)
return _current_config, False
changed = False
if (not _current_config) or (config_exists and _current_config._checksum != sha256sum(repo_config_file_path)):
if config_exists:
conf = ReposConfigFile.parse_config(repo_config_file_path)
else:
conf = REPOS_CONFIG_DEFAULT
changed = conf != (_current_config or {})
if changed:
_current_config = deepcopy(conf)
else:
logging.debug("Repo config: Cache hit!")
assert _current_config
return _current_config, changed
def get_repos(**kwargs) -> list[RepoConfig]:
config, _ = get_repo_config(**kwargs)
return list(config.repos.values())

View File

@@ -1,71 +0,0 @@
import click
import logging
from json import dumps as json_dump
from typing import Optional
from kupferbootstrap.config.cli import resolve_profile_field
from kupferbootstrap.config.state import config
from kupferbootstrap.utils import color_mark_selected, colors_supported
from .flavour import get_flavours, get_flavour
profile_option = click.option('-p', '--profile', help="name of the profile to use", required=False, default=None)
@click.command(name='flavours')
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
'list information about available flavours'
results = []
json_results = {}
profile_flavour = None
flavours = get_flavours()
interactive_json = json and not output_file
use_colors = colors_supported(config.runtime.colors) and not interactive_json
profile_name = config.file.profiles.current
selected, inherited_from = None, None
if output_file:
json = True
if not flavours:
raise Exception("No flavours found!")
if not interactive_json:
try:
selected, inherited_from = resolve_profile_field(None, profile_name, 'flavour', config.file.profiles)
if selected:
profile_flavour = get_flavour(selected)
except Exception as ex:
logging.debug(f"Failed to get profile flavour for marking as currently selected, continuing anyway. Exception: {ex}")
for name in sorted(flavours.keys()):
f = flavours[name]
try:
f.parse_flavourinfo()
except Exception as ex:
logging.debug(f"A problem happened while parsing flavourinfo for {name}, continuing anyway. Exception: {ex}")
if not interactive_json:
snippet = f.nice_str(newlines=True, colors=use_colors)
if profile_flavour == f:
snippet = color_mark_selected(snippet, profile_name or '[unknown]', inherited_from)
snippet += '\n'
results += snippet.split('\n')
if json:
d = dict(f)
d["description"] = f.flavour_info.description if (f.flavour_info and f.flavour_info.description) else f.description
if "flavour_info" in d and d["flavour_info"]:
for k in set(d["flavour_info"].keys()) - set(['description']):
d[k] = d["flavour_info"][k]
del d["flavour_info"]
d["pkgbuild"] = f.pkgbuild.path if f.pkgbuild else None
d["package"] = f.pkgbuild.name
d["arches"] = sorted(f.pkgbuild.arches) if f.pkgbuild else None
json_results[name] = d
print()
if output_file:
with open(output_file, 'w') as fd:
fd.write(json_dump(json_results))
if interactive_json:
print(json_dump(json_results, indent=4))
else:
for r in results:
print(r)

View File

@@ -1,65 +0,0 @@
import click
import logging
from kupferbootstrap.exec.cmd import run_cmd, CompletedProcess
from typing import Optional
def confirm_cmd(cmd: list[str], color='green', default=True, msg='Really execute fastboot cmd?') -> bool:
return click.confirm(
f'{click.style(msg, fg=color, bold=True)} {" ".join(cmd)}',
default=default,
abort=False,
)
def fastboot_erase(target: str, confirm: bool = False):
if not target:
raise Exception(f"No fastboot erase target specified: {repr(target)}")
cmd = [
'fastboot',
'erase',
target,
]
if confirm:
if not confirm_cmd(cmd, msg=f'Really erase fastboot "{target}" partition?', color='yellow'):
raise Exception("user aborted")
logging.info(f"Fastboot: Erasing {target}")
run_cmd(
cmd,
capture_output=True,
)
def fastboot_flash(partition: str, file: str, sparse_size: Optional[str] = None, confirm: bool = False):
cmd = [
'fastboot',
*(['-S', sparse_size] if sparse_size is not None else []),
'flash',
partition,
file,
]
if confirm:
if not confirm_cmd(cmd):
raise Exception("user aborted")
logging.info(f"Fastboot: Flashing {file} to {partition}")
result = run_cmd(cmd)
assert isinstance(result, CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to flash {file}')
def fastboot_boot(file, confirm: bool = False):
cmd = [
'fastboot',
'boot',
file,
]
if confirm:
if not confirm_cmd(cmd):
raise Exception("user aborted")
logging.info(f"Fastboot: booting {file}")
result = run_cmd(cmd)
assert isinstance(result, CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to boot {file} using fastboot')

View File

@@ -1,150 +0,0 @@
import shutil
import os
import click
import logging
from typing import Optional
from kupferbootstrap.constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE
from kupferbootstrap.exec.cmd import run_root_cmd
from kupferbootstrap.exec.file import get_temp_dir
from kupferbootstrap.devices.device import get_profile_device
from kupferbootstrap.flavours.flavour import get_profile_flavour
from kupferbootstrap.flavours.cli import profile_option
from kupferbootstrap.wrapper import enforce_wrap
from .fastboot import fastboot_flash
from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs
ABOOT = FLASH_PARTS['ABOOT']
LK2ND = FLASH_PARTS['LK2ND']
QHYPSTUB = FLASH_PARTS['QHYPSTUB']
FULL_IMG = FLASH_PARTS['FULL']
DD = 'dd'
FLASH_METHODS = [FASTBOOT, JUMPDRIVE, DD]
def find_jumpdrive(location: str) -> str:
if location not in LOCATIONS:
raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}')
dir = '/dev/disk/by-id'
for file in os.listdir(dir):
sanitized_file = file.replace('-', '').replace('_', '').lower()
if f'jumpdrive{location.split("-")[0]}' in sanitized_file:
return os.path.realpath(os.path.join(dir, file))
raise Exception('Unable to discover Jumpdrive')
def test_blockdev(path: str):
partprobe(path)
result = run_root_cmd(['lsblk', path, '-o', 'SIZE'], capture_output=True)
if result.returncode != 0:
raise Exception(f'Failed to lsblk {path}')
if result.stdout == b'SIZE\n 0B\n':
raise Exception(f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no'
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
def prepare_minimal_image(source_path: str, sector_size: int) -> str:
minimal_image_dir = get_temp_dir(register_cleanup=True)
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}')
logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking")
shutil.copyfile(source_path, minimal_image_path)
loop_device = losetup_rootfs_image(minimal_image_path, sector_size)
partprobe(loop_device)
shrink_fs(loop_device, minimal_image_path, sector_size)
losetup_destroy(loop_device)
return minimal_image_path
@click.command(name='flash')
@profile_option
@click.option('-m', '--method', type=click.Choice(FLASH_METHODS))
@click.option('--split-size', help='Chunk size when splitting the image into sparse files via fastboot')
@click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size")
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
@click.argument('location', type=str, required=False)
def cmd_flash(
what: str,
location: str,
method: Optional[str] = None,
split_size: Optional[str] = None,
profile: Optional[str] = None,
shrink: bool = True,
sector_size: Optional[int] = None,
confirm: bool = False,
):
"""
Flash a partition onto a device.
The syntax of LOCATION depends on the flashing method and is usually only required for flashing "full":
\b
- fastboot: the regular fastboot partition identifier. Usually "userdata"
- dd: a path to a block device
- jumpdrive: one of "emmc", "sdcard" or a path to a block device
"""
enforce_wrap()
device = get_profile_device(profile)
flavour = get_profile_flavour(profile).name
device_image_path = get_image_path(device, flavour)
deviceinfo = device.parse_deviceinfo()
sector_size = sector_size or device.get_image_sectorsize_default()
method = method or deviceinfo.flash_method
if what not in FLASH_PARTS.values():
raise Exception(f'Unknown what "{what}", must be one of {", ".join(FLASH_PARTS.values())}')
if location and location.startswith('aboot'):
raise Exception("You're trying to flash something "
f"to your aboot partition ({location!r}), "
"which contains the android bootloader itself.\n"
"This will brick your phone and is not what you want.\n"
'Aborting.\nDid you mean to flash to "boot"?')
if what == FULL_IMG:
path = ''
if method not in FLASH_METHODS:
raise Exception(f"Flash method {method} not supported!")
if not location:
raise Exception(f'You need to specify a location to flash {what} to')
path = ''
image_path = prepare_minimal_image(device_image_path, sector_size) if shrink else device_image_path
if method == FASTBOOT:
fastboot_flash(
partition=location,
file=image_path,
sparse_size=split_size if split_size is not None else '100M',
confirm=confirm,
)
elif method in [JUMPDRIVE, DD]:
if method == DD or location.startswith("/") or (location not in LOCATIONS and os.path.exists(location)):
path = location
elif method == JUMPDRIVE:
path = find_jumpdrive(location)
test_blockdev(path)
if dd_image(input=image_path, output=path).returncode != 0:
raise Exception(f'Failed to flash {image_path} to {path}')
else:
raise Exception(f'Unhandled flash method "{method}" for "{what}"')
else:
if method and method != FASTBOOT:
raise Exception(f'Flashing "{what}" with method "{method}" not supported, try no parameter or "{FASTBOOT}"')
loop_device = losetup_rootfs_image(device_image_path, sector_size)
if what == ABOOT:
path = dump_aboot(f'{loop_device}p1')
fastboot_flash(location or 'boot', path, confirm=confirm)
elif what == LK2ND:
path = dump_lk2nd(f'{loop_device}p1')
fastboot_flash(location or 'lk2nd', path, confirm=confirm)
elif what == QHYPSTUB:
path = dump_qhypstub(f'{loop_device}p1')
fastboot_flash(location or 'qhypstub', path, confirm=confirm)
else:
raise Exception(f'Unknown what "{what}", this must be a bug in kupferbootstrap!')

View File

@@ -1,49 +0,0 @@
import click
import coloredlogs
import logging
import sys
from typing import Optional
def setup_logging(verbose: bool, quiet: bool = False, force_colors: Optional[bool] = None, log_setup: bool = True):
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
level = logging.DEBUG if verbose and not quiet else (logging.INFO if not quiet else logging.ERROR)
coloredlogs.install(
stream=sys.stdout,
fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=level,
level_styles=level_colors,
field_styles=field_colors,
isatty=force_colors,
)
# don't raise Exceptions when e.g. output stream is closed
logging.raiseExceptions = False
if log_setup:
logging.debug('Logger: Logging set up.')
if force_colors is not None:
logging.debug(f'Logger: Force-{"en" if force_colors else "dis"}abled colors')
verbose_option = click.option(
'-v',
'--verbose',
is_flag=True,
help='Enables verbose logging',
)
quiet_option = click.option(
'-q',
'--quiet',
is_flag=True,
help='Disable most logging, only log errors. (Currently only affects KBS logging, not called subprograms)',
)
color_option = click.option(
'--force-colors/--no-colors',
is_flag=True,
default=None,
help='Force enable/disable log coloring. Defaults to autodetection.',
)

View File

@@ -1,52 +0,0 @@
import click
import sys
from enlighten import Counter, Manager, get_manager as _getmanager
from typing import Hashable, Optional
from .config.state import config
BAR_PADDING = 25
DEFAULT_OUTPUT = sys.stderr
managers: dict[Hashable, Manager] = {}
progress_bars_option = click.option(
'--force-progress-bars/--no-progress-bars',
is_flag=True,
default=None,
help='Force enable/disable progress bars. Defaults to autodetection.',
)
def get_manager(file=DEFAULT_OUTPUT, enabled: Optional[bool] = None) -> Manager:
global managers
m = managers.get(file, None)
if not m:
kwargs = {}
if enabled is None or config.runtime.progress_bars is False:
enabled = config.runtime.progress_bars
if enabled is not None:
kwargs = {"enabled": enabled}
m = _getmanager(file, **kwargs)
managers[file] = m
return m
def get_progress_bar(*kargs, file=DEFAULT_OUTPUT, leave=False, **kwargs) -> Counter:
m = get_manager(file=file)
kwargs["file"] = file
kwargs["leave"] = leave
return m.counter(*kargs, **kwargs)
def get_levels_bar(*kargs, file=DEFAULT_OUTPUT, enable_rate=True, **kwargs):
kwargs["fields"] = {"name": "None", "level": 1, "levels_total": 1} | (kwargs.get("fields", None) or {})
f = (u'{desc}: {name}{desc_pad}{percentage:3.0f}%|{bar}| '
u'{count:{len_total}d}/{total:d} '
u'[lvl: {level}/{levels_total}] ')
if enable_rate:
f += u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
kwargs["bar_format"] = f
return get_progress_bar(*kargs, **kwargs)

View File

@@ -1,18 +0,0 @@
from typing import Union
try:
from typing import TypeAlias # type: ignore[attr-defined]
except ImportError:
from typing_extensions import TypeAlias
TypeAlias = TypeAlias
try:
from types import UnionType
except ImportError:
UnionType: TypeAlias = Union # type: ignore[no-redef]
try:
from types import NoneType
except ImportError:
NoneType: TypeAlias = type(None) # type: ignore[no-redef]

View File

@@ -1,159 +0,0 @@
import logging
import os
import pathlib
import subprocess
import sys
from kupferbootstrap.config.state import config
from kupferbootstrap.exec.file import makedir
from .wrapper import Wrapper, WRAPPER_PATHS
VERSION_FILE = "docker_version.txt"
DOCKER_FILE = "Dockerfile"
DOCKER_PATHS = WRAPPER_PATHS.copy()
def docker_volumes_args(volume_mappings: dict[str, str]) -> list[str]:
result = []
for source, destination in volume_mappings.items():
result += ['-v', f'{source}:{destination}:z']
return result
class DockerWrapper(Wrapper):
type: str = 'docker'
def wrap(self):
super().wrap()
script_path = config.runtime.script_source_dir
assert script_path
docker_path = script_path
tried = [docker_path]
if not os.path.exists(os.path.join(docker_path, DOCKER_FILE)):
docker_path = os.path.realpath(os.path.join(script_path, "../.."))
tried.append(docker_path)
if not os.path.exists(os.path.join(docker_path, DOCKER_FILE)):
_par_dir = os.path.dirname(script_path)
# handle venv
if os.path.basename(_par_dir) == "site-packages":
_path = os.path.join(_par_dir, "../../../..")
docker_path = os.path.realpath(_path)
tried.append(f"{_path} => {docker_path}")
logging.debug(f"{DOCKER_FILE!r} not found at {script_path!r}, trying {docker_path!r}")
version_file = os.path.join(script_path, '../..', VERSION_FILE)
if not os.path.exists(version_file):
_vfile = os.path.join(docker_path, VERSION_FILE)
logging.warning(f"{VERSION_FILE} not found at {version_file!r}."
f"\nTrying {_vfile!r}"
"\nDid you use `pip install .` instead of `pip install -e .`?")
if os.path.exists(_vfile):
version_file = _vfile
if os.path.exists(version_file):
with open(version_file) as fd:
version = fd.read().replace('\n', '').strip()
logging.debug(f"Read docker tag {version} from {version_file}")
else:
version = "BUILD"
logging.error(f"'{script_path}/{VERSION_FILE}' doesn't exist, defaulting docker tag to {version}!"
"\nThis installation is potentially broken!"
"\nDid you use `pip install .` instead of `pip install -e .` to install kupferboostrap?"
f"Tried locations: {[version_file, _vfile]}")
tag = f'registry.gitlab.com/kupfer/kupferbootstrap:{version}'
if version == 'BUILD':
logging.info(f'Building docker image "{tag}"')
cmd = [
'docker',
'build',
'.',
'-t',
tag,
] + (['-q'] if not config.runtime.verbose else [])
_dfile = os.path.join(docker_path, DOCKER_FILE)
if not os.path.exists(_dfile):
_sep = "\n -"
raise Exception(f'{DOCKER_FILE!r} not found. Tried locations:' + (_sep.join(["", *[repr(f"{p}/{DOCKER_FILE}") for p in tried]])))
logging.debug(f'Running docker cmd (chdir={script_path!r}) : ' + ' '.join(cmd))
mute_docker = not config.runtime.verbose
result = subprocess.run(
cmd,
cwd=docker_path,
capture_output=mute_docker,
)
if result.returncode != 0:
error_msg = ('\n' + result.stderr.decode() + '\n') if mute_docker else ''
logging.fatal(f'Docker error: {error_msg}Failed to build docker image: see errors above: ^^^^')
exit(1)
else:
# Check if the image for the version already exists
result = subprocess.run(
[
'docker',
'images',
'-q',
tag,
],
capture_output=True,
)
if result.stdout == b'':
logging.info(f'Pulling kupferbootstrap docker image version \'{version}\'')
subprocess.run([
'docker',
'pull',
tag,
])
container_name = f'kupferbootstrap-{self.uuid}'
wrapped_config = self.generate_wrapper_config()
target_user = 'root' if config.runtime.uid == 0 else 'kupfer'
target_home = '/root' if target_user == 'root' else f'/home/{target_user}'
ssh_dir = os.path.join(pathlib.Path.home(), '.ssh')
if not os.path.exists(ssh_dir):
os.makedirs(ssh_dir, mode=0o700)
volumes = self.get_bind_mounts_default(wrapped_config, ssh_dir=ssh_dir, target_home=target_home)
for vol_name, vol_dest in DOCKER_PATHS.items():
vol_src = config.get_path(vol_name)
makedir(vol_src)
volumes[vol_src] = vol_dest
docker_cmd = [
'docker',
'run',
'--name',
container_name,
'--rm',
'--interactive',
'--tty',
'--privileged',
] + docker_volumes_args(volumes) + [tag]
kupfer_cmd = [
'kupferbootstrap',
'--config',
volumes[wrapped_config],
]
kupfer_cmd += self.argv_override or self.filter_args_wrapper(sys.argv[1:])
if config.runtime.uid:
kupfer_cmd = ['wrapper_su_helper', '--uid', str(config.runtime.uid), '--username', 'kupfer', '--'] + kupfer_cmd
cmd = docker_cmd + kupfer_cmd
logging.debug('Wrapping in docker:' + repr(cmd))
result = subprocess.run(cmd)
if self.should_exit:
exit(result.returncode)
return result.returncode
def stop(self):
subprocess.run(
[
'docker',
'kill',
self.identifier,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
wrapper = DockerWrapper()

Some files were not shown because too many files have changed in this diff Show More