import atexit import json import logging import os import re import subprocess from subprocess import CompletedProcess from typing import Optional, Union from chroot.build import BuildChroot, get_build_chroot from chroot.device import DeviceChroot, get_device_chroot from config.state import config, Profile from constants import Arch, LUKS_MAPPER_DEFAULT, POST_INSTALL_CMDS from distro.distro import get_base_distro, get_kupfer_https from devices.device import Device from exec.cmd import run_root_cmd, generate_cmd_su from exec.file import get_temp_dir, root_write_file, root_makedir from flavours.flavour import Flavour from net.ssh import copy_ssh_keys from utils import programs_available from .cryptsetup import is_luks, get_luks_offset, luks_close, luks_open MAPPER_DIR = '/dev/mapper/' # image files need to be slightly smaller than partitions to fit IMG_FILE_ROOT_DEFAULT_SIZE = "1800M" IMG_FILE_BOOT_DEFAULT_SIZE = "90M" def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess: cmd = [ 'dd', f'if={input}', f'of={output}', f'bs={blocksize}', 'oflag=direct', 'status=progress', 'conv=sync,noerror', ] logging.debug(f'running dd cmd: {cmd}') return run_root_cmd(cmd) def partprobe(device: str): return run_root_cmd(['partprobe', device]) def bytes_to_sectors(b: int, sector_size: int, round_up: bool = True): sectors, rest = divmod(b, sector_size) if rest and round_up: sectors += 1 return sectors def get_fs_size(partition: str) -> tuple[int, int]: blocks_cmd = run_root_cmd(['dumpe2fs', '-h', partition], env={"LC_ALL": "C"}, capture_output=True) if blocks_cmd.returncode != 0: logging.debug(f"dumpe2fs stdout:\n: {blocks_cmd.stdout}") logging.debug(f"dumpe2fs stderr:\n {blocks_cmd.stderr}") raise Exception(f'Failed to detect new filesystem size of {partition}') blocks_text = blocks_cmd.stdout.decode('utf-8') if blocks_cmd.stdout else '' try: fs_blocks = int(re.search('\\nBlock count:[ ]+([0-9]+)\\n', blocks_text, flags=re.MULTILINE).group(1)) # type: ignore[union-attr] fs_block_size = int(re.search('\\nBlock size:[ ]+([0-9]+)\\n', blocks_text).group(1)) # type: ignore[union-attr] except Exception as ex: logging.debug(f"dumpe2fs stdout:\n {blocks_text}") logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}") logging.info("Failed to scrape block size and count from dumpe2fs:", ex) raise ex return fs_blocks, fs_block_size def align_bytes(size_bytes: int, alignment: int = 4096) -> int: rest = size_bytes % alignment if rest: size_bytes += alignment - rest return size_bytes def shrink_fs( loop_device: str, file: str, sector_size: int, encrypted: Optional[bool] = None, encryption_password: Optional[str] = None, crypt_mapper=LUKS_MAPPER_DEFAULT, ): partprobe(loop_device) root_partition = f'{loop_device}p2' root_partition_fs = root_partition if not (encrypted is False): root_partition_fs, native_chroot, encrypted = resolve_rootfs_crypt( root_partition, fail_on_unencrypted=bool(encrypted), crypt_mapper=crypt_mapper, password=encryption_password, ) logging.debug(f"Checking filesystem at {root_partition_fs}") result = run_root_cmd(['e2fsck', '-fy', root_partition_fs]) if result.returncode > 2: # https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE raise Exception(f'Failed to e2fsck {root_partition_fs} with exit code {result.returncode}') logging.info(f'Shrinking filesystem at {root_partition_fs}') result = run_root_cmd(['resize2fs', '-M', root_partition_fs]) if result.returncode != 0: raise Exception(f'Failed to resize2fs {root_partition_fs}') logging.debug(f'Reading size of shrunken filesystem on {root_partition_fs}') fs_blocks, fs_block_size = get_fs_size(root_partition_fs) sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size) logging.debug(f"shrunken FS length is {fs_blocks} blocks * {fs_block_size} bytes = {sectors} bytes") _, image_size = find_end_sector(loop_device, root_partition, sector_size) if image_size == -1: raise Exception(f'Failed to find pre-repartition size of {loop_device}') if encrypted: if sectors > image_size: raise Exception("Shrunk FS size allegedly larger than the image itself; this is probably " f"a kupferbootstrap parsing bug. shrunk partition end={sectors}, image size={image_size}, {sector_size=}") old_sectors = sectors luks_offset, luks_sector_size = get_luks_offset(crypt_mapper, native_chroot) #luks_offset_bytes = align_bytes((luks_offset + 1) * luks_sector_size, sector_size) luks_offset_normalized = bytes_to_sectors(luks_offset * luks_sector_size, sector_size) logging.debug(f"Discovered LUKS attrs: {luks_offset=}, {luks_sector_size=}, {luks_offset_normalized=}") luks_close(crypt_mapper, native_chroot) sectors += luks_offset_normalized + 1024 logging.debug(f"Increasing sectors from {old_sectors} to {sectors} ({sectors - old_sectors}) to leave space for the LUKS header") logging.info(f'Shrinking partition at {root_partition} to {sectors} {sector_size}b sectors ({sectors * sector_size} bytes)') child_proccess = subprocess.Popen( generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore stdin=subprocess.PIPE, ) child_proccess.stdin.write('\n'.join([ # type: ignore 'd', '2', 'n', 'p', '2', '', f'+{sectors}', 'w', 'q', ]).encode('utf-8')) child_proccess.communicate() returncode = child_proccess.wait() if returncode == 1: # For some reason re-reading the partition table fails, but that is not a problem partprobe(loop_device) if returncode > 1: raise Exception(f'Failed to shrink partition size of {root_partition} with fdisk') partprobe(loop_device).check_returncode() end_sector, _ = find_end_sector(loop_device, root_partition, sector_size) if end_sector == -1: raise Exception(f'Failed to find end sector of {root_partition}') if end_sector > image_size: logging.warning(f"Clipping sectors ({end_sector}) to {image_size=}") end_sector = image_size end_size = align_bytes((end_sector + 1024) * sector_size, 4096) logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes') logging.info(f'Truncating {file} to {end_size} bytes') result = subprocess.run(['truncate', '-s', str(end_size), file]) if result.returncode != 0: raise Exception(f'Failed to truncate {file}') partprobe(loop_device) def find_end_sector(device: str, partition: str, sector_size: int) -> tuple[int, int]: """Return (last_sector_index, sector_count) of a partition on a device, returns (-1, -1) if not found""" logging.debug(f'Finding end sector of partition at {partition}') result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', device], capture_output=True) if result.returncode != 0: print(result.stdout) print(result.stderr) raise Exception(f'Failed to fdisk -l {device}') end_sector = -1 num_sectors = -1 for line in result.stdout.decode('utf-8').split('\n'): if line.startswith(partition): parts = list(filter(lambda part: part != '', line.split(' '))) end_sector = int(parts[2]) num_sectors = int(parts[3]) return end_sector, num_sectors def losetup_destroy(loop_device): logging.debug(f'Destroying loop device {loop_device}') run_root_cmd( [ 'losetup', '-d', loop_device, ], stderr=subprocess.DEVNULL, ) def get_device_name(device: Union[str, Device]) -> str: return device.name if isinstance(device, Device) else device def get_flavour_name(flavour: Union[str, Flavour]) -> str: if isinstance(flavour, Flavour): return flavour.name return flavour def get_image_name(device: Union[str, Device], flavour: Union[str, Flavour], img_type='full') -> str: return f'{get_device_name(device)}-{get_flavour_name(flavour)}-{img_type}.img' def get_image_path(device: Union[str, Device], flavour: Union[str, Flavour], img_type='full') -> str: return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type)) def losetup_setup_image(image_path: str, sector_size: int) -> str: logging.debug(f'Creating loop device for {image_path} with sector size {sector_size}') result = run_root_cmd([ 'losetup', '-f', '-b', str(sector_size), '-P', image_path, ]) if result.returncode != 0: raise Exception(f'Failed to create loop device for {image_path}') logging.debug(f'Finding loop device for {image_path}') result = subprocess.run(['losetup', '-J'], capture_output=True) if result.returncode != 0: print(result.stdout) print(result.stderr) raise Exception('Failed to list loop devices') data = json.loads(result.stdout.decode('utf-8')) loop_device = '' for d in data['loopdevices']: if d['back-file'] == image_path: loop_device = d['name'] break if loop_device == '': raise Exception(f'Failed to find loop device for {image_path}') partprobe(loop_device) atexit.register(losetup_destroy, loop_device) return loop_device def resolve_rootfs_crypt( rootfs_source: str, password: Optional[str] = None, crypt_mapper: str = LUKS_MAPPER_DEFAULT, native_chroot: Optional[BuildChroot] = None, fail_on_unencrypted: bool = True, ) -> tuple[str, Optional[BuildChroot], bool]: assert config.runtime.arch is_encrypted = False if not (native_chroot or programs_available(['blkid'])): native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'util-linux']) if is_luks(rootfs_source, native_chroot=native_chroot): if not (native_chroot or programs_available(['cryptsetup'])): native_chroot = get_build_chroot(config.runtime.arch, packages=['base', 'cryptsetup', 'util-linux']) luks_open(rootfs_source, crypt_mapper, password=password, native_chroot=native_chroot) rootfs_source = f'{MAPPER_DIR}{crypt_mapper}' is_encrypted = True elif fail_on_unencrypted: hint = '' if rootfs_source.startswith(MAPPER_DIR): hint = (f' HINT: path starts with {MAPPER_DIR!r}, probably already a decrypted volume.' ' This is likely a kupferbootstrap bug.') raise Exception(f"Error: {rootfs_source!r} is not an encrypted LUKS volume.{hint}") return rootfs_source, native_chroot, is_encrypted def mount_chroot( rootfs_source: str, boot_src: str, device_chroot: DeviceChroot, encrypted: Optional[bool] = None, password: Optional[str] = None, native_chroot: Optional[BuildChroot] = None, crypt_mapper: str = LUKS_MAPPER_DEFAULT, ): if encrypted is not False: rootfs_source, native_chroot, encrypted = resolve_rootfs_crypt( rootfs_source, native_chroot=native_chroot, crypt_mapper=crypt_mapper, fail_on_unencrypted=bool(encrypted), password=password, ) logging.debug(f'Mounting {rootfs_source} at {device_chroot.path}') device_chroot.mount_rootfs(rootfs_source) assert (os.path.ismount(device_chroot.path)) root_makedir(device_chroot.get_path('boot')) logging.debug(f'Mounting {boot_src} at {device_chroot.path}/boot') device_chroot.mount(boot_src, '/boot', options=['defaults']) def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None): target_path = target_path or os.path.join(get_temp_dir(), os.path.basename(file_path)) result = run_root_cmd([ 'debugfs', image_path, '-R', f'\'dump /{file_path.lstrip("/")} {target_path}\'', ]) if result.returncode != 0 or not os.path.exists(target_path): raise Exception(f'Failed to dump {file_path} from /boot') return target_path def dump_aboot(image_path: str) -> str: return dump_file_from_image(image_path, file_path='/aboot.img') def dump_lk2nd(image_path: str) -> str: """ This doesn't append the image with the appended DTB which is needed for some devices, so it should get added in the future. """ return dump_file_from_image(image_path, file_path='/lk2nd.img') def dump_qhypstub(image_path: str) -> str: return dump_file_from_image(image_path, file_path='/qhyptstub.img') def create_img_file(image_path: str, size_str: str): result = subprocess.run([ 'truncate', '-s', size_str, image_path, ]) if result.returncode != 0: raise Exception(f'Failed to allocate {image_path}') return image_path def partition_device(device: str, boot_partition_size: Optional[str] = None): if boot_partition_size is None: boot_partition_size = IMG_FILE_BOOT_DEFAULT_SIZE if boot_partition_size and boot_partition_size[-1] in ['M', 'G', 'K']: boot_partition_size = f'{boot_partition_size}iB' create_partition_table = ['mklabel', 'msdos'] create_boot_partition = ['mkpart', 'primary', 'ext2', '0%', boot_partition_size] create_root_partition = ['mkpart', 'primary', boot_partition_size, '100%'] enable_boot = ['set', '1', 'boot', 'on'] result = run_root_cmd([ 'parted', '--script', device, ] + create_partition_table + create_boot_partition + create_root_partition + enable_boot) if result.returncode != 0: raise Exception(f'Failed to create partitions on {device}') def create_filesystem(device: str, blocksize: Optional[int], label=None, options=[], fstype='ext4'): """Creates a new filesystem. Blocksize defaults""" labels = ['-L', label] if label else [] cmd = [f'mkfs.{fstype}', '-F', *labels] if blocksize: # blocksize can be 4k max due to pagesize blocksize = min(blocksize, 4096) if fstype.startswith('ext'): # blocksize for ext-fs must be >=1024 blocksize = max(blocksize, 1024) cmd += [ '-b', str(blocksize), ] cmd.append(device) result = run_root_cmd(cmd) if result.returncode != 0: raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}') def create_root_fs(device: str, blocksize: Optional[int]): create_filesystem(device, blocksize=blocksize, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000']) def create_boot_fs(device: str, blocksize: Optional[int]): create_filesystem(device, blocksize=blocksize, label='kupfer_boot', fstype='ext2') def install_rootfs( rootfs_device: str, bootfs_device: str, device: Union[str, Device], flavour: Flavour, arch: Arch, packages: list[str], use_local_repos: bool, profile: Profile, encrypted: bool, ): user = profile.username or 'kupfer' chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos) # rootfs_device must be passed the crypt_mapper if encrypted is True if encrypted: assert rootfs_device.startswith(MAPPER_DIR) mount_chroot( rootfs_device, bootfs_device, chroot, encrypted=False, # rootfs_device is already the crypt_mapper ) chroot.mount_pacman_cache() chroot.initialize() chroot.activate() chroot.create_user( user=user, password=profile.password, ) chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True) copy_ssh_keys( chroot.path, user=user, ) files = { 'etc/pacman.conf': get_base_distro(arch).get_pacman_conf( check_space=True, extra_repos=get_kupfer_https(arch).repos, in_chroot=True, ), 'etc/hostname': profile.hostname or 'kupfer', } for target, content in files.items(): root_write_file(os.path.join(chroot.path, target.lstrip('/')), content) logging.info("Running post-install CMDs") for cmd in POST_INSTALL_CMDS: result = chroot.run_cmd(cmd) assert isinstance(result, subprocess.CompletedProcess) if result.returncode != 0: raise Exception(f'Error running post-install cmd: {cmd}') logging.info('Preparing to unmount chroot') res = chroot.run_cmd('sync && umount /boot', attach_tty=True) logging.debug(f'rc: {res}') chroot.deactivate() logging.debug(f'Unmounting rootfs at "{chroot.path}"') res = run_root_cmd(['umount', chroot.path]) assert isinstance(res, CompletedProcess) logging.debug(f'rc: {res.returncode}')