Compare commits
82 commits
prawn/dyna
...
dev
Author | SHA1 | Date | |
---|---|---|---|
|
a28550825f | ||
|
a176fad05a | ||
|
a4cfc3c3e5 | ||
|
cebac83186 | ||
|
f05de7738a | ||
|
b006cd8f4d | ||
|
4b2150940d | ||
|
eaac9195ea | ||
|
c074fbe42c | ||
|
a75f32b4b1 | ||
|
4cce7e57ae | ||
|
95147cecea | ||
|
ff8a529690 | ||
|
2e504b7b00 | ||
|
a0c4036390 | ||
|
4c5fe2cb1c | ||
|
6bcd132b53 | ||
|
c70b52e5c1 | ||
|
60b38d895c | ||
|
7425356f10 | ||
|
bfce7c466d | ||
|
16f351a41c | ||
|
8376725652 | ||
|
5b2f36c74d | ||
|
0951865868 | ||
|
e6f4a68c6b | ||
|
1374e2be74 | ||
|
9bd2bd46a9 | ||
|
8b0ca115a7 | ||
|
fc690eca8a | ||
|
eb2b0a6c75 | ||
|
fd4495dd58 | ||
|
933b7c42ef | ||
|
c86ce577d1 | ||
|
46507f8dbe | ||
|
0d866c6287 | ||
|
3c9b96f03f | ||
|
407d8893a3 | ||
|
379e951526 | ||
|
ad80b3e889 | ||
|
efe4bf085d | ||
|
de76641fa1 | ||
|
edcad72f7a | ||
|
33e1214aef | ||
|
4ba5f87f1e | ||
|
3ac8fc0689 | ||
|
6648a77822 | ||
|
69b7ea9db2 | ||
|
8a266f9149 | ||
|
604f123067 | ||
|
08285a7931 | ||
|
68154467f3 | ||
|
dbc512ee3f | ||
|
7945a4756f | ||
|
fd2abd3805 | ||
|
44eaf0d767 | ||
|
74a7aeb668 | ||
|
acee95a003 | ||
|
b84d2202db | ||
|
c357b0a968 | ||
|
67590fe12b | ||
|
cd1d0543fe | ||
|
6961cb7f36 | ||
|
eb13a7d093 | ||
|
1a695adff4 | ||
|
d3cc5e9483 | ||
|
cfd65f9638 | ||
|
61b1444360 | ||
|
4115d6ba00 | ||
|
0353693025 | ||
|
f6fb521c8a | ||
|
f113faa201 | ||
|
91d2cd3681 | ||
|
b9969d8feb | ||
|
389d44e776 | ||
|
d2d9cb6c7c | ||
|
ec0e430c00 | ||
|
954592fc62 | ||
|
e07306d5c4 | ||
|
dfd191060a | ||
|
13aa258794 | ||
|
572142bf0b |
51 changed files with 1200 additions and 465 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
*.kate-swp
|
||||
venv/
|
||||
__pycache__/
|
||||
.coverage*
|
||||
|
|
|
@ -7,7 +7,7 @@ format:
|
|||
stage: check
|
||||
image: python
|
||||
before_script:
|
||||
- pip install yapf autoflake
|
||||
- pip install yapf autoflake --break-system-packages
|
||||
script:
|
||||
- ./format.sh --check
|
||||
|
||||
|
@ -15,7 +15,7 @@ typecheck:
|
|||
stage: check
|
||||
image: python
|
||||
before_script:
|
||||
- pip install mypy
|
||||
- pip install mypy --break-system-packages
|
||||
script:
|
||||
- ./typecheck.sh --non-interactive --junit-xml mypy-report.xml
|
||||
artifacts:
|
||||
|
@ -27,7 +27,7 @@ pytest:
|
|||
image: archlinux
|
||||
before_script:
|
||||
- pacman -Sy --noconfirm --needed archlinux-keyring && pacman -Su --noconfirm python python-pip sudo git base-devel arch-install-scripts rsync
|
||||
- pip install -r test_requirements.txt -r requirements.txt
|
||||
- pip install -r test_requirements.txt -r requirements.txt --break-system-packages
|
||||
- 'echo "kupfer ALL = (ALL) NOPASSWD: ALL" > /etc/sudoers.d/kupfer_all'
|
||||
- useradd -m kupfer
|
||||
- chmod 777 .
|
||||
|
@ -44,7 +44,9 @@ pytest:
|
|||
build_docker:
|
||||
stage: build
|
||||
image: docker:latest
|
||||
services: ['docker:dind']
|
||||
services:
|
||||
- name: docker:dind
|
||||
command: ["--mtu=1100"] # very low, safe value -.-
|
||||
variables:
|
||||
DOCKER_TLS_CERTDIR: ""
|
||||
script:
|
||||
|
@ -75,7 +77,7 @@ push_docker:
|
|||
DOCS_MAKE_TARGET: "html"
|
||||
DOCS_MAKE_THREADS: 6
|
||||
before_script: &docs_before_script
|
||||
- pip install -r requirements.txt -r docs/requirements.txt
|
||||
- pip install -r requirements.txt -r docs/requirements.txt --break-system-packages
|
||||
script: &docs_script
|
||||
- make -C docs -j$DOCS_MAKE_THREADS SPHINXARGS="$DOCS_SPHINXARGS" $DOCS_MAKE_TARGET
|
||||
- mv "docs/$DOCS_MAKE_TARGET" public
|
||||
|
|
|
@ -21,7 +21,8 @@ ENV PATH=/app/bin:/app/local/bin:$PATH
|
|||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip install -r requirements.txt
|
||||
# TODO: pip packaging so we don't need --break-system-packages
|
||||
RUN pip install -r requirements.txt --break-system-packages
|
||||
|
||||
COPY . .
|
||||
|
||||
|
|
0
binfmt/__init__.py
Normal file
0
binfmt/__init__.py
Normal file
|
@ -7,7 +7,7 @@ from typing import Optional
|
|||
|
||||
from chroot.abstract import Chroot
|
||||
from constants import Arch, QEMU_ARCHES
|
||||
from exec.cmd import run_root_cmd
|
||||
from exec.cmd import run_root_cmd, CompletedProcess
|
||||
from utils import mount
|
||||
|
||||
|
||||
|
@ -66,25 +66,23 @@ def binfmt_ensure_mounted(chroot: Optional[Chroot] = None):
|
|||
binfmt_path = '/proc/sys/fs/binfmt_misc'
|
||||
register_path = binfmt_path + '/register'
|
||||
if chroot:
|
||||
binfmt_path = chroot.get_path(binfmt_path)
|
||||
register_path = chroot.get_path(register_path)
|
||||
chroot.activate()
|
||||
if not os.path.exists(register_path):
|
||||
logging.info('mounting binfmt_misc')
|
||||
result = mount('binfmt_misc', binfmt_path, options=[], fs_type='binfmt_misc')
|
||||
if result.returncode != 0:
|
||||
result = (chroot.mount if chroot else mount)('binfmt_misc', binfmt_path, options=[], fs_type='binfmt_misc') # type: ignore[operator]
|
||||
if (isinstance(result, CompletedProcess) and result.returncode != 0) or not result:
|
||||
raise Exception(f'Failed mounting binfmt_misc to {binfmt_path}')
|
||||
|
||||
|
||||
def register(arch: Arch, chroot: Optional[Chroot] = None):
|
||||
def binfmt_register(arch: Arch, chroot: Optional[Chroot] = None):
|
||||
binfmt_path = '/proc/sys/fs/binfmt_misc'
|
||||
register_path = binfmt_path + '/register'
|
||||
is_arch_known(arch, True, 'register')
|
||||
qemu_arch = QEMU_ARCHES[arch]
|
||||
if binfmt_is_registered(arch):
|
||||
if binfmt_is_registered(arch, chroot=chroot):
|
||||
return
|
||||
|
||||
lines = binfmt_info()
|
||||
lines = binfmt_info(chroot=chroot)
|
||||
|
||||
_runcmd = run_root_cmd
|
||||
if chroot:
|
||||
|
@ -99,15 +97,19 @@ def register(arch: Arch, chroot: Optional[Chroot] = None):
|
|||
info = lines[qemu_arch]
|
||||
code = info['line']
|
||||
|
||||
if arch == os.uname().machine:
|
||||
logging.fatal("Attempted to register qemu binfmt for host architecture, skipping!")
|
||||
return
|
||||
|
||||
# Register in binfmt_misc
|
||||
logging.info(f"Registering qemu binfmt ({arch})")
|
||||
_runcmd(f'echo "{code}" > "{register_path}" 2>/dev/null') # use path without chroot path prefix
|
||||
if not binfmt_is_registered(arch):
|
||||
if not binfmt_is_registered(arch, chroot=chroot):
|
||||
logging.debug(f'binfmt line: {code}')
|
||||
raise Exception(f'Failed to register qemu-user for {arch} with binfmt_misc, {binfmt_path}/{info["name"]} not found')
|
||||
|
||||
|
||||
def unregister(arch, chroot: Optional[Chroot] = None):
|
||||
def binfmt_unregister(arch, chroot: Optional[Chroot] = None):
|
||||
is_arch_known(arch, True, 'unregister')
|
||||
qemu_arch = QEMU_ARCHES[arch]
|
||||
binfmt_ensure_mounted(chroot)
|
||||
|
@ -115,6 +117,9 @@ def unregister(arch, chroot: Optional[Chroot] = None):
|
|||
if chroot:
|
||||
binfmt_file = chroot.get_path(binfmt_file)
|
||||
if not os.path.exists(binfmt_file):
|
||||
logging.debug(f"qemu binfmt for {arch} not registered")
|
||||
return
|
||||
logging.info(f"Unregistering qemu binfmt ({arch})")
|
||||
run_root_cmd(f"echo -1 > {binfmt_file}")
|
||||
if binfmt_is_registered(arch, chroot=chroot):
|
||||
raise Exception(f'Failed to UNregister qemu-user for {arch} with binfmt_misc, {chroot=}')
|
44
binfmt/cli.py
Normal file
44
binfmt/cli.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
import click
|
||||
import os
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from constants import Arch, ARCHES
|
||||
|
||||
from .binfmt import binfmt_unregister, binfmt_is_registered
|
||||
|
||||
cmd_binfmt = click.Group('binfmt', help='Manage qemu binfmt for executing foreign architecture binaries')
|
||||
arches_arg = click.argument('arches', type=click.Choice(ARCHES), nargs=-1, required=True)
|
||||
arches_arg_optional = click.argument('arches', type=click.Choice(ARCHES), nargs=-1, required=False)
|
||||
|
||||
|
||||
@cmd_binfmt.command('register', help='Register a binfmt handler with the kernel')
|
||||
@arches_arg
|
||||
def cmd_register(arches: list[Arch], disable_chroot: bool = False):
|
||||
from packages.build import build_enable_qemu_binfmt
|
||||
for arch in arches:
|
||||
build_enable_qemu_binfmt(arch)
|
||||
|
||||
|
||||
@cmd_binfmt.command('unregister', help='Unregister a binfmt handler from the kernel')
|
||||
@arches_arg_optional
|
||||
def cmd_unregister(arches: Optional[list[Arch]]):
|
||||
for arch in arches or ARCHES:
|
||||
binfmt_unregister(arch)
|
||||
|
||||
|
||||
@cmd_binfmt.command('status', help='Get the status of a binfmt handler from the kernel')
|
||||
@arches_arg_optional
|
||||
def cmd_status(arches: Optional[list[Arch]]):
|
||||
for arch in arches or ARCHES:
|
||||
native = arch == os.uname().machine
|
||||
active = binfmt_is_registered(arch)
|
||||
if native and not active:
|
||||
# boooring
|
||||
continue
|
||||
verb = click.style(
|
||||
"is" if active else "is NOT",
|
||||
fg='green' if (active ^ native) else 'red',
|
||||
bold=True,
|
||||
)
|
||||
click.echo(f'Binfmt for {arch} {verb} set up! {"(host architecture!)" if native else ""}')
|
|
@ -82,6 +82,7 @@ class BuildChroot(Chroot):
|
|||
native_chroot.mount_pacman_cache()
|
||||
native_chroot.mount_packages()
|
||||
native_chroot.activate()
|
||||
logging.debug(f"Installing {CROSSDIRECT_PKGS=} + {gcc=}")
|
||||
results = dict(native_chroot.try_install_packages(
|
||||
CROSSDIRECT_PKGS + [gcc],
|
||||
refresh=True,
|
||||
|
@ -103,8 +104,8 @@ class BuildChroot(Chroot):
|
|||
target_include_dir = os.path.join(self.path, 'include')
|
||||
|
||||
for target, source in {cc_path: gcc, target_lib_dir: 'lib', target_include_dir: 'usr/include'}.items():
|
||||
if not os.path.exists(target):
|
||||
logging.debug(f'Symlinking {source} at {target}')
|
||||
if not (os.path.exists(target) or os.path.islink(target)):
|
||||
logging.debug(f'Symlinking {source=} at {target=}')
|
||||
symlink(source, target)
|
||||
ld_so = os.path.basename(glob(f"{os.path.join(native_chroot.path, 'usr', 'lib', 'ld-linux-')}*")[0])
|
||||
ld_so_target = os.path.join(target_lib_dir, ld_so)
|
||||
|
|
104
config/cli.py
104
config/cli.py
|
@ -1,15 +1,16 @@
|
|||
import click
|
||||
import logging
|
||||
import os
|
||||
|
||||
from copy import deepcopy
|
||||
from typing import Any, Iterable, Optional, Union
|
||||
from typing import Any, Callable, Iterable, Mapping, Optional, Union
|
||||
|
||||
from devices.device import get_devices
|
||||
from devices.device import get_devices, sanitize_device_name
|
||||
from flavours.flavour import get_flavours
|
||||
from utils import color_bold, colors_supported, color_mark_selected
|
||||
from wrapper import execute_without_exit
|
||||
|
||||
from .scheme import Profile
|
||||
from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS
|
||||
from .profile import PROFILE_EMPTY, PROFILE_DEFAULTS, resolve_profile_attr, SparseProfile
|
||||
from .state import config, CONFIG_DEFAULTS, CONFIG_SECTIONS, merge_configs
|
||||
|
||||
|
||||
|
@ -87,6 +88,7 @@ def prompt_profile(
|
|||
raise Exception("profile name 'current' not allowed")
|
||||
# don't use get_profile() here because we need the sparse profile
|
||||
if name in config.file.profiles:
|
||||
logging.debug(f"Merging with existing profile config for {name}")
|
||||
profile |= config.file.profiles[name]
|
||||
elif create:
|
||||
logging.info(f"Profile {name} doesn't exist yet, creating new profile.")
|
||||
|
@ -94,27 +96,26 @@ def prompt_profile(
|
|||
raise Exception(f'Unknown profile "{name}"')
|
||||
logging.info(f'Configuring profile "{name}"')
|
||||
changed = False
|
||||
if not (no_parse or os.path.exists(os.path.join(config.get_path('pkgbuilds'), 'device'))):
|
||||
logging.warning("PKGBUILDS NOT INITIALISED:\n"
|
||||
"Usually we'd present you with detailed lists of choices for devices and flavours in this dialogue,\n"
|
||||
"but your pkgbuilds.git seem to not have been cloned yet.\n\n"
|
||||
"You can:\n1. complete the dialogue with default values for now\n"
|
||||
"2. run `kupferbootstrap packages update` afterwards\n"
|
||||
f"3. then get back to this dialogue by running `kupferbootstrap config profile init {name}`\n\n"
|
||||
"You can also use `kupferbootstrap packages flavours` and `kupferbootstrap packages devices` to list them.")
|
||||
no_parse = True
|
||||
for key, current in profile.items():
|
||||
current = profile[key]
|
||||
text = f'{name}.{key}'
|
||||
text = f'profiles.{name}.{key}'
|
||||
if not no_parse and key in PARSEABLE_FIELDS:
|
||||
parse_prompt = None
|
||||
sanitize_func = None
|
||||
if key == 'device':
|
||||
parse_prompt = prompt_profile_device
|
||||
sanitize_func = sanitize_device_name
|
||||
elif key == 'flavour':
|
||||
parse_prompt = prompt_profile_flavour
|
||||
else:
|
||||
raise Exception(f'config: Unhandled parseable field {key}, this is a bug in kupferbootstrap.')
|
||||
result, _changed = parse_prompt(current, name) # type: ignore
|
||||
result, _changed = parse_prompt(
|
||||
current=current,
|
||||
profile_name=name,
|
||||
sparse_profiles=config.file.profiles,
|
||||
use_colors=config.runtime.colors,
|
||||
sanitize_func=sanitize_func,
|
||||
) # type: ignore
|
||||
else:
|
||||
result, _changed = prompt_config(text=text, default=current, field_type=type(PROFILE_DEFAULTS[key])) # type: ignore
|
||||
if _changed:
|
||||
|
@ -128,23 +129,51 @@ def prompt_choice(current: Optional[Any], key: str, choices: Iterable[Any], allo
|
|||
res, _ = prompt_config(text=key, default=current, field_type=click.Choice(choices), show_choices=show_choices)
|
||||
if allow_none and res == '':
|
||||
res = None
|
||||
return res, res == current
|
||||
return res, res != current
|
||||
|
||||
|
||||
def prompt_profile_device(current: Optional[str], profile_name: str) -> tuple[str, bool]:
|
||||
devices = get_devices()
|
||||
print(click.style("Pick your device!\nThese are the available devices:", bold=True))
|
||||
for dev in sorted(devices.keys()):
|
||||
print(f"{devices[dev]}\n")
|
||||
return prompt_choice(current, f'profiles.{profile_name}.device', devices.keys())
|
||||
def resolve_profile_field(current: Any, *kargs):
|
||||
try:
|
||||
return resolve_profile_attr(*kargs)
|
||||
except KeyError as err:
|
||||
logging.debug(err)
|
||||
return current, None
|
||||
|
||||
|
||||
def prompt_profile_flavour(current: Optional[str], profile_name: str) -> tuple[str, bool]:
|
||||
flavours = get_flavours()
|
||||
print(click.style("Pick your flavour!\nThese are the available flavours:", bold=True))
|
||||
for f in sorted(flavours.keys()):
|
||||
print(flavours[f])
|
||||
return prompt_choice(current, f'profiles.{profile_name}.flavour', flavours.keys())
|
||||
def prompt_wrappable(
|
||||
attr_name: str,
|
||||
native_cmd: Callable,
|
||||
cli_cmd: list[str],
|
||||
current: Optional[str],
|
||||
profile_name: str,
|
||||
sparse_profiles: Mapping[str, SparseProfile],
|
||||
sanitize_func: Optional[Callable[[str], str]] = None,
|
||||
use_colors: Optional[bool] = None,
|
||||
) -> tuple[str, bool]:
|
||||
use_colors = colors_supported(use_colors)
|
||||
|
||||
print(color_bold(f"Pick your {attr_name}!\nThese are the available choices:", use_colors=use_colors))
|
||||
items = execute_without_exit(native_cmd, cli_cmd)
|
||||
if items is None:
|
||||
logging.warning("(wrapper mode, input for this field will not be checked for correctness)")
|
||||
return prompt_config(text=f'profiles.{profile_name}.{attr_name}', default=current)
|
||||
selected, inherited_from = resolve_profile_field(current, profile_name, attr_name, sparse_profiles)
|
||||
if selected and sanitize_func:
|
||||
selected = sanitize_func(selected)
|
||||
for key in sorted(items.keys()):
|
||||
text = items[key].nice_str(newlines=True, colors=use_colors)
|
||||
if key == selected:
|
||||
text = color_mark_selected(text, profile_name, inherited_from)
|
||||
print(text + '\n')
|
||||
return prompt_choice(current, f'profiles.{profile_name}.{attr_name}', items.keys())
|
||||
|
||||
|
||||
def prompt_profile_device(*kargs, **kwargs) -> tuple[str, bool]:
|
||||
return prompt_wrappable('device', get_devices, ['devices'], *kargs, **kwargs)
|
||||
|
||||
|
||||
def prompt_profile_flavour(*kargs, **kwargs) -> tuple[str, bool]:
|
||||
return prompt_wrappable('flavour', get_flavours, ['flavours'], *kargs, **kwargs)
|
||||
|
||||
|
||||
def config_dot_name_get(name: str, config: dict[str, Any], prefix: str = '') -> Any:
|
||||
|
@ -176,7 +205,12 @@ def prompt_for_save(retry_ctx: Optional[click.Context] = None):
|
|||
If `retry_ctx` is passed, the context's command will be reexecuted with the same arguments if the user chooses to retry.
|
||||
False will still be returned as the retry is expected to either save, perform another retry or arbort.
|
||||
"""
|
||||
from wrapper import is_wrapped
|
||||
if click.confirm(f'Do you want to save your changes to {config.runtime.config_file}?', default=True):
|
||||
if is_wrapped():
|
||||
logging.warning("Writing to config file inside wrapper."
|
||||
"This is pointless and probably a bug."
|
||||
"Your host config file will not be modified.")
|
||||
return True
|
||||
if retry_ctx:
|
||||
if click.confirm('Retry? ("n" to quit without saving)', default=True):
|
||||
|
@ -201,6 +235,8 @@ noninteractive_flag = click.option('-N', '--non-interactive', is_flag=True)
|
|||
noop_flag = click.option('--noop', '-n', help="Don't write changes to file", is_flag=True)
|
||||
noparse_flag = click.option('--no-parse', help="Don't search PKGBUILDs for devices and flavours", is_flag=True)
|
||||
|
||||
CONFIG_MSG = ("Leave fields empty to leave them at their currently displayed value.")
|
||||
|
||||
|
||||
@cmd_config.command(name='init')
|
||||
@noninteractive_flag
|
||||
|
@ -224,6 +260,7 @@ def cmd_config_init(
|
|||
):
|
||||
"""Initialize the config file"""
|
||||
if not non_interactive:
|
||||
logging.info(CONFIG_MSG)
|
||||
results: dict[str, dict] = {}
|
||||
for section in sections:
|
||||
if section not in CONFIG_SECTIONS:
|
||||
|
@ -239,7 +276,14 @@ def cmd_config_init(
|
|||
results[section][key] = result
|
||||
|
||||
config.update(results)
|
||||
print("Main configuration complete")
|
||||
if not noop:
|
||||
if prompt_for_save(ctx):
|
||||
config.write()
|
||||
else:
|
||||
return
|
||||
if 'profiles' in sections:
|
||||
print("Configuring profiles")
|
||||
current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current
|
||||
new_current, _ = prompt_config('profiles.current', default=current_profile, field_type=str)
|
||||
profile, changed = prompt_profile(new_current, create=True, no_parse=no_parse)
|
||||
|
@ -266,6 +310,7 @@ def cmd_config_set(ctx, key_vals: list[str], non_interactive: bool = False, noop
|
|||
like `build.clean_mode=false` or alternatively just keys to get prompted if run interactively.
|
||||
"""
|
||||
config.enforce_config_loaded()
|
||||
logging.info(CONFIG_MSG)
|
||||
config_copy = deepcopy(config.file)
|
||||
for pair in key_vals:
|
||||
split_pair = pair.split('=')
|
||||
|
@ -323,6 +368,7 @@ def cmd_profile_init(ctx, name: Optional[str] = None, non_interactive: bool = Fa
|
|||
profile = deepcopy(PROFILE_EMPTY)
|
||||
if name == 'current':
|
||||
raise Exception("profile name 'current' not allowed")
|
||||
logging.info(CONFIG_MSG)
|
||||
name = name or config.file.profiles.current
|
||||
if name in config.file.profiles:
|
||||
profile |= config.file.profiles[name]
|
||||
|
@ -333,7 +379,9 @@ def cmd_profile_init(ctx, name: Optional[str] = None, non_interactive: bool = Fa
|
|||
config.update_profile(name, profile)
|
||||
if not noop:
|
||||
if not prompt_for_save(ctx):
|
||||
logging.info("Not saving.")
|
||||
return
|
||||
|
||||
config.write()
|
||||
else:
|
||||
logging.info(f'--noop passed, not writing to {config.runtime.config_file}!')
|
||||
|
|
|
@ -21,6 +21,10 @@ PROFILE_DEFAULTS = Profile.fromDict(PROFILE_DEFAULTS_DICT)
|
|||
PROFILE_EMPTY: Profile = {key: None for key in PROFILE_DEFAULTS.keys()} # type: ignore
|
||||
|
||||
|
||||
class ProfileNotFoundException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def resolve_profile(
|
||||
name: str,
|
||||
sparse_profiles: dict[str, SparseProfile],
|
||||
|
@ -85,3 +89,40 @@ def resolve_profile(
|
|||
|
||||
resolved[name] = Profile.fromDict(full)
|
||||
return resolved
|
||||
|
||||
|
||||
def resolve_profile_attr(
|
||||
profile_name: str,
|
||||
attr_name: str,
|
||||
profiles_sparse: dict[str, SparseProfile],
|
||||
) -> tuple[str, str]:
|
||||
"""
|
||||
This function tries to resolve a profile attribute recursively,
|
||||
and throws KeyError if the key is not found anywhere in the hierarchy.
|
||||
Throws a ProfileNotFoundException if the profile is not in profiles_sparse
|
||||
"""
|
||||
if profile_name not in profiles_sparse:
|
||||
raise ProfileNotFoundException(f"Unknown profile {profile_name}")
|
||||
profile: Profile = profiles_sparse[profile_name]
|
||||
if attr_name in profile:
|
||||
return profile[attr_name], profile_name
|
||||
|
||||
if 'parent' not in profile:
|
||||
raise KeyError(f'Profile attribute {attr_name} not found in {profile_name} and no parents')
|
||||
parent = profile
|
||||
parent_name = profile_name
|
||||
seen = []
|
||||
while True:
|
||||
if attr_name in parent:
|
||||
return parent[attr_name], parent_name
|
||||
|
||||
seen.append(parent_name)
|
||||
|
||||
if not parent.get('parent', None):
|
||||
raise KeyError(f'Profile attribute {attr_name} not found in inheritance chain, '
|
||||
f'we went down to {parent_name}.')
|
||||
parent_name = parent['parent']
|
||||
if parent_name in seen:
|
||||
raise RecursionError(f"Profile recursion loop: profile {profile_name} couldn't be resolved"
|
||||
f"because of a dependency loop:\n{' -> '.join([*seen, parent_name])}")
|
||||
parent = profiles_sparse[parent_name]
|
||||
|
|
|
@ -3,12 +3,11 @@ from __future__ import annotations
|
|||
from munch import Munch
|
||||
from typing import Any, Optional, Mapping, Union
|
||||
|
||||
from dataclass import DataClass, munchclass
|
||||
from dictscheme import DictScheme
|
||||
from constants import Arch
|
||||
|
||||
|
||||
@munchclass()
|
||||
class SparseProfile(DataClass):
|
||||
class SparseProfile(DictScheme):
|
||||
parent: Optional[str]
|
||||
device: Optional[str]
|
||||
flavour: Optional[str]
|
||||
|
@ -23,7 +22,6 @@ class SparseProfile(DataClass):
|
|||
return f'{type(self)}{dict.__repr__(self.toDict())}'
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Profile(SparseProfile):
|
||||
parent: Optional[str]
|
||||
device: str
|
||||
|
@ -36,13 +34,11 @@ class Profile(SparseProfile):
|
|||
size_extra_mb: Union[str, int]
|
||||
|
||||
|
||||
@munchclass()
|
||||
class WrapperSection(DataClass):
|
||||
class WrapperSection(DictScheme):
|
||||
type: str # NOTE: rename to 'wrapper_type' if this causes problems
|
||||
|
||||
|
||||
@munchclass()
|
||||
class BuildSection(DataClass):
|
||||
class BuildSection(DictScheme):
|
||||
ccache: bool
|
||||
clean_mode: bool
|
||||
crosscompile: bool
|
||||
|
@ -50,21 +46,18 @@ class BuildSection(DataClass):
|
|||
threads: int
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PkgbuildsSection(DataClass):
|
||||
class PkgbuildsSection(DictScheme):
|
||||
git_repo: str
|
||||
git_branch: str
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PacmanSection(DataClass):
|
||||
class PacmanSection(DictScheme):
|
||||
parallel_downloads: int
|
||||
check_space: bool
|
||||
repo_branch: str
|
||||
|
||||
|
||||
@munchclass()
|
||||
class PathsSection(DataClass):
|
||||
class PathsSection(DictScheme):
|
||||
cache_dir: str
|
||||
chroots: str
|
||||
pacman: str
|
||||
|
@ -76,7 +69,7 @@ class PathsSection(DataClass):
|
|||
rust: str
|
||||
|
||||
|
||||
class ProfilesSection(DataClass):
|
||||
class ProfilesSection(DictScheme):
|
||||
current: str
|
||||
default: SparseProfile
|
||||
|
||||
|
@ -101,8 +94,7 @@ class ProfilesSection(DataClass):
|
|||
return f'{type(self)}{dict.__repr__(self.toDict())}'
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Config(DataClass):
|
||||
class Config(DictScheme):
|
||||
wrapper: WrapperSection
|
||||
build: BuildSection
|
||||
pkgbuilds: PkgbuildsSection
|
||||
|
@ -138,8 +130,7 @@ class Config(DataClass):
|
|||
return Config(_vals, validate=validate)
|
||||
|
||||
|
||||
@munchclass()
|
||||
class RuntimeConfiguration(DataClass):
|
||||
class RuntimeConfiguration(DictScheme):
|
||||
verbose: bool
|
||||
no_wrap: bool
|
||||
error_shell: bool
|
||||
|
@ -151,7 +142,7 @@ class RuntimeConfiguration(DataClass):
|
|||
colors: Optional[bool]
|
||||
|
||||
|
||||
class ConfigLoadState(DataClass):
|
||||
class ConfigLoadState(DictScheme):
|
||||
load_finished: bool
|
||||
exception: Optional[Exception]
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ from typing import Mapping, Optional
|
|||
|
||||
from constants import DEFAULT_PACKAGE_BRANCH
|
||||
|
||||
from .scheme import Config, ConfigLoadState, DataClass, Profile, RuntimeConfiguration
|
||||
from .scheme import Config, ConfigLoadState, DictScheme, Profile, RuntimeConfiguration
|
||||
from .profile import PROFILE_DEFAULTS, PROFILE_DEFAULTS_DICT, resolve_profile
|
||||
|
||||
CONFIG_DIR = appdirs.user_config_dir('kupfer')
|
||||
|
@ -95,7 +95,7 @@ def merge_configs(conf_new: Mapping[str, dict], conf_base={}, warn_missing_defau
|
|||
continue
|
||||
logging.debug(f'Parsing config section "{outer_name}"')
|
||||
# check if outer_conf is a dict
|
||||
if not (isinstance(outer_conf, (dict, DataClass))):
|
||||
if not (isinstance(outer_conf, (dict, DictScheme))):
|
||||
parsed[outer_name] = outer_conf
|
||||
else:
|
||||
# init section
|
||||
|
@ -257,7 +257,7 @@ class ConfigStateHolder:
|
|||
profile = self.get_profile(profile_name)
|
||||
if field not in profile or not profile[field]:
|
||||
m = (f'Profile "{profile_name}" has no {field.upper()} configured.\n'
|
||||
f'Please run `kupferbootstrap config profile init {field}`{arch_hint}')
|
||||
f'Please run `kupferbootstrap config profile init {profile_name}`{arch_hint}')
|
||||
raise Exception(m)
|
||||
return profile
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ def test_config_save_modified(configstate_emptyfile: ConfigStateHolder):
|
|||
def get_config_scheme(data: dict[str, Any], validate=True, allow_incomplete=False) -> Config:
|
||||
"""
|
||||
helper func to ignore a false type error.
|
||||
for some reason, mypy argues about DataClass.fromDict() instead of Config.fromDict() here
|
||||
for some reason, mypy argues about DictScheme.fromDict() instead of Config.fromDict() here
|
||||
"""
|
||||
return Config.fromDict(data, validate=validate, allow_incomplete=allow_incomplete) # type: ignore[call-arg]
|
||||
|
||||
|
|
13
constants.py
13
constants.py
|
@ -1,9 +1,9 @@
|
|||
from typing_extensions import TypeAlias
|
||||
from typehelpers import TypeAlias
|
||||
|
||||
FASTBOOT = 'fastboot'
|
||||
FLASH_PARTS = {
|
||||
'ROOTFS': 'rootfs',
|
||||
'ABOOT': 'aboot',
|
||||
'FULL': 'full',
|
||||
'ABOOT': 'abootimg',
|
||||
'LK2ND': 'lk2nd',
|
||||
'QHYPSTUB': 'qhypstub',
|
||||
}
|
||||
|
@ -24,7 +24,10 @@ BASE_PACKAGES: list[str] = BASE_LOCAL_PACKAGES + [
|
|||
'vim',
|
||||
]
|
||||
|
||||
POST_CMDS = ['kupfer-config apply']
|
||||
POST_INSTALL_CMDS = [
|
||||
'kupfer-config apply',
|
||||
'kupfer-config --user apply',
|
||||
]
|
||||
|
||||
REPOS_CONFIG_FILE = "repos.yml"
|
||||
|
||||
|
@ -86,7 +89,7 @@ COMPILE_ARCHES: dict[Arch, str] = {
|
|||
GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = {
|
||||
'x86_64': {
|
||||
'x86_64': 'x86_64-pc-linux-gnu',
|
||||
'aarch64': 'aarch64-linux-gnu',
|
||||
'aarch64': 'aarch64-unknown-linux-gnu',
|
||||
'armv7h': 'arm-unknown-linux-gnueabihf'
|
||||
},
|
||||
'aarch64': {
|
||||
|
|
|
@ -5,9 +5,10 @@ from json import dumps as json_dump
|
|||
from typing import Optional
|
||||
|
||||
from config.state import config
|
||||
from utils import colors_supported, color_str
|
||||
from config.cli import resolve_profile_field
|
||||
from utils import color_mark_selected, colors_supported
|
||||
|
||||
from .device import get_devices, get_profile_device
|
||||
from .device import get_devices, get_device
|
||||
|
||||
|
||||
@click.command(name='devices')
|
||||
|
@ -36,12 +37,14 @@ def cmd_devices(
|
|||
if not devices:
|
||||
raise Exception("No devices found!")
|
||||
profile_device = None
|
||||
profile_name = config.file.profiles.current
|
||||
selected, inherited_from = None, None
|
||||
try:
|
||||
dev = get_profile_device()
|
||||
assert dev
|
||||
profile_device = dev
|
||||
selected, inherited_from = resolve_profile_field(None, profile_name, 'device', config.file.profiles)
|
||||
if selected:
|
||||
profile_device = get_device(selected)
|
||||
except Exception as ex:
|
||||
logging.debug(f"Failed to get profile device for visual highlighting, not a problem: {ex}")
|
||||
logging.debug(f"Failed to get profile device for marking as currently selected, continuing anyway. Exception: {ex}")
|
||||
output = ['']
|
||||
json_output = {}
|
||||
interactive_json = json and not output_file
|
||||
|
@ -49,8 +52,6 @@ def cmd_devices(
|
|||
json = True
|
||||
use_colors = colors_supported(False if interactive_json else config.runtime.colors)
|
||||
for name in sorted(devices.keys()):
|
||||
prefix = ''
|
||||
suffix = ''
|
||||
device = devices[name]
|
||||
assert device
|
||||
if force_parse_deviceinfo in [None, True]:
|
||||
|
@ -66,14 +67,9 @@ def cmd_devices(
|
|||
json_output[name] = device.get_summary().toDict()
|
||||
if interactive_json:
|
||||
continue
|
||||
snippet = device.nice_str(colors=use_colors, newlines=True)
|
||||
if profile_device and profile_device.name == device.name:
|
||||
prefix = color_str('>>> ', bold=True, fg="bright_green", use_colors=use_colors)
|
||||
suffix = '\n\n'
|
||||
suffix += color_str('Currently selected by profile', bold=True, use_colors=use_colors) + " "
|
||||
suffix += color_str(f'"{config.file.profiles.current}"', bold=True, fg="bright_green", use_colors=use_colors)
|
||||
snippet = f'{device.nice_str(colors=use_colors, newlines=True)}{suffix}'
|
||||
# prefix each line in the snippet
|
||||
snippet = '\n'.join([f'{prefix}{line}' for line in snippet.split('\n')])
|
||||
snippet = color_mark_selected(snippet, profile_name or '[unknown]', inherited_from)
|
||||
output.append(f"{snippet}\n")
|
||||
if interactive_json:
|
||||
output = ['\n' + json_dump(json_output, indent=4)]
|
||||
|
|
|
@ -5,13 +5,13 @@ from typing import Optional
|
|||
|
||||
from config.state import config
|
||||
from constants import Arch, ARCHES
|
||||
from config.scheme import DataClass, munchclass
|
||||
from dictscheme import DictScheme
|
||||
from distro.distro import get_kupfer_local
|
||||
from distro.package import LocalPackage
|
||||
from packages.pkgbuild import Pkgbuild, _pkgbuilds_cache, discover_pkgbuilds, get_pkgbuild_by_path, init_pkgbuilds
|
||||
from utils import read_files_from_tar, color_str
|
||||
|
||||
from .deviceinfo import DeviceInfo, parse_deviceinfo
|
||||
from .deviceinfo import DEFAULT_IMAGE_SECTOR_SIZE, DeviceInfo, parse_deviceinfo
|
||||
|
||||
DEVICE_DEPRECATIONS = {
|
||||
"oneplus-enchilada": "sdm845-oneplus-enchilada",
|
||||
|
@ -22,7 +22,7 @@ DEVICE_DEPRECATIONS = {
|
|||
}
|
||||
|
||||
|
||||
class DeviceSummary(DataClass):
|
||||
class DeviceSummary(DictScheme):
|
||||
name: str
|
||||
description: str
|
||||
arch: str
|
||||
|
@ -43,8 +43,7 @@ class DeviceSummary(DataClass):
|
|||
return separator.join([f"{color_str(name, bold=True, use_colors=colors)}: {value}" for name, value in fields.items()])
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Device(DataClass):
|
||||
class Device(DictScheme):
|
||||
name: str
|
||||
arch: Arch
|
||||
package: Pkgbuild
|
||||
|
@ -70,7 +69,7 @@ class Device(DataClass):
|
|||
result["package_path"] = self.package.path if self.package else None
|
||||
return DeviceSummary(result)
|
||||
|
||||
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True):
|
||||
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True) -> DeviceInfo:
|
||||
if not lazy or 'deviceinfo' not in self or self.deviceinfo is None:
|
||||
# avoid import loop
|
||||
from packages.build import check_package_version_built
|
||||
|
@ -97,8 +96,16 @@ class Device(DataClass):
|
|||
assert info.arch
|
||||
assert info.arch == self.arch
|
||||
self['deviceinfo'] = info
|
||||
assert self.deviceinfo
|
||||
return self.deviceinfo
|
||||
|
||||
def get_image_sectorsize(self, **kwargs) -> Optional[int]:
|
||||
"""Gets the deviceinfo_rootfs_image_sector_size if defined, otherwise None"""
|
||||
return self.parse_deviceinfo(**kwargs).get('rootfs_image_sector_size', None)
|
||||
|
||||
def get_image_sectorsize_default(self, **kwargs) -> int:
|
||||
return self.get_image_sectorsize(**kwargs) or DEFAULT_IMAGE_SECTOR_SIZE
|
||||
|
||||
|
||||
def check_devicepkg_name(name: str, log_level: Optional[int] = None):
|
||||
valid = True
|
||||
|
@ -128,6 +135,20 @@ def parse_device_pkg(pkgbuild: Pkgbuild) -> Device:
|
|||
return Device(name=name, arch=arch, package=pkgbuild, deviceinfo=None)
|
||||
|
||||
|
||||
def sanitize_device_name(name: str, warn: bool = True) -> str:
|
||||
if name not in DEVICE_DEPRECATIONS:
|
||||
return name
|
||||
warning = f"Deprecated device {name}"
|
||||
replacement = DEVICE_DEPRECATIONS[name]
|
||||
if replacement:
|
||||
warning += (f': Device has been renamed to {replacement}! Please adjust your profile config!\n'
|
||||
'This will become an error in a future version!')
|
||||
name = replacement
|
||||
if warn:
|
||||
logging.warning(warning)
|
||||
return name
|
||||
|
||||
|
||||
_device_cache: dict[str, Device] = {}
|
||||
_device_cache_populated: bool = False
|
||||
|
||||
|
@ -152,14 +173,7 @@ def get_devices(pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy: bool = Tr
|
|||
def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy: bool = True, scan_all=False) -> Device:
|
||||
global _device_cache, _device_cache_populated
|
||||
assert lazy or pkgbuilds
|
||||
if name in DEVICE_DEPRECATIONS:
|
||||
warning = f"Deprecated device {name}"
|
||||
replacement = DEVICE_DEPRECATIONS[name]
|
||||
if replacement:
|
||||
warning += (f': Device has been renamed to {replacement}! Please adjust your profile config!\n'
|
||||
'This will become an error in a future version!')
|
||||
name = replacement
|
||||
logging.warning(warning)
|
||||
name = sanitize_device_name(name)
|
||||
if lazy and name in _device_cache:
|
||||
return _device_cache[name]
|
||||
if scan_all:
|
||||
|
|
|
@ -5,18 +5,20 @@ import copy
|
|||
import logging
|
||||
import os
|
||||
|
||||
from typing import Any, Mapping, Optional
|
||||
from typing import Mapping, Optional
|
||||
|
||||
from config.state import config
|
||||
from constants import Arch
|
||||
from dataclass import DataClass
|
||||
from dictscheme import DictScheme
|
||||
|
||||
PMOS_ARCHES_OVERRIDES: dict[str, Arch] = {
|
||||
"armv7": 'armv7h',
|
||||
}
|
||||
|
||||
DEFAULT_IMAGE_SECTOR_SIZE = 512
|
||||
|
||||
class DeviceInfo(DataClass):
|
||||
|
||||
class DeviceInfo(DictScheme):
|
||||
arch: Arch
|
||||
name: str
|
||||
manufacturer: str
|
||||
|
@ -24,10 +26,12 @@ class DeviceInfo(DataClass):
|
|||
chassis: str
|
||||
flash_pagesize: int
|
||||
flash_method: str
|
||||
rootfs_image_sector_size: Optional[int]
|
||||
|
||||
@classmethod
|
||||
def transform(cls, values: Mapping[str, str], validate: bool = True, allow_extra: bool = True, type_hints: Optional[dict[str, Any]] = None):
|
||||
return super().transform(values, validate=validate, allow_extra=allow_extra)
|
||||
def transform(cls, values: Mapping[str, Optional[str]], **kwargs):
|
||||
kwargs = {'allow_extra': True} | kwargs
|
||||
return super().transform(values, **kwargs)
|
||||
|
||||
|
||||
# Variables from deviceinfo. Reference: <https://postmarketos.org/deviceinfo>
|
||||
|
@ -115,7 +119,7 @@ deviceinfo_chassis_types = [
|
|||
]
|
||||
|
||||
|
||||
def sanity_check(deviceinfo: dict[str, str], device_name: str):
|
||||
def sanity_check(deviceinfo: dict[str, Optional[str]], device_name: str):
|
||||
try:
|
||||
_pmos_sanity_check(deviceinfo, device_name)
|
||||
except RuntimeError as err:
|
||||
|
@ -129,7 +133,7 @@ def sanity_check(deviceinfo: dict[str, str], device_name: str):
|
|||
f"{err}")
|
||||
|
||||
|
||||
def _pmos_sanity_check(info: dict[str, str], device_name: str):
|
||||
def _pmos_sanity_check(info: dict[str, Optional[str]], device_name: str):
|
||||
# Resolve path for more readable error messages
|
||||
path = os.path.join(config.get_path('pkgbuilds'), 'device', device_name, 'deviceinfo')
|
||||
|
||||
|
@ -194,7 +198,7 @@ def _pmos_sanity_check(info: dict[str, str], device_name: str):
|
|||
f" and try again: {path}")
|
||||
|
||||
|
||||
def parse_kernel_suffix(deviceinfo: dict[str, str], kernel: str = 'mainline') -> dict[str, str]:
|
||||
def parse_kernel_suffix(deviceinfo: dict[str, Optional[str]], kernel: str = 'mainline') -> dict[str, Optional[str]]:
|
||||
"""
|
||||
Remove the kernel suffix (as selected in 'pmbootstrap init') from
|
||||
deviceinfo variables. Related:
|
||||
|
@ -240,7 +244,7 @@ def parse_deviceinfo(deviceinfo_lines: list[str], device_name: str, kernel='main
|
|||
:param device: defaults to args.device
|
||||
:param kernel: defaults to args.kernel
|
||||
"""
|
||||
info = {}
|
||||
info: dict[str, Optional[str]] = {}
|
||||
for line in deviceinfo_lines:
|
||||
line = line.strip()
|
||||
if line.startswith("#") or not line:
|
||||
|
@ -258,12 +262,12 @@ def parse_deviceinfo(deviceinfo_lines: list[str], device_name: str, kernel='main
|
|||
# Assign empty string as default
|
||||
for key in deviceinfo_attributes:
|
||||
if key not in info:
|
||||
info[key] = ""
|
||||
info[key] = None
|
||||
|
||||
info = parse_kernel_suffix(info, kernel)
|
||||
sanity_check(info, device_name)
|
||||
if 'arch' in info:
|
||||
arch = info['arch']
|
||||
info['arch'] = PMOS_ARCHES_OVERRIDES.get(arch, arch)
|
||||
info['arch'] = PMOS_ARCHES_OVERRIDES.get(arch, arch) # type: ignore[arg-type]
|
||||
dev = DeviceInfo.fromDict(info)
|
||||
return dev
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import toml
|
||||
|
||||
from dataclasses import dataclass
|
||||
from munch import Munch
|
||||
from toml.encoder import TomlEncoder, TomlPreserveInlineDictEncoder
|
||||
from typing import ClassVar, Generator, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
|
||||
from types import UnionType, NoneType
|
||||
|
||||
|
||||
def munchclass(*args, init=False, **kwargs):
|
||||
return dataclass(*args, init=init, slots=True, **kwargs)
|
||||
from typehelpers import UnionType, NoneType
|
||||
|
||||
|
||||
def resolve_type_hint(hint: type, ignore_origins: list[type] = []) -> Iterable[type]:
|
||||
|
@ -41,7 +38,6 @@ def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
|
|||
t_origin = get_origin(hint)
|
||||
t_args = get_args(hint)
|
||||
if t_origin == dict:
|
||||
print(f"Yielding {t_args=}")
|
||||
yield t_args
|
||||
continue
|
||||
if t_origin in [NoneType, Optional, Union, UnionType] and t_args:
|
||||
|
@ -49,34 +45,37 @@ def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
|
|||
continue
|
||||
|
||||
|
||||
class DataClass(Munch):
|
||||
class DictScheme(Munch):
|
||||
|
||||
_type_hints: ClassVar[dict[str, Any]]
|
||||
_strip_hidden: ClassVar[bool] = False
|
||||
_sparse: ClassVar[bool] = False
|
||||
|
||||
def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs):
|
||||
self.update(d | kwargs, validate=validate)
|
||||
self.update(dict(d) | kwargs, validate=validate)
|
||||
|
||||
@classmethod
|
||||
def transform(
|
||||
cls,
|
||||
values: Mapping[str, Any],
|
||||
*,
|
||||
validate: bool = True,
|
||||
allow_extra: bool = False,
|
||||
type_hints: Optional[dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
results = {}
|
||||
results: dict[str, Any] = {}
|
||||
values = dict(values)
|
||||
print(f"\ntransform function:\n{values}, {type_hints=}")
|
||||
for key in list(values.keys()):
|
||||
value = values.pop(key)
|
||||
type_hints = cls._type_hints if type_hints is None else type_hints
|
||||
if key in type_hints:
|
||||
_classes = tuple[type](resolve_type_hint(type_hints[key]))
|
||||
optional = NoneType in _classes
|
||||
optional = bool(set([NoneType, None]).intersection(_classes))
|
||||
if optional and value is None:
|
||||
results[key] = None
|
||||
continue
|
||||
if issubclass(_classes[0], dict):
|
||||
assert isinstance(value, dict) or optional
|
||||
assert isinstance(value, dict) or (optional and value is None), f'{key=} is not dict: {value!r}, {_classes=}'
|
||||
target_class = _classes[0]
|
||||
if target_class in [None, NoneType, Optional]:
|
||||
for target in _classes[1:]:
|
||||
|
@ -85,42 +84,46 @@ class DataClass(Munch):
|
|||
break
|
||||
if target_class is dict:
|
||||
dict_hints = list(resolve_dict_hints(type_hints[key]))
|
||||
print(f"Got {key=} {dict_hints=}")
|
||||
if len(dict_hints) != 1:
|
||||
print(f"Received wrong amount of type hints for key {key}: {len(dict_hints)}")
|
||||
msg = f"transform(): Received wrong amount of type hints for key {key}: {len(dict_hints)}"
|
||||
if validate:
|
||||
raise Exception(msg)
|
||||
logging.warning(msg)
|
||||
if len(dict_hints) == 1 and value is not None:
|
||||
if len(dict_hints[0]) != 2 or not all(dict_hints[0]):
|
||||
print(f"Weird dict hints received: {dict_hints}")
|
||||
logging.debug(f"Weird dict hints received: {dict_hints}")
|
||||
continue
|
||||
key_type, value_type = dict_hints[0]
|
||||
if not isinstance(value, Mapping):
|
||||
msg = f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}"
|
||||
if validate:
|
||||
raise Exception(
|
||||
f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}")
|
||||
print(f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}")
|
||||
raise Exception(msg)
|
||||
logging.warning(msg)
|
||||
results[key] = value
|
||||
continue
|
||||
if isinstance(key_type, type):
|
||||
if issubclass(key_type, str):
|
||||
target_class = Munch
|
||||
else:
|
||||
print(f"{key=} DICT WRONG KEY TYPE: {key_type}")
|
||||
msg = f"{key=} subdict got wrong key type hint (expected str): {key_type}"
|
||||
if validate:
|
||||
raise Exception(msg)
|
||||
logging.warning(msg)
|
||||
if validate:
|
||||
for k in value:
|
||||
if not isinstance(k, tuple(flatten_hints(key_type))):
|
||||
raise Exception(f'Subdict "{key}": wrong type for subkey "{k}": got: {type(k)}, expected: {key_type}')
|
||||
dict_content_hints = {k: value_type for k in value}
|
||||
print(f"tranforming: {value=} {dict_content_hints=}")
|
||||
value = cls.transform(value, validate=validate, allow_extra=allow_extra, type_hints=dict_content_hints)
|
||||
print(f"tranformed: {value=}")
|
||||
if not isinstance(value, target_class):
|
||||
if not (optional and value is None):
|
||||
assert issubclass(target_class, Munch)
|
||||
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
|
||||
kwargs = {'validate': validate} if issubclass(target_class, DataClass) else {}
|
||||
kwargs = {'validate': validate} if issubclass(target_class, DictScheme) else {}
|
||||
value = target_class(value, **kwargs) # type:ignore[attr-defined]
|
||||
else:
|
||||
print(f"nothing to do: '{key}' was already {target_class}")
|
||||
# print(f"nothing to do: '{key}' was already {target_class})
|
||||
pass
|
||||
# handle numerics
|
||||
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
|
||||
parsed_number = None
|
||||
|
@ -145,7 +148,6 @@ class DataClass(Munch):
|
|||
f'{" ,".join([ c.__name__ for c in _classes])}; '
|
||||
f'got: {type(value).__name__}; value: {value}')
|
||||
elif validate and not allow_extra:
|
||||
import logging
|
||||
logging.debug(f"{cls}: unknown key '{key}': {value}")
|
||||
raise Exception(f'{cls}: Unknown key "{key}"')
|
||||
else:
|
||||
|
@ -183,6 +185,7 @@ class DataClass(Munch):
|
|||
sparse: Optional[bool] = None,
|
||||
recursive: bool = True,
|
||||
hints: Optional[dict[str, Any]] = None,
|
||||
validate: bool = True,
|
||||
) -> dict[Any, Any]:
|
||||
# preserve original None-type args
|
||||
_sparse = cls._sparse if sparse is None else sparse
|
||||
|
@ -190,64 +193,55 @@ class DataClass(Munch):
|
|||
hints = cls._type_hints if hints is None else hints
|
||||
result = dict(d)
|
||||
if not (_strip_hidden or _sparse or result):
|
||||
print(f"shortcircuiting {d=}")
|
||||
return result
|
||||
print(f"Stripping {result} with hints: {hints}")
|
||||
for k, v in d.items():
|
||||
type_hint = resolve_type_hint(hints.get(k, "abc"))
|
||||
print(f"Working on key {k}, type hints: {type_hint}")
|
||||
if not isinstance(k, str):
|
||||
print(f"skipping unknown key type {k=}")
|
||||
msg = f"strip_dict(): unknown key type {k=}: {type(k)=}"
|
||||
if validate:
|
||||
raise Exception(msg)
|
||||
logging.warning(f"{msg} (skipping)")
|
||||
continue
|
||||
if strip_hidden and k.startswith('_'):
|
||||
if _strip_hidden and k.startswith('_'):
|
||||
result.pop(k)
|
||||
continue
|
||||
if v is None:
|
||||
if NoneType not in type_hint:
|
||||
msg = f'encountered illegal null value at key "{k}" for typehint {type_hint}'
|
||||
if True:
|
||||
if validate:
|
||||
raise Exception(msg)
|
||||
print(msg)
|
||||
logging.warning(msg)
|
||||
if _sparse:
|
||||
print(f"popping empty {k}")
|
||||
result.pop(k)
|
||||
continue
|
||||
print(f"encountered legal null value at {k}: {_sparse=}")
|
||||
if recursive and isinstance(v, dict):
|
||||
if not v:
|
||||
result[k] = {}
|
||||
continue
|
||||
if isinstance(v, DataClass):
|
||||
print(f"Dataclass detected in {k=}")
|
||||
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse) # pass None in sparse and strip_hidden
|
||||
if isinstance(v, DictScheme):
|
||||
# pass None in sparse and strip_hidden
|
||||
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse)
|
||||
continue
|
||||
if isinstance(v, Munch):
|
||||
print(f"Converting munch {k=}")
|
||||
result[k] = v.toDict()
|
||||
if k not in hints:
|
||||
print(f"skipping unknown {k=}")
|
||||
continue
|
||||
print(f"STRIPPING RECURSIVELY: {k}: {v}, parent hints: {hints[k]}")
|
||||
_subhints = {}
|
||||
_hints = resolve_type_hint(hints[k], [dict])
|
||||
hints_flat = list(flatten_hints(_hints))
|
||||
print(f"going over hints for {k}: {_hints=} {hints_flat=}")
|
||||
subclass = DataClass
|
||||
subclass = DictScheme
|
||||
for hint in hints_flat:
|
||||
print(f"working on hint: {hint}")
|
||||
if get_origin(hint) == dict:
|
||||
_valtype = get_args(hint)[1]
|
||||
_subhints = {n: _valtype for n in v.keys()}
|
||||
print(f"generated {_subhints=} from {_valtype=}")
|
||||
break
|
||||
if isinstance(hint, type) and issubclass(hint, DataClass):
|
||||
if isinstance(hint, type) and issubclass(hint, DictScheme):
|
||||
subclass = hint
|
||||
_subhints = hint._type_hints
|
||||
print(f"found subhints: {_subhints}")
|
||||
break
|
||||
else:
|
||||
print(f"ignoring {hint=}")
|
||||
print(f"STRIPPING SUBDICT {k=} WITH {_subhints=}")
|
||||
# print(f"ignoring {hint=}")
|
||||
continue
|
||||
result[k] = subclass.strip_dict(
|
||||
v,
|
||||
hints=_subhints,
|
||||
|
@ -258,7 +252,7 @@ class DataClass(Munch):
|
|||
return result
|
||||
|
||||
def update(self, d: Mapping[str, Any], validate: bool = True):
|
||||
Munch.update(self, type(self).transform(d, validate))
|
||||
Munch.update(self, type(self).transform(d, validate=validate))
|
||||
|
||||
def __init_subclass__(cls):
|
||||
super().__init_subclass__()
|
||||
|
@ -275,10 +269,13 @@ class DataClass(Munch):
|
|||
) -> str:
|
||||
import yaml
|
||||
yaml_args = {'sort_keys': False} | yaml_args
|
||||
return yaml.dump(
|
||||
dumped = yaml.dump(
|
||||
self.toDict(strip_hidden=strip_hidden, sparse=sparse),
|
||||
**yaml_args,
|
||||
)
|
||||
if dumped is None:
|
||||
raise Exception(f"Failed to yaml-serialse {self}")
|
||||
return dumped
|
||||
|
||||
def toToml(
|
||||
self,
|
|
@ -1,3 +1,5 @@
|
|||
import logging
|
||||
|
||||
from enum import IntFlag
|
||||
from typing import Generic, Mapping, Optional, TypeVar
|
||||
|
||||
|
@ -91,6 +93,11 @@ def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
|
|||
scan=scan,
|
||||
)
|
||||
assert isinstance(distro, (LocalDistro, RemoteDistro))
|
||||
if remote:
|
||||
assert isinstance(distro, RemoteDistro)
|
||||
for repo in distro.repos.values():
|
||||
repo.cache_repo_db = True
|
||||
|
||||
return distro
|
||||
|
||||
|
||||
|
@ -115,6 +122,7 @@ def get_kupfer_url(url: str = KUPFER_HTTPS, branch: Optional[str] = None) -> str
|
|||
def get_repo_config(*args, **kwargs) -> ReposConfigFile:
|
||||
repo_config, changed = _get_repo_config(*args, **kwargs)
|
||||
if changed:
|
||||
logging.debug("Repo configs changed, resetting caches")
|
||||
reset_distro_caches()
|
||||
return repo_config
|
||||
|
||||
|
@ -132,6 +140,8 @@ def get_kupfer_repo_names(local) -> list[str]:
|
|||
def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional[str]) -> RepoInfo:
|
||||
url = repo_config.remote_url or default_url
|
||||
if isinstance(url, dict):
|
||||
if arch not in url and not default_url:
|
||||
raise Exception(f"Invalid repo config: Architecture {arch} not in remote_url mapping: {url}")
|
||||
url = url.get(arch, default_url)
|
||||
assert url
|
||||
return RepoInfo(
|
||||
|
@ -140,39 +150,54 @@ def get_RepoInfo(arch: Arch, repo_config: AbstrRepoConfig, default_url: Optional
|
|||
)
|
||||
|
||||
|
||||
def get_base_distro(arch: Arch, scan: bool = False, unsigned: bool = True) -> RemoteDistro:
|
||||
def get_base_distro(arch: Arch, scan: bool = False, unsigned: bool = True, cache_db: bool = True) -> RemoteDistro:
|
||||
base_distros = get_repo_config().base_distros
|
||||
if base_distros is None or arch not in base_distros:
|
||||
base_distros = REPOS_CONFIG_DEFAULT.base_distros
|
||||
assert base_distros
|
||||
distro: BaseDistro
|
||||
distro = base_distros.get(arch) # type: ignore[assignment]
|
||||
distro_config: BaseDistro
|
||||
distro_config = base_distros.get(arch) # type: ignore[assignment]
|
||||
repos = {}
|
||||
for repo, repo_config in distro.repos.items():
|
||||
for repo, repo_config in distro_config.repos.items():
|
||||
if unsigned:
|
||||
repo_config['options'] = (repo_config.get('options', None) or {}) | {'SigLevel': 'Never'}
|
||||
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro.remote_url)
|
||||
repos[repo] = get_RepoInfo(arch, repo_config, default_url=distro_config.remote_url)
|
||||
|
||||
return RemoteDistro(arch=arch, repo_infos=repos, scan=scan)
|
||||
distro = RemoteDistro(arch=arch, repo_infos=repos, scan=False)
|
||||
if cache_db:
|
||||
for r in distro.repos.values():
|
||||
assert isinstance(r, RemoteRepo)
|
||||
r.cache_repo_db = True
|
||||
if scan:
|
||||
distro.scan()
|
||||
return distro
|
||||
|
||||
|
||||
def get_kupfer_distro(
|
||||
arch: Arch,
|
||||
location: DistroLocation,
|
||||
scan: bool = False,
|
||||
cache_db: bool = True,
|
||||
) -> Distro:
|
||||
global _kupfer_https, _kupfer_local, _kupfer_local_chroots
|
||||
cls: type[Distro]
|
||||
cache: Mapping[str, Distro]
|
||||
repo_config = get_repo_config()
|
||||
remote = False
|
||||
if location == DistroLocation.REMOTE:
|
||||
remote = True
|
||||
cache = _kupfer_https
|
||||
default_url = repo_config.remote_url or KUPFER_HTTPS
|
||||
repos = {repo: get_RepoInfo(arch, conf, default_url) for repo, conf in repo_config.repos.items() if not conf.local_only}
|
||||
cls = RemoteDistro
|
||||
elif location in [DistroLocation.CHROOT, DistroLocation.LOCAL]:
|
||||
cache = _kupfer_local_chroots
|
||||
pkgdir = CHROOT_PATHS['packages'] if location == DistroLocation.CHROOT else config.get_path('packages')
|
||||
if location == DistroLocation.CHROOT:
|
||||
cache = _kupfer_local_chroots
|
||||
pkgdir = CHROOT_PATHS['packages']
|
||||
else:
|
||||
assert location == DistroLocation.LOCAL
|
||||
cache = _kupfer_local
|
||||
pkgdir = config.get_path('packages')
|
||||
default_url = f"file://{pkgdir}/$arch/$repo"
|
||||
cls = LocalDistro
|
||||
repos = {}
|
||||
|
@ -181,7 +206,7 @@ def get_kupfer_distro(
|
|||
repo.remote_url = default_url
|
||||
repos[name] = get_RepoInfo(arch, repo, default_url)
|
||||
else:
|
||||
raise Exception(f"Unknown location {location}")
|
||||
raise Exception(f"Unknown distro location {location}")
|
||||
if cache is None:
|
||||
cache = {}
|
||||
assert arch
|
||||
|
@ -190,26 +215,33 @@ def get_kupfer_distro(
|
|||
distro = cls(
|
||||
arch=arch,
|
||||
repo_infos=repos,
|
||||
scan=scan,
|
||||
scan=False,
|
||||
)
|
||||
assert isinstance(distro, (LocalDistro, RemoteDistro))
|
||||
return distro
|
||||
cache[arch] = distro
|
||||
if remote and cache_db:
|
||||
assert isinstance(distro, RemoteDistro)
|
||||
for r in distro.repos.values():
|
||||
r.cache_repo_db = True
|
||||
if scan:
|
||||
distro.scan()
|
||||
return distro
|
||||
item: Distro = cache[arch]
|
||||
if scan and not item.is_scanned():
|
||||
item.scan()
|
||||
return item
|
||||
|
||||
|
||||
def get_kupfer_https(arch: Arch, scan: bool = False) -> RemoteDistro:
|
||||
d = get_kupfer_distro(arch, location=DistroLocation.REMOTE, scan=scan)
|
||||
def get_kupfer_https(arch: Arch, scan: bool = False, cache_db: bool = True) -> RemoteDistro:
|
||||
d = get_kupfer_distro(arch, location=DistroLocation.REMOTE, scan=scan, cache_db=cache_db)
|
||||
assert isinstance(d, RemoteDistro)
|
||||
return d
|
||||
|
||||
|
||||
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True, scan: bool = False) -> LocalDistro:
|
||||
def get_kupfer_local(arch: Optional[Arch] = None, scan: bool = False, in_chroot: bool = True) -> LocalDistro:
|
||||
arch = arch or config.runtime.arch
|
||||
assert arch
|
||||
d = get_kupfer_distro(arch, location=DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL, scan=scan)
|
||||
location = DistroLocation.CHROOT if in_chroot else DistroLocation.LOCAL
|
||||
d = get_kupfer_distro(arch, location=location, scan=scan)
|
||||
assert isinstance(d, LocalDistro)
|
||||
return d
|
||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
|||
import os
|
||||
|
||||
from shutil import copyfileobj
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
from urllib.request import urlopen
|
||||
|
||||
from exec.file import get_temp_dir, makedir
|
||||
|
@ -17,7 +17,7 @@ class BinaryPackage(PackageInfo):
|
|||
arch: str
|
||||
filename: str
|
||||
resolved_url: Optional[str]
|
||||
_desc: Optional[dict[str, str | list[str]]]
|
||||
_desc: Optional[dict[str, Union[str, list[str]]]]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -39,7 +39,7 @@ class BinaryPackage(PackageInfo):
|
|||
@classmethod
|
||||
def parse_desc(clss, desc_str: str, resolved_repo_url=None):
|
||||
"""Parses a desc file, returning a PackageInfo"""
|
||||
desc: dict[str, str | list[str]] = {}
|
||||
desc: dict[str, Union[str, list[str]]] = {}
|
||||
for segment in f'\n{desc_str}'.split('\n%'):
|
||||
if not segment.strip():
|
||||
continue
|
||||
|
|
|
@ -2,11 +2,13 @@ from copy import deepcopy
|
|||
import logging
|
||||
import os
|
||||
import tarfile
|
||||
import tempfile
|
||||
import urllib.request
|
||||
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
from config.state import config
|
||||
from exec.file import get_temp_dir
|
||||
from utils import download_file
|
||||
|
||||
from .package import BinaryPackage, LocalPackage, RemotePackage
|
||||
|
||||
BinaryPackageType = TypeVar('BinaryPackageType', bound=BinaryPackage)
|
||||
|
@ -112,6 +114,11 @@ class LocalRepo(Repo[LocalPackage]):
|
|||
|
||||
|
||||
class RemoteRepo(Repo[RemotePackage]):
|
||||
cache_repo_db: bool
|
||||
|
||||
def __init__(self, *kargs, cache_repo_db: bool = False, **kwargs):
|
||||
self.cache_repo_db = cache_repo_db
|
||||
super().__init__(*kargs, **kwargs)
|
||||
|
||||
def _parse_desc(self, desc_text: str) -> RemotePackage:
|
||||
return RemotePackage.parse_desc(desc_text, resolved_repo_url=self.resolved_url)
|
||||
|
@ -119,8 +126,9 @@ class RemoteRepo(Repo[RemotePackage]):
|
|||
def acquire_db_file(self) -> str:
|
||||
uri = f'{self.resolved_url}/{self.name}.db'
|
||||
logging.info(f'Downloading repo file from {uri}')
|
||||
with urllib.request.urlopen(uri) as request:
|
||||
fd, path = tempfile.mkstemp()
|
||||
with open(fd, 'wb') as writable:
|
||||
writable.write(request.read())
|
||||
return path
|
||||
assert self.arch and self.name, f"repo has incomplete information: {self.name=}, {self.arch=}"
|
||||
path = get_temp_dir() if not self.cache_repo_db else os.path.join(config.get_path('pacman'), 'repo_dbs', self.arch)
|
||||
os.makedirs(path, exist_ok=True)
|
||||
repo_file = f'{path}/{self.name}.tar.gz'
|
||||
download_file(repo_file, uri, update=True)
|
||||
return repo_file
|
||||
|
|
|
@ -6,11 +6,11 @@ import toml
|
|||
import yaml
|
||||
|
||||
from copy import deepcopy
|
||||
from typing import ClassVar, Optional, Mapping
|
||||
from typing import ClassVar, Optional, Mapping, Union
|
||||
|
||||
from config.state import config
|
||||
from constants import Arch, BASE_DISTROS, KUPFER_HTTPS, REPOS_CONFIG_FILE, REPOSITORIES
|
||||
from dataclass import DataClass, toml_inline_dicts, TomlPreserveInlineDictEncoder
|
||||
from dictscheme import DictScheme, toml_inline_dicts, TomlPreserveInlineDictEncoder
|
||||
from utils import sha256sum
|
||||
|
||||
REPOS_KEY = 'repos'
|
||||
|
@ -22,7 +22,7 @@ BASEDISTROS_KEY = 'base_distros'
|
|||
_current_config: Optional[ReposConfigFile]
|
||||
|
||||
|
||||
class AbstrRepoConfig(DataClass):
|
||||
class AbstrRepoConfig(DictScheme):
|
||||
options: Optional[dict[str, str]]
|
||||
_strip_hidden: ClassVar[bool] = True
|
||||
_sparse: ClassVar[bool] = True
|
||||
|
@ -33,16 +33,16 @@ class BaseDistroRepo(AbstrRepoConfig):
|
|||
|
||||
|
||||
class RepoConfig(AbstrRepoConfig):
|
||||
remote_url: Optional[str | dict[Arch, str]]
|
||||
remote_url: Optional[Union[str, dict[Arch, str]]]
|
||||
local_only: Optional[bool]
|
||||
|
||||
|
||||
class BaseDistro(DataClass):
|
||||
class BaseDistro(DictScheme):
|
||||
remote_url: Optional[str]
|
||||
repos: dict[str, BaseDistroRepo]
|
||||
|
||||
|
||||
class ReposConfigFile(DataClass):
|
||||
class ReposConfigFile(DictScheme):
|
||||
remote_url: Optional[str]
|
||||
repos: dict[str, RepoConfig]
|
||||
base_distros: dict[Arch, BaseDistro]
|
||||
|
@ -53,18 +53,18 @@ class ReposConfigFile(DataClass):
|
|||
|
||||
def __init__(self, d, **kwargs):
|
||||
super().__init__(d=d, **kwargs)
|
||||
self[REPOS_KEY] = self.get(REPOS_KEY, {})
|
||||
for repo_cls, defaults, repos, remote_url in [
|
||||
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY, {}), d.get(REMOTEURL_KEY, None)),
|
||||
(RepoConfig, REPO_DEFAULTS, self.get(REPOS_KEY), d.get(REMOTEURL_KEY, None)),
|
||||
*[(BaseDistroRepo, BASE_DISTRO_DEFAULTS, _distro.repos, _distro.get(REMOTEURL_KEY, None)) for _distro in self.base_distros.values()],
|
||||
]:
|
||||
if repos is None:
|
||||
continue
|
||||
for name, repo in repos.items():
|
||||
_repo = defaults | (repo or {}) # type: ignore[operator]
|
||||
_repo = dict(defaults | (repo or {})) # type: ignore[operator]
|
||||
if REMOTEURL_KEY not in repo and not repo.get(LOCALONLY_KEY, None):
|
||||
_repo[REMOTEURL_KEY] = remote_url
|
||||
repos[name] = repo_cls(_repo, **kwargs)
|
||||
# self.repos = repos
|
||||
|
||||
@staticmethod
|
||||
def parse_config(path: str) -> ReposConfigFile:
|
||||
|
@ -103,21 +103,25 @@ BASE_DISTRO_DEFAULTS = {
|
|||
}
|
||||
|
||||
REPOS_CONFIG_DEFAULT = ReposConfigFile({
|
||||
'_path': None,
|
||||
'_path': '__DEFAULTS__',
|
||||
'_checksum': None,
|
||||
REMOTEURL_KEY: KUPFER_HTTPS,
|
||||
REPOS_KEY: {
|
||||
'local': REPO_DEFAULTS | {
|
||||
'kupfer_local': REPO_DEFAULTS | {
|
||||
LOCALONLY_KEY: True
|
||||
},
|
||||
**{r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES},
|
||||
**{
|
||||
r: deepcopy(REPO_DEFAULTS) for r in REPOSITORIES
|
||||
},
|
||||
},
|
||||
BASEDISTROS_KEY: {
|
||||
arch: {
|
||||
REMOTEURL_KEY: None,
|
||||
'repos': {k: {
|
||||
'remote_url': v
|
||||
} for k, v in arch_def['repos'].items()},
|
||||
'repos': {
|
||||
k: {
|
||||
'remote_url': v
|
||||
} for k, v in arch_def['repos'].items()
|
||||
},
|
||||
} for arch, arch_def in BASE_DISTROS.items()
|
||||
},
|
||||
})
|
||||
|
@ -135,24 +139,30 @@ def get_repo_config(
|
|||
repo_config_file_path = repo_config_file_default
|
||||
else:
|
||||
repo_config_file_path = repo_config_file
|
||||
if not os.path.exists(repo_config_file_path):
|
||||
config_exists = os.path.exists(repo_config_file_path)
|
||||
if not config_exists and _current_config is None:
|
||||
if initialize_pkgbuilds:
|
||||
from packages.pkgbuild import init_pkgbuilds
|
||||
init_pkgbuilds(update=False)
|
||||
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file)
|
||||
if repo_config_file is not None:
|
||||
raise Exception(f"Requested repo config {repo_config_file} doesn't exist")
|
||||
if not initialize_pkgbuilds:
|
||||
logging.warning(f"{repo_config_file_path} doesn't exist, using default Repositories")
|
||||
return deepcopy(REPOS_CONFIG_DEFAULT), False
|
||||
from packages.pkgbuild import init_pkgbuilds
|
||||
init_pkgbuilds()
|
||||
return get_repo_config(initialize_pkgbuilds=False, repo_config_file=repo_config_file_path)
|
||||
conf = _current_config
|
||||
logging.warning(f"{repo_config_file_path} doesn't exist, using built-in repo config defaults")
|
||||
_current_config = deepcopy(REPOS_CONFIG_DEFAULT)
|
||||
return _current_config, False
|
||||
changed = False
|
||||
if (not _current_config) or _current_config._path != repo_config_file_path or _current_config._checksum != sha256sum(repo_config_file_path):
|
||||
conf = ReposConfigFile.parse_config(repo_config_file_path)
|
||||
if repo_config_file_path == repo_config_file_default:
|
||||
_current_config = conf
|
||||
changed = True
|
||||
assert conf
|
||||
return conf, changed
|
||||
if (not _current_config) or (config_exists and _current_config._checksum != sha256sum(repo_config_file_path)):
|
||||
if config_exists:
|
||||
conf = ReposConfigFile.parse_config(repo_config_file_path)
|
||||
else:
|
||||
conf = REPOS_CONFIG_DEFAULT
|
||||
changed = conf != (_current_config or {})
|
||||
if changed:
|
||||
_current_config = deepcopy(conf)
|
||||
else:
|
||||
logging.debug("Repo config: Cache hit!")
|
||||
assert _current_config
|
||||
return _current_config, changed
|
||||
|
||||
|
||||
def get_repos(**kwargs) -> list[RepoConfig]:
|
||||
|
|
|
@ -11,6 +11,7 @@ only used to trigger builds of the submodule docs!
|
|||
:template: command.rst
|
||||
:recursive:
|
||||
|
||||
binfmt
|
||||
cache
|
||||
chroot
|
||||
config
|
||||
|
|
|
@ -6,7 +6,6 @@ a tool to build and flash packages and images for the [Kupfer](https://gitlab.co
|
|||
## Documentation pages
|
||||
|
||||
```{toctree}
|
||||
install
|
||||
config
|
||||
usage/index
|
||||
cli
|
||||
```
|
||||
|
|
|
@ -2,10 +2,14 @@
|
|||
|
||||
Kupferbootstrap uses [toml](https://en.wikipedia.org/wiki/TOML) for its configuration file.
|
||||
|
||||
The file can either be edited manually or managed via the {doc}`cli/config` subcommand.
|
||||
The file can either be edited manually or managed via the [`kupferbootstrap config`](../../cli/config) subcommand.
|
||||
|
||||
```{hint}
|
||||
You can quickly generate a default config by running {code}`kupferbootstrap config init -N`.
|
||||
|
||||
For an interactive dialogue, omit the `-N`.
|
||||
```
|
||||
|
||||
## File Location
|
||||
|
||||
The configuration is stored in `~/.config/kupfer/kupferbootstrap.toml`, where `~` is your user's home folder.
|
||||
|
@ -54,7 +58,7 @@ This allows you to easily keep a number of slight variations of the same target
|
|||
without the need to constantly modify your Kupferbootstrap configuration file.
|
||||
|
||||
You can easily create new profiles with
|
||||
[kupferbootstrap config profile init](../cli/config/#kupferbootstrap-config-profile-init).
|
||||
[kupferbootstrap config profile init](../../cli/config/#kupferbootstrap-config-profile-init).
|
||||
|
||||
Here's an example:
|
||||
|
||||
|
@ -64,7 +68,7 @@ current = "graphical"
|
|||
|
||||
[profiles.default]
|
||||
parent = ""
|
||||
device = "oneplus-enchilada"
|
||||
device = "sdm845-oneplus-enchilada"
|
||||
flavour = "barebone"
|
||||
pkgs_include = [ "wget", "rsync", "nano", "tmux", "zsh", "pv", ]
|
||||
pkgs_exclude = []
|
||||
|
@ -89,7 +93,7 @@ flavour = "debug-shell"
|
|||
|
||||
[profiles.beryllium]
|
||||
parent = "graphical"
|
||||
device = "xiaomi-beryllium-ebbg"
|
||||
device = "sdm845-xiaomi-beryllium-ebbg"
|
||||
flavour = "gnome"
|
||||
hostname = "pocof1"
|
||||
```
|
||||
|
@ -97,7 +101,7 @@ hostname = "pocof1"
|
|||
The `current` key in the `profiles` section controlls which profile gets used by Kupferbootstrap by default.
|
||||
|
||||
The first subsection (`profiles.default`) describes the `default` profile
|
||||
which gets created by [config init](../cli/config/#kupferbootstrap-config-init).
|
||||
which gets created by [`kupferbootstrap config init`](../../cli/config/#kupferbootstrap-config-init).
|
||||
|
||||
Next, we have a `graphical` profile that defines a couple of graphical programs for all but the `recovery` profile,
|
||||
since that doesn't have a GUI.
|
39
docs/source/usage/faq.md
Normal file
39
docs/source/usage/faq.md
Normal file
|
@ -0,0 +1,39 @@
|
|||
# FAQ
|
||||
|
||||
|
||||
```{contents} Table of Contents
|
||||
:class: this-will-duplicate-information-and-it-is-still-useful-here
|
||||
:depth: 3
|
||||
```
|
||||
|
||||
|
||||
## Which devices are currently supported?
|
||||
|
||||
Currently very few!
|
||||
See [the `devices` repo](https://gitlab.com/kupfer/packages/pkgbuilds/-/tree/dev/device). We use the same codenames as [postmarketOS](https://wiki.postmarketos.org/wiki/Devices) (although we prefix them with the SoC)
|
||||
|
||||
|
||||
## How to port a new device or package?
|
||||
|
||||
See [Porting](../porting)
|
||||
|
||||
## How to build a specific package
|
||||
|
||||
See also: The full [`kupferbootstrap packages build` docs](../../cli/packages#kupferbootstrap-packages-build)
|
||||
|
||||
### Example
|
||||
|
||||
For rebuilding `kupfer-config` and `crossdirect`, defaulting to your device's architecture
|
||||
|
||||
```sh
|
||||
kupferbootstrap packages build [--force] [--arch $target_arch] kupfer-config crossdirect
|
||||
```
|
||||
|
||||
|
||||
### By package path
|
||||
You can also use the a path snippet (`$repo/$pkgbase`) to the PKGBUILD folder as seen inside your pkgbuilds.git:
|
||||
|
||||
```sh
|
||||
kupferbootstrap packages build [--force] main/kupfer-config cross/crossdirect
|
||||
```
|
||||
|
9
docs/source/usage/index.md
Normal file
9
docs/source/usage/index.md
Normal file
|
@ -0,0 +1,9 @@
|
|||
# Usage
|
||||
|
||||
```{toctree}
|
||||
quickstart
|
||||
faq
|
||||
install
|
||||
config
|
||||
porting
|
||||
```
|
94
docs/source/usage/porting.md
Normal file
94
docs/source/usage/porting.md
Normal file
|
@ -0,0 +1,94 @@
|
|||
# Porting
|
||||
## Porting devices
|
||||
|
||||
### Homework
|
||||
Before you can get started porting a device, you'll need to do some research:
|
||||
|
||||
1. Familiarize yourself with git basics.
|
||||
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
|
||||
1. Familiarize yourself with the postmarketOS port of the device.
|
||||
```{warning}
|
||||
If there is no postmarketOS port yet, you'll probably need to get deep into kernel development.
|
||||
We suggest [starting with a port to pmOS](https://wiki.postmarketos.org/wiki/Porting_to_a_new_device) then, especially if you're not familiar with the process already.
|
||||
```
|
||||
|
||||
### Porting
|
||||
1. Navigate to your pkgbuilds checkout
|
||||
1. Follow the [general package porting guidelines](#porting-packages) to create a device-, kernel- and probably also a firmware-package for the device and SoC. Usually this roughly means porting the postmarketOS APKBUILDs to our PKGBUILD scheme.
|
||||
You can get inspiration by comparing existing Kupfer ports (e.g. one of the SDM845 devices) to the [postmarketOS packages](https://gitlab.com/postmarketOS/pmaports/-/tree/master/device) for that device.
|
||||
Usually you should start out by copying and then customizing the Kupfer packages for a device that's as similar to yours as possible, i.e. uses the same or a related SoC, if something like that is already available in Kupfer.
|
||||
```{hint} Package Repos:
|
||||
Device packages belong into `device/`, kernels into `linux/` and firmware into `firmware/`.
|
||||
```
|
||||
1. When submitting your MR, please include some information:
|
||||
- what you have found to be working, broken, and not tested (and why)
|
||||
- any necessary instructions for testing
|
||||
- whether you'd be willing to maintain the device long-term (test kernel upgrades, submit device package updates, etc.)
|
||||
|
||||
|
||||
### Gotchas
|
||||
|
||||
Please be aware of these gotchas:
|
||||
- As of now, Kupfer only really supports platforms using Android's `aboot` bootloader, i.e. ex-Android phones. In order to support other boot modes (e.g. uboot on the Librem5 and Pine devices), we'll need to port and switch to postmarketOS's [boot-deploy](https://gitlab.com/postmarketOS/boot-deploy) first and add support for EFI setups to Kupferbootstrap.
|
||||
|
||||
|
||||
## Porting packages
|
||||
|
||||
### Homework
|
||||
Before you can get started, you'll need to do some research:
|
||||
|
||||
1. Familiarize yourself with git basics.
|
||||
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
|
||||
|
||||
### Development
|
||||
|
||||
```{warning}
|
||||
Throughout the process, use git to version your changes.
|
||||
- Don't procrastinate using git or committing until you're "done" or "have got something working", you'll regret it.
|
||||
- Don't worry about a "clean" git history while you're developing; we can squash it up later.
|
||||
- \[Force-]Push your changes regularly, just like committing. Don't wait for perfection.
|
||||
```
|
||||
1. Create a new git branch for your package locally.
|
||||
```{hint}
|
||||
It might be a good ideaa to get into the habit of prefixing branch names with \[a part of] your username and a slash like so:
|
||||
`myNickname/myFeatureNme`
|
||||
This makes it easier to work in the same remote repo with multiple people.
|
||||
```
|
||||
1.
|
||||
```{note}
|
||||
The pkgbuilds git repo contains multiple package repositories, represented by folders at the top level (`main`, `cross`, `phosh`, etc.).
|
||||
```
|
||||
Try to choose a sensible package repo for your new packages and create new folders for each `pkgbase` inside the repo folder.
|
||||
1. Navigate into the folder of the new package and create a new `PKGBUILD`; fill it with life!
|
||||
1. **`_mode`**: Add the build mode at the top of the PKGBUILD.
|
||||
```{hint}
|
||||
If you're unsure what to pick, go with `_mode=host`. It'll use `crossdirect` to get speeds close to proper cross-compiling.
|
||||
```
|
||||
This determines whether it's built using a foreign-arch chroot (`_mode=host`) executed with qemu-user, or using real cross-compilation (`_mode=cross`) from a host-architecture chroot, but the package's build tooling has to specifically support the latter, so it's mostly useful for kernels and uncompiled packages.
|
||||
1. **`_nodeps`**: (Optional) If your package doesn't require its listed dependencies to build
|
||||
(usually because you're packaging a meta-package or only configs or scripts)
|
||||
you can add `_nodeps=true` as the next line after the `_mode=` line to speed up packaging.
|
||||
`makedeps` are still installed anyway.
|
||||
1. Test building it with `kupferbootstrap packages build $pkgbname`
|
||||
1. For any files and git repos downloaded by your PKGBUILD,
|
||||
add them to a new `.gitignore` file in the same directory as your `PKGBUILD`.
|
||||
```{hint}
|
||||
Don't forget to `git add` the new `.gitignore` file!
|
||||
```
|
||||
1. Run `kupferbootstrap packages check` to make sure the formatting for your PKGBUILDs is okay.
|
||||
```{warning}
|
||||
This is **not** optional. MRs with failing CI will **not** be merged.
|
||||
```
|
||||
|
||||
### Pushing
|
||||
1. Fork the Kupfer pkgbuilds repo on Gitlab using the Fork button
|
||||
1. Add your fork's **SSH** URI to your local git repo as a **new remote**: `git remote add fork git@gitlab...`
|
||||
1. `git push -u fork $branchname` it
|
||||
|
||||
### Submitting the MR
|
||||
When you're ready, open a Merge Request on the Kupfer pkgbuilds repo.
|
||||
|
||||
```{hint}
|
||||
Prefix the MR title with `Draft: ` to indicate a Work In Progress state.
|
||||
```
|
||||
|
9
docs/source/usage/quickstart.md
Normal file
9
docs/source/usage/quickstart.md
Normal file
|
@ -0,0 +1,9 @@
|
|||
# Quickstart
|
||||
|
||||
1. [Install](../install) Kupferbootstrap
|
||||
1. [Configure](../config) it: `kuperbootstrap config init`
|
||||
1. [Update your PKGBUILDs + SRCINFO cache](../../cli/packages#kupferbootstrap-packages-update): `kupferbootstrap packages update`
|
||||
1. [Build an image](../../cli/image#kupferbootstrap-image-build): `kupferbootstrap image build`
|
||||
1. [Flash the image](../../cli/image#kupferbootstrap-image-flash): `kupferbootstrap image flash abootimg && kupferbootstrap image flash full userdata`
|
||||
|
||||
See also: [Frequently Asked Questions](../faq)
|
|
@ -5,7 +5,9 @@ import subprocess
|
|||
|
||||
from subprocess import CompletedProcess # make it easy for users of this module
|
||||
from shlex import quote as shell_quote
|
||||
from typing import IO, Optional, Union, TypeAlias
|
||||
from typing import IO, Optional, Union
|
||||
|
||||
from typehelpers import TypeAlias
|
||||
|
||||
ElevationMethod: TypeAlias = str
|
||||
|
||||
|
@ -38,6 +40,8 @@ def flatten_shell_script(script: Union[list[str], str], shell_quote_items: bool
|
|||
cmds = script
|
||||
if shell_quote_items:
|
||||
cmds = [shell_quote(i) for i in cmds]
|
||||
else:
|
||||
cmds = [(i if i != '' else '""') for i in cmds]
|
||||
script = " ".join(cmds)
|
||||
if wrap_in_shell_quote:
|
||||
script = shell_quote(script)
|
||||
|
|
18
exec/file.py
18
exec/file.py
|
@ -8,7 +8,7 @@ from shutil import rmtree
|
|||
from tempfile import mkdtemp
|
||||
from typing import Optional, Union
|
||||
|
||||
from .cmd import run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote
|
||||
from .cmd import run_cmd, run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote
|
||||
from utils import get_user_name, get_group_name
|
||||
|
||||
|
||||
|
@ -41,7 +41,7 @@ def chown(path: str, user: Optional[Union[str, int]] = None, group: Optional[Uni
|
|||
raise Exception(f"Failed to change owner of '{path}' to '{owner}'")
|
||||
|
||||
|
||||
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True):
|
||||
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True, privileged: bool = True):
|
||||
if not isinstance(mode, str):
|
||||
octal = oct(mode)[2:]
|
||||
else:
|
||||
|
@ -54,7 +54,7 @@ def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True):
|
|||
os.chmod(path, mode=octal) # type: ignore
|
||||
except:
|
||||
cmd = ["chmod", octal, path]
|
||||
result = run_root_cmd(cmd)
|
||||
result = run_cmd(cmd, switch_user='root' if privileged else None)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode:
|
||||
raise Exception(f"Failed to set mode of '{path}' to '{chmod}'")
|
||||
|
@ -144,7 +144,13 @@ def remove_file(path: str, recursive=False):
|
|||
raise Exception(f"Unable to remove {path}: cmd returned {rc}")
|
||||
|
||||
|
||||
def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None, parents: bool = True):
|
||||
def makedir(
|
||||
path,
|
||||
user: Optional[Union[str, int]] = None,
|
||||
group: Optional[Union[str, int]] = None,
|
||||
parents: bool = True,
|
||||
mode: Optional[Union[int, str]] = None,
|
||||
):
|
||||
if not root_check_exists(path):
|
||||
try:
|
||||
if parents:
|
||||
|
@ -153,6 +159,8 @@ def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[
|
|||
os.mkdir(path)
|
||||
except:
|
||||
run_root_cmd(['mkdir'] + (['-p'] if parents else []) + [path])
|
||||
if mode is not None:
|
||||
chmod(path, mode=mode)
|
||||
chown(path, user, group)
|
||||
|
||||
|
||||
|
@ -174,7 +182,7 @@ def symlink(source, target):
|
|||
def get_temp_dir(register_cleanup=True, mode: int = 0o0755):
|
||||
"create a new tempdir and sanitize ownership so root can access user files as god intended"
|
||||
t = mkdtemp()
|
||||
chmod(t, mode)
|
||||
chmod(t, mode, privileged=False)
|
||||
if register_cleanup:
|
||||
atexit.register(remove_file, t, recursive=True)
|
||||
return t
|
||||
|
|
|
@ -4,10 +4,11 @@ import logging
|
|||
from json import dumps as json_dump
|
||||
from typing import Optional
|
||||
|
||||
from config.cli import resolve_profile_field
|
||||
from config.state import config
|
||||
from utils import colors_supported, color_str
|
||||
from utils import color_mark_selected, colors_supported
|
||||
|
||||
from .flavour import get_flavours, get_profile_flavour
|
||||
from .flavour import get_flavours, get_flavour
|
||||
|
||||
profile_option = click.option('-p', '--profile', help="name of the profile to use", required=False, default=None)
|
||||
|
||||
|
@ -23,13 +24,17 @@ def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
|
|||
flavours = get_flavours()
|
||||
interactive_json = json and not output_file
|
||||
use_colors = colors_supported(config.runtime.colors) and not interactive_json
|
||||
profile_name = config.file.profiles.current
|
||||
selected, inherited_from = None, None
|
||||
if output_file:
|
||||
json = True
|
||||
if not flavours:
|
||||
raise Exception("No flavours found!")
|
||||
if not interactive_json:
|
||||
try:
|
||||
profile_flavour = get_profile_flavour()
|
||||
selected, inherited_from = resolve_profile_field(None, profile_name, 'flavour', config.file.profiles)
|
||||
if selected:
|
||||
profile_flavour = get_flavour(selected)
|
||||
except Exception as ex:
|
||||
logging.debug(f"Failed to get profile flavour for marking as currently selected, continuing anyway. Exception: {ex}")
|
||||
for name in sorted(flavours.keys()):
|
||||
|
@ -39,15 +44,11 @@ def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
|
|||
except Exception as ex:
|
||||
logging.debug(f"A problem happened while parsing flavourinfo for {name}, continuing anyway. Exception: {ex}")
|
||||
if not interactive_json:
|
||||
block = [*f.nice_str(newlines=True, colors=use_colors).split('\n'), '']
|
||||
snippet = f.nice_str(newlines=True, colors=use_colors)
|
||||
if profile_flavour == f:
|
||||
prefix = color_str('>>> ', bold=True, fg='bright_green', use_colors=use_colors)
|
||||
block += [
|
||||
color_str("Currently selected by profile ", bold=True, use_colors=use_colors) +
|
||||
color_str(f'"{config.file.profiles.current}"\n', bold=True, fg="bright_green")
|
||||
]
|
||||
block = [prefix + line for line in block]
|
||||
results += block
|
||||
snippet = color_mark_selected(snippet, profile_name or '[unknown]', inherited_from)
|
||||
snippet += '\n'
|
||||
results += snippet.split('\n')
|
||||
if json:
|
||||
d = dict(f)
|
||||
d["description"] = f.flavour_info.description if (f.flavour_info and f.flavour_info.description) else f.description
|
||||
|
@ -58,7 +59,7 @@ def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
|
|||
d["pkgbuild"] = f.pkgbuild.path if f.pkgbuild else None
|
||||
d["package"] = f.pkgbuild.name
|
||||
d["arches"] = sorted(f.pkgbuild.arches) if f.pkgbuild else None
|
||||
json_results[d["name"]] = d
|
||||
json_results[name] = d
|
||||
print()
|
||||
if output_file:
|
||||
with open(output_file, 'w') as fd:
|
||||
|
|
|
@ -8,12 +8,12 @@ from typing import Optional
|
|||
|
||||
from config.state import config
|
||||
from constants import FLAVOUR_DESCRIPTION_PREFIX, FLAVOUR_INFO_FILE
|
||||
from dataclass import DataClass
|
||||
from dictscheme import DictScheme
|
||||
from packages.pkgbuild import discover_pkgbuilds, get_pkgbuild_by_name, init_pkgbuilds, Pkgbuild
|
||||
from utils import color_str
|
||||
|
||||
|
||||
class FlavourInfo(DataClass):
|
||||
class FlavourInfo(DictScheme):
|
||||
rootfs_size: int # rootfs size in GB
|
||||
description: Optional[str]
|
||||
|
||||
|
@ -21,7 +21,7 @@ class FlavourInfo(DataClass):
|
|||
return f'rootfs_size: {self.rootfs_size}'
|
||||
|
||||
|
||||
class Flavour(DataClass):
|
||||
class Flavour(DictScheme):
|
||||
name: str
|
||||
pkgbuild: Pkgbuild
|
||||
description: str
|
||||
|
@ -53,7 +53,7 @@ class Flavour(DataClass):
|
|||
def get_lines(k, v, key_prefix=''):
|
||||
results = []
|
||||
full_k = f'{key_prefix}.{k}' if key_prefix else k
|
||||
if not isinstance(v, (dict, DataClass)):
|
||||
if not isinstance(v, (dict, DictScheme)):
|
||||
results = [f'{color_str(full_k, bold=True)}: {v}']
|
||||
else:
|
||||
for _k, _v in v.items():
|
||||
|
|
|
@ -12,27 +12,42 @@ from flavours.flavour import get_profile_flavour
|
|||
from flavours.cli import profile_option
|
||||
from wrapper import enforce_wrap
|
||||
|
||||
from .fastboot import fastboot_boot, fastboot_erase_dtbo
|
||||
from .fastboot import fastboot_boot, fastboot_erase
|
||||
from .image import get_device_name, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd
|
||||
|
||||
LK2ND = FLASH_PARTS['LK2ND']
|
||||
ABOOT = FLASH_PARTS['ABOOT']
|
||||
|
||||
TYPES = [LK2ND, JUMPDRIVE, ABOOT]
|
||||
BOOT_TYPES = [ABOOT, LK2ND, JUMPDRIVE]
|
||||
|
||||
|
||||
@click.command(name='boot')
|
||||
@profile_option
|
||||
@click.argument('type', required=False, default=ABOOT, type=click.Choice(TYPES))
|
||||
def cmd_boot(type: str, profile: Optional[str] = None):
|
||||
@click.argument('type', required=False, default=ABOOT, type=click.Choice(BOOT_TYPES))
|
||||
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
|
||||
@click.option(
|
||||
'--erase-dtbo/--no-erase-dtbo',
|
||||
is_flag=True,
|
||||
default=True,
|
||||
show_default=True,
|
||||
help="Erase the DTBO partition before flashing",
|
||||
)
|
||||
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
|
||||
def cmd_boot(
|
||||
type: str,
|
||||
profile: Optional[str] = None,
|
||||
sector_size: Optional[int] = None,
|
||||
erase_dtbo: bool = True,
|
||||
confirm: bool = False,
|
||||
):
|
||||
"""Boot JumpDrive or the Kupfer aboot image. Erases Android DTBO in the process."""
|
||||
enforce_wrap()
|
||||
device = get_profile_device(profile)
|
||||
flavour = get_profile_flavour(profile).name
|
||||
deviceinfo = device.parse_deviceinfo()
|
||||
sector_size = deviceinfo.flash_pagesize
|
||||
sector_size = sector_size or device.get_image_sectorsize_default()
|
||||
if not sector_size:
|
||||
raise Exception(f"Device {device.name} has no flash_pagesize specified")
|
||||
raise Exception(f"Device {device.name} has no rootfs_image_sector_size specified")
|
||||
image_path = get_image_path(device, flavour)
|
||||
strategy = deviceinfo.flash_method
|
||||
if not strategy:
|
||||
|
@ -53,7 +68,8 @@ def cmd_boot(type: str, profile: Optional[str] = None):
|
|||
path = dump_aboot(loop_device + 'p1')
|
||||
else:
|
||||
raise Exception(f'Unknown boot image type {type}')
|
||||
fastboot_erase_dtbo()
|
||||
fastboot_boot(path)
|
||||
if erase_dtbo:
|
||||
fastboot_erase('dtbo', confirm=confirm)
|
||||
fastboot_boot(path, confirm=confirm)
|
||||
else:
|
||||
raise Exception(f"Unknown flash strategy {strategy} for device {device.name}")
|
||||
raise Exception(f'Unsupported flash strategy "{strategy}" for device {device.name}')
|
||||
|
|
|
@ -1,37 +1,65 @@
|
|||
import click
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from exec.cmd import run_cmd, CompletedProcess
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def fastboot_erase_dtbo():
|
||||
logging.info("Fastboot: Erasing DTBO")
|
||||
subprocess.run(
|
||||
[
|
||||
'fastboot',
|
||||
'erase',
|
||||
'dtbo',
|
||||
],
|
||||
def confirm_cmd(cmd: list[str], color='green', default=True, msg='Really execute fastboot cmd?') -> bool:
|
||||
return click.confirm(
|
||||
f'{click.style(msg, fg=color, bold=True)} {" ".join(cmd)}',
|
||||
default=default,
|
||||
abort=False,
|
||||
)
|
||||
|
||||
|
||||
def fastboot_erase(target: str, confirm: bool = False):
|
||||
if not target:
|
||||
raise Exception(f"No fastboot erase target specified: {repr(target)}")
|
||||
cmd = [
|
||||
'fastboot',
|
||||
'erase',
|
||||
target,
|
||||
]
|
||||
if confirm:
|
||||
if not confirm_cmd(cmd, msg=f'Really erase fastboot "{target}" partition?', color='yellow'):
|
||||
raise Exception("user aborted")
|
||||
logging.info(f"Fastboot: Erasing {target}")
|
||||
run_cmd(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
)
|
||||
|
||||
|
||||
def fastboot_flash(partition, file):
|
||||
logging.info(f"Fastboot: Flashing {file} to {partition}")
|
||||
result = subprocess.run([
|
||||
def fastboot_flash(partition: str, file: str, sparse_size: Optional[str] = None, confirm: bool = False):
|
||||
cmd = [
|
||||
'fastboot',
|
||||
*(['-S', sparse_size] if sparse_size is not None else []),
|
||||
'flash',
|
||||
partition,
|
||||
file,
|
||||
])
|
||||
]
|
||||
if confirm:
|
||||
if not confirm_cmd(cmd):
|
||||
raise Exception("user aborted")
|
||||
logging.info(f"Fastboot: Flashing {file} to {partition}")
|
||||
result = run_cmd(cmd)
|
||||
assert isinstance(result, CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to flash {file}')
|
||||
|
||||
|
||||
def fastboot_boot(file):
|
||||
logging.info(f"Fastboot: booting {file}")
|
||||
result = subprocess.run([
|
||||
def fastboot_boot(file, confirm: bool = False):
|
||||
cmd = [
|
||||
'fastboot',
|
||||
'boot',
|
||||
file,
|
||||
])
|
||||
]
|
||||
if confirm:
|
||||
if not confirm_cmd(cmd):
|
||||
raise Exception("user aborted")
|
||||
logging.info(f"Fastboot: booting {file}")
|
||||
result = run_cmd(cmd)
|
||||
assert isinstance(result, CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to boot {file} using fastboot')
|
||||
|
|
152
image/flash.py
152
image/flash.py
|
@ -1,10 +1,11 @@
|
|||
import shutil
|
||||
import os
|
||||
import click
|
||||
import logging
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from constants import FLASH_PARTS, LOCATIONS
|
||||
from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE
|
||||
from exec.cmd import run_root_cmd
|
||||
from exec.file import get_temp_dir
|
||||
from devices.device import get_profile_device
|
||||
|
@ -13,84 +14,137 @@ from flavours.cli import profile_option
|
|||
from wrapper import enforce_wrap
|
||||
|
||||
from .fastboot import fastboot_flash
|
||||
from .image import dd_image, partprobe, shrink_fs, losetup_rootfs_image, losetup_destroy, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_name, get_image_path
|
||||
from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs
|
||||
|
||||
ABOOT = FLASH_PARTS['ABOOT']
|
||||
LK2ND = FLASH_PARTS['LK2ND']
|
||||
QHYPSTUB = FLASH_PARTS['QHYPSTUB']
|
||||
ROOTFS = FLASH_PARTS['ROOTFS']
|
||||
FULL_IMG = FLASH_PARTS['FULL']
|
||||
|
||||
DD = 'dd'
|
||||
|
||||
FLASH_METHODS = [FASTBOOT, JUMPDRIVE, DD]
|
||||
|
||||
|
||||
def find_jumpdrive(location: str) -> str:
|
||||
if location not in LOCATIONS:
|
||||
raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}')
|
||||
dir = '/dev/disk/by-id'
|
||||
for file in os.listdir(dir):
|
||||
sanitized_file = file.replace('-', '').replace('_', '').lower()
|
||||
if f'jumpdrive{location.split("-")[0]}' in sanitized_file:
|
||||
return os.path.realpath(os.path.join(dir, file))
|
||||
raise Exception('Unable to discover Jumpdrive')
|
||||
|
||||
|
||||
def test_blockdev(path: str):
|
||||
partprobe(path)
|
||||
result = run_root_cmd(['lsblk', path, '-o', 'SIZE'], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to lsblk {path}')
|
||||
if result.stdout == b'SIZE\n 0B\n':
|
||||
raise Exception(f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no'
|
||||
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
|
||||
|
||||
|
||||
def prepare_minimal_image(source_path: str, sector_size: int) -> str:
|
||||
minimal_image_dir = get_temp_dir(register_cleanup=True)
|
||||
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}')
|
||||
logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking")
|
||||
shutil.copyfile(source_path, minimal_image_path)
|
||||
|
||||
loop_device = losetup_rootfs_image(minimal_image_path, sector_size)
|
||||
partprobe(loop_device)
|
||||
shrink_fs(loop_device, minimal_image_path, sector_size)
|
||||
losetup_destroy(loop_device)
|
||||
return minimal_image_path
|
||||
|
||||
|
||||
@click.command(name='flash')
|
||||
@profile_option
|
||||
@click.option('-m', '--method', type=click.Choice(FLASH_METHODS))
|
||||
@click.option('--split-size', help='Chunk size when splitting the image into sparse files via fastboot')
|
||||
@click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size")
|
||||
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
|
||||
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
|
||||
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
|
||||
@click.argument('location', type=str, required=False)
|
||||
def cmd_flash(what: str, location: str, profile: Optional[str] = None):
|
||||
"""Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard"""
|
||||
def cmd_flash(
|
||||
what: str,
|
||||
location: str,
|
||||
method: Optional[str] = None,
|
||||
split_size: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
shrink: bool = True,
|
||||
sector_size: Optional[int] = None,
|
||||
confirm: bool = False,
|
||||
):
|
||||
"""
|
||||
Flash a partition onto a device.
|
||||
|
||||
The syntax of LOCATION depends on the flashing method and is usually only required for flashing "full":
|
||||
|
||||
\b
|
||||
- fastboot: the regular fastboot partition identifier. Usually "userdata"
|
||||
- dd: a path to a block device
|
||||
- jumpdrive: one of "emmc", "sdcard" or a path to a block device
|
||||
"""
|
||||
enforce_wrap()
|
||||
device = get_profile_device(profile)
|
||||
flavour = get_profile_flavour(profile).name
|
||||
device_image_name = get_image_name(device, flavour)
|
||||
device_image_path = get_image_path(device, flavour)
|
||||
|
||||
deviceinfo = device.parse_deviceinfo()
|
||||
sector_size = deviceinfo.flash_pagesize
|
||||
if not sector_size:
|
||||
raise Exception(f"Device {device.name} has no flash_pagesize specified")
|
||||
sector_size = sector_size or device.get_image_sectorsize_default()
|
||||
method = method or deviceinfo.flash_method
|
||||
|
||||
if what not in FLASH_PARTS.values():
|
||||
raise Exception(f'Unknown what "{what}", must be one of {", ".join(FLASH_PARTS.values())}')
|
||||
|
||||
if what == ROOTFS:
|
||||
if location is None:
|
||||
raise Exception(f'You need to specify a location to flash {what} to')
|
||||
if location and location.startswith('aboot'):
|
||||
raise Exception("You're trying to flash something "
|
||||
f"to your aboot partition ({location!r}), "
|
||||
"which contains the android bootloader itself.\n"
|
||||
"This will brick your phone and is not what you want.\n"
|
||||
'Aborting.\nDid you mean to flash to "boot"?')
|
||||
|
||||
if what == FULL_IMG:
|
||||
path = ''
|
||||
if location.startswith("/dev/"):
|
||||
path = location
|
||||
if method not in FLASH_METHODS:
|
||||
raise Exception(f"Flash method {method} not supported!")
|
||||
if not location:
|
||||
raise Exception(f'You need to specify a location to flash {what} to')
|
||||
path = ''
|
||||
image_path = prepare_minimal_image(device_image_path, sector_size) if shrink else device_image_path
|
||||
if method == FASTBOOT:
|
||||
fastboot_flash(
|
||||
partition=location,
|
||||
file=image_path,
|
||||
sparse_size=split_size if split_size is not None else '100M',
|
||||
confirm=confirm,
|
||||
)
|
||||
elif method in [JUMPDRIVE, DD]:
|
||||
if method == DD or location.startswith("/") or (location not in LOCATIONS and os.path.exists(location)):
|
||||
path = location
|
||||
elif method == JUMPDRIVE:
|
||||
path = find_jumpdrive(location)
|
||||
test_blockdev(path)
|
||||
if dd_image(input=image_path, output=path).returncode != 0:
|
||||
raise Exception(f'Failed to flash {image_path} to {path}')
|
||||
else:
|
||||
if location not in LOCATIONS:
|
||||
raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}')
|
||||
|
||||
dir = '/dev/disk/by-id'
|
||||
for file in os.listdir(dir):
|
||||
sanitized_file = file.replace('-', '').replace('_', '').lower()
|
||||
if f'jumpdrive{location.split("-")[0]}' in sanitized_file:
|
||||
path = os.path.realpath(os.path.join(dir, file))
|
||||
partprobe(path)
|
||||
result = run_root_cmd(['lsblk', path, '-o', 'SIZE'], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to lsblk {path}')
|
||||
if result.stdout == b'SIZE\n 0B\n':
|
||||
raise Exception(f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no'
|
||||
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
|
||||
if path == '':
|
||||
raise Exception('Unable to discover Jumpdrive')
|
||||
|
||||
minimal_image_dir = get_temp_dir(register_cleanup=True)
|
||||
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{device_image_name}')
|
||||
|
||||
shutil.copyfile(device_image_path, minimal_image_path)
|
||||
|
||||
loop_device = losetup_rootfs_image(minimal_image_path, sector_size)
|
||||
partprobe(loop_device)
|
||||
shrink_fs(loop_device, minimal_image_path, sector_size)
|
||||
losetup_destroy(loop_device)
|
||||
|
||||
result = dd_image(input=minimal_image_path, output=path)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to flash {minimal_image_path} to {path}')
|
||||
raise Exception(f'Unhandled flash method "{method}" for "{what}"')
|
||||
else:
|
||||
if method and method != FASTBOOT:
|
||||
raise Exception(f'Flashing "{what}" with method "{method}" not supported, try no parameter or "{FASTBOOT}"')
|
||||
loop_device = losetup_rootfs_image(device_image_path, sector_size)
|
||||
if what == ABOOT:
|
||||
path = dump_aboot(f'{loop_device}p1')
|
||||
fastboot_flash('boot', path)
|
||||
fastboot_flash(location or 'boot', path, confirm=confirm)
|
||||
elif what == LK2ND:
|
||||
path = dump_lk2nd(f'{loop_device}p1')
|
||||
fastboot_flash('lk2nd', path)
|
||||
fastboot_flash(location or 'lk2nd', path, confirm=confirm)
|
||||
elif what == QHYPSTUB:
|
||||
path = dump_qhypstub(f'{loop_device}p1')
|
||||
fastboot_flash('qhypstub', path)
|
||||
fastboot_flash(location or 'qhypstub', path, confirm=confirm)
|
||||
else:
|
||||
raise Exception(f'Unknown what "{what}", this must be a bug in kupferbootstrap!')
|
||||
|
|
199
image/image.py
199
image/image.py
|
@ -11,11 +11,11 @@ from typing import Optional, Union
|
|||
|
||||
from config.state import config, Profile
|
||||
from chroot.device import DeviceChroot, get_device_chroot
|
||||
from constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_CMDS
|
||||
from constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_INSTALL_CMDS
|
||||
from distro.distro import get_base_distro, get_kupfer_https
|
||||
from devices.device import Device, get_profile_device
|
||||
from exec.cmd import run_root_cmd, generate_cmd_su
|
||||
from exec.file import root_write_file, root_makedir, makedir
|
||||
from exec.file import get_temp_dir, root_write_file, root_makedir, makedir
|
||||
from flavours.flavour import Flavour, get_profile_flavour
|
||||
from net.ssh import copy_ssh_keys
|
||||
from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds
|
||||
|
@ -44,10 +44,39 @@ def partprobe(device: str):
|
|||
return run_root_cmd(['partprobe', device])
|
||||
|
||||
|
||||
def bytes_to_sectors(b: int, sector_size: int, round_up: bool = True):
|
||||
sectors, rest = divmod(b, sector_size)
|
||||
if rest and round_up:
|
||||
sectors += 1
|
||||
return sectors
|
||||
|
||||
|
||||
def get_fs_size(partition: str) -> tuple[int, int]:
|
||||
blocks_cmd = run_root_cmd(['dumpe2fs', '-h', partition], env={"LC_ALL": "C"}, capture_output=True)
|
||||
if blocks_cmd.returncode != 0:
|
||||
logging.debug(f"dumpe2fs stdout:\n: {blocks_cmd.stdout}")
|
||||
logging.debug(f"dumpe2fs stderr:\n {blocks_cmd.stderr}")
|
||||
raise Exception(f'Failed to detect new filesystem size of {partition}')
|
||||
blocks_text = blocks_cmd.stdout.decode('utf-8') if blocks_cmd.stdout else ''
|
||||
try:
|
||||
fs_blocks = int(re.search('\\nBlock count:[ ]+([0-9]+)\\n', blocks_text, flags=re.MULTILINE).group(1)) # type: ignore[union-attr]
|
||||
fs_block_size = int(re.search('\\nBlock size:[ ]+([0-9]+)\\n', blocks_text).group(1)) # type: ignore[union-attr]
|
||||
except Exception as ex:
|
||||
logging.debug(f"dumpe2fs stdout:\n {blocks_text}")
|
||||
logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}")
|
||||
logging.info("Failed to scrape block size and count from dumpe2fs:", ex)
|
||||
raise ex
|
||||
return fs_blocks, fs_block_size
|
||||
|
||||
|
||||
def align_bytes(size_bytes: int, alignment: int = 4096) -> int:
|
||||
rest = size_bytes % alignment
|
||||
if rest:
|
||||
size_bytes += alignment - rest
|
||||
return size_bytes
|
||||
|
||||
|
||||
def shrink_fs(loop_device: str, file: str, sector_size: int):
|
||||
# 8: 512 bytes sectors
|
||||
# 1: 4096 bytes sectors
|
||||
sectors_blocks_factor = 4096 // sector_size
|
||||
partprobe(loop_device)
|
||||
logging.debug(f"Checking filesystem at {loop_device}p2")
|
||||
result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2'])
|
||||
|
@ -55,18 +84,16 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
|
|||
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
|
||||
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}')
|
||||
|
||||
logging.debug(f'Shrinking filesystem at {loop_device}p2')
|
||||
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2'], capture_output=True)
|
||||
logging.info(f'Shrinking filesystem at {loop_device}p2')
|
||||
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2'])
|
||||
if result.returncode != 0:
|
||||
print(result.stdout)
|
||||
print(result.stderr)
|
||||
raise Exception(f'Failed to resize2fs {loop_device}p2')
|
||||
|
||||
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
|
||||
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore
|
||||
sectors = blocks * sectors_blocks_factor
|
||||
logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2')
|
||||
fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2')
|
||||
sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size)
|
||||
|
||||
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
|
||||
logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)')
|
||||
child_proccess = subprocess.Popen(
|
||||
generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore
|
||||
stdin=subprocess.PIPE,
|
||||
|
@ -92,7 +119,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
|
|||
if returncode > 1:
|
||||
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk')
|
||||
|
||||
partprobe(loop_device)
|
||||
partprobe(loop_device).check_returncode()
|
||||
|
||||
logging.debug(f'Finding end sector of partition at {loop_device}p2')
|
||||
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True)
|
||||
|
@ -110,7 +137,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
|
|||
if end_sector == 0:
|
||||
raise Exception(f'Failed to find end sector of {loop_device}p2')
|
||||
|
||||
end_size = (end_sector + 1) * sector_size
|
||||
end_size = align_bytes((end_sector + 1) * sector_size, 4096)
|
||||
|
||||
logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes')
|
||||
logging.info(f'Truncating {file} to {end_size} bytes')
|
||||
|
@ -200,14 +227,14 @@ def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot):
|
|||
|
||||
|
||||
def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None):
|
||||
target_path = target_path or os.path.join('/tmp', os.path.basename(file_path))
|
||||
target_path = target_path or os.path.join(get_temp_dir(), os.path.basename(file_path))
|
||||
result = run_root_cmd([
|
||||
'debugfs',
|
||||
image_path,
|
||||
'-R',
|
||||
f'\'dump /{file_path.lstrip("/")} {target_path}\'',
|
||||
])
|
||||
if result.returncode != 0:
|
||||
if result.returncode != 0 or not os.path.exists(target_path):
|
||||
raise Exception(f'Failed to dump {file_path} from /boot')
|
||||
return target_path
|
||||
|
||||
|
@ -254,30 +281,31 @@ def partition_device(device: str):
|
|||
raise Exception(f'Failed to create partitions on {device}')
|
||||
|
||||
|
||||
def create_filesystem(device: str, blocksize: int = 4096, label=None, options=[], fstype='ext4'):
|
||||
# blocksize can be 4k max due to pagesize
|
||||
blocksize = min(blocksize, 4096)
|
||||
if fstype.startswith('ext'):
|
||||
# blocksize for ext-fs must be >=1024
|
||||
blocksize = max(blocksize, 1024)
|
||||
|
||||
def create_filesystem(device: str, blocksize: Optional[int], label=None, options=[], fstype='ext4'):
|
||||
"""Creates a new filesystem. Blocksize defaults"""
|
||||
labels = ['-L', label] if label else []
|
||||
cmd = [
|
||||
f'mkfs.{fstype}',
|
||||
'-F',
|
||||
'-b',
|
||||
str(blocksize),
|
||||
] + labels + [device]
|
||||
cmd = [f'mkfs.{fstype}', '-F', *labels]
|
||||
if blocksize:
|
||||
# blocksize can be 4k max due to pagesize
|
||||
blocksize = min(blocksize, 4096)
|
||||
if fstype.startswith('ext'):
|
||||
# blocksize for ext-fs must be >=1024
|
||||
blocksize = max(blocksize, 1024)
|
||||
cmd += [
|
||||
'-b',
|
||||
str(blocksize),
|
||||
]
|
||||
cmd.append(device)
|
||||
result = run_root_cmd(cmd)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}')
|
||||
|
||||
|
||||
def create_root_fs(device: str, blocksize: int):
|
||||
def create_root_fs(device: str, blocksize: Optional[int]):
|
||||
create_filesystem(device, blocksize=blocksize, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000'])
|
||||
|
||||
|
||||
def create_boot_fs(device: str, blocksize: int):
|
||||
def create_boot_fs(device: str, blocksize: Optional[int]):
|
||||
create_filesystem(device, blocksize=blocksize, label='kupfer_boot', fstype='ext2')
|
||||
|
||||
|
||||
|
@ -305,8 +333,9 @@ def install_rootfs(
|
|||
)
|
||||
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
|
||||
copy_ssh_keys(
|
||||
chroot.path,
|
||||
chroot,
|
||||
user=user,
|
||||
allow_fail=True,
|
||||
)
|
||||
files = {
|
||||
'etc/pacman.conf': get_base_distro(arch).get_pacman_conf(
|
||||
|
@ -318,12 +347,13 @@ def install_rootfs(
|
|||
}
|
||||
for target, content in files.items():
|
||||
root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)
|
||||
if POST_CMDS:
|
||||
logging.info("Running post-install CMDs")
|
||||
result = chroot.run_cmd(' && '.join(POST_CMDS))
|
||||
|
||||
logging.info("Running post-install CMDs")
|
||||
for cmd in POST_INSTALL_CMDS:
|
||||
result = chroot.run_cmd(cmd)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception('Error running post_cmds')
|
||||
raise Exception(f'Error running post-install cmd: {cmd}')
|
||||
|
||||
logging.info('Preparing to unmount chroot')
|
||||
res = chroot.run_cmd('sync && umount /boot', attach_tty=True)
|
||||
|
@ -341,33 +371,61 @@ def cmd_image():
|
|||
"""Build, flash and boot device images"""
|
||||
|
||||
|
||||
sectorsize_option = click.option(
|
||||
'-b',
|
||||
'--sector-size',
|
||||
help="Override the device's sector size",
|
||||
type=int,
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
@cmd_image.command(name='build')
|
||||
@click.argument('profile_name', required=False)
|
||||
@click.option('--local-repos/--no-local-repos',
|
||||
'-l/-L',
|
||||
default=True,
|
||||
show_default=True,
|
||||
help='Whether to use local package repos at all or only use HTTPS repos.')
|
||||
@click.option('--build-pkgs/--no-build-pkgs',
|
||||
'-p/-P',
|
||||
default=True,
|
||||
show_default=True,
|
||||
help='Whether to build missing/outdated local packages if local repos are enabled.')
|
||||
@click.option('--no-download-pkgs',
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help='Disable trying to download packages instead of building if building is enabled.')
|
||||
@click.option('--block-target', type=click.Path(), default=None, help='Override the block device file to write the final image to')
|
||||
@click.option('--skip-part-images',
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help='Skip creating image files for the partitions and directly work on the target block device.')
|
||||
def cmd_build(profile_name: Optional[str] = None,
|
||||
local_repos: bool = True,
|
||||
build_pkgs: bool = True,
|
||||
no_download_pkgs=False,
|
||||
block_target: Optional[str] = None,
|
||||
skip_part_images: bool = False):
|
||||
@click.option(
|
||||
'--local-repos/--no-local-repos',
|
||||
'-l/-L',
|
||||
help='Whether to use local package repos at all or only use HTTPS repos.',
|
||||
default=True,
|
||||
show_default=True,
|
||||
is_flag=True,
|
||||
)
|
||||
@click.option(
|
||||
'--build-pkgs/--no-build-pkgs',
|
||||
'-p/-P',
|
||||
help='Whether to build missing/outdated local packages if local repos are enabled.',
|
||||
default=True,
|
||||
show_default=True,
|
||||
is_flag=True,
|
||||
)
|
||||
@click.option(
|
||||
'--no-download-pkgs',
|
||||
help='Disable trying to download packages instead of building if building is enabled.',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
)
|
||||
@click.option(
|
||||
'--block-target',
|
||||
help='Override the block device file to write the final image to',
|
||||
type=click.Path(),
|
||||
default=None,
|
||||
)
|
||||
@click.option(
|
||||
'--skip-part-images',
|
||||
help='Skip creating image files for the partitions and directly work on the target block device.',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
)
|
||||
@sectorsize_option
|
||||
def cmd_build(
|
||||
profile_name: Optional[str] = None,
|
||||
local_repos: bool = True,
|
||||
build_pkgs: bool = True,
|
||||
no_download_pkgs=False,
|
||||
block_target: Optional[str] = None,
|
||||
sector_size: Optional[int] = None,
|
||||
skip_part_images: bool = False,
|
||||
):
|
||||
"""
|
||||
Build a device image.
|
||||
|
||||
|
@ -398,10 +456,7 @@ def cmd_build(profile_name: Optional[str] = None,
|
|||
pkgbuilds |= set(filter_pkgbuilds(packages_extra, arch=arch, allow_empty_results=True, use_paths=False))
|
||||
build_packages(pkgbuilds, arch, try_download=not no_download_pkgs)
|
||||
|
||||
deviceinfo = device.parse_deviceinfo()
|
||||
sector_size = deviceinfo.flash_pagesize
|
||||
if not sector_size:
|
||||
raise Exception(f"Device {device.name} has no flash_pagesize specified")
|
||||
sector_size = sector_size or device.get_image_sectorsize()
|
||||
|
||||
image_path = block_target or get_image_path(device, flavour.name)
|
||||
|
||||
|
@ -410,7 +465,7 @@ def cmd_build(profile_name: Optional[str] = None,
|
|||
logging.info(f'Creating new file at {image_path}')
|
||||
create_img_file(image_path, f"{rootfs_size_mb}M")
|
||||
|
||||
loop_device = losetup_rootfs_image(image_path, sector_size)
|
||||
loop_device = losetup_rootfs_image(image_path, sector_size or device.get_image_sectorsize_default())
|
||||
|
||||
partition_device(loop_device)
|
||||
partprobe(loop_device)
|
||||
|
@ -453,19 +508,17 @@ def cmd_build(profile_name: Optional[str] = None,
|
|||
|
||||
@cmd_image.command(name='inspect')
|
||||
@click.option('--shell', '-s', is_flag=True)
|
||||
@sectorsize_option
|
||||
@click.argument('profile', required=False)
|
||||
def cmd_inspect(profile: Optional[str] = None, shell: bool = False):
|
||||
"""Open a shell in a device image"""
|
||||
def cmd_inspect(profile: Optional[str] = None, shell: bool = False, sector_size: Optional[int] = None):
|
||||
"""Loop-mount the device image for inspection."""
|
||||
config.enforce_profile_device_set()
|
||||
config.enforce_profile_flavour_set()
|
||||
enforce_wrap()
|
||||
device = get_profile_device(profile)
|
||||
arch = device.arch
|
||||
flavour = get_profile_flavour(profile).name
|
||||
deviceinfo = device.parse_deviceinfo()
|
||||
sector_size = deviceinfo.flash_pagesize
|
||||
if not sector_size:
|
||||
raise Exception(f"Device {device.name} has no flash_pagesize specified")
|
||||
sector_size = sector_size or device.get_image_sectorsize_default()
|
||||
|
||||
chroot = get_device_chroot(device.name, flavour, arch)
|
||||
image_path = get_image_path(device, flavour)
|
||||
|
|
|
@ -37,6 +37,11 @@ def ctx() -> click.Context:
|
|||
return click.Context(click.Command('integration_tests'))
|
||||
|
||||
|
||||
def test_main_import():
|
||||
from main import cli
|
||||
assert cli
|
||||
|
||||
|
||||
def test_config_load(ctx: click.Context):
|
||||
path = config.runtime.config_file
|
||||
assert path
|
||||
|
|
5
main.py
5
main.py
|
@ -8,9 +8,10 @@ from traceback import format_exc, format_exception_only, format_tb
|
|||
from typing import Optional
|
||||
|
||||
from logger import color_option, logging, quiet_option, setup_logging, verbose_option
|
||||
from wrapper import nowrapper_option, enforce_wrap
|
||||
from wrapper import get_wrapper_type, enforce_wrap, nowrapper_option
|
||||
from progressbar import progress_bars_option
|
||||
|
||||
from binfmt.cli import cmd_binfmt
|
||||
from config.cli import config, config_option, cmd_config
|
||||
from packages.cli import cmd_packages
|
||||
from flavours.cli import cmd_flavours
|
||||
|
@ -49,6 +50,7 @@ def cli(
|
|||
if config.file_state.exception:
|
||||
logging.warning(f"Config file couldn't be loaded: {config.file_state.exception}")
|
||||
if wrapper_override:
|
||||
logging.info(f'Force-wrapping in wrapper-type: "{get_wrapper_type()}"!')
|
||||
enforce_wrap()
|
||||
|
||||
|
||||
|
@ -76,6 +78,7 @@ def main():
|
|||
exit(1)
|
||||
|
||||
|
||||
cli.add_command(cmd_binfmt)
|
||||
cli.add_command(cmd_cache)
|
||||
cli.add_command(cmd_chroot)
|
||||
cli.add_command(cmd_config)
|
||||
|
|
58
net/ssh.py
58
net/ssh.py
|
@ -6,7 +6,9 @@ import click
|
|||
|
||||
from config.state import config
|
||||
from constants import SSH_COMMON_OPTIONS, SSH_DEFAULT_HOST, SSH_DEFAULT_PORT
|
||||
from chroot.abstract import Chroot
|
||||
from exec.cmd import run_cmd
|
||||
from exec.file import write_file
|
||||
from wrapper import check_programs_wrap
|
||||
|
||||
|
||||
|
@ -83,21 +85,16 @@ def find_ssh_keys():
|
|||
return keys
|
||||
|
||||
|
||||
def copy_ssh_keys(root_dir: str, user: str):
|
||||
def copy_ssh_keys(chroot: Chroot, user: str, allow_fail: bool = False):
|
||||
check_programs_wrap(['ssh-keygen'])
|
||||
authorized_keys_file = os.path.join(
|
||||
root_dir,
|
||||
'home',
|
||||
user,
|
||||
'.ssh',
|
||||
'authorized_keys',
|
||||
)
|
||||
if os.path.exists(authorized_keys_file):
|
||||
os.unlink(authorized_keys_file)
|
||||
ssh_dir_relative = os.path.join('/home', user, '.ssh')
|
||||
ssh_dir = chroot.get_path(ssh_dir_relative)
|
||||
authorized_keys_file_rel = os.path.join(ssh_dir_relative, 'authorized_keys')
|
||||
authorized_keys_file = chroot.get_path(authorized_keys_file_rel)
|
||||
|
||||
keys = find_ssh_keys()
|
||||
if len(keys) == 0:
|
||||
logging.info("Could not find any ssh key to copy")
|
||||
logging.warning("Could not find any ssh key to copy")
|
||||
create = click.confirm("Do you want me to generate an ssh key for you?", True)
|
||||
if not create:
|
||||
return
|
||||
|
@ -116,15 +113,34 @@ def copy_ssh_keys(root_dir: str, user: str):
|
|||
logging.fatal("Failed to generate ssh key")
|
||||
keys = find_ssh_keys()
|
||||
|
||||
ssh_dir = os.path.join(root_dir, 'home', user, '.ssh')
|
||||
if not os.path.exists(ssh_dir):
|
||||
os.makedirs(ssh_dir, exist_ok=True, mode=0o700)
|
||||
if not keys:
|
||||
logging.warning("No SSH keys to be copied. Skipping.")
|
||||
return
|
||||
|
||||
with open(authorized_keys_file, 'a') as authorized_keys:
|
||||
for key in keys:
|
||||
pub = f'{key}.pub'
|
||||
if not os.path.exists(pub):
|
||||
logging.debug(f'Skipping key {key}: {pub} not found')
|
||||
continue
|
||||
auth_key_lines = []
|
||||
for key in keys:
|
||||
pub = f'{key}.pub'
|
||||
if not os.path.exists(pub):
|
||||
logging.debug(f'Skipping key {key}: {pub} not found')
|
||||
continue
|
||||
try:
|
||||
with open(pub, 'r') as file:
|
||||
authorized_keys.write(file.read())
|
||||
contents = file.read()
|
||||
if not contents.strip():
|
||||
continue
|
||||
auth_key_lines.append(contents)
|
||||
except Exception as ex:
|
||||
logging.warning(f"Could not read ssh pub key {pub}", exc_info=ex)
|
||||
continue
|
||||
|
||||
if not os.path.exists(ssh_dir):
|
||||
logging.info(f"Creating {ssh_dir_relative!r} dir in chroot {chroot.path!r}")
|
||||
chroot.run_cmd(["mkdir", "-p", "-m", "700", ssh_dir_relative], switch_user=user)
|
||||
logging.info(f"Writing SSH pub keys to {authorized_keys_file}")
|
||||
try:
|
||||
write_file(authorized_keys_file, "\n".join(auth_key_lines), user=str(chroot.get_uid(user)), mode="644")
|
||||
except Exception as ex:
|
||||
logging.error(f"Failed to write SSH authorized_keys_file at {authorized_keys_file!r}:", exc_info=ex)
|
||||
if allow_fail:
|
||||
return
|
||||
raise ex from ex
|
||||
|
|
|
@ -9,7 +9,7 @@ from copy import deepcopy
|
|||
from urllib.error import HTTPError
|
||||
from typing import Iterable, Iterator, Optional
|
||||
|
||||
from binfmt import register as binfmt_register, binfmt_is_registered
|
||||
from binfmt.binfmt import binfmt_is_registered, binfmt_register
|
||||
from constants import CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
|
||||
from config.state import config
|
||||
from exec.cmd import run_cmd, run_root_cmd
|
||||
|
@ -290,7 +290,8 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
|
|||
return None
|
||||
repo_pkg: RemotePackage = repo.packages[pkgname]
|
||||
if repo_pkg.version != package.version:
|
||||
logging.debug(f"Package {pkgname} versions differ: local: {package.version}, remote: {repo_pkg.version}. Building instead.")
|
||||
logging.debug(f"Package {pkgname} versions differ: local: {package.version}, "
|
||||
f"remote: {repo_pkg.version}. Building instead.")
|
||||
return None
|
||||
if repo_pkg.filename != filename:
|
||||
versions_str = f"local: {filename}, remote: {repo_pkg.filename}"
|
||||
|
@ -298,6 +299,20 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
|
|||
logging.debug(f"package filenames don't match: {versions_str}")
|
||||
return None
|
||||
logging.debug(f"ignoring compression extension difference: {versions_str}")
|
||||
cache_file = os.path.join(config.get_path('pacman'), arch, repo_pkg.filename)
|
||||
if os.path.exists(cache_file):
|
||||
if not repo_pkg._desc or 'SHA256SUM' not in repo_pkg._desc:
|
||||
cache_matches = False
|
||||
extra_msg = ". However, we can't validate it, as the https repo doesnt provide a SHA256SUM for it."
|
||||
else:
|
||||
cache_matches = sha256sum(cache_file) == repo_pkg._desc['SHA256SUM']
|
||||
extra_msg = (". However its checksum doesn't match." if not cache_matches else " and its checksum matches.")
|
||||
logging.debug(f"While checking the HTTPS repo DB, we found a matching filename in the pacman cache{extra_msg}")
|
||||
if cache_matches:
|
||||
logging.info(f'copying cache file {cache_file} to repo as verified by remote checksum')
|
||||
shutil.copy(cache_file, dest_file_path)
|
||||
remove_file(cache_file)
|
||||
return dest_file_path
|
||||
url = repo_pkg.resolved_url
|
||||
assert url
|
||||
try:
|
||||
|
@ -424,10 +439,11 @@ def setup_build_chroot(
|
|||
extra_packages: list[str] = [],
|
||||
add_kupfer_repos: bool = True,
|
||||
clean_chroot: bool = False,
|
||||
repo: Optional[dict[str, Pkgbuild]] = None,
|
||||
) -> BuildChroot:
|
||||
assert config.runtime.arch
|
||||
if arch != config.runtime.arch:
|
||||
build_enable_qemu_binfmt(arch)
|
||||
build_enable_qemu_binfmt(arch, repo=repo or discover_pkgbuilds(), lazy=False)
|
||||
init_prebuilts(arch)
|
||||
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
|
||||
chroot.mount_packages()
|
||||
|
@ -496,6 +512,7 @@ def build_package(
|
|||
enable_ccache: bool = True,
|
||||
clean_chroot: bool = False,
|
||||
build_user: str = 'kupfer',
|
||||
repo: Optional[dict[str, Pkgbuild]] = None,
|
||||
):
|
||||
makepkg_compile_opts = ['--holdver']
|
||||
makepkg_conf_path = 'etc/makepkg.conf'
|
||||
|
@ -515,6 +532,7 @@ def build_package(
|
|||
arch=arch,
|
||||
extra_packages=deps,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
)
|
||||
assert config.runtime.arch
|
||||
native_chroot = target_chroot
|
||||
|
@ -524,6 +542,7 @@ def build_package(
|
|||
arch=config.runtime.arch,
|
||||
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
)
|
||||
if not package.mode:
|
||||
logging.warning(f'Package {package.path} has no _mode set, assuming "host"')
|
||||
|
@ -556,7 +575,7 @@ def build_package(
|
|||
build_root = target_chroot
|
||||
makepkg_compile_opts += ['--nodeps' if package.nodeps else '--syncdeps']
|
||||
env = deepcopy(get_makepkg_env(arch))
|
||||
if foreign_arch and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
|
||||
if foreign_arch and package.crossdirect and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
|
||||
env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}"
|
||||
target_chroot.mount_crossdirect(native_chroot)
|
||||
else:
|
||||
|
@ -578,7 +597,7 @@ def build_package(
|
|||
setup_git_insecure_paths(build_root)
|
||||
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
|
||||
|
||||
build_cmd = MAKEPKG_CMD + ['--config', makepkg_conf_absolute, '--skippgpcheck'] + makepkg_compile_opts
|
||||
build_cmd = ['source', '/etc/profile', '&&', *MAKEPKG_CMD, '--config', makepkg_conf_absolute, '--skippgpcheck', *makepkg_compile_opts]
|
||||
logging.debug(f'Building: Running {build_cmd}')
|
||||
result = build_root.run_cmd(
|
||||
build_cmd,
|
||||
|
@ -742,6 +761,7 @@ def build_packages(
|
|||
enable_crossdirect=enable_crossdirect,
|
||||
enable_ccache=enable_ccache,
|
||||
clean_chroot=clean_chroot,
|
||||
repo=repo,
|
||||
)
|
||||
files += add_package_to_repo(package, arch)
|
||||
updated_repos.add(package.repo)
|
||||
|
@ -797,20 +817,39 @@ _qemu_enabled: dict[Arch, bool] = {arch: False for arch in ARCHES}
|
|||
|
||||
|
||||
def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = None, lazy: bool = True, native_chroot: Optional[BuildChroot] = None):
|
||||
"""
|
||||
Build and enable qemu-user-static, binfmt and crossdirect
|
||||
Specify lazy=False to force building the packages.
|
||||
"""
|
||||
if arch not in ARCHES:
|
||||
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
|
||||
logging.info('Installing qemu-user (building if necessary)')
|
||||
if lazy and _qemu_enabled[arch] and binfmt_is_registered(arch):
|
||||
_qemu_enabled[arch] = True
|
||||
raise Exception(f'Unknown binfmt architecture "{arch}". Choices: {", ".join(ARCHES)}')
|
||||
if _qemu_enabled[arch] or (lazy and binfmt_is_registered(arch)):
|
||||
if not _qemu_enabled[arch]:
|
||||
logging.info(f"qemu binfmt for {arch} was already enabled!")
|
||||
return
|
||||
native = config.runtime.arch
|
||||
assert native
|
||||
if arch == native:
|
||||
_qemu_enabled[arch] = True
|
||||
logging.warning("Not enabling binfmt for host architecture!")
|
||||
return
|
||||
logging.info('Installing qemu-user (building if necessary)')
|
||||
check_programs_wrap(['pacman', 'makepkg', 'pacstrap'])
|
||||
# build qemu-user, binfmt, crossdirect
|
||||
packages = list(CROSSDIRECT_PKGS)
|
||||
hostspec = GCC_HOSTSPECS[arch][arch]
|
||||
cross_gcc = f"{hostspec}-gcc"
|
||||
if repo:
|
||||
for pkg in repo.values():
|
||||
if (pkg.name == cross_gcc or cross_gcc in pkg.provides):
|
||||
if config.runtime.arch not in pkg.arches:
|
||||
logging.debug(f"Package {pkg.path} matches {cross_gcc=} name but not arch: {pkg.arches=}")
|
||||
continue
|
||||
packages.append(pkg.path)
|
||||
logging.debug(f"Adding gcc package {pkg.path} to the necessary crosscompilation tools")
|
||||
break
|
||||
build_packages_by_paths(
|
||||
CROSSDIRECT_PKGS,
|
||||
packages,
|
||||
native,
|
||||
repo=repo,
|
||||
try_download=True,
|
||||
|
|
|
@ -158,6 +158,7 @@ def cmd_update(
|
|||
discard_changes: bool = False,
|
||||
):
|
||||
"""Update PKGBUILDs git repo"""
|
||||
enforce_wrap()
|
||||
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=True, switch_branch=switch_branch, discard_changes=discard_changes)
|
||||
if init_caches:
|
||||
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
|
||||
|
@ -312,7 +313,7 @@ def cmd_list():
|
|||
logging.info(f'Done! {len(packages)} Pkgbuilds:')
|
||||
for name in sorted(packages.keys()):
|
||||
p = packages[name]
|
||||
print(f'name: {p.name}; ver: {p.version}; provides: {p.provides}; replaces: {p.replaces};'
|
||||
print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; crossdirect: {p.crossdirect} provides: {p.provides}; replaces: {p.replaces};'
|
||||
f'local_depends: {p.local_depends}; depends: {p.depends}')
|
||||
|
||||
|
||||
|
@ -345,6 +346,7 @@ def cmd_check(paths):
|
|||
|
||||
mode_key = '_mode'
|
||||
nodeps_key = '_nodeps'
|
||||
crossdirect_key = '_crossdirect'
|
||||
pkgbase_key = 'pkgbase'
|
||||
pkgname_key = 'pkgname'
|
||||
arches_key = '_arches'
|
||||
|
@ -355,6 +357,7 @@ def cmd_check(paths):
|
|||
required = {
|
||||
mode_key: True,
|
||||
nodeps_key: False,
|
||||
crossdirect_key: False,
|
||||
pkgbase_key: False,
|
||||
pkgname_key: True,
|
||||
'pkgdesc': False,
|
||||
|
@ -376,6 +379,7 @@ def cmd_check(paths):
|
|||
commit_key: is_git_package,
|
||||
source_key: False,
|
||||
sha256sums_key: False,
|
||||
'noextract': False,
|
||||
}
|
||||
pkgbuild_path = os.path.join(config.get_path('pkgbuilds'), package.path, 'PKGBUILD')
|
||||
with open(pkgbuild_path, 'r') as file:
|
||||
|
|
|
@ -6,7 +6,7 @@ import multiprocessing
|
|||
import os
|
||||
|
||||
from joblib import Parallel, delayed
|
||||
from typing import Iterable, Optional, TypeAlias
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from config.state import config, ConfigStateHolder
|
||||
from constants import Arch
|
||||
|
@ -16,6 +16,7 @@ from exec.file import remove_file
|
|||
from logger import setup_logging
|
||||
from utils import git, git_get_branch
|
||||
from wrapper import check_programs_wrap
|
||||
from typehelpers import TypeAlias
|
||||
|
||||
from .srcinfo_cache import SrcinfoMetaFile
|
||||
|
||||
|
@ -155,6 +156,7 @@ class Pkgbuild(PackageInfo):
|
|||
repo: str
|
||||
mode: str
|
||||
nodeps: bool
|
||||
crossdirect: bool
|
||||
path: str
|
||||
pkgver: str
|
||||
pkgrel: str
|
||||
|
@ -189,6 +191,7 @@ class Pkgbuild(PackageInfo):
|
|||
self.repo = repo or ''
|
||||
self.mode = ''
|
||||
self.nodeps = False
|
||||
self.crossdirect = True
|
||||
self.path = relative_path
|
||||
self.pkgver = ''
|
||||
self.pkgrel = ''
|
||||
|
@ -200,8 +203,8 @@ class Pkgbuild(PackageInfo):
|
|||
return ','.join([
|
||||
'Pkgbuild(' + self.name,
|
||||
repr(self.path),
|
||||
self.version + ("🔄" if self.sources_refreshed else ""),
|
||||
self.mode + ')',
|
||||
str(self.version) + ("🔄" if self.sources_refreshed else ""),
|
||||
repr(self.mode) + ')',
|
||||
])
|
||||
|
||||
def names(self) -> list[str]:
|
||||
|
@ -222,6 +225,7 @@ class Pkgbuild(PackageInfo):
|
|||
self.repo = pkg.repo
|
||||
self.mode = pkg.mode
|
||||
self.nodeps = pkg.nodeps
|
||||
self.crossdirect = pkg.crossdirect
|
||||
self.path = pkg.path
|
||||
self.pkgver = pkg.pkgver
|
||||
self.pkgrel = pkg.pkgrel
|
||||
|
@ -309,8 +313,11 @@ class SubPkgbuild(Pkgbuild):
|
|||
self.sources_refreshed = False
|
||||
self.update(pkgbase)
|
||||
|
||||
self.provides = {}
|
||||
self.replaces = []
|
||||
# set to None - will be replaced with base_pkg if still None after parsing
|
||||
self.depends = None # type: ignore[assignment]
|
||||
self.makedepends = None # type: ignore[assignment]
|
||||
self.provides = None # type: ignore[assignment]
|
||||
self.replaces = None # type: ignore[assignment]
|
||||
|
||||
def refresh_sources(self, lazy: bool = True):
|
||||
assert self.pkgbase
|
||||
|
@ -353,7 +360,11 @@ def parse_pkgbuild(
|
|||
else:
|
||||
raise Exception(msg)
|
||||
|
||||
# if _crossdirect is unset (None), it defaults to True
|
||||
crossdirect_enabled = srcinfo_cache.build_crossdirect in (None, True)
|
||||
|
||||
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed, srcinfo_cache=srcinfo_cache)
|
||||
base_package.crossdirect = crossdirect_enabled
|
||||
base_package.mode = mode
|
||||
base_package.nodeps = nodeps
|
||||
base_package.repo = relative_pkg_dir.split('/')[0]
|
||||
|
@ -382,13 +393,21 @@ def parse_pkgbuild(
|
|||
elif line.startswith('arch'):
|
||||
current.arches.append(splits[1])
|
||||
elif line.startswith('provides'):
|
||||
if not current.provides:
|
||||
current.provides = {}
|
||||
current.provides = get_version_specs(splits[1], current.provides)
|
||||
elif line.startswith('replaces'):
|
||||
if not current.replaces:
|
||||
current.replaces = []
|
||||
current.replaces.append(splits[1])
|
||||
elif splits[0] in ['depends', 'makedepends', 'checkdepends', 'optdepends']:
|
||||
spec = splits[1].split(': ', 1)[0]
|
||||
if not current.depends:
|
||||
current.depends = (base_package.makedepends or {}).copy()
|
||||
current.depends = get_version_specs(spec, current.depends)
|
||||
if splits[0] == 'makedepends':
|
||||
if not current.makedepends:
|
||||
current.makedepends = {}
|
||||
current.makedepends = get_version_specs(spec, current.makedepends)
|
||||
|
||||
results: list[Pkgbuild] = list(base_package.subpackages)
|
||||
|
@ -401,6 +420,15 @@ def parse_pkgbuild(
|
|||
pkg.update_version()
|
||||
if not (pkg.version == base_package.version):
|
||||
raise Exception(f'Subpackage malformed! Versions differ! base: {base_package}, subpackage: {pkg}')
|
||||
if isinstance(pkg, SubPkgbuild):
|
||||
if pkg.depends is None:
|
||||
pkg.depends = base_package.depends
|
||||
if pkg.makedepends is None:
|
||||
pkg.makedepends = base_package.makedepends
|
||||
if pkg.replaces is None:
|
||||
pkg.replaces = base_package.replaces
|
||||
if pkg.provides is None:
|
||||
pkg.provides = base_package.provides
|
||||
return results
|
||||
|
||||
|
||||
|
|
|
@ -9,14 +9,14 @@ from typing import Any, ClassVar, Optional
|
|||
|
||||
from config.state import config
|
||||
from constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE, SRCINFO_INITIALISED_FILE
|
||||
from dataclass import DataClass
|
||||
from dictscheme import DictScheme
|
||||
from exec.cmd import run_cmd
|
||||
from utils import sha256sum
|
||||
|
||||
SRCINFO_CHECKSUM_FILES = ['PKGBUILD', SRCINFO_FILE]
|
||||
|
||||
|
||||
class JsonFile(DataClass):
|
||||
class JsonFile(DictScheme):
|
||||
|
||||
_filename: ClassVar[str]
|
||||
_relative_path: str
|
||||
|
@ -68,11 +68,19 @@ class SrcInitialisedFile(JsonFile):
|
|||
raise ex
|
||||
|
||||
|
||||
srcinfo_meta_defaults = {
|
||||
'build_mode': None,
|
||||
"build_nodeps": None,
|
||||
"build_crossdirect": None,
|
||||
}
|
||||
|
||||
|
||||
class SrcinfoMetaFile(JsonFile):
|
||||
|
||||
checksums: dict[str, str]
|
||||
build_mode: Optional[str]
|
||||
build_nodeps: Optional[bool]
|
||||
build_crossdirect: Optional[bool]
|
||||
|
||||
_changed: bool
|
||||
_filename: ClassVar[str] = SRCINFO_METADATA_FILE
|
||||
|
@ -92,9 +100,8 @@ class SrcinfoMetaFile(JsonFile):
|
|||
s = SrcinfoMetaFile({
|
||||
'_relative_path': relative_pkg_dir,
|
||||
'_changed': True,
|
||||
'build_mode': '',
|
||||
'build_nodeps': None,
|
||||
'checksums': {},
|
||||
**srcinfo_meta_defaults,
|
||||
})
|
||||
return s, s.refresh_all()
|
||||
|
||||
|
@ -120,9 +127,11 @@ class SrcinfoMetaFile(JsonFile):
|
|||
if not force_refresh:
|
||||
logging.debug(f'{metadata._relative_path}: srcinfo checksums match!')
|
||||
lines = lines or metadata.read_srcinfo_file()
|
||||
for build_field in ['build_mode', 'build_nodeps']:
|
||||
for build_field in srcinfo_meta_defaults.keys():
|
||||
if build_field not in metadata:
|
||||
metadata.refresh_build_fields()
|
||||
if write:
|
||||
metadata.write()
|
||||
break
|
||||
else:
|
||||
lines = metadata.refresh_all(write=write)
|
||||
|
@ -143,8 +152,7 @@ class SrcinfoMetaFile(JsonFile):
|
|||
self._changed = True
|
||||
|
||||
def refresh_build_fields(self):
|
||||
self['build_mode'] = None
|
||||
self['build_nodeps'] = None
|
||||
self.update(srcinfo_meta_defaults)
|
||||
with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file:
|
||||
lines = file.read().split('\n')
|
||||
for line in lines:
|
||||
|
@ -156,6 +164,8 @@ class SrcinfoMetaFile(JsonFile):
|
|||
self.build_mode = val
|
||||
elif key == '_nodeps':
|
||||
self.build_nodeps = val.lower() == 'true'
|
||||
elif key == '_crossdirect':
|
||||
self.build_crossdirect = val.lower() == 'true'
|
||||
else:
|
||||
continue
|
||||
|
||||
|
|
|
@ -1,2 +1,5 @@
|
|||
autoflake
|
||||
mypy
|
||||
yapf
|
||||
pytest
|
||||
pytest-cov
|
||||
|
|
18
typehelpers.py
Normal file
18
typehelpers.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
from typing import Union
|
||||
|
||||
try:
|
||||
from typing import TypeAlias # type: ignore[attr-defined]
|
||||
except ImportError:
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
TypeAlias = TypeAlias
|
||||
|
||||
try:
|
||||
from types import UnionType
|
||||
except ImportError:
|
||||
UnionType: TypeAlias = Union # type: ignore[no-redef]
|
||||
|
||||
try:
|
||||
from types import NoneType
|
||||
except ImportError:
|
||||
NoneType: TypeAlias = type(None) # type: ignore[no-redef]
|
50
utils.py
50
utils.py
|
@ -12,7 +12,7 @@ import tarfile
|
|||
|
||||
from dateutil.parser import parse as parsedate
|
||||
from shutil import which
|
||||
from typing import Generator, IO, Optional, Union, Sequence
|
||||
from typing import Any, Generator, IO, Optional, Union, Sequence
|
||||
|
||||
from exec.cmd import run_cmd, run_root_cmd
|
||||
|
||||
|
@ -143,7 +143,13 @@ def download_file(path: str, url: str, update: bool = True):
|
|||
url_time = None
|
||||
if os.path.exists(path) and update:
|
||||
headers = requests.head(url).headers
|
||||
if 'last-modified' in headers:
|
||||
file_size = os.path.getsize(path)
|
||||
missing = [i for i in ['Content-Length', 'last-modified'] if i not in headers]
|
||||
if missing:
|
||||
logging.debug(f"Headers not specified: {missing}")
|
||||
if 'Content-Length' in headers and int(headers['Content-Length']) != file_size:
|
||||
logging.debug(f"{path} size differs: local: {file_size}, http: {headers['Content-Length']}")
|
||||
elif 'last-modified' in headers:
|
||||
url_time = parsedate(headers['last-modified']).astimezone()
|
||||
file_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).astimezone()
|
||||
if url_time == file_time:
|
||||
|
@ -195,3 +201,43 @@ def color_str(s: str, use_colors: Optional[bool] = None, **kwargs) -> str:
|
|||
if colors_supported(use_colors):
|
||||
return click.style(s, **kwargs)
|
||||
return s
|
||||
|
||||
|
||||
def color_green(s: str, **kwargs):
|
||||
return color_str(s, fg="bright_green", **kwargs)
|
||||
|
||||
|
||||
def color_bold(s: str, **kwargs):
|
||||
return color_str(s, bold=True, **kwargs)
|
||||
|
||||
|
||||
def color_mark_selected(
|
||||
item: str,
|
||||
profile_name: str,
|
||||
inherited_from: Optional[str] = None,
|
||||
msg_fmt: str = 'Currently selected by profile "%s"%s',
|
||||
msg_item_colors: dict[str, Any] = dict(bold=True, fg="bright_green"),
|
||||
marker: str = '>>> ',
|
||||
marker_config: dict[str, Any] = dict(bold=True, fg="bright_green"),
|
||||
split_on: str = '\n',
|
||||
suffix: str = '\n\n',
|
||||
use_colors: Optional[bool] = None,
|
||||
) -> str:
|
||||
|
||||
def bold(s: str, _bold=True, **kwargs):
|
||||
return color_bold(s, use_colors=use_colors, **kwargs)
|
||||
|
||||
def green(s: str, **kwargs):
|
||||
return color_green(s, use_colors=use_colors, **kwargs)
|
||||
|
||||
marker_full = color_str(marker, use_colors=use_colors, **marker_config)
|
||||
|
||||
msg_items = [color_str(profile_name, use_colors=use_colors, **msg_item_colors), '']
|
||||
if inherited_from and inherited_from != profile_name:
|
||||
msg_items[1] = ''.join([
|
||||
bold(' (inherited from profile "'),
|
||||
green(inherited_from, bold=True),
|
||||
bold('")'),
|
||||
])
|
||||
output = f'{item}{suffix}{msg_fmt % tuple(msg_items)}'
|
||||
return '\n'.join([(marker_full + o) for o in output.split(split_on)])
|
||||
|
|
|
@ -14,7 +14,7 @@ wrapper_impls: dict[str, Wrapper] = {
|
|||
}
|
||||
|
||||
|
||||
def get_wrapper_type(wrapper_type: Optional[str] = None):
|
||||
def get_wrapper_type(wrapper_type: Optional[str] = None) -> str:
|
||||
return wrapper_type or config.file.wrapper.type
|
||||
|
||||
|
||||
|
@ -28,14 +28,19 @@ def wrap(wrapper_type: Optional[str] = None):
|
|||
get_wrapper_impl(wrapper_type).wrap()
|
||||
|
||||
|
||||
def is_wrapped(wrapper_type: Optional[str] = None):
|
||||
def is_wrapped(wrapper_type: Optional[str] = None) -> bool:
|
||||
wrapper_type = get_wrapper_type(wrapper_type)
|
||||
return wrapper_type != 'none' and get_wrapper_impl(wrapper_type).is_wrapped()
|
||||
|
||||
|
||||
def needs_wrap(wrapper_type: Optional[str] = None) -> bool:
|
||||
wrapper_type = wrapper_type or get_wrapper_type()
|
||||
return wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime.no_wrap
|
||||
|
||||
|
||||
def enforce_wrap(no_wrapper=False):
|
||||
wrapper_type = get_wrapper_type()
|
||||
if wrapper_type != 'none' and not is_wrapped(wrapper_type) and not config.runtime.no_wrap and not no_wrapper:
|
||||
if needs_wrap(wrapper_type) and not no_wrapper:
|
||||
logging.info(f'Wrapping in {wrapper_type}')
|
||||
wrap()
|
||||
|
||||
|
@ -51,6 +56,26 @@ def wrap_if_foreign_arch(arch: Arch):
|
|||
enforce_wrap()
|
||||
|
||||
|
||||
def execute_without_exit(f, argv_override: Optional[list[str]], *args, **kwargs):
|
||||
"""If no wrap is needed, executes and returns f(*args, **kwargs).
|
||||
If a wrap is determined to be necessary, force a wrap with argv_override applied.
|
||||
If a wrap was forced, None is returned.
|
||||
WARNING: No protection against f() returning None is taken."""
|
||||
if not needs_wrap():
|
||||
return f(*args, **kwargs)
|
||||
assert get_wrapper_type() != 'none', "needs_wrap() should've returned False"
|
||||
w = get_wrapper_impl()
|
||||
w_cmd = w.argv_override
|
||||
# we need to avoid throwing and catching SystemExit due to FDs getting closed otherwise
|
||||
w_should_exit = w.should_exit
|
||||
w.argv_override = argv_override
|
||||
w.should_exit = False
|
||||
w.wrap()
|
||||
w.argv_override = w_cmd
|
||||
w.should_exit = w_should_exit
|
||||
return None
|
||||
|
||||
|
||||
nowrapper_option = click.option(
|
||||
'-w/-W',
|
||||
'--force-wrapper/--no-wrapper',
|
||||
|
|
|
@ -7,7 +7,7 @@ import sys
|
|||
from config.state import config
|
||||
from exec.file import makedir
|
||||
|
||||
from .wrapper import BaseWrapper, WRAPPER_PATHS
|
||||
from .wrapper import Wrapper, WRAPPER_PATHS
|
||||
|
||||
DOCKER_PATHS = WRAPPER_PATHS.copy()
|
||||
|
||||
|
@ -19,10 +19,11 @@ def docker_volumes_args(volume_mappings: dict[str, str]) -> list[str]:
|
|||
return result
|
||||
|
||||
|
||||
class DockerWrapper(BaseWrapper):
|
||||
class DockerWrapper(Wrapper):
|
||||
type: str = 'docker'
|
||||
|
||||
def wrap(self):
|
||||
super().wrap()
|
||||
script_path = config.runtime.script_source_dir
|
||||
assert script_path
|
||||
with open(os.path.join(script_path, 'version.txt')) as version_file:
|
||||
|
@ -38,9 +39,15 @@ class DockerWrapper(BaseWrapper):
|
|||
tag,
|
||||
] + (['-q'] if not config.runtime.verbose else [])
|
||||
logging.debug('Running docker cmd: ' + ' '.join(cmd))
|
||||
result = subprocess.run(cmd, cwd=script_path, capture_output=True)
|
||||
mute_docker = not config.runtime.verbose
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=script_path,
|
||||
capture_output=mute_docker,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logging.fatal('Failed to build docker image:\n' + result.stderr.decode())
|
||||
error_msg = ('\n' + result.stderr.decode() + '\n') if mute_docker else ''
|
||||
logging.fatal(f'Docker error: {error_msg}Failed to build docker image: see errors above: ^^^^')
|
||||
exit(1)
|
||||
else:
|
||||
# Check if the image for the version already exists
|
||||
|
@ -86,15 +93,21 @@ class DockerWrapper(BaseWrapper):
|
|||
'--privileged',
|
||||
] + docker_volumes_args(volumes) + [tag]
|
||||
|
||||
kupfer_cmd = ['kupferbootstrap', '--config', volumes[wrapped_config]] + self.filter_args_wrapper(sys.argv[1:])
|
||||
kupfer_cmd = [
|
||||
'kupferbootstrap',
|
||||
'--config',
|
||||
volumes[wrapped_config],
|
||||
]
|
||||
kupfer_cmd += self.argv_override or self.filter_args_wrapper(sys.argv[1:])
|
||||
if config.runtime.uid:
|
||||
kupfer_cmd = ['wrapper_su_helper', '--uid', str(config.runtime.uid), '--username', 'kupfer', '--'] + kupfer_cmd
|
||||
|
||||
cmd = docker_cmd + kupfer_cmd
|
||||
logging.debug('Wrapping in docker:' + repr(cmd))
|
||||
result = subprocess.run(cmd)
|
||||
|
||||
exit(result.returncode)
|
||||
if self.should_exit:
|
||||
exit(result.returncode)
|
||||
return result.returncode
|
||||
|
||||
def stop(self):
|
||||
subprocess.run(
|
||||
|
|
|
@ -15,7 +15,7 @@ WRAPPER_PATHS = CHROOT_PATHS | {
|
|||
}
|
||||
|
||||
|
||||
class Wrapper(Protocol):
|
||||
class WrapperProtocol(Protocol):
|
||||
"""Wrappers wrap kupferbootstrap in some form of isolation from the host OS, i.e. docker or chroots"""
|
||||
|
||||
def wrap(self):
|
||||
|
@ -31,15 +31,22 @@ class Wrapper(Protocol):
|
|||
"""
|
||||
|
||||
|
||||
class BaseWrapper(Wrapper):
|
||||
class Wrapper(WrapperProtocol):
|
||||
uuid: str
|
||||
identifier: str
|
||||
type: str
|
||||
wrapped_config_path: str
|
||||
wrapped_config_path: Optional[str]
|
||||
argv_override: Optional[list[str]]
|
||||
should_exit: bool
|
||||
atexit_registered: bool
|
||||
|
||||
def __init__(self, random_id: Optional[str] = None, name: Optional[str] = None):
|
||||
self.uuid = str(random_id or uuid.uuid4())
|
||||
self.identifier = name or f'kupferbootstrap-{self.uuid}'
|
||||
self.argv_override = None
|
||||
self.should_exit = True
|
||||
self.atexit_registered = False
|
||||
self.wrapped_config_path = None
|
||||
|
||||
def filter_args_wrapper(self, args):
|
||||
"""filter out -c/--config since it doesn't apply in wrapper"""
|
||||
|
@ -73,13 +80,6 @@ class BaseWrapper(Wrapper):
|
|||
) -> str:
|
||||
wrapped_config = f'{target_path.rstrip("/")}/{self.identifier}_wrapped.toml'
|
||||
|
||||
# FIXME: these at_exit hooks should go and be called from somewhere better suited
|
||||
def at_exit():
|
||||
self.stop()
|
||||
os.remove(wrapped_config)
|
||||
|
||||
atexit.register(at_exit)
|
||||
|
||||
dump_config_file(
|
||||
file_path=wrapped_config,
|
||||
config=(config.file | {
|
||||
|
@ -89,8 +89,16 @@ class BaseWrapper(Wrapper):
|
|||
self.wrapped_config_path = wrapped_config
|
||||
return wrapped_config
|
||||
|
||||
def at_exit(self):
|
||||
if self.wrapped_config_path:
|
||||
os.remove(self.wrapped_config_path)
|
||||
self.stop()
|
||||
self.atexit_registered = False
|
||||
|
||||
def wrap(self):
|
||||
raise NotImplementedError()
|
||||
if not self.atexit_registered:
|
||||
atexit.register(self.at_exit)
|
||||
self.atexit_registered = True
|
||||
|
||||
def stop(self):
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -21,7 +21,7 @@ def kupferbootstrap_su(cmd: list[str], uid: int = 1000, username: str = 'kupfer'
|
|||
user = pwd.getpwnam(username)
|
||||
home = user.pw_dir
|
||||
if uid != user.pw_uid:
|
||||
run_cmd(['usermod', '-u', str(uid), username]).check_returncode() # type: ignore[union-attr]
|
||||
run_cmd(['usermod', '-o', '-u', str(uid), username]).check_returncode() # type: ignore[union-attr]
|
||||
chown(home, username, recursive=False)
|
||||
logging.debug(f'wrapper_su_helper: running {cmd} as {repr(username)}')
|
||||
env_inject = ['env', f'{WRAPPER_ENV_VAR}={os.environ[WRAPPER_ENV_VAR]}'] if WRAPPER_ENV_VAR in os.environ else []
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue