Compare commits

..

15 commits

Author SHA1 Message Date
Syboxez Blank
e783ec6632 image/image.py: Fix off-by-one error when creating boot partition in full image 2023-12-14 21:03:14 -06:00
InsanePrawn
4c5609423e constants: change default luks label to kupfer_cryptroot 2023-09-13 18:14:41 +02:00
InsanePrawn
4792eafe80 image: bump /boot default size to 200MB 2023-09-13 18:14:41 +02:00
InsanePrawn
d2942945e4 image: pass sector_size to partition_device() 2023-09-13 18:14:41 +02:00
InsanePrawn
fbe43456f8 image: don't pass block size to mkfs.ext* 2023-09-13 18:14:41 +02:00
InsanePrawn
03c95dcb6a image/image: rename losetup_rootfs_image() to losetup_setup_image() 2023-09-13 18:14:41 +02:00
InsanePrawn
095ecb672f image: use IMG_FILE_BOOT_DEFAULT_SIZE to calculate shrunk boot partition size 2023-09-13 18:14:41 +02:00
InsanePrawn
dd4a4212a3 config/cli: init PKGBUILDs after main config is complete 2023-09-13 18:14:41 +02:00
InsanePrawn
a9cd8178c8 image: add LUKS support and --[no-]encryption CLI flag to build & inspect subcommands 2023-09-13 18:14:41 +02:00
InsanePrawn
6de8137c90 image: add new module: cryptsetup 2023-09-13 18:14:41 +02:00
InsanePrawn
370510e09f config/scheme: add Profile.encryption and Profile.encryption_password 2023-09-13 18:14:41 +02:00
InsanePrawn
d6900172fe exec/cmd: run_cmd(): add new params: stdin, stdin_input, check asddsadsa 2023-09-13 18:14:30 +02:00
InsanePrawn
1e446e6f80 config/state: remove superflous attrs from Profile, as they already exist in SparseProfile 2023-09-13 18:14:30 +02:00
InsanePrawn
829d80ede0 chroot/abstract: add switch_user param to chroot.run_cmd()'s signature 2023-09-13 18:14:30 +02:00
InsanePrawn
8437613e6e image/image: move CLI methods to image/cli.py 2023-09-13 18:14:30 +02:00
28 changed files with 691 additions and 557 deletions

View file

@ -60,8 +60,12 @@ class AbstractChroot(Protocol):
capture_output: bool, capture_output: bool,
cwd: str, cwd: str,
fail_inactive: bool, fail_inactive: bool,
switch_user: Optional[str],
stdout: Optional[FileDescriptor], stdout: Optional[FileDescriptor],
stderr: Optional[FileDescriptor], stderr: Optional[FileDescriptor],
stdin: Optional[FileDescriptor],
stdin_input: Optional[str],
check: Optional[bool],
): ):
pass pass
@ -225,9 +229,12 @@ class Chroot(AbstractChroot):
capture_output: bool = False, capture_output: bool = False,
cwd: Optional[str] = None, cwd: Optional[str] = None,
fail_inactive: bool = True, fail_inactive: bool = True,
switch_user: Optional[str] = None,
stdout: Optional[FileDescriptor] = None, stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None, stderr: Optional[FileDescriptor] = None,
switch_user: Optional[str] = None, stdin: Optional[FileDescriptor] = None,
stdin_input: Optional[str] = None,
check: Optional[bool] = None,
) -> Union[int, subprocess.CompletedProcess]: ) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive: if not self.active and fail_inactive:
raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`') raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`')
@ -244,13 +251,23 @@ class Chroot(AbstractChroot):
script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False) script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False)
if cwd: if cwd:
script = f"cd {shell_quote(cwd)} && ( {script} )" script = f"cd {shell_quote(cwd)} && ( {script} )"
if switch_user: if switch_user and switch_user != 'root':
inner_cmd = generate_cmd_su(script, switch_user=switch_user, elevation_method='none', force_su=True) inner_cmd = generate_cmd_su(script, switch_user=switch_user, elevation_method='none', force_su=True)
else: else:
inner_cmd = wrap_in_bash(script, flatten_result=False) inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True) cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr) return run_root_cmd(
cmd,
env=outer_env,
attach_tty=attach_tty,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
stdin_input=stdin_input,
check=check,
)
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str: def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
return self.mount( return self.mount(

View file

@ -82,7 +82,6 @@ class BuildChroot(Chroot):
native_chroot.mount_pacman_cache() native_chroot.mount_pacman_cache()
native_chroot.mount_packages() native_chroot.mount_packages()
native_chroot.activate() native_chroot.activate()
logging.debug(f"Installing {CROSSDIRECT_PKGS=} + {gcc=}")
results = dict(native_chroot.try_install_packages( results = dict(native_chroot.try_install_packages(
CROSSDIRECT_PKGS + [gcc], CROSSDIRECT_PKGS + [gcc],
refresh=True, refresh=True,
@ -104,8 +103,8 @@ class BuildChroot(Chroot):
target_include_dir = os.path.join(self.path, 'include') target_include_dir = os.path.join(self.path, 'include')
for target, source in {cc_path: gcc, target_lib_dir: 'lib', target_include_dir: 'usr/include'}.items(): for target, source in {cc_path: gcc, target_lib_dir: 'lib', target_include_dir: 'usr/include'}.items():
if not (os.path.exists(target) or os.path.islink(target)): if not os.path.exists(target):
logging.debug(f'Symlinking {source=} at {target=}') logging.debug(f'Symlinking {source} at {target}')
symlink(source, target) symlink(source, target)
ld_so = os.path.basename(glob(f"{os.path.join(native_chroot.path, 'usr', 'lib', 'ld-linux-')}*")[0]) ld_so = os.path.basename(glob(f"{os.path.join(native_chroot.path, 'usr', 'lib', 'ld-linux-')}*")[0])
ld_so_target = os.path.join(target_lib_dir, ld_so) ld_so_target = os.path.join(target_lib_dir, ld_so)

View file

@ -7,6 +7,7 @@ from typing import Optional
from config.state import config from config.state import config
from wrapper import enforce_wrap from wrapper import enforce_wrap
from devices.device import get_profile_device from devices.device import get_profile_device
from image.cli import cmd_inspect
from .abstract import Chroot from .abstract import Chroot
from .base import get_base_chroot from .base import get_base_chroot
@ -30,7 +31,6 @@ def cmd_chroot(ctx: click.Context, type: str = 'build', name: Optional[str] = No
raise Exception(f'Unknown chroot type: "{type}"') raise Exception(f'Unknown chroot type: "{type}"')
if type == 'rootfs': if type == 'rootfs':
from image.image import cmd_inspect
assert isinstance(cmd_inspect, click.Command) assert isinstance(cmd_inspect, click.Command)
ctx.invoke(cmd_inspect, profile=name, shell=True) ctx.invoke(cmd_inspect, profile=name, shell=True)
return return

View file

@ -1,5 +1,6 @@
import click import click
import logging import logging
import os
from copy import deepcopy from copy import deepcopy
from typing import Any, Callable, Iterable, Mapping, Optional, Union from typing import Any, Callable, Iterable, Mapping, Optional, Union
@ -260,6 +261,7 @@ def cmd_config_init(
): ):
"""Initialize the config file""" """Initialize the config file"""
if not non_interactive: if not non_interactive:
from packages.cli import cmd_init as cmd_init_pkgbuilds
logging.info(CONFIG_MSG) logging.info(CONFIG_MSG)
results: dict[str, dict] = {} results: dict[str, dict] = {}
for section in sections: for section in sections:
@ -282,6 +284,13 @@ def cmd_config_init(
config.write() config.write()
else: else:
return return
if not non_interactive and not os.path.exists(os.path.join(config.get_path('pkgbuilds'), '.git')):
extra_msg = " This way, we can give you a list of devices and flavours later" if 'profiles' in sections else ''
if click.confirm(
f"It seems you don't have our PKGBUILDs checked out yet.\nWould you like KBS to fetch them?{extra_msg}",
default=True,
):
execute_without_exit(click.Context(cmd_config).invoke, ['packages', 'init'], cmd_init_pkgbuilds)
if 'profiles' in sections: if 'profiles' in sections:
print("Configuring profiles") print("Configuring profiles")
current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current current_profile = 'default' if 'current' not in config.file.profiles else config.file.profiles.current

View file

@ -15,6 +15,8 @@ PROFILE_DEFAULTS_DICT = {
'username': 'kupfer', 'username': 'kupfer',
'password': None, 'password': None,
'size_extra_mb': "0", 'size_extra_mb': "0",
'encryption': None,
'encryption_password': None,
} }
PROFILE_DEFAULTS = Profile.fromDict(PROFILE_DEFAULTS_DICT) PROFILE_DEFAULTS = Profile.fromDict(PROFILE_DEFAULTS_DICT)

View file

@ -17,20 +17,20 @@ class SparseProfile(DictScheme):
username: Optional[str] username: Optional[str]
password: Optional[str] password: Optional[str]
size_extra_mb: Optional[Union[str, int]] size_extra_mb: Optional[Union[str, int]]
encryption: Optional[bool]
encryption_password: Optional[str]
def __repr__(self): def __repr__(self):
return f'{type(self)}{dict.__repr__(self.toDict())}' return f'{type(self)}{dict.__repr__(self.toDict())}'
class Profile(SparseProfile): class Profile(SparseProfile):
parent: Optional[str]
device: str device: str
flavour: str flavour: str
pkgs_include: list[str] pkgs_include: list[str]
pkgs_exclude: list[str] pkgs_exclude: list[str]
hostname: str hostname: str
username: str username: str
password: Optional[str]
size_extra_mb: Union[str, int] size_extra_mb: Union[str, int]

View file

@ -89,7 +89,7 @@ COMPILE_ARCHES: dict[Arch, str] = {
GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = { GCC_HOSTSPECS: dict[DistroArch, dict[TargetArch, str]] = {
'x86_64': { 'x86_64': {
'x86_64': 'x86_64-pc-linux-gnu', 'x86_64': 'x86_64-pc-linux-gnu',
'aarch64': 'aarch64-unknown-linux-gnu', 'aarch64': 'aarch64-linux-gnu',
'armv7h': 'arm-unknown-linux-gnueabihf' 'armv7h': 'arm-unknown-linux-gnueabihf'
}, },
'aarch64': { 'aarch64': {
@ -172,3 +172,6 @@ SRCINFO_TARBALL_URL = f'{KUPFER_HTTPS_BASE}/{SRCINFO_TARBALL_FILE}'
FLAVOUR_INFO_FILE = 'flavourinfo.json' FLAVOUR_INFO_FILE = 'flavourinfo.json'
FLAVOUR_DESCRIPTION_PREFIX = 'kupfer flavour:' FLAVOUR_DESCRIPTION_PREFIX = 'kupfer flavour:'
LUKS_LABEL_DEFAULT = 'kupfer_cryptroot'
LUKS_MAPPER_DEFAULT = 'kupfer-crypt'

View file

@ -52,7 +52,7 @@ class DictScheme(Munch):
_sparse: ClassVar[bool] = False _sparse: ClassVar[bool] = False
def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs): def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs):
self.update(dict(d) | kwargs, validate=validate) self.update(d | kwargs, validate=validate)
@classmethod @classmethod
def transform( def transform(
@ -269,13 +269,10 @@ class DictScheme(Munch):
) -> str: ) -> str:
import yaml import yaml
yaml_args = {'sort_keys': False} | yaml_args yaml_args = {'sort_keys': False} | yaml_args
dumped = yaml.dump( return yaml.dump(
self.toDict(strip_hidden=strip_hidden, sparse=sparse), self.toDict(strip_hidden=strip_hidden, sparse=sparse),
**yaml_args, **yaml_args,
) )
if dumped is None:
raise Exception(f"Failed to yaml-serialse {self}")
return dumped
def toToml( def toToml(
self, self,

View file

@ -2,14 +2,10 @@
Kupferbootstrap uses [toml](https://en.wikipedia.org/wiki/TOML) for its configuration file. Kupferbootstrap uses [toml](https://en.wikipedia.org/wiki/TOML) for its configuration file.
The file can either be edited manually or managed via the [`kupferbootstrap config`](../../cli/config) subcommand. The file can either be edited manually or managed via the {doc}`cli/config` subcommand.
```{hint}
You can quickly generate a default config by running {code}`kupferbootstrap config init -N`. You can quickly generate a default config by running {code}`kupferbootstrap config init -N`.
For an interactive dialogue, omit the `-N`.
```
## File Location ## File Location
The configuration is stored in `~/.config/kupfer/kupferbootstrap.toml`, where `~` is your user's home folder. The configuration is stored in `~/.config/kupfer/kupferbootstrap.toml`, where `~` is your user's home folder.
@ -58,7 +54,7 @@ This allows you to easily keep a number of slight variations of the same target
without the need to constantly modify your Kupferbootstrap configuration file. without the need to constantly modify your Kupferbootstrap configuration file.
You can easily create new profiles with You can easily create new profiles with
[kupferbootstrap config profile init](../../cli/config/#kupferbootstrap-config-profile-init). [kupferbootstrap config profile init](../cli/config/#kupferbootstrap-config-profile-init).
Here's an example: Here's an example:
@ -101,7 +97,7 @@ hostname = "pocof1"
The `current` key in the `profiles` section controlls which profile gets used by Kupferbootstrap by default. The `current` key in the `profiles` section controlls which profile gets used by Kupferbootstrap by default.
The first subsection (`profiles.default`) describes the `default` profile The first subsection (`profiles.default`) describes the `default` profile
which gets created by [`kupferbootstrap config init`](../../cli/config/#kupferbootstrap-config-init). which gets created by [config init](../cli/config/#kupferbootstrap-config-init).
Next, we have a `graphical` profile that defines a couple of graphical programs for all but the `recovery` profile, Next, we have a `graphical` profile that defines a couple of graphical programs for all but the `recovery` profile,
since that doesn't have a GUI. since that doesn't have a GUI.

View file

@ -6,6 +6,7 @@ a tool to build and flash packages and images for the [Kupfer](https://gitlab.co
## Documentation pages ## Documentation pages
```{toctree} ```{toctree}
usage/index install
config
cli cli
``` ```

View file

@ -1,39 +0,0 @@
# FAQ
```{contents} Table of Contents
:class: this-will-duplicate-information-and-it-is-still-useful-here
:depth: 3
```
## Which devices are currently supported?
Currently very few!
See [the `devices` repo](https://gitlab.com/kupfer/packages/pkgbuilds/-/tree/dev/device). We use the same codenames as [postmarketOS](https://wiki.postmarketos.org/wiki/Devices) (although we prefix them with the SoC)
## How to port a new device or package?
See [Porting](../porting)
## How to build a specific package
See also: The full [`kupferbootstrap packages build` docs](../../cli/packages#kupferbootstrap-packages-build)
### Example
For rebuilding `kupfer-config` and `crossdirect`, defaulting to your device's architecture
```sh
kupferbootstrap packages build [--force] [--arch $target_arch] kupfer-config crossdirect
```
### By package path
You can also use the a path snippet (`$repo/$pkgbase`) to the PKGBUILD folder as seen inside your pkgbuilds.git:
```sh
kupferbootstrap packages build [--force] main/kupfer-config cross/crossdirect
```

View file

@ -1,9 +0,0 @@
# Usage
```{toctree}
quickstart
faq
install
config
porting
```

View file

@ -1,94 +0,0 @@
# Porting
## Porting devices
### Homework
Before you can get started porting a device, you'll need to do some research:
1. Familiarize yourself with git basics.
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
1. Familiarize yourself with the postmarketOS port of the device.
```{warning}
If there is no postmarketOS port yet, you'll probably need to get deep into kernel development.
We suggest [starting with a port to pmOS](https://wiki.postmarketos.org/wiki/Porting_to_a_new_device) then, especially if you're not familiar with the process already.
```
### Porting
1. Navigate to your pkgbuilds checkout
1. Follow the [general package porting guidelines](#porting-packages) to create a device-, kernel- and probably also a firmware-package for the device and SoC. Usually this roughly means porting the postmarketOS APKBUILDs to our PKGBUILD scheme.
You can get inspiration by comparing existing Kupfer ports (e.g. one of the SDM845 devices) to the [postmarketOS packages](https://gitlab.com/postmarketOS/pmaports/-/tree/master/device) for that device.
Usually you should start out by copying and then customizing the Kupfer packages for a device that's as similar to yours as possible, i.e. uses the same or a related SoC, if something like that is already available in Kupfer.
```{hint} Package Repos:
Device packages belong into `device/`, kernels into `linux/` and firmware into `firmware/`.
```
1. When submitting your MR, please include some information:
- what you have found to be working, broken, and not tested (and why)
- any necessary instructions for testing
- whether you'd be willing to maintain the device long-term (test kernel upgrades, submit device package updates, etc.)
### Gotchas
Please be aware of these gotchas:
- As of now, Kupfer only really supports platforms using Android's `aboot` bootloader, i.e. ex-Android phones. In order to support other boot modes (e.g. uboot on the Librem5 and Pine devices), we'll need to port and switch to postmarketOS's [boot-deploy](https://gitlab.com/postmarketOS/boot-deploy) first and add support for EFI setups to Kupferbootstrap.
## Porting packages
### Homework
Before you can get started, you'll need to do some research:
1. Familiarize yourself with git basics.
1. Familiarize yourself with Arch Linux packaging, i.e. `PKGBUILD`s and `makepkg`
### Development
```{warning}
Throughout the process, use git to version your changes.
- Don't procrastinate using git or committing until you're "done" or "have got something working", you'll regret it.
- Don't worry about a "clean" git history while you're developing; we can squash it up later.
- \[Force-]Push your changes regularly, just like committing. Don't wait for perfection.
```
1. Create a new git branch for your package locally.
```{hint}
It might be a good ideaa to get into the habit of prefixing branch names with \[a part of] your username and a slash like so:
`myNickname/myFeatureNme`
This makes it easier to work in the same remote repo with multiple people.
```
1.
```{note}
The pkgbuilds git repo contains multiple package repositories, represented by folders at the top level (`main`, `cross`, `phosh`, etc.).
```
Try to choose a sensible package repo for your new packages and create new folders for each `pkgbase` inside the repo folder.
1. Navigate into the folder of the new package and create a new `PKGBUILD`; fill it with life!
1. **`_mode`**: Add the build mode at the top of the PKGBUILD.
```{hint}
If you're unsure what to pick, go with `_mode=host`. It'll use `crossdirect` to get speeds close to proper cross-compiling.
```
This determines whether it's built using a foreign-arch chroot (`_mode=host`) executed with qemu-user, or using real cross-compilation (`_mode=cross`) from a host-architecture chroot, but the package's build tooling has to specifically support the latter, so it's mostly useful for kernels and uncompiled packages.
1. **`_nodeps`**: (Optional) If your package doesn't require its listed dependencies to build
(usually because you're packaging a meta-package or only configs or scripts)
you can add `_nodeps=true` as the next line after the `_mode=` line to speed up packaging.
`makedeps` are still installed anyway.
1. Test building it with `kupferbootstrap packages build $pkgbname`
1. For any files and git repos downloaded by your PKGBUILD,
add them to a new `.gitignore` file in the same directory as your `PKGBUILD`.
```{hint}
Don't forget to `git add` the new `.gitignore` file!
```
1. Run `kupferbootstrap packages check` to make sure the formatting for your PKGBUILDs is okay.
```{warning}
This is **not** optional. MRs with failing CI will **not** be merged.
```
### Pushing
1. Fork the Kupfer pkgbuilds repo on Gitlab using the Fork button
1. Add your fork's **SSH** URI to your local git repo as a **new remote**: `git remote add fork git@gitlab...`
1. `git push -u fork $branchname` it
### Submitting the MR
When you're ready, open a Merge Request on the Kupfer pkgbuilds repo.
```{hint}
Prefix the MR title with `Draft: ` to indicate a Work In Progress state.
```

View file

@ -1,9 +0,0 @@
# Quickstart
1. [Install](../install) Kupferbootstrap
1. [Configure](../config) it: `kuperbootstrap config init`
1. [Update your PKGBUILDs + SRCINFO cache](../../cli/packages#kupferbootstrap-packages-update): `kupferbootstrap packages update`
1. [Build an image](../../cli/image#kupferbootstrap-image-build): `kupferbootstrap image build`
1. [Flash the image](../../cli/image#kupferbootstrap-image-flash): `kupferbootstrap image flash abootimg && kupferbootstrap image flash full userdata`
See also: [Frequently Asked Questions](../faq)

View file

@ -97,6 +97,9 @@ def run_cmd(
elevation_method: Optional[ElevationMethod] = None, elevation_method: Optional[ElevationMethod] = None,
stdout: Optional[FileDescriptor] = None, stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None, stderr: Optional[FileDescriptor] = None,
stdin: Optional[FileDescriptor] = None,
stdin_input: Optional[str] = None,
check: Optional[bool] = None,
) -> Union[CompletedProcess, int]: ) -> Union[CompletedProcess, int]:
"execute `script` as `switch_user`, elevating and su'ing as necessary" "execute `script` as `switch_user`, elevating and su'ing as necessary"
kwargs: dict = {} kwargs: dict = {}
@ -111,6 +114,10 @@ def run_cmd(
for name, fd in {'stdout': stdout, 'stderr': stderr}.items(): for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
if fd is not None: if fd is not None:
kwargs[name] = fd kwargs[name] = fd
for name, value in {'stdin': stdin, 'input': stdin_input, 'check': check}.items():
if value is not None:
kwargs[name] = value
script = flatten_shell_script(script) script = flatten_shell_script(script)
if cwd: if cwd:
kwargs['cwd'] = cwd kwargs['cwd'] = cwd

View file

@ -144,13 +144,7 @@ def remove_file(path: str, recursive=False):
raise Exception(f"Unable to remove {path}: cmd returned {rc}") raise Exception(f"Unable to remove {path}: cmd returned {rc}")
def makedir( def makedir(path, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None, parents: bool = True):
path,
user: Optional[Union[str, int]] = None,
group: Optional[Union[str, int]] = None,
parents: bool = True,
mode: Optional[Union[int, str]] = None,
):
if not root_check_exists(path): if not root_check_exists(path):
try: try:
if parents: if parents:
@ -159,8 +153,6 @@ def makedir(
os.mkdir(path) os.mkdir(path)
except: except:
run_root_cmd(['mkdir'] + (['-p'] if parents else []) + [path]) run_root_cmd(['mkdir'] + (['-p'] if parents else []) + [path])
if mode is not None:
chmod(path, mode=mode)
chown(path, user, group) chown(path, user, group)
@ -179,9 +171,9 @@ def symlink(source, target):
raise Exception(f'Symlink creation of {target} pointing at {source} failed') raise Exception(f'Symlink creation of {target} pointing at {source} failed')
def get_temp_dir(register_cleanup=True, mode: int = 0o0755): def get_temp_dir(register_cleanup=True, mode: int = 0o0755, prefix='kupfertmp_'):
"create a new tempdir and sanitize ownership so root can access user files as god intended" "create a new tempdir and sanitize ownership so root can access user files as god intended"
t = mkdtemp() t = mkdtemp(prefix=prefix)
chmod(t, mode, privileged=False) chmod(t, mode, privileged=False)
if register_cleanup: if register_cleanup:
atexit.register(remove_file, t, recursive=True) atexit.register(remove_file, t, recursive=True)

View file

@ -13,7 +13,7 @@ from flavours.cli import profile_option
from wrapper import enforce_wrap from wrapper import enforce_wrap
from .fastboot import fastboot_boot, fastboot_erase from .fastboot import fastboot_boot, fastboot_erase
from .image import get_device_name, losetup_rootfs_image, get_image_path, dump_aboot, dump_lk2nd from .image import get_device_name, losetup_setup_image, get_image_path, dump_aboot, dump_lk2nd
LK2ND = FLASH_PARTS['LK2ND'] LK2ND = FLASH_PARTS['LK2ND']
ABOOT = FLASH_PARTS['ABOOT'] ABOOT = FLASH_PARTS['ABOOT']
@ -61,7 +61,7 @@ def cmd_boot(
if not os.path.exists(path): if not os.path.exists(path):
urllib.request.urlretrieve(f'https://github.com/dreemurrs-embedded/Jumpdrive/releases/download/{JUMPDRIVE_VERSION}/{file}', path) urllib.request.urlretrieve(f'https://github.com/dreemurrs-embedded/Jumpdrive/releases/download/{JUMPDRIVE_VERSION}/{file}', path)
else: else:
loop_device = losetup_rootfs_image(image_path, sector_size) loop_device = losetup_setup_image(image_path, sector_size)
if type == LK2ND: if type == LK2ND:
path = dump_lk2nd(loop_device + 'p1') path = dump_lk2nd(loop_device + 'p1')
elif type == ABOOT: elif type == ABOOT:

View file

@ -1,6 +1,248 @@
import click
import logging
import os
from signal import pause
from typing import Optional
from config.state import config, Profile
from constants import BASE_LOCAL_PACKAGES, BASE_PACKAGES, LUKS_MAPPER_DEFAULT
from devices.device import get_profile_device
from exec.file import makedir
from flavours.flavour import get_profile_flavour
from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds
from wrapper import enforce_wrap
from .boot import cmd_boot from .boot import cmd_boot
from .cryptsetup import encryption_option, get_cryptmapper_path, luks_close, luks_create, luks_open
from .flash import cmd_flash from .flash import cmd_flash
from .image import cmd_image from .image import (
IMG_DEFAULT_SIZE_BOOT_MB,
create_boot_fs,
create_img_file,
create_root_fs,
dd_image,
get_device_chroot,
get_image_path,
install_rootfs,
losetup_setup_image,
mount_chroot,
partprobe,
partition_device,
)
@click.group(name='image')
def cmd_image():
"""Build, flash and boot device images"""
for cmd in [cmd_boot, cmd_flash]: for cmd in [cmd_boot, cmd_flash]:
cmd_image.add_command(cmd) cmd_image.add_command(cmd)
sectorsize_option = click.option(
'-b',
'--sector-size',
help="Override the device's sector size",
type=int,
default=None,
)
@cmd_image.command(name='build')
@click.argument('profile_name', required=False)
@click.option(
'--local-repos/--no-local-repos',
'-l/-L',
help='Whether to use local package repos at all or only use HTTPS repos.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--build-pkgs/--no-build-pkgs',
'-p/-P',
help='Whether to build missing/outdated local packages if local repos are enabled.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--no-download-pkgs',
help='Disable trying to download packages instead of building if building is enabled.',
default=False,
is_flag=True,
)
@click.option(
'--block-target',
help='Override the block device file to write the final image to',
type=click.Path(),
default=None,
)
@click.option(
'--skip-part-images',
help='Skip creating image files for the partitions and directly work on the target block device.',
default=False,
is_flag=True,
)
@encryption_option
@sectorsize_option
def cmd_build(
profile_name: Optional[str] = None,
local_repos: bool = True,
build_pkgs: bool = True,
no_download_pkgs=False,
block_target: Optional[str] = None,
sector_size: Optional[int] = None,
skip_part_images: bool = False,
encryption: Optional[bool] = None,
encryption_password: Optional[str] = None,
encryption_mapper: str = LUKS_MAPPER_DEFAULT,
):
"""
Build a device image.
Unless overriden, required packages will be built or preferably downloaded from HTTPS repos.
"""
config.enforce_profile_device_set()
config.enforce_profile_flavour_set()
enforce_wrap()
device = get_profile_device(profile_name)
arch = device.arch
# check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
profile: Profile = config.get_profile(profile_name)
flavour = get_profile_flavour(profile_name)
rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb)
bootfs_size_mb = IMG_DEFAULT_SIZE_BOOT_MB
if encryption is None:
encryption = profile.encryption
packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name]
packages_extra = BASE_PACKAGES + profile.pkgs_include
if encryption:
packages_extra += ['cryptsetup', 'util-linux'] # TODO: select osk-sdl here somehow
if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch)
if local_repos and build_pkgs:
logging.info("Making sure all packages are built")
# enforce that local base packages are built
pkgbuilds = set(filter_pkgbuilds(packages, arch=arch, allow_empty_results=False, use_paths=False))
# extra packages might be a mix of package names that are in our PKGBUILDs and packages from the base distro
pkgbuilds |= set(filter_pkgbuilds(packages_extra, arch=arch, allow_empty_results=True, use_paths=False))
build_packages(pkgbuilds, arch, try_download=not no_download_pkgs)
sector_size = sector_size or device.get_image_sectorsize() or 512
image_path = block_target or get_image_path(device, flavour.name)
makedir(os.path.dirname(image_path))
logging.info(f'Creating new file at {image_path}')
create_img_file(image_path, f"{rootfs_size_mb + bootfs_size_mb}M")
loop_device = losetup_setup_image(image_path, sector_size or device.get_image_sectorsize_default())
partition_device(loop_device, sector_size=sector_size, boot_partition_size_mb=bootfs_size_mb)
partprobe(loop_device)
boot_dev: str
root_dev: str
root_dev_raw: str
loop_boot = loop_device + 'p1'
loop_root = loop_device + 'p2'
if skip_part_images:
boot_dev = loop_boot
root_dev = loop_root
else:
logging.info('Creating per-partition image files')
boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), f'{bootfs_size_mb}M')
root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M')
root_dev_raw = root_dev
if encryption:
encryption_password = encryption_password or profile.encryption_password
if not encryption_password:
encryption_password = click.prompt(
"Please enter your encryption password (input hidden)",
hide_input=True,
confirmation_prompt=True,
)
luks_create(root_dev, password=encryption_password)
luks_open(root_dev, mapper_name=encryption_mapper, password=encryption_password)
root_dev = get_cryptmapper_path(encryption_mapper)
assert os.path.exists(root_dev)
create_root_fs(root_dev)
create_boot_fs(boot_dev)
install_rootfs(
root_dev,
boot_dev,
device,
flavour,
arch,
list(set(packages) | set(packages_extra)),
local_repos,
profile,
encrypted=bool(encryption),
)
if encryption:
luks_close(mapper_name=encryption_mapper)
if not skip_part_images:
logging.info('Copying partition image files into full image:')
logging.info(f'Block-copying /boot to {image_path}')
dd_image(input=boot_dev, output=loop_boot)
logging.info(f'Block-copying rootfs to {image_path}')
dd_image(input=root_dev_raw, output=loop_root)
logging.info(f'Done! Image saved to {image_path}')
@cmd_image.command(name='inspect')
@click.option('--shell', '-s', is_flag=True)
@click.option('--use-local-repos', '-l', is_flag=True)
@sectorsize_option
@encryption_option
@click.argument('profile', required=False)
def cmd_inspect(
profile: Optional[str] = None,
shell: bool = False,
sector_size: Optional[int] = None,
use_local_repos: bool = False,
encryption: Optional[bool] = None,
):
"""Loop-mount the device image for inspection."""
config.enforce_profile_device_set()
config.enforce_profile_flavour_set()
enforce_wrap()
profile_conf = config.get_profile(profile)
device = get_profile_device(profile)
arch = device.arch
flavour = get_profile_flavour(profile).name
sector_size = sector_size or device.get_image_sectorsize_default()
chroot = get_device_chroot(device.name, flavour, arch, packages=[], use_local_repos=use_local_repos)
image_path = get_image_path(device, flavour)
loop_device = losetup_setup_image(image_path, sector_size)
partprobe(loop_device)
mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot, password=profile_conf.encryption_password)
logging.info(f'Inspect the rootfs image at {chroot.path}')
if shell:
chroot.initialized = True
chroot.activate()
if arch != config.runtime.arch:
logging.info('Installing requisites for foreign-arch shell')
build_enable_qemu_binfmt(arch)
logging.info('Starting inspection shell')
chroot.run_cmd('/bin/bash')
else:
pause()

179
image/cryptsetup.py Normal file
View file

@ -0,0 +1,179 @@
import atexit
import click
import logging
import os
from typing import Optional
from constants import LUKS_LABEL_DEFAULT
from chroot.build import BuildChroot
from exec.cmd import run_cmd, CompletedProcess
encryption_option = click.option(
'--encryption/--no-encryption',
help="Force applying/ignoring LUKS encryption when handling the device image."
"Defaults to using the Profile's setting.",
default=None,
is_flag=True,
)
def get_accessible_user(path):
return None if os.access(path, os.R_OK) else 'root'
def check_dev_exists(device_path: str, verb: str = 'find'):
if not os.path.exists(device_path):
raise Exception(f"Can't {verb} LUKS on {device_path!r}: file does not exist")
def get_cryptmapper_path(mapper_name: str) -> str:
return f'/dev/mapper/{mapper_name}'
def mapper_exists(mapper_name: str, chroot: Optional[BuildChroot]) -> bool:
path = get_cryptmapper_path(mapper_name)
paths = [path]
if chroot:
paths.append(chroot.get_path(path))
for p in paths:
if os.path.exists(p):
return True
return False
def get_password_io(password: Optional[str]) -> Optional[bytes]:
return password.encode() if password else None
def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bool:
check_dev_exists(device_path, 'check')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
user = get_accessible_user(device_path)
cmd = ["blkid", '--match-token', 'TYPE=crypto_LUKS', device_path]
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
assert isinstance(result, CompletedProcess)
return bool(result.stdout and result.stdout.strip())
def get_luks_offset(
mapper_name: str,
native_chroot: Optional[BuildChroot] = None,
) -> tuple[int, int]:
device_path = get_cryptmapper_path(mapper_name)
check_dev_exists(device_path, 'get offset of')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
user = get_accessible_user(device_path)
stdout: str = ''
cmd = ['cryptsetup', 'status', mapper_name]
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
assert isinstance(result, CompletedProcess)
if not (result.stdout and (stdout := result.stdout.strip())):
raise Exception(f"Couldn't get LUKS offset for {mapper_name!r} from 'cryptsetup status': empty stdout: {stdout!r}")
markers = {'offset': -1, 'sector size': -1}
for line in stdout.decode().split('\n'):
line = line.strip()
for item in markers:
offset_marker = f'{item}:'
if line.startswith(offset_marker):
try:
markers[item] = int(line.split(offset_marker)[-1].strip().split(' ')[0])
except Exception as ex:
raise Exception(f"Couldn't get LUKS {item=} for {mapper_name!r} due to an exception parsing cryptsetup output: {ex}")
for i in markers.values():
if i != -1:
continue
logging.debug(f"Failed to find ':' in stdout: {stdout}")
raise Exception(f"Failed to find LUKS offset for {mapper_name!r}: Offset line not found")
return markers['offset'], markers['sector size']
def luks_create(
backing_device: str,
label: str = LUKS_LABEL_DEFAULT,
native_chroot: Optional[BuildChroot] = None,
password: Optional[str] = None,
extra_opts: list[str] = [],
use_random: bool = True,
cipher: Optional[str] = None,
pbkdf: Optional[str] = None,
iter_time: Optional[int] = None,
):
check_dev_exists(backing_device, 'create')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
extra_opts = list(extra_opts) # copy list before modification
if use_random:
extra_opts += ['--use-random']
if cipher:
extra_opts += ['--cipher', cipher]
if pbkdf:
extra_opts += ['--pbkdf', pbkdf]
if iter_time is not None:
extra_opts += ['--iter-time', str(iter_time)]
if label:
extra_opts += ['--label', label]
logging.info(f"Creating LUKS volume at {backing_device!r}{' (unattended)' if password else ''}")
result = run_func( # type: ignore[operator]
['cryptsetup', '-q', 'luksFormat', *extra_opts, backing_device],
switch_user=get_accessible_user(backing_device),
attach_tty=not password,
stdin_input=get_password_io(password),
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to format LUKS device: cryptsetup error^^^^")
def luks_open(
backing_device: str,
mapper_name: str,
extra_opts: list[str] = [],
password: Optional[str] = None,
native_chroot: Optional[BuildChroot] = None,
schedule_close: bool = True,
idempotent: bool = False,
):
check_dev_exists(backing_device, 'open')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
if mapper_exists(mapper_name, native_chroot):
if idempotent:
logging.debug(f"LUKS mapper {mapper_name!r} already open")
return
raise Exception(f"Can't open LUKS for {backing_device!r} with mapper name {mapper_name!r}: "
"mapper file already exists")
logging.info(f"Opening LUKS mapper {mapper_name!r} for {backing_device!r}")
result = run_func( # type: ignore[operator]
['cryptsetup', 'luksOpen', *extra_opts, backing_device, mapper_name],
switch_user='root',
attach_tty=not password,
stdin_input=get_password_io(password),
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to open LUKS device: cryptsetup error^^^^")
if schedule_close:
atexit.register(luks_close, mapper_name, native_chroot=native_chroot, idempotent=True)
logging.info(f"LUKS mapper {mapper_name!r} opened!")
def luks_close(
mapper_name: str,
native_chroot: Optional[BuildChroot] = None,
extra_opts: list[str] = [],
idempotent: bool = False,
):
run_func = native_chroot.run_cmd if native_chroot else run_cmd
if not mapper_exists(mapper_name, native_chroot):
if idempotent:
logging.debug(f"LUKS mapper {mapper_name!r} already closed")
return 0
raise Exception(f"Can't close LUKS mapper {mapper_name!r}: mapper doesn't exist")
logging.info(f"Closing LUKS mapper {mapper_name!r}")
result = run_func( # type: ignore[operator]
['cryptsetup', 'close', *extra_opts, mapper_name],
switch_user='root',
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to close LUKS device: cryptsetup error^^^^")
logging.info(f"LUKS mapper {mapper_name!r} closed.")

View file

@ -5,6 +5,7 @@ import logging
from typing import Optional from typing import Optional
from config.state import config
from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE
from exec.cmd import run_root_cmd from exec.cmd import run_root_cmd
from exec.file import get_temp_dir from exec.file import get_temp_dir
@ -14,7 +15,8 @@ from flavours.cli import profile_option
from wrapper import enforce_wrap from wrapper import enforce_wrap
from .fastboot import fastboot_flash from .fastboot import fastboot_flash
from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_setup_image, partprobe, shrink_fs
from .cryptsetup import encryption_option
ABOOT = FLASH_PARTS['ABOOT'] ABOOT = FLASH_PARTS['ABOOT']
LK2ND = FLASH_PARTS['LK2ND'] LK2ND = FLASH_PARTS['LK2ND']
@ -47,15 +49,15 @@ def test_blockdev(path: str):
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect') 'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
def prepare_minimal_image(source_path: str, sector_size: int) -> str: def prepare_minimal_image(source_path: str, sector_size: int, encrypted: Optional[bool], encryption_password: Optional[str]) -> str:
minimal_image_dir = get_temp_dir(register_cleanup=True) minimal_image_dir = get_temp_dir(register_cleanup=True)
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}') minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}')
logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking") logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking")
shutil.copyfile(source_path, minimal_image_path) shutil.copyfile(source_path, minimal_image_path)
loop_device = losetup_rootfs_image(minimal_image_path, sector_size) loop_device = losetup_setup_image(minimal_image_path, sector_size)
partprobe(loop_device) partprobe(loop_device)
shrink_fs(loop_device, minimal_image_path, sector_size) shrink_fs(loop_device, minimal_image_path, sector_size, encrypted, encryption_password)
losetup_destroy(loop_device) losetup_destroy(loop_device)
return minimal_image_path return minimal_image_path
@ -67,6 +69,7 @@ def prepare_minimal_image(source_path: str, sector_size: int) -> str:
@click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size") @click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size")
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None) @click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands") @click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
@encryption_option
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values()))) @click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
@click.argument('location', type=str, required=False) @click.argument('location', type=str, required=False)
def cmd_flash( def cmd_flash(
@ -78,6 +81,7 @@ def cmd_flash(
shrink: bool = True, shrink: bool = True,
sector_size: Optional[int] = None, sector_size: Optional[int] = None,
confirm: bool = False, confirm: bool = False,
encryption: Optional[bool] = None,
): ):
""" """
Flash a partition onto a device. Flash a partition onto a device.
@ -115,7 +119,12 @@ def cmd_flash(
if not location: if not location:
raise Exception(f'You need to specify a location to flash {what} to') raise Exception(f'You need to specify a location to flash {what} to')
path = '' path = ''
image_path = prepare_minimal_image(device_image_path, sector_size) if shrink else device_image_path image_path = prepare_minimal_image(
device_image_path,
sector_size,
encrypted=encryption,
encryption_password=config.get_profile(profile).encryption_password,
) if shrink else device_image_path
if method == FASTBOOT: if method == FASTBOOT:
fastboot_flash( fastboot_flash(
partition=location, partition=location,
@ -136,7 +145,7 @@ def cmd_flash(
else: else:
if method and method != FASTBOOT: if method and method != FASTBOOT:
raise Exception(f'Flashing "{what}" with method "{method}" not supported, try no parameter or "{FASTBOOT}"') raise Exception(f'Flashing "{what}" with method "{method}" not supported, try no parameter or "{FASTBOOT}"')
loop_device = losetup_rootfs_image(device_image_path, sector_size) loop_device = losetup_setup_image(device_image_path, sector_size)
if what == ABOOT: if what == ABOOT:
path = dump_aboot(f'{loop_device}p1') path = dump_aboot(f'{loop_device}p1')
fastboot_flash(location or 'boot', path, confirm=confirm) fastboot_flash(location or 'boot', path, confirm=confirm)

View file

@ -1,29 +1,29 @@
import atexit import atexit
import json import json
import logging
import os import os
import re import re
import subprocess import subprocess
import click
import logging
from signal import pause
from subprocess import CompletedProcess from subprocess import CompletedProcess
from typing import Optional, Union from typing import Optional, Union
from config.state import config, Profile from chroot.build import BuildChroot, get_build_chroot
from chroot.device import DeviceChroot, get_device_chroot from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_INSTALL_CMDS from config.state import config, Profile
from constants import Arch, LUKS_MAPPER_DEFAULT, POST_INSTALL_CMDS
from distro.distro import get_base_distro, get_kupfer_https from distro.distro import get_base_distro, get_kupfer_https
from devices.device import Device, get_profile_device from devices.device import Device
from exec.cmd import run_root_cmd, generate_cmd_su from exec.cmd import run_root_cmd, generate_cmd_su
from exec.file import get_temp_dir, root_write_file, root_makedir, makedir from exec.file import get_temp_dir, root_write_file, root_makedir
from flavours.flavour import Flavour, get_profile_flavour from flavours.flavour import Flavour
from net.ssh import copy_ssh_keys from net.ssh import copy_ssh_keys
from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds from utils import programs_available
from wrapper import enforce_wrap
# image files need to be slightly smaller than partitions to fit from .cryptsetup import is_luks, get_luks_offset, luks_close, luks_open
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
IMG_FILE_BOOT_DEFAULT_SIZE = "90M" MAPPER_DIR = '/dev/mapper/'
IMG_DEFAULT_SIZE_BOOT_MB = 200
def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess: def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess:
@ -76,24 +76,58 @@ def align_bytes(size_bytes: int, alignment: int = 4096) -> int:
return size_bytes return size_bytes
def shrink_fs(loop_device: str, file: str, sector_size: int): def shrink_fs(
loop_device: str,
file: str,
sector_size: int,
encrypted: Optional[bool] = None,
encryption_password: Optional[str] = None,
crypt_mapper=LUKS_MAPPER_DEFAULT,
):
partprobe(loop_device) partprobe(loop_device)
logging.debug(f"Checking filesystem at {loop_device}p2") root_partition = f'{loop_device}p2'
result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2']) root_partition_fs = root_partition
if not (encrypted is False):
root_partition_fs, native_chroot, encrypted = resolve_rootfs_crypt(
root_partition,
fail_on_unencrypted=bool(encrypted),
crypt_mapper=crypt_mapper,
password=encryption_password,
)
logging.debug(f"Checking filesystem at {root_partition_fs}")
result = run_root_cmd(['e2fsck', '-fy', root_partition_fs])
if result.returncode > 2: if result.returncode > 2:
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE # https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}') raise Exception(f'Failed to e2fsck {root_partition_fs} with exit code {result.returncode}')
logging.info(f'Shrinking filesystem at {loop_device}p2') logging.info(f'Shrinking filesystem at {root_partition_fs}')
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2']) result = run_root_cmd(['resize2fs', '-M', root_partition_fs])
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed to resize2fs {loop_device}p2') raise Exception(f'Failed to resize2fs {root_partition_fs}')
logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2') logging.debug(f'Reading size of shrunken filesystem on {root_partition_fs}')
fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2') fs_blocks, fs_block_size = get_fs_size(root_partition_fs)
sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size) sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size)
logging.debug(f"shrunken FS length is {fs_blocks} blocks * {fs_block_size} bytes = {sectors} bytes")
logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)') _, image_size = find_end_sector(loop_device, root_partition, sector_size)
if image_size == -1:
raise Exception(f'Failed to find pre-repartition size of {loop_device}')
if encrypted:
if sectors > image_size:
raise Exception("Shrunk FS size allegedly larger than the image itself; this is probably "
f"a kupferbootstrap parsing bug. shrunk partition end={sectors}, image size={image_size}, {sector_size=}")
old_sectors = sectors
luks_offset, luks_sector_size = get_luks_offset(crypt_mapper, native_chroot)
#luks_offset_bytes = align_bytes((luks_offset + 1) * luks_sector_size, sector_size)
luks_offset_normalized = bytes_to_sectors(luks_offset * luks_sector_size, sector_size)
logging.debug(f"Discovered LUKS attrs: {luks_offset=}, {luks_sector_size=}, {luks_offset_normalized=}")
luks_close(crypt_mapper, native_chroot)
sectors += luks_offset_normalized + 1024
logging.debug(f"Increasing sectors from {old_sectors} to {sectors} ({sectors - old_sectors}) to leave space for the LUKS header")
logging.info(f'Shrinking partition at {root_partition} to {sectors} {sector_size}b sectors ({sectors * sector_size} bytes)')
child_proccess = subprocess.Popen( child_proccess = subprocess.Popen(
generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -117,27 +151,18 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
# For some reason re-reading the partition table fails, but that is not a problem # For some reason re-reading the partition table fails, but that is not a problem
partprobe(loop_device) partprobe(loop_device)
if returncode > 1: if returncode > 1:
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk') raise Exception(f'Failed to shrink partition size of {root_partition} with fdisk')
partprobe(loop_device).check_returncode() partprobe(loop_device).check_returncode()
logging.debug(f'Finding end sector of partition at {loop_device}p2') end_sector, _ = find_end_sector(loop_device, root_partition, sector_size)
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True) if end_sector == -1:
if result.returncode != 0: raise Exception(f'Failed to find end sector of {root_partition}')
print(result.stdout)
print(result.stderr)
raise Exception(f'Failed to fdisk -l {loop_device}')
end_sector = 0 if end_sector > image_size:
for line in result.stdout.decode('utf-8').split('\n'): logging.warning(f"Clipping sectors ({end_sector}) to {image_size=}")
if line.startswith(f'{loop_device}p2'): end_sector = image_size
parts = list(filter(lambda part: part != '', line.split(' '))) end_size = align_bytes((end_sector + 1024) * sector_size, 4096)
end_sector = int(parts[2])
if end_sector == 0:
raise Exception(f'Failed to find end sector of {loop_device}p2')
end_size = align_bytes((end_sector + 1) * sector_size, 4096)
logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes') logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes')
logging.info(f'Truncating {file} to {end_size} bytes') logging.info(f'Truncating {file} to {end_size} bytes')
@ -147,6 +172,26 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
partprobe(loop_device) partprobe(loop_device)
def find_end_sector(device: str, partition: str, sector_size: int) -> tuple[int, int]:
"""Return (last_sector_index, sector_count) of a partition on a device, returns (-1, -1) if not found"""
logging.debug(f'Finding end sector of partition at {partition}')
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', device], capture_output=True)
if result.returncode != 0:
print(result.stdout)
print(result.stderr)
raise Exception(f'Failed to fdisk -l {device}')
end_sector = -1
num_sectors = -1
for line in result.stdout.decode('utf-8').split('\n'):
if line.startswith(partition):
parts = list(filter(lambda part: part != '', line.split(' ')))
end_sector = int(parts[2])
num_sectors = int(parts[3])
return end_sector, num_sectors
def losetup_destroy(loop_device): def losetup_destroy(loop_device):
logging.debug(f'Destroying loop device {loop_device}') logging.debug(f'Destroying loop device {loop_device}')
run_root_cmd( run_root_cmd(
@ -177,7 +222,7 @@ def get_image_path(device: Union[str, Device], flavour: Union[str, Flavour], img
return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type)) return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type))
def losetup_rootfs_image(image_path: str, sector_size: int) -> str: def losetup_setup_image(image_path: str, sector_size: int) -> str:
logging.debug(f'Creating loop device for {image_path} with sector size {sector_size}') logging.debug(f'Creating loop device for {image_path} with sector size {sector_size}')
result = run_root_cmd([ result = run_root_cmd([
'losetup', 'losetup',
@ -214,16 +259,59 @@ def losetup_rootfs_image(image_path: str, sector_size: int) -> str:
return loop_device return loop_device
def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot): def resolve_rootfs_crypt(
logging.debug(f'Mounting {rootfs_source} at {chroot.path}') rootfs_source: str,
password: Optional[str] = None,
crypt_mapper: str = LUKS_MAPPER_DEFAULT,
native_chroot: Optional[BuildChroot] = None,
fail_on_unencrypted: bool = True,
) -> tuple[str, Optional[BuildChroot], bool]:
assert config.runtime.arch
is_encrypted = False
if not (native_chroot or programs_available(['blkid'])):
native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'util-linux'])
if is_luks(rootfs_source, native_chroot=native_chroot):
if not (native_chroot or programs_available(['cryptsetup'])):
native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'cryptsetup', 'util-linux'])
luks_open(rootfs_source, crypt_mapper, password=password, native_chroot=native_chroot)
rootfs_source = f'{MAPPER_DIR}{crypt_mapper}'
is_encrypted = True
elif fail_on_unencrypted:
hint = ''
if rootfs_source.startswith(MAPPER_DIR):
hint = (f' HINT: path starts with {MAPPER_DIR!r}, probably already a decrypted volume.'
' This is likely a kupferbootstrap bug.')
raise Exception(f"Error: {rootfs_source!r} is not an encrypted LUKS volume.{hint}")
return rootfs_source, native_chroot, is_encrypted
chroot.mount_rootfs(rootfs_source)
assert (os.path.ismount(chroot.path))
root_makedir(chroot.get_path('boot')) def mount_chroot(
rootfs_source: str,
boot_src: str,
device_chroot: DeviceChroot,
encrypted: Optional[bool] = None,
password: Optional[str] = None,
native_chroot: Optional[BuildChroot] = None,
crypt_mapper: str = LUKS_MAPPER_DEFAULT,
):
if encrypted is not False:
rootfs_source, native_chroot, encrypted = resolve_rootfs_crypt(
rootfs_source,
native_chroot=native_chroot,
crypt_mapper=crypt_mapper,
fail_on_unencrypted=bool(encrypted),
password=password,
)
logging.debug(f'Mounting {boot_src} at {chroot.path}/boot') logging.debug(f'Mounting {rootfs_source} at {device_chroot.path}')
chroot.mount(boot_src, '/boot', options=['defaults'])
device_chroot.mount_rootfs(rootfs_source)
assert (os.path.ismount(device_chroot.path))
root_makedir(device_chroot.get_path('boot'))
logging.debug(f'Mounting {boot_src} at {device_chroot.path}/boot')
device_chroot.mount(boot_src, '/boot', options=['defaults'])
def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None): def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None):
@ -266,11 +354,12 @@ def create_img_file(image_path: str, size_str: str):
return image_path return image_path
def partition_device(device: str): def partition_device(device: str, sector_size: int, boot_partition_size_mb: int = IMG_DEFAULT_SIZE_BOOT_MB):
boot_partition_size = '100MiB' initial_offset = 1048576 // sector_size # 2048 for 512, 256 for 4096
boot_partition_size: int = align_bytes((boot_partition_size_mb + 1) * 1024 * 1024, 4096)
create_partition_table = ['mklabel', 'msdos'] create_partition_table = ['mklabel', 'msdos']
create_boot_partition = ['mkpart', 'primary', 'ext2', '0%', boot_partition_size] create_boot_partition = ['mkpart', 'primary', 'ext2', f'{initial_offset}s', f'{boot_partition_size}b']
create_root_partition = ['mkpart', 'primary', boot_partition_size, '100%'] create_root_partition = ['mkpart', 'primary', f'{bytes_to_sectors(boot_partition_size, sector_size) + initial_offset}s', '100%']
enable_boot = ['set', '1', 'boot', 'on'] enable_boot = ['set', '1', 'boot', 'on']
result = run_root_cmd([ result = run_root_cmd([
'parted', 'parted',
@ -281,7 +370,7 @@ def partition_device(device: str):
raise Exception(f'Failed to create partitions on {device}') raise Exception(f'Failed to create partitions on {device}')
def create_filesystem(device: str, blocksize: Optional[int], label=None, options=[], fstype='ext4'): def create_filesystem(device: str, blocksize: Optional[int] = None, label=None, options=[], fstype='ext4'):
"""Creates a new filesystem. Blocksize defaults""" """Creates a new filesystem. Blocksize defaults"""
labels = ['-L', label] if label else [] labels = ['-L', label] if label else []
cmd = [f'mkfs.{fstype}', '-F', *labels] cmd = [f'mkfs.{fstype}', '-F', *labels]
@ -301,12 +390,12 @@ def create_filesystem(device: str, blocksize: Optional[int], label=None, options
raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}') raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}')
def create_root_fs(device: str, blocksize: Optional[int]): def create_root_fs(device: str):
create_filesystem(device, blocksize=blocksize, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000']) create_filesystem(device, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000'])
def create_boot_fs(device: str, blocksize: Optional[int]): def create_boot_fs(device: str):
create_filesystem(device, blocksize=blocksize, label='kupfer_boot', fstype='ext2') create_filesystem(device, label='kupfer_boot', fstype='ext2')
def install_rootfs( def install_rootfs(
@ -318,24 +407,33 @@ def install_rootfs(
packages: list[str], packages: list[str],
use_local_repos: bool, use_local_repos: bool,
profile: Profile, profile: Profile,
encrypted: bool,
): ):
user = profile['username'] or 'kupfer' user = profile.username or 'kupfer'
chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos) chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos)
mount_chroot(rootfs_device, bootfs_device, chroot) # rootfs_device must be passed the crypt_mapper if encrypted is True
if encrypted:
assert rootfs_device.startswith(MAPPER_DIR)
mount_chroot(
rootfs_device,
bootfs_device,
chroot,
encrypted=False, # rootfs_device is already the crypt_mapper
)
chroot.mount_pacman_cache() chroot.mount_pacman_cache()
chroot.initialize() chroot.initialize()
chroot.activate() chroot.activate()
chroot.create_user( chroot.create_user(
user=user, user=user,
password=profile['password'], password=profile.password,
) )
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True) chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
copy_ssh_keys( copy_ssh_keys(
chroot, chroot.path,
user=user, user=user,
allow_fail=True,
) )
files = { files = {
'etc/pacman.conf': get_base_distro(arch).get_pacman_conf( 'etc/pacman.conf': get_base_distro(arch).get_pacman_conf(
@ -343,7 +441,7 @@ def install_rootfs(
extra_repos=get_kupfer_https(arch).repos, extra_repos=get_kupfer_https(arch).repos,
in_chroot=True, in_chroot=True,
), ),
'etc/hostname': profile['hostname'] or 'kupfer', 'etc/hostname': profile.hostname or 'kupfer',
} }
for target, content in files.items(): for target, content in files.items():
root_write_file(os.path.join(chroot.path, target.lstrip('/')), content) root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)
@ -364,177 +462,3 @@ def install_rootfs(
res = run_root_cmd(['umount', chroot.path]) res = run_root_cmd(['umount', chroot.path])
assert isinstance(res, CompletedProcess) assert isinstance(res, CompletedProcess)
logging.debug(f'rc: {res.returncode}') logging.debug(f'rc: {res.returncode}')
@click.group(name='image')
def cmd_image():
"""Build, flash and boot device images"""
sectorsize_option = click.option(
'-b',
'--sector-size',
help="Override the device's sector size",
type=int,
default=None,
)
@cmd_image.command(name='build')
@click.argument('profile_name', required=False)
@click.option(
'--local-repos/--no-local-repos',
'-l/-L',
help='Whether to use local package repos at all or only use HTTPS repos.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--build-pkgs/--no-build-pkgs',
'-p/-P',
help='Whether to build missing/outdated local packages if local repos are enabled.',
default=True,
show_default=True,
is_flag=True,
)
@click.option(
'--no-download-pkgs',
help='Disable trying to download packages instead of building if building is enabled.',
default=False,
is_flag=True,
)
@click.option(
'--block-target',
help='Override the block device file to write the final image to',
type=click.Path(),
default=None,
)
@click.option(
'--skip-part-images',
help='Skip creating image files for the partitions and directly work on the target block device.',
default=False,
is_flag=True,
)
@sectorsize_option
def cmd_build(
profile_name: Optional[str] = None,
local_repos: bool = True,
build_pkgs: bool = True,
no_download_pkgs=False,
block_target: Optional[str] = None,
sector_size: Optional[int] = None,
skip_part_images: bool = False,
):
"""
Build a device image.
Unless overriden, required packages will be built or preferably downloaded from HTTPS repos.
"""
config.enforce_profile_device_set()
config.enforce_profile_flavour_set()
enforce_wrap()
device = get_profile_device(profile_name)
arch = device.arch
# check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
profile: Profile = config.get_profile(profile_name)
flavour = get_profile_flavour(profile_name)
rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb)
packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name]
packages_extra = BASE_PACKAGES + profile.pkgs_include
if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch)
if local_repos and build_pkgs:
logging.info("Making sure all packages are built")
# enforce that local base packages are built
pkgbuilds = set(filter_pkgbuilds(packages, arch=arch, allow_empty_results=False, use_paths=False))
# extra packages might be a mix of package names that are in our PKGBUILDs and packages from the base distro
pkgbuilds |= set(filter_pkgbuilds(packages_extra, arch=arch, allow_empty_results=True, use_paths=False))
build_packages(pkgbuilds, arch, try_download=not no_download_pkgs)
sector_size = sector_size or device.get_image_sectorsize()
image_path = block_target or get_image_path(device, flavour.name)
makedir(os.path.dirname(image_path))
logging.info(f'Creating new file at {image_path}')
create_img_file(image_path, f"{rootfs_size_mb}M")
loop_device = losetup_rootfs_image(image_path, sector_size or device.get_image_sectorsize_default())
partition_device(loop_device)
partprobe(loop_device)
boot_dev: str
root_dev: str
loop_boot = loop_device + 'p1'
loop_root = loop_device + 'p2'
if skip_part_images:
boot_dev = loop_boot
root_dev = loop_root
else:
logging.info('Creating per-partition image files')
boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), IMG_FILE_BOOT_DEFAULT_SIZE)
root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M')
create_boot_fs(boot_dev, sector_size)
create_root_fs(root_dev, sector_size)
install_rootfs(
root_dev,
boot_dev,
device,
flavour,
arch,
list(set(packages) | set(packages_extra)),
local_repos,
profile,
)
if not skip_part_images:
logging.info('Copying partition image files into full image:')
logging.info(f'Block-copying /boot to {image_path}')
dd_image(input=boot_dev, output=loop_boot)
logging.info(f'Block-copying rootfs to {image_path}')
dd_image(input=root_dev, output=loop_root)
logging.info(f'Done! Image saved to {image_path}')
@cmd_image.command(name='inspect')
@click.option('--shell', '-s', is_flag=True)
@sectorsize_option
@click.argument('profile', required=False)
def cmd_inspect(profile: Optional[str] = None, shell: bool = False, sector_size: Optional[int] = None):
"""Loop-mount the device image for inspection."""
config.enforce_profile_device_set()
config.enforce_profile_flavour_set()
enforce_wrap()
device = get_profile_device(profile)
arch = device.arch
flavour = get_profile_flavour(profile).name
sector_size = sector_size or device.get_image_sectorsize_default()
chroot = get_device_chroot(device.name, flavour, arch)
image_path = get_image_path(device, flavour)
loop_device = losetup_rootfs_image(image_path, sector_size)
partprobe(loop_device)
mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot)
logging.info(f'Inspect the rootfs image at {chroot.path}')
if shell:
chroot.initialized = True
chroot.activate()
if arch != config.runtime.arch:
logging.info('Installing requisites for foreign-arch shell')
build_enable_qemu_binfmt(arch)
logging.info('Starting inspection shell')
chroot.run_cmd('/bin/bash')
else:
pause()

View file

@ -37,11 +37,6 @@ def ctx() -> click.Context:
return click.Context(click.Command('integration_tests')) return click.Context(click.Command('integration_tests'))
def test_main_import():
from main import cli
assert cli
def test_config_load(ctx: click.Context): def test_config_load(ctx: click.Context):
path = config.runtime.config_file path = config.runtime.config_file
assert path assert path

View file

@ -6,9 +6,7 @@ import click
from config.state import config from config.state import config
from constants import SSH_COMMON_OPTIONS, SSH_DEFAULT_HOST, SSH_DEFAULT_PORT from constants import SSH_COMMON_OPTIONS, SSH_DEFAULT_HOST, SSH_DEFAULT_PORT
from chroot.abstract import Chroot
from exec.cmd import run_cmd from exec.cmd import run_cmd
from exec.file import write_file
from wrapper import check_programs_wrap from wrapper import check_programs_wrap
@ -85,16 +83,21 @@ def find_ssh_keys():
return keys return keys
def copy_ssh_keys(chroot: Chroot, user: str, allow_fail: bool = False): def copy_ssh_keys(root_dir: str, user: str):
check_programs_wrap(['ssh-keygen']) check_programs_wrap(['ssh-keygen'])
ssh_dir_relative = os.path.join('/home', user, '.ssh') authorized_keys_file = os.path.join(
ssh_dir = chroot.get_path(ssh_dir_relative) root_dir,
authorized_keys_file_rel = os.path.join(ssh_dir_relative, 'authorized_keys') 'home',
authorized_keys_file = chroot.get_path(authorized_keys_file_rel) user,
'.ssh',
'authorized_keys',
)
if os.path.exists(authorized_keys_file):
os.unlink(authorized_keys_file)
keys = find_ssh_keys() keys = find_ssh_keys()
if len(keys) == 0: if len(keys) == 0:
logging.warning("Could not find any ssh key to copy") logging.info("Could not find any ssh key to copy")
create = click.confirm("Do you want me to generate an ssh key for you?", True) create = click.confirm("Do you want me to generate an ssh key for you?", True)
if not create: if not create:
return return
@ -113,34 +116,15 @@ def copy_ssh_keys(chroot: Chroot, user: str, allow_fail: bool = False):
logging.fatal("Failed to generate ssh key") logging.fatal("Failed to generate ssh key")
keys = find_ssh_keys() keys = find_ssh_keys()
if not keys: ssh_dir = os.path.join(root_dir, 'home', user, '.ssh')
logging.warning("No SSH keys to be copied. Skipping.") if not os.path.exists(ssh_dir):
return os.makedirs(ssh_dir, exist_ok=True, mode=0o700)
auth_key_lines = [] with open(authorized_keys_file, 'a') as authorized_keys:
for key in keys: for key in keys:
pub = f'{key}.pub' pub = f'{key}.pub'
if not os.path.exists(pub): if not os.path.exists(pub):
logging.debug(f'Skipping key {key}: {pub} not found') logging.debug(f'Skipping key {key}: {pub} not found')
continue continue
try:
with open(pub, 'r') as file: with open(pub, 'r') as file:
contents = file.read() authorized_keys.write(file.read())
if not contents.strip():
continue
auth_key_lines.append(contents)
except Exception as ex:
logging.warning(f"Could not read ssh pub key {pub}", exc_info=ex)
continue
if not os.path.exists(ssh_dir):
logging.info(f"Creating {ssh_dir_relative!r} dir in chroot {chroot.path!r}")
chroot.run_cmd(["mkdir", "-p", "-m", "700", ssh_dir_relative], switch_user=user)
logging.info(f"Writing SSH pub keys to {authorized_keys_file}")
try:
write_file(authorized_keys_file, "\n".join(auth_key_lines), user=str(chroot.get_uid(user)), mode="644")
except Exception as ex:
logging.error(f"Failed to write SSH authorized_keys_file at {authorized_keys_file!r}:", exc_info=ex)
if allow_fail:
return
raise ex from ex

View file

@ -290,8 +290,7 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
return None return None
repo_pkg: RemotePackage = repo.packages[pkgname] repo_pkg: RemotePackage = repo.packages[pkgname]
if repo_pkg.version != package.version: if repo_pkg.version != package.version:
logging.debug(f"Package {pkgname} versions differ: local: {package.version}, " logging.debug(f"Package {pkgname} versions differ: local: {package.version}, remote: {repo_pkg.version}. Building instead.")
f"remote: {repo_pkg.version}. Building instead.")
return None return None
if repo_pkg.filename != filename: if repo_pkg.filename != filename:
versions_str = f"local: {filename}, remote: {repo_pkg.filename}" versions_str = f"local: {filename}, remote: {repo_pkg.filename}"
@ -299,20 +298,6 @@ def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) ->
logging.debug(f"package filenames don't match: {versions_str}") logging.debug(f"package filenames don't match: {versions_str}")
return None return None
logging.debug(f"ignoring compression extension difference: {versions_str}") logging.debug(f"ignoring compression extension difference: {versions_str}")
cache_file = os.path.join(config.get_path('pacman'), arch, repo_pkg.filename)
if os.path.exists(cache_file):
if not repo_pkg._desc or 'SHA256SUM' not in repo_pkg._desc:
cache_matches = False
extra_msg = ". However, we can't validate it, as the https repo doesnt provide a SHA256SUM for it."
else:
cache_matches = sha256sum(cache_file) == repo_pkg._desc['SHA256SUM']
extra_msg = (". However its checksum doesn't match." if not cache_matches else " and its checksum matches.")
logging.debug(f"While checking the HTTPS repo DB, we found a matching filename in the pacman cache{extra_msg}")
if cache_matches:
logging.info(f'copying cache file {cache_file} to repo as verified by remote checksum')
shutil.copy(cache_file, dest_file_path)
remove_file(cache_file)
return dest_file_path
url = repo_pkg.resolved_url url = repo_pkg.resolved_url
assert url assert url
try: try:
@ -439,11 +424,10 @@ def setup_build_chroot(
extra_packages: list[str] = [], extra_packages: list[str] = [],
add_kupfer_repos: bool = True, add_kupfer_repos: bool = True,
clean_chroot: bool = False, clean_chroot: bool = False,
repo: Optional[dict[str, Pkgbuild]] = None,
) -> BuildChroot: ) -> BuildChroot:
assert config.runtime.arch assert config.runtime.arch
if arch != config.runtime.arch: if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch, repo=repo or discover_pkgbuilds(), lazy=False) build_enable_qemu_binfmt(arch, lazy=False)
init_prebuilts(arch) init_prebuilts(arch)
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos) chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
chroot.mount_packages() chroot.mount_packages()
@ -512,7 +496,6 @@ def build_package(
enable_ccache: bool = True, enable_ccache: bool = True,
clean_chroot: bool = False, clean_chroot: bool = False,
build_user: str = 'kupfer', build_user: str = 'kupfer',
repo: Optional[dict[str, Pkgbuild]] = None,
): ):
makepkg_compile_opts = ['--holdver'] makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf' makepkg_conf_path = 'etc/makepkg.conf'
@ -532,7 +515,6 @@ def build_package(
arch=arch, arch=arch,
extra_packages=deps, extra_packages=deps,
clean_chroot=clean_chroot, clean_chroot=clean_chroot,
repo=repo,
) )
assert config.runtime.arch assert config.runtime.arch
native_chroot = target_chroot native_chroot = target_chroot
@ -542,7 +524,6 @@ def build_package(
arch=config.runtime.arch, arch=config.runtime.arch,
extra_packages=['base-devel'] + CROSSDIRECT_PKGS, extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot, clean_chroot=clean_chroot,
repo=repo,
) )
if not package.mode: if not package.mode:
logging.warning(f'Package {package.path} has no _mode set, assuming "host"') logging.warning(f'Package {package.path} has no _mode set, assuming "host"')
@ -575,7 +556,7 @@ def build_package(
build_root = target_chroot build_root = target_chroot
makepkg_compile_opts += ['--nodeps' if package.nodeps else '--syncdeps'] makepkg_compile_opts += ['--nodeps' if package.nodeps else '--syncdeps']
env = deepcopy(get_makepkg_env(arch)) env = deepcopy(get_makepkg_env(arch))
if foreign_arch and package.crossdirect and enable_crossdirect and package.name not in CROSSDIRECT_PKGS: if foreign_arch and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}" env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}"
target_chroot.mount_crossdirect(native_chroot) target_chroot.mount_crossdirect(native_chroot)
else: else:
@ -761,7 +742,6 @@ def build_packages(
enable_crossdirect=enable_crossdirect, enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache, enable_ccache=enable_ccache,
clean_chroot=clean_chroot, clean_chroot=clean_chroot,
repo=repo,
) )
files += add_package_to_repo(package, arch) files += add_package_to_repo(package, arch)
updated_repos.add(package.repo) updated_repos.add(package.repo)
@ -836,20 +816,8 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
logging.info('Installing qemu-user (building if necessary)') logging.info('Installing qemu-user (building if necessary)')
check_programs_wrap(['pacman', 'makepkg', 'pacstrap']) check_programs_wrap(['pacman', 'makepkg', 'pacstrap'])
# build qemu-user, binfmt, crossdirect # build qemu-user, binfmt, crossdirect
packages = list(CROSSDIRECT_PKGS)
hostspec = GCC_HOSTSPECS[arch][arch]
cross_gcc = f"{hostspec}-gcc"
if repo:
for pkg in repo.values():
if (pkg.name == cross_gcc or cross_gcc in pkg.provides):
if config.runtime.arch not in pkg.arches:
logging.debug(f"Package {pkg.path} matches {cross_gcc=} name but not arch: {pkg.arches=}")
continue
packages.append(pkg.path)
logging.debug(f"Adding gcc package {pkg.path} to the necessary crosscompilation tools")
break
build_packages_by_paths( build_packages_by_paths(
packages, CROSSDIRECT_PKGS,
native, native,
repo=repo, repo=repo,
try_download=True, try_download=True,

View file

@ -313,7 +313,7 @@ def cmd_list():
logging.info(f'Done! {len(packages)} Pkgbuilds:') logging.info(f'Done! {len(packages)} Pkgbuilds:')
for name in sorted(packages.keys()): for name in sorted(packages.keys()):
p = packages[name] p = packages[name]
print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; crossdirect: {p.crossdirect} provides: {p.provides}; replaces: {p.replaces};' print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; provides: {p.provides}; replaces: {p.replaces};'
f'local_depends: {p.local_depends}; depends: {p.depends}') f'local_depends: {p.local_depends}; depends: {p.depends}')
@ -346,7 +346,6 @@ def cmd_check(paths):
mode_key = '_mode' mode_key = '_mode'
nodeps_key = '_nodeps' nodeps_key = '_nodeps'
crossdirect_key = '_crossdirect'
pkgbase_key = 'pkgbase' pkgbase_key = 'pkgbase'
pkgname_key = 'pkgname' pkgname_key = 'pkgname'
arches_key = '_arches' arches_key = '_arches'
@ -357,7 +356,6 @@ def cmd_check(paths):
required = { required = {
mode_key: True, mode_key: True,
nodeps_key: False, nodeps_key: False,
crossdirect_key: False,
pkgbase_key: False, pkgbase_key: False,
pkgname_key: True, pkgname_key: True,
'pkgdesc': False, 'pkgdesc': False,

View file

@ -156,7 +156,6 @@ class Pkgbuild(PackageInfo):
repo: str repo: str
mode: str mode: str
nodeps: bool nodeps: bool
crossdirect: bool
path: str path: str
pkgver: str pkgver: str
pkgrel: str pkgrel: str
@ -191,7 +190,6 @@ class Pkgbuild(PackageInfo):
self.repo = repo or '' self.repo = repo or ''
self.mode = '' self.mode = ''
self.nodeps = False self.nodeps = False
self.crossdirect = True
self.path = relative_path self.path = relative_path
self.pkgver = '' self.pkgver = ''
self.pkgrel = '' self.pkgrel = ''
@ -225,7 +223,6 @@ class Pkgbuild(PackageInfo):
self.repo = pkg.repo self.repo = pkg.repo
self.mode = pkg.mode self.mode = pkg.mode
self.nodeps = pkg.nodeps self.nodeps = pkg.nodeps
self.crossdirect = pkg.crossdirect
self.path = pkg.path self.path = pkg.path
self.pkgver = pkg.pkgver self.pkgver = pkg.pkgver
self.pkgrel = pkg.pkgrel self.pkgrel = pkg.pkgrel
@ -313,11 +310,8 @@ class SubPkgbuild(Pkgbuild):
self.sources_refreshed = False self.sources_refreshed = False
self.update(pkgbase) self.update(pkgbase)
# set to None - will be replaced with base_pkg if still None after parsing self.provides = {}
self.depends = None # type: ignore[assignment] self.replaces = []
self.makedepends = None # type: ignore[assignment]
self.provides = None # type: ignore[assignment]
self.replaces = None # type: ignore[assignment]
def refresh_sources(self, lazy: bool = True): def refresh_sources(self, lazy: bool = True):
assert self.pkgbase assert self.pkgbase
@ -360,11 +354,7 @@ def parse_pkgbuild(
else: else:
raise Exception(msg) raise Exception(msg)
# if _crossdirect is unset (None), it defaults to True
crossdirect_enabled = srcinfo_cache.build_crossdirect in (None, True)
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed, srcinfo_cache=srcinfo_cache) base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed, srcinfo_cache=srcinfo_cache)
base_package.crossdirect = crossdirect_enabled
base_package.mode = mode base_package.mode = mode
base_package.nodeps = nodeps base_package.nodeps = nodeps
base_package.repo = relative_pkg_dir.split('/')[0] base_package.repo = relative_pkg_dir.split('/')[0]
@ -393,21 +383,13 @@ def parse_pkgbuild(
elif line.startswith('arch'): elif line.startswith('arch'):
current.arches.append(splits[1]) current.arches.append(splits[1])
elif line.startswith('provides'): elif line.startswith('provides'):
if not current.provides:
current.provides = {}
current.provides = get_version_specs(splits[1], current.provides) current.provides = get_version_specs(splits[1], current.provides)
elif line.startswith('replaces'): elif line.startswith('replaces'):
if not current.replaces:
current.replaces = []
current.replaces.append(splits[1]) current.replaces.append(splits[1])
elif splits[0] in ['depends', 'makedepends', 'checkdepends', 'optdepends']: elif splits[0] in ['depends', 'makedepends', 'checkdepends', 'optdepends']:
spec = splits[1].split(': ', 1)[0] spec = splits[1].split(': ', 1)[0]
if not current.depends:
current.depends = (base_package.makedepends or {}).copy()
current.depends = get_version_specs(spec, current.depends) current.depends = get_version_specs(spec, current.depends)
if splits[0] == 'makedepends': if splits[0] == 'makedepends':
if not current.makedepends:
current.makedepends = {}
current.makedepends = get_version_specs(spec, current.makedepends) current.makedepends = get_version_specs(spec, current.makedepends)
results: list[Pkgbuild] = list(base_package.subpackages) results: list[Pkgbuild] = list(base_package.subpackages)
@ -420,15 +402,6 @@ def parse_pkgbuild(
pkg.update_version() pkg.update_version()
if not (pkg.version == base_package.version): if not (pkg.version == base_package.version):
raise Exception(f'Subpackage malformed! Versions differ! base: {base_package}, subpackage: {pkg}') raise Exception(f'Subpackage malformed! Versions differ! base: {base_package}, subpackage: {pkg}')
if isinstance(pkg, SubPkgbuild):
if pkg.depends is None:
pkg.depends = base_package.depends
if pkg.makedepends is None:
pkg.makedepends = base_package.makedepends
if pkg.replaces is None:
pkg.replaces = base_package.replaces
if pkg.provides is None:
pkg.provides = base_package.provides
return results return results

View file

@ -68,19 +68,11 @@ class SrcInitialisedFile(JsonFile):
raise ex raise ex
srcinfo_meta_defaults = {
'build_mode': None,
"build_nodeps": None,
"build_crossdirect": None,
}
class SrcinfoMetaFile(JsonFile): class SrcinfoMetaFile(JsonFile):
checksums: dict[str, str] checksums: dict[str, str]
build_mode: Optional[str] build_mode: Optional[str]
build_nodeps: Optional[bool] build_nodeps: Optional[bool]
build_crossdirect: Optional[bool]
_changed: bool _changed: bool
_filename: ClassVar[str] = SRCINFO_METADATA_FILE _filename: ClassVar[str] = SRCINFO_METADATA_FILE
@ -100,8 +92,9 @@ class SrcinfoMetaFile(JsonFile):
s = SrcinfoMetaFile({ s = SrcinfoMetaFile({
'_relative_path': relative_pkg_dir, '_relative_path': relative_pkg_dir,
'_changed': True, '_changed': True,
'build_mode': '',
'build_nodeps': None,
'checksums': {}, 'checksums': {},
**srcinfo_meta_defaults,
}) })
return s, s.refresh_all() return s, s.refresh_all()
@ -127,11 +120,9 @@ class SrcinfoMetaFile(JsonFile):
if not force_refresh: if not force_refresh:
logging.debug(f'{metadata._relative_path}: srcinfo checksums match!') logging.debug(f'{metadata._relative_path}: srcinfo checksums match!')
lines = lines or metadata.read_srcinfo_file() lines = lines or metadata.read_srcinfo_file()
for build_field in srcinfo_meta_defaults.keys(): for build_field in ['build_mode', 'build_nodeps']:
if build_field not in metadata: if build_field not in metadata:
metadata.refresh_build_fields() metadata.refresh_build_fields()
if write:
metadata.write()
break break
else: else:
lines = metadata.refresh_all(write=write) lines = metadata.refresh_all(write=write)
@ -152,7 +143,8 @@ class SrcinfoMetaFile(JsonFile):
self._changed = True self._changed = True
def refresh_build_fields(self): def refresh_build_fields(self):
self.update(srcinfo_meta_defaults) self['build_mode'] = None
self['build_nodeps'] = None
with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file: with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file:
lines = file.read().split('\n') lines = file.read().split('\n')
for line in lines: for line in lines:
@ -164,8 +156,6 @@ class SrcinfoMetaFile(JsonFile):
self.build_mode = val self.build_mode = val
elif key == '_nodeps': elif key == '_nodeps':
self.build_nodeps = val.lower() == 'true' self.build_nodeps = val.lower() == 'true'
elif key == '_crossdirect':
self.build_crossdirect = val.lower() == 'true'
else: else:
continue continue