image: add LUKS support and --[no-]encryption CLI flag to build & inspect subcommands

This commit is contained in:
InsanePrawn 2023-07-09 03:20:33 +02:00
parent 39b9cebc54
commit 6a7bfbb49b
5 changed files with 236 additions and 49 deletions

View file

@ -171,9 +171,9 @@ def symlink(source, target):
raise Exception(f'Symlink creation of {target} pointing at {source} failed') raise Exception(f'Symlink creation of {target} pointing at {source} failed')
def get_temp_dir(register_cleanup=True, mode: int = 0o0755): def get_temp_dir(register_cleanup=True, mode: int = 0o0755, prefix='kupfertmp_'):
"create a new tempdir and sanitize ownership so root can access user files as god intended" "create a new tempdir and sanitize ownership so root can access user files as god intended"
t = mkdtemp() t = mkdtemp(prefix=prefix)
chmod(t, mode, privileged=False) chmod(t, mode, privileged=False)
if register_cleanup: if register_cleanup:
atexit.register(remove_file, t, recursive=True) atexit.register(remove_file, t, recursive=True)

View file

@ -6,7 +6,7 @@ from signal import pause
from typing import Optional from typing import Optional
from config.state import config, Profile from config.state import config, Profile
from constants import BASE_LOCAL_PACKAGES, BASE_PACKAGES from constants import BASE_LOCAL_PACKAGES, BASE_PACKAGES, LUKS_MAPPER_DEFAULT
from devices.device import get_profile_device from devices.device import get_profile_device
from exec.file import makedir from exec.file import makedir
from flavours.flavour import get_profile_flavour from flavours.flavour import get_profile_flavour
@ -14,6 +14,7 @@ from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgb
from wrapper import enforce_wrap from wrapper import enforce_wrap
from .boot import cmd_boot from .boot import cmd_boot
from .cryptsetup import encryption_option, get_cryptmapper_path, luks_close, luks_create, luks_open
from .flash import cmd_flash from .flash import cmd_flash
from .image import ( from .image import (
IMG_FILE_BOOT_DEFAULT_SIZE, IMG_FILE_BOOT_DEFAULT_SIZE,
@ -84,6 +85,7 @@ sectorsize_option = click.option(
default=False, default=False,
is_flag=True, is_flag=True,
) )
@encryption_option
@sectorsize_option @sectorsize_option
def cmd_build( def cmd_build(
profile_name: Optional[str] = None, profile_name: Optional[str] = None,
@ -93,6 +95,9 @@ def cmd_build(
block_target: Optional[str] = None, block_target: Optional[str] = None,
sector_size: Optional[int] = None, sector_size: Optional[int] = None,
skip_part_images: bool = False, skip_part_images: bool = False,
encryption: Optional[bool] = None,
encryption_password: Optional[str] = None,
encryption_mapper: str = LUKS_MAPPER_DEFAULT,
): ):
""" """
Build a device image. Build a device image.
@ -110,9 +115,15 @@ def cmd_build(
flavour = get_profile_flavour(profile_name) flavour = get_profile_flavour(profile_name)
rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb) rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb)
if encryption is None:
encryption = profile.encryption
packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name] packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name]
packages_extra = BASE_PACKAGES + profile.pkgs_include packages_extra = BASE_PACKAGES + profile.pkgs_include
if encryption:
packages_extra += ['cryptsetup', 'util-linux'] # TODO: select osk-sdl here somehow
if arch != config.runtime.arch: if arch != config.runtime.arch:
build_enable_qemu_binfmt(arch) build_enable_qemu_binfmt(arch)
@ -140,6 +151,7 @@ def cmd_build(
boot_dev: str boot_dev: str
root_dev: str root_dev: str
root_dev_raw: str
loop_boot = loop_device + 'p1' loop_boot = loop_device + 'p1'
loop_root = loop_device + 'p2' loop_root = loop_device + 'p2'
if skip_part_images: if skip_part_images:
@ -150,8 +162,23 @@ def cmd_build(
boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), IMG_FILE_BOOT_DEFAULT_SIZE) boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), IMG_FILE_BOOT_DEFAULT_SIZE)
root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M') root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M')
create_boot_fs(boot_dev, sector_size) root_dev_raw = root_dev
if encryption:
encryption_password = encryption_password or profile.encryption_password
if not encryption_password:
encryption_password = click.prompt(
"Please enter your encryption password (input hidden)",
hide_input=True,
confirmation_prompt=True,
)
luks_create(root_dev, password=encryption_password)
luks_open(root_dev, mapper_name=encryption_mapper, password=encryption_password)
root_dev = get_cryptmapper_path(encryption_mapper)
assert os.path.exists(root_dev)
create_root_fs(root_dev, sector_size) create_root_fs(root_dev, sector_size)
create_boot_fs(boot_dev, sector_size)
install_rootfs( install_rootfs(
root_dev, root_dev,
@ -162,14 +189,17 @@ def cmd_build(
list(set(packages) | set(packages_extra)), list(set(packages) | set(packages_extra)),
local_repos, local_repos,
profile, profile,
encrypted=bool(encryption),
) )
if encryption:
luks_close(mapper_name=encryption_mapper)
if not skip_part_images: if not skip_part_images:
logging.info('Copying partition image files into full image:') logging.info('Copying partition image files into full image:')
logging.info(f'Block-copying /boot to {image_path}') logging.info(f'Block-copying /boot to {image_path}')
dd_image(input=boot_dev, output=loop_boot) dd_image(input=boot_dev, output=loop_boot)
logging.info(f'Block-copying rootfs to {image_path}') logging.info(f'Block-copying rootfs to {image_path}')
dd_image(input=root_dev, output=loop_root) dd_image(input=root_dev_raw, output=loop_root)
logging.info(f'Done! Image saved to {image_path}') logging.info(f'Done! Image saved to {image_path}')
@ -178,17 +208,20 @@ def cmd_build(
@click.option('--shell', '-s', is_flag=True) @click.option('--shell', '-s', is_flag=True)
@click.option('--use-local-repos', '-l', is_flag=True) @click.option('--use-local-repos', '-l', is_flag=True)
@sectorsize_option @sectorsize_option
@encryption_option
@click.argument('profile', required=False) @click.argument('profile', required=False)
def cmd_inspect( def cmd_inspect(
profile: Optional[str] = None, profile: Optional[str] = None,
shell: bool = False, shell: bool = False,
sector_size: Optional[int] = None, sector_size: Optional[int] = None,
use_local_repos: bool = False, use_local_repos: bool = False,
encryption: Optional[bool] = None,
): ):
"""Loop-mount the device image for inspection.""" """Loop-mount the device image for inspection."""
config.enforce_profile_device_set() config.enforce_profile_device_set()
config.enforce_profile_flavour_set() config.enforce_profile_flavour_set()
enforce_wrap() enforce_wrap()
profile_conf = config.get_profile(profile)
device = get_profile_device(profile) device = get_profile_device(profile)
arch = device.arch arch = device.arch
flavour = get_profile_flavour(profile).name flavour = get_profile_flavour(profile).name
@ -198,7 +231,7 @@ def cmd_inspect(
image_path = get_image_path(device, flavour) image_path = get_image_path(device, flavour)
loop_device = losetup_rootfs_image(image_path, sector_size) loop_device = losetup_rootfs_image(image_path, sector_size)
partprobe(loop_device) partprobe(loop_device)
mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot) mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot, password=profile_conf.encryption_password)
logging.info(f'Inspect the rootfs image at {chroot.path}') logging.info(f'Inspect the rootfs image at {chroot.path}')

View file

@ -1,4 +1,5 @@
import atexit import atexit
import click
import logging import logging
import os import os
@ -8,6 +9,14 @@ from constants import LUKS_LABEL_DEFAULT
from chroot.build import BuildChroot from chroot.build import BuildChroot
from exec.cmd import run_cmd, CompletedProcess from exec.cmd import run_cmd, CompletedProcess
encryption_option = click.option(
'--encryption/--no-encryption',
help="Force applying/ignoring LUKS encryption when handling the device image."
"Defaults to using the Profile's setting.",
default=None,
is_flag=True,
)
def get_accessible_user(path): def get_accessible_user(path):
return None if os.access(path, os.R_OK) else 'root' return None if os.access(path, os.R_OK) else 'root'
@ -47,6 +56,38 @@ def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bo
return bool(result.stdout and result.stdout.strip()) return bool(result.stdout and result.stdout.strip())
def get_luks_offset(
mapper_name: str,
native_chroot: Optional[BuildChroot] = None,
) -> tuple[int, int]:
device_path = get_cryptmapper_path(mapper_name)
check_dev_exists(device_path, 'get offset of')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
user = get_accessible_user(device_path)
stdout: str = ''
cmd = ['cryptsetup', 'status', mapper_name]
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
assert isinstance(result, CompletedProcess)
if not (result.stdout and (stdout := result.stdout.strip())):
raise Exception(f"Couldn't get LUKS offset for {mapper_name!r} from 'cryptsetup status': empty stdout: {stdout!r}")
markers = {'offset': -1, 'sector size': -1}
for line in stdout.decode().split('\n'):
line = line.strip()
for item in markers:
offset_marker = f'{item}:'
if line.startswith(offset_marker):
try:
markers[item] = int(line.split(offset_marker)[-1].strip().split(' ')[0])
except Exception as ex:
raise Exception(f"Couldn't get LUKS {item=} for {mapper_name!r} due to an exception parsing cryptsetup output: {ex}")
for i in markers.values():
if i != -1:
continue
logging.debug(f"Failed to find ':' in stdout: {stdout}")
raise Exception(f"Failed to find LUKS offset for {mapper_name!r}: Offset line not found")
return markers['offset'], markers['sector size']
def luks_create( def luks_create(
backing_device: str, backing_device: str,
label: str = LUKS_LABEL_DEFAULT, label: str = LUKS_LABEL_DEFAULT,

View file

@ -5,6 +5,7 @@ import logging
from typing import Optional from typing import Optional
from config.state import config
from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE
from exec.cmd import run_root_cmd from exec.cmd import run_root_cmd
from exec.file import get_temp_dir from exec.file import get_temp_dir
@ -15,6 +16,7 @@ from wrapper import enforce_wrap
from .fastboot import fastboot_flash from .fastboot import fastboot_flash
from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs
from .cryptsetup import encryption_option
ABOOT = FLASH_PARTS['ABOOT'] ABOOT = FLASH_PARTS['ABOOT']
LK2ND = FLASH_PARTS['LK2ND'] LK2ND = FLASH_PARTS['LK2ND']
@ -47,7 +49,7 @@ def test_blockdev(path: str):
'microSD inserted or no microSD card slot installed in the device) or corrupt or defect') 'microSD inserted or no microSD card slot installed in the device) or corrupt or defect')
def prepare_minimal_image(source_path: str, sector_size: int) -> str: def prepare_minimal_image(source_path: str, sector_size: int, encrypted: Optional[bool], encryption_password: Optional[str]) -> str:
minimal_image_dir = get_temp_dir(register_cleanup=True) minimal_image_dir = get_temp_dir(register_cleanup=True)
minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}') minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}')
logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking") logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking")
@ -55,7 +57,7 @@ def prepare_minimal_image(source_path: str, sector_size: int) -> str:
loop_device = losetup_rootfs_image(minimal_image_path, sector_size) loop_device = losetup_rootfs_image(minimal_image_path, sector_size)
partprobe(loop_device) partprobe(loop_device)
shrink_fs(loop_device, minimal_image_path, sector_size) shrink_fs(loop_device, minimal_image_path, sector_size, encrypted, encryption_password)
losetup_destroy(loop_device) losetup_destroy(loop_device)
return minimal_image_path return minimal_image_path
@ -67,6 +69,7 @@ def prepare_minimal_image(source_path: str, sector_size: int) -> str:
@click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size") @click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size")
@click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None) @click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None)
@click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands") @click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands")
@encryption_option
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values()))) @click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
@click.argument('location', type=str, required=False) @click.argument('location', type=str, required=False)
def cmd_flash( def cmd_flash(
@ -78,6 +81,7 @@ def cmd_flash(
shrink: bool = True, shrink: bool = True,
sector_size: Optional[int] = None, sector_size: Optional[int] = None,
confirm: bool = False, confirm: bool = False,
encryption: Optional[bool] = None,
): ):
""" """
Flash a partition onto a device. Flash a partition onto a device.
@ -115,7 +119,12 @@ def cmd_flash(
if not location: if not location:
raise Exception(f'You need to specify a location to flash {what} to') raise Exception(f'You need to specify a location to flash {what} to')
path = '' path = ''
image_path = prepare_minimal_image(device_image_path, sector_size) if shrink else device_image_path image_path = prepare_minimal_image(
device_image_path,
sector_size,
encrypted=encryption,
encryption_password=config.get_profile(profile).encryption_password,
) if shrink else device_image_path
if method == FASTBOOT: if method == FASTBOOT:
fastboot_flash( fastboot_flash(
partition=location, partition=location,

View file

@ -7,15 +7,21 @@ import subprocess
from subprocess import CompletedProcess from subprocess import CompletedProcess
from typing import Optional, Union from typing import Optional, Union
from config.state import config, Profile from chroot.build import BuildChroot, get_build_chroot
from chroot.device import DeviceChroot, get_device_chroot from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, POST_INSTALL_CMDS from config.state import config, Profile
from constants import Arch, LUKS_MAPPER_DEFAULT, POST_INSTALL_CMDS
from distro.distro import get_base_distro, get_kupfer_https from distro.distro import get_base_distro, get_kupfer_https
from devices.device import Device from devices.device import Device
from exec.cmd import run_root_cmd, generate_cmd_su from exec.cmd import run_root_cmd, generate_cmd_su
from exec.file import get_temp_dir, root_write_file, root_makedir from exec.file import get_temp_dir, root_write_file, root_makedir
from flavours.flavour import Flavour from flavours.flavour import Flavour
from net.ssh import copy_ssh_keys from net.ssh import copy_ssh_keys
from utils import programs_available
from .cryptsetup import is_luks, get_luks_offset, luks_close, luks_open
MAPPER_DIR = '/dev/mapper/'
# image files need to be slightly smaller than partitions to fit # image files need to be slightly smaller than partitions to fit
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M" IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
@ -72,24 +78,58 @@ def align_bytes(size_bytes: int, alignment: int = 4096) -> int:
return size_bytes return size_bytes
def shrink_fs(loop_device: str, file: str, sector_size: int): def shrink_fs(
loop_device: str,
file: str,
sector_size: int,
encrypted: Optional[bool] = None,
encryption_password: Optional[str] = None,
crypt_mapper=LUKS_MAPPER_DEFAULT,
):
partprobe(loop_device) partprobe(loop_device)
logging.debug(f"Checking filesystem at {loop_device}p2") root_partition = f'{loop_device}p2'
result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2']) root_partition_fs = root_partition
if not (encrypted is False):
root_partition_fs, native_chroot, encrypted = resolve_rootfs_crypt(
root_partition,
fail_on_unencrypted=bool(encrypted),
crypt_mapper=crypt_mapper,
password=encryption_password,
)
logging.debug(f"Checking filesystem at {root_partition_fs}")
result = run_root_cmd(['e2fsck', '-fy', root_partition_fs])
if result.returncode > 2: if result.returncode > 2:
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE # https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}') raise Exception(f'Failed to e2fsck {root_partition_fs} with exit code {result.returncode}')
logging.info(f'Shrinking filesystem at {loop_device}p2') logging.info(f'Shrinking filesystem at {root_partition_fs}')
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2']) result = run_root_cmd(['resize2fs', '-M', root_partition_fs])
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed to resize2fs {loop_device}p2') raise Exception(f'Failed to resize2fs {root_partition_fs}')
logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2') logging.debug(f'Reading size of shrunken filesystem on {root_partition_fs}')
fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2') fs_blocks, fs_block_size = get_fs_size(root_partition_fs)
sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size) sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size)
logging.debug(f"shrunken FS length is {fs_blocks} blocks * {fs_block_size} bytes = {sectors} bytes")
logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)') _, image_size = find_end_sector(loop_device, root_partition, sector_size)
if image_size == -1:
raise Exception(f'Failed to find pre-repartition size of {loop_device}')
if encrypted:
if sectors > image_size:
raise Exception("Shrunk FS size allegedly larger than the image itself; this is probably "
f"a kupferbootstrap parsing bug. shrunk partition end={sectors}, image size={image_size}, {sector_size=}")
old_sectors = sectors
luks_offset, luks_sector_size = get_luks_offset(crypt_mapper, native_chroot)
#luks_offset_bytes = align_bytes((luks_offset + 1) * luks_sector_size, sector_size)
luks_offset_normalized = bytes_to_sectors(luks_offset * luks_sector_size, sector_size)
logging.debug(f"Discovered LUKS attrs: {luks_offset=}, {luks_sector_size=}, {luks_offset_normalized=}")
luks_close(crypt_mapper, native_chroot)
sectors += luks_offset_normalized + 1024
logging.debug(f"Increasing sectors from {old_sectors} to {sectors} ({sectors - old_sectors}) to leave space for the LUKS header")
logging.info(f'Shrinking partition at {root_partition} to {sectors} {sector_size}b sectors ({sectors * sector_size} bytes)')
child_proccess = subprocess.Popen( child_proccess = subprocess.Popen(
generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -113,27 +153,18 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
# For some reason re-reading the partition table fails, but that is not a problem # For some reason re-reading the partition table fails, but that is not a problem
partprobe(loop_device) partprobe(loop_device)
if returncode > 1: if returncode > 1:
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk') raise Exception(f'Failed to shrink partition size of {root_partition} with fdisk')
partprobe(loop_device).check_returncode() partprobe(loop_device).check_returncode()
logging.debug(f'Finding end sector of partition at {loop_device}p2') end_sector, _ = find_end_sector(loop_device, root_partition, sector_size)
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True) if end_sector == -1:
if result.returncode != 0: raise Exception(f'Failed to find end sector of {root_partition}')
print(result.stdout)
print(result.stderr)
raise Exception(f'Failed to fdisk -l {loop_device}')
end_sector = 0 if end_sector > image_size:
for line in result.stdout.decode('utf-8').split('\n'): logging.warning(f"Clipping sectors ({end_sector}) to {image_size=}")
if line.startswith(f'{loop_device}p2'): end_sector = image_size
parts = list(filter(lambda part: part != '', line.split(' '))) end_size = align_bytes((end_sector + 1024) * sector_size, 4096)
end_sector = int(parts[2])
if end_sector == 0:
raise Exception(f'Failed to find end sector of {loop_device}p2')
end_size = align_bytes((end_sector + 1) * sector_size, 4096)
logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes') logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes')
logging.info(f'Truncating {file} to {end_size} bytes') logging.info(f'Truncating {file} to {end_size} bytes')
@ -143,6 +174,26 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
partprobe(loop_device) partprobe(loop_device)
def find_end_sector(device: str, partition: str, sector_size: int) -> tuple[int, int]:
"""Return (last_sector_index, sector_count) of a partition on a device, returns (-1, -1) if not found"""
logging.debug(f'Finding end sector of partition at {partition}')
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', device], capture_output=True)
if result.returncode != 0:
print(result.stdout)
print(result.stderr)
raise Exception(f'Failed to fdisk -l {device}')
end_sector = -1
num_sectors = -1
for line in result.stdout.decode('utf-8').split('\n'):
if line.startswith(partition):
parts = list(filter(lambda part: part != '', line.split(' ')))
end_sector = int(parts[2])
num_sectors = int(parts[3])
return end_sector, num_sectors
def losetup_destroy(loop_device): def losetup_destroy(loop_device):
logging.debug(f'Destroying loop device {loop_device}') logging.debug(f'Destroying loop device {loop_device}')
run_root_cmd( run_root_cmd(
@ -210,16 +261,59 @@ def losetup_rootfs_image(image_path: str, sector_size: int) -> str:
return loop_device return loop_device
def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot): def resolve_rootfs_crypt(
logging.debug(f'Mounting {rootfs_source} at {chroot.path}') rootfs_source: str,
password: Optional[str] = None,
crypt_mapper: str = LUKS_MAPPER_DEFAULT,
native_chroot: Optional[BuildChroot] = None,
fail_on_unencrypted: bool = True,
) -> tuple[str, Optional[BuildChroot], bool]:
assert config.runtime.arch
is_encrypted = False
if not (native_chroot or programs_available(['blkid'])):
native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'util-linux'])
if is_luks(rootfs_source, native_chroot=native_chroot):
if not (native_chroot or programs_available(['cryptsetup'])):
native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'cryptsetup', 'util-linux'])
luks_open(rootfs_source, crypt_mapper, password=password, native_chroot=native_chroot)
rootfs_source = f'{MAPPER_DIR}{crypt_mapper}'
is_encrypted = True
elif fail_on_unencrypted:
hint = ''
if rootfs_source.startswith(MAPPER_DIR):
hint = (f' HINT: path starts with {MAPPER_DIR!r}, probably already a decrypted volume.'
' This is likely a kupferbootstrap bug.')
raise Exception(f"Error: {rootfs_source!r} is not an encrypted LUKS volume.{hint}")
return rootfs_source, native_chroot, is_encrypted
chroot.mount_rootfs(rootfs_source)
assert (os.path.ismount(chroot.path))
root_makedir(chroot.get_path('boot')) def mount_chroot(
rootfs_source: str,
boot_src: str,
device_chroot: DeviceChroot,
encrypted: Optional[bool] = None,
password: Optional[str] = None,
native_chroot: Optional[BuildChroot] = None,
crypt_mapper: str = LUKS_MAPPER_DEFAULT,
):
if encrypted is not False:
rootfs_source, native_chroot, encrypted = resolve_rootfs_crypt(
rootfs_source,
native_chroot=native_chroot,
crypt_mapper=crypt_mapper,
fail_on_unencrypted=bool(encrypted),
password=password,
)
logging.debug(f'Mounting {boot_src} at {chroot.path}/boot') logging.debug(f'Mounting {rootfs_source} at {device_chroot.path}')
chroot.mount(boot_src, '/boot', options=['defaults'])
device_chroot.mount_rootfs(rootfs_source)
assert (os.path.ismount(device_chroot.path))
root_makedir(device_chroot.get_path('boot'))
logging.debug(f'Mounting {boot_src} at {device_chroot.path}/boot')
device_chroot.mount(boot_src, '/boot', options=['defaults'])
def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None): def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None):
@ -314,18 +408,28 @@ def install_rootfs(
packages: list[str], packages: list[str],
use_local_repos: bool, use_local_repos: bool,
profile: Profile, profile: Profile,
encrypted: bool,
): ):
user = profile['username'] or 'kupfer' user = profile.username or 'kupfer'
chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos) chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos)
mount_chroot(rootfs_device, bootfs_device, chroot) # rootfs_device must be passed the crypt_mapper if encrypted is True
if encrypted:
assert rootfs_device.startswith(MAPPER_DIR)
mount_chroot(
rootfs_device,
bootfs_device,
chroot,
encrypted=False, # rootfs_device is already the crypt_mapper
)
chroot.mount_pacman_cache() chroot.mount_pacman_cache()
chroot.initialize() chroot.initialize()
chroot.activate() chroot.activate()
chroot.create_user( chroot.create_user(
user=user, user=user,
password=profile['password'], password=profile.password,
) )
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True) chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
copy_ssh_keys( copy_ssh_keys(
@ -338,7 +442,7 @@ def install_rootfs(
extra_repos=get_kupfer_https(arch).repos, extra_repos=get_kupfer_https(arch).repos,
in_chroot=True, in_chroot=True,
), ),
'etc/hostname': profile['hostname'] or 'kupfer', 'etc/hostname': profile.hostname or 'kupfer',
} }
for target, content in files.items(): for target, content in files.items():
root_write_file(os.path.join(chroot.path, target.lstrip('/')), content) root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)