diff --git a/exec/file.py b/exec/file.py index 852ad48..167d3f8 100644 --- a/exec/file.py +++ b/exec/file.py @@ -171,9 +171,9 @@ def symlink(source, target): raise Exception(f'Symlink creation of {target} pointing at {source} failed') -def get_temp_dir(register_cleanup=True, mode: int = 0o0755): +def get_temp_dir(register_cleanup=True, mode: int = 0o0755, prefix='kupfertmp_'): "create a new tempdir and sanitize ownership so root can access user files as god intended" - t = mkdtemp() + t = mkdtemp(prefix=prefix) chmod(t, mode, privileged=False) if register_cleanup: atexit.register(remove_file, t, recursive=True) diff --git a/image/cli.py b/image/cli.py index c4b4143..3833339 100644 --- a/image/cli.py +++ b/image/cli.py @@ -6,7 +6,7 @@ from signal import pause from typing import Optional from config.state import config, Profile -from constants import BASE_LOCAL_PACKAGES, BASE_PACKAGES +from constants import BASE_LOCAL_PACKAGES, BASE_PACKAGES, LUKS_MAPPER_DEFAULT from devices.device import get_profile_device from exec.file import makedir from flavours.flavour import get_profile_flavour @@ -14,6 +14,7 @@ from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgb from wrapper import enforce_wrap from .boot import cmd_boot +from .cryptsetup import encryption_option, get_cryptmapper_path, luks_close, luks_create, luks_open from .flash import cmd_flash from .image import ( IMG_FILE_BOOT_DEFAULT_SIZE, @@ -84,6 +85,7 @@ sectorsize_option = click.option( default=False, is_flag=True, ) +@encryption_option @sectorsize_option def cmd_build( profile_name: Optional[str] = None, @@ -93,6 +95,9 @@ def cmd_build( block_target: Optional[str] = None, sector_size: Optional[int] = None, skip_part_images: bool = False, + encryption: Optional[bool] = None, + encryption_password: Optional[str] = None, + encryption_mapper: str = LUKS_MAPPER_DEFAULT, ): """ Build a device image. @@ -110,9 +115,15 @@ def cmd_build( flavour = get_profile_flavour(profile_name) rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb) + if encryption is None: + encryption = profile.encryption + packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name] packages_extra = BASE_PACKAGES + profile.pkgs_include + if encryption: + packages_extra += ['cryptsetup', 'util-linux'] # TODO: select osk-sdl here somehow + if arch != config.runtime.arch: build_enable_qemu_binfmt(arch) @@ -140,6 +151,7 @@ def cmd_build( boot_dev: str root_dev: str + root_dev_raw: str loop_boot = loop_device + 'p1' loop_root = loop_device + 'p2' if skip_part_images: @@ -150,8 +162,23 @@ def cmd_build( boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), IMG_FILE_BOOT_DEFAULT_SIZE) root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M') - create_boot_fs(boot_dev, sector_size) + root_dev_raw = root_dev + + if encryption: + encryption_password = encryption_password or profile.encryption_password + if not encryption_password: + encryption_password = click.prompt( + "Please enter your encryption password (input hidden)", + hide_input=True, + confirmation_prompt=True, + ) + luks_create(root_dev, password=encryption_password) + luks_open(root_dev, mapper_name=encryption_mapper, password=encryption_password) + root_dev = get_cryptmapper_path(encryption_mapper) + + assert os.path.exists(root_dev) create_root_fs(root_dev, sector_size) + create_boot_fs(boot_dev, sector_size) install_rootfs( root_dev, @@ -162,14 +189,17 @@ def cmd_build( list(set(packages) | set(packages_extra)), local_repos, profile, + encrypted=bool(encryption), ) + if encryption: + luks_close(mapper_name=encryption_mapper) if not skip_part_images: logging.info('Copying partition image files into full image:') logging.info(f'Block-copying /boot to {image_path}') dd_image(input=boot_dev, output=loop_boot) logging.info(f'Block-copying rootfs to {image_path}') - dd_image(input=root_dev, output=loop_root) + dd_image(input=root_dev_raw, output=loop_root) logging.info(f'Done! Image saved to {image_path}') @@ -178,17 +208,20 @@ def cmd_build( @click.option('--shell', '-s', is_flag=True) @click.option('--use-local-repos', '-l', is_flag=True) @sectorsize_option +@encryption_option @click.argument('profile', required=False) def cmd_inspect( profile: Optional[str] = None, shell: bool = False, sector_size: Optional[int] = None, use_local_repos: bool = False, + encryption: Optional[bool] = None, ): """Loop-mount the device image for inspection.""" config.enforce_profile_device_set() config.enforce_profile_flavour_set() enforce_wrap() + profile_conf = config.get_profile(profile) device = get_profile_device(profile) arch = device.arch flavour = get_profile_flavour(profile).name @@ -198,7 +231,7 @@ def cmd_inspect( image_path = get_image_path(device, flavour) loop_device = losetup_rootfs_image(image_path, sector_size) partprobe(loop_device) - mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot) + mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot, password=profile_conf.encryption_password) logging.info(f'Inspect the rootfs image at {chroot.path}') diff --git a/image/cryptsetup.py b/image/cryptsetup.py index b7edce7..d4dfaa7 100644 --- a/image/cryptsetup.py +++ b/image/cryptsetup.py @@ -1,4 +1,5 @@ import atexit +import click import logging import os @@ -8,6 +9,14 @@ from constants import LUKS_LABEL_DEFAULT from chroot.build import BuildChroot from exec.cmd import run_cmd, CompletedProcess +encryption_option = click.option( + '--encryption/--no-encryption', + help="Force applying/ignoring LUKS encryption when handling the device image." + "Defaults to using the Profile's setting.", + default=None, + is_flag=True, +) + def get_accessible_user(path): return None if os.access(path, os.R_OK) else 'root' @@ -47,6 +56,38 @@ def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bo return bool(result.stdout and result.stdout.strip()) +def get_luks_offset( + mapper_name: str, + native_chroot: Optional[BuildChroot] = None, +) -> tuple[int, int]: + device_path = get_cryptmapper_path(mapper_name) + check_dev_exists(device_path, 'get offset of') + run_func = native_chroot.run_cmd if native_chroot else run_cmd + user = get_accessible_user(device_path) + stdout: str = '' + cmd = ['cryptsetup', 'status', mapper_name] + result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator] + assert isinstance(result, CompletedProcess) + if not (result.stdout and (stdout := result.stdout.strip())): + raise Exception(f"Couldn't get LUKS offset for {mapper_name!r} from 'cryptsetup status': empty stdout: {stdout!r}") + markers = {'offset': -1, 'sector size': -1} + for line in stdout.decode().split('\n'): + line = line.strip() + for item in markers: + offset_marker = f'{item}:' + if line.startswith(offset_marker): + try: + markers[item] = int(line.split(offset_marker)[-1].strip().split(' ')[0]) + except Exception as ex: + raise Exception(f"Couldn't get LUKS {item=} for {mapper_name!r} due to an exception parsing cryptsetup output: {ex}") + for i in markers.values(): + if i != -1: + continue + logging.debug(f"Failed to find ':' in stdout: {stdout}") + raise Exception(f"Failed to find LUKS offset for {mapper_name!r}: Offset line not found") + return markers['offset'], markers['sector size'] + + def luks_create( backing_device: str, label: str = LUKS_LABEL_DEFAULT, diff --git a/image/flash.py b/image/flash.py index 5def17c..9f11ed3 100644 --- a/image/flash.py +++ b/image/flash.py @@ -5,6 +5,7 @@ import logging from typing import Optional +from config.state import config from constants import FLASH_PARTS, LOCATIONS, FASTBOOT, JUMPDRIVE from exec.cmd import run_root_cmd from exec.file import get_temp_dir @@ -15,6 +16,7 @@ from wrapper import enforce_wrap from .fastboot import fastboot_flash from .image import dd_image, dump_aboot, dump_lk2nd, dump_qhypstub, get_image_path, losetup_destroy, losetup_rootfs_image, partprobe, shrink_fs +from .cryptsetup import encryption_option ABOOT = FLASH_PARTS['ABOOT'] LK2ND = FLASH_PARTS['LK2ND'] @@ -47,7 +49,7 @@ def test_blockdev(path: str): 'microSD inserted or no microSD card slot installed in the device) or corrupt or defect') -def prepare_minimal_image(source_path: str, sector_size: int) -> str: +def prepare_minimal_image(source_path: str, sector_size: int, encrypted: Optional[bool], encryption_password: Optional[str]) -> str: minimal_image_dir = get_temp_dir(register_cleanup=True) minimal_image_path = os.path.join(minimal_image_dir, f'minimal-{os.path.basename(source_path)}') logging.info(f"Copying image {os.path.basename(source_path)} to {minimal_image_dir} for shrinking") @@ -55,7 +57,7 @@ def prepare_minimal_image(source_path: str, sector_size: int) -> str: loop_device = losetup_rootfs_image(minimal_image_path, sector_size) partprobe(loop_device) - shrink_fs(loop_device, minimal_image_path, sector_size) + shrink_fs(loop_device, minimal_image_path, sector_size, encrypted, encryption_password) losetup_destroy(loop_device) return minimal_image_path @@ -67,6 +69,7 @@ def prepare_minimal_image(source_path: str, sector_size: int) -> str: @click.option('--shrink/--no-shrink', is_flag=True, default=True, help="Copy and shrink the image file to minimal size") @click.option('-b', '--sector-size', type=int, help="Override the device's sector size", default=None) @click.option('--confirm', is_flag=True, help="Ask for confirmation before executing fastboot commands") +@encryption_option @click.argument('what', type=click.Choice(list(FLASH_PARTS.values()))) @click.argument('location', type=str, required=False) def cmd_flash( @@ -78,6 +81,7 @@ def cmd_flash( shrink: bool = True, sector_size: Optional[int] = None, confirm: bool = False, + encryption: Optional[bool] = None, ): """ Flash a partition onto a device. @@ -115,7 +119,12 @@ def cmd_flash( if not location: raise Exception(f'You need to specify a location to flash {what} to') path = '' - image_path = prepare_minimal_image(device_image_path, sector_size) if shrink else device_image_path + image_path = prepare_minimal_image( + device_image_path, + sector_size, + encrypted=encryption, + encryption_password=config.get_profile(profile).encryption_password, + ) if shrink else device_image_path if method == FASTBOOT: fastboot_flash( partition=location, diff --git a/image/image.py b/image/image.py index 8efe33f..bebe82d 100644 --- a/image/image.py +++ b/image/image.py @@ -7,15 +7,21 @@ import subprocess from subprocess import CompletedProcess from typing import Optional, Union -from config.state import config, Profile +from chroot.build import BuildChroot, get_build_chroot from chroot.device import DeviceChroot, get_device_chroot -from constants import Arch, POST_INSTALL_CMDS +from config.state import config, Profile +from constants import Arch, LUKS_MAPPER_DEFAULT, POST_INSTALL_CMDS from distro.distro import get_base_distro, get_kupfer_https from devices.device import Device from exec.cmd import run_root_cmd, generate_cmd_su from exec.file import get_temp_dir, root_write_file, root_makedir from flavours.flavour import Flavour from net.ssh import copy_ssh_keys +from utils import programs_available + +from .cryptsetup import is_luks, get_luks_offset, luks_close, luks_open + +MAPPER_DIR = '/dev/mapper/' # image files need to be slightly smaller than partitions to fit IMG_FILE_ROOT_DEFAULT_SIZE = "1800M" @@ -72,24 +78,58 @@ def align_bytes(size_bytes: int, alignment: int = 4096) -> int: return size_bytes -def shrink_fs(loop_device: str, file: str, sector_size: int): +def shrink_fs( + loop_device: str, + file: str, + sector_size: int, + encrypted: Optional[bool] = None, + encryption_password: Optional[str] = None, + crypt_mapper=LUKS_MAPPER_DEFAULT, +): partprobe(loop_device) - logging.debug(f"Checking filesystem at {loop_device}p2") - result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2']) + root_partition = f'{loop_device}p2' + root_partition_fs = root_partition + if not (encrypted is False): + root_partition_fs, native_chroot, encrypted = resolve_rootfs_crypt( + root_partition, + fail_on_unencrypted=bool(encrypted), + crypt_mapper=crypt_mapper, + password=encryption_password, + ) + logging.debug(f"Checking filesystem at {root_partition_fs}") + result = run_root_cmd(['e2fsck', '-fy', root_partition_fs]) if result.returncode > 2: # https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE - raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}') + raise Exception(f'Failed to e2fsck {root_partition_fs} with exit code {result.returncode}') - logging.info(f'Shrinking filesystem at {loop_device}p2') - result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2']) + logging.info(f'Shrinking filesystem at {root_partition_fs}') + result = run_root_cmd(['resize2fs', '-M', root_partition_fs]) if result.returncode != 0: - raise Exception(f'Failed to resize2fs {loop_device}p2') + raise Exception(f'Failed to resize2fs {root_partition_fs}') - logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2') - fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2') + logging.debug(f'Reading size of shrunken filesystem on {root_partition_fs}') + fs_blocks, fs_block_size = get_fs_size(root_partition_fs) sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size) + logging.debug(f"shrunken FS length is {fs_blocks} blocks * {fs_block_size} bytes = {sectors} bytes") - logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)') + _, image_size = find_end_sector(loop_device, root_partition, sector_size) + if image_size == -1: + raise Exception(f'Failed to find pre-repartition size of {loop_device}') + + if encrypted: + if sectors > image_size: + raise Exception("Shrunk FS size allegedly larger than the image itself; this is probably " + f"a kupferbootstrap parsing bug. shrunk partition end={sectors}, image size={image_size}, {sector_size=}") + old_sectors = sectors + luks_offset, luks_sector_size = get_luks_offset(crypt_mapper, native_chroot) + #luks_offset_bytes = align_bytes((luks_offset + 1) * luks_sector_size, sector_size) + luks_offset_normalized = bytes_to_sectors(luks_offset * luks_sector_size, sector_size) + logging.debug(f"Discovered LUKS attrs: {luks_offset=}, {luks_sector_size=}, {luks_offset_normalized=}") + luks_close(crypt_mapper, native_chroot) + sectors += luks_offset_normalized + 1024 + logging.debug(f"Increasing sectors from {old_sectors} to {sectors} ({sectors - old_sectors}) to leave space for the LUKS header") + + logging.info(f'Shrinking partition at {root_partition} to {sectors} {sector_size}b sectors ({sectors * sector_size} bytes)') child_proccess = subprocess.Popen( generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore stdin=subprocess.PIPE, @@ -113,27 +153,18 @@ def shrink_fs(loop_device: str, file: str, sector_size: int): # For some reason re-reading the partition table fails, but that is not a problem partprobe(loop_device) if returncode > 1: - raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk') + raise Exception(f'Failed to shrink partition size of {root_partition} with fdisk') partprobe(loop_device).check_returncode() - logging.debug(f'Finding end sector of partition at {loop_device}p2') - result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True) - if result.returncode != 0: - print(result.stdout) - print(result.stderr) - raise Exception(f'Failed to fdisk -l {loop_device}') + end_sector, _ = find_end_sector(loop_device, root_partition, sector_size) + if end_sector == -1: + raise Exception(f'Failed to find end sector of {root_partition}') - end_sector = 0 - for line in result.stdout.decode('utf-8').split('\n'): - if line.startswith(f'{loop_device}p2'): - parts = list(filter(lambda part: part != '', line.split(' '))) - end_sector = int(parts[2]) - - if end_sector == 0: - raise Exception(f'Failed to find end sector of {loop_device}p2') - - end_size = align_bytes((end_sector + 1) * sector_size, 4096) + if end_sector > image_size: + logging.warning(f"Clipping sectors ({end_sector}) to {image_size=}") + end_sector = image_size + end_size = align_bytes((end_sector + 1024) * sector_size, 4096) logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes') logging.info(f'Truncating {file} to {end_size} bytes') @@ -143,6 +174,26 @@ def shrink_fs(loop_device: str, file: str, sector_size: int): partprobe(loop_device) +def find_end_sector(device: str, partition: str, sector_size: int) -> tuple[int, int]: + """Return (last_sector_index, sector_count) of a partition on a device, returns (-1, -1) if not found""" + logging.debug(f'Finding end sector of partition at {partition}') + result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', device], capture_output=True) + if result.returncode != 0: + print(result.stdout) + print(result.stderr) + raise Exception(f'Failed to fdisk -l {device}') + + end_sector = -1 + num_sectors = -1 + for line in result.stdout.decode('utf-8').split('\n'): + if line.startswith(partition): + parts = list(filter(lambda part: part != '', line.split(' '))) + end_sector = int(parts[2]) + num_sectors = int(parts[3]) + + return end_sector, num_sectors + + def losetup_destroy(loop_device): logging.debug(f'Destroying loop device {loop_device}') run_root_cmd( @@ -210,16 +261,59 @@ def losetup_rootfs_image(image_path: str, sector_size: int) -> str: return loop_device -def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot): - logging.debug(f'Mounting {rootfs_source} at {chroot.path}') +def resolve_rootfs_crypt( + rootfs_source: str, + password: Optional[str] = None, + crypt_mapper: str = LUKS_MAPPER_DEFAULT, + native_chroot: Optional[BuildChroot] = None, + fail_on_unencrypted: bool = True, +) -> tuple[str, Optional[BuildChroot], bool]: + assert config.runtime.arch + is_encrypted = False + if not (native_chroot or programs_available(['blkid'])): + native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'util-linux']) + if is_luks(rootfs_source, native_chroot=native_chroot): + if not (native_chroot or programs_available(['cryptsetup'])): + native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'cryptsetup', 'util-linux']) + luks_open(rootfs_source, crypt_mapper, password=password, native_chroot=native_chroot) + rootfs_source = f'{MAPPER_DIR}{crypt_mapper}' + is_encrypted = True + elif fail_on_unencrypted: + hint = '' + if rootfs_source.startswith(MAPPER_DIR): + hint = (f' HINT: path starts with {MAPPER_DIR!r}, probably already a decrypted volume.' + ' This is likely a kupferbootstrap bug.') + raise Exception(f"Error: {rootfs_source!r} is not an encrypted LUKS volume.{hint}") + return rootfs_source, native_chroot, is_encrypted - chroot.mount_rootfs(rootfs_source) - assert (os.path.ismount(chroot.path)) - root_makedir(chroot.get_path('boot')) +def mount_chroot( + rootfs_source: str, + boot_src: str, + device_chroot: DeviceChroot, + encrypted: Optional[bool] = None, + password: Optional[str] = None, + native_chroot: Optional[BuildChroot] = None, + crypt_mapper: str = LUKS_MAPPER_DEFAULT, +): + if encrypted is not False: + rootfs_source, native_chroot, encrypted = resolve_rootfs_crypt( + rootfs_source, + native_chroot=native_chroot, + crypt_mapper=crypt_mapper, + fail_on_unencrypted=bool(encrypted), + password=password, + ) - logging.debug(f'Mounting {boot_src} at {chroot.path}/boot') - chroot.mount(boot_src, '/boot', options=['defaults']) + logging.debug(f'Mounting {rootfs_source} at {device_chroot.path}') + + device_chroot.mount_rootfs(rootfs_source) + assert (os.path.ismount(device_chroot.path)) + + root_makedir(device_chroot.get_path('boot')) + + logging.debug(f'Mounting {boot_src} at {device_chroot.path}/boot') + device_chroot.mount(boot_src, '/boot', options=['defaults']) def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None): @@ -314,18 +408,28 @@ def install_rootfs( packages: list[str], use_local_repos: bool, profile: Profile, + encrypted: bool, ): - user = profile['username'] or 'kupfer' + user = profile.username or 'kupfer' chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos) - mount_chroot(rootfs_device, bootfs_device, chroot) + # rootfs_device must be passed the crypt_mapper if encrypted is True + if encrypted: + assert rootfs_device.startswith(MAPPER_DIR) + + mount_chroot( + rootfs_device, + bootfs_device, + chroot, + encrypted=False, # rootfs_device is already the crypt_mapper + ) chroot.mount_pacman_cache() chroot.initialize() chroot.activate() chroot.create_user( user=user, - password=profile['password'], + password=profile.password, ) chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True) copy_ssh_keys( @@ -338,7 +442,7 @@ def install_rootfs( extra_repos=get_kupfer_https(arch).repos, in_chroot=True, ), - 'etc/hostname': profile['hostname'] or 'kupfer', + 'etc/hostname': profile.hostname or 'kupfer', } for target, content in files.items(): root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)