import atexit import click import logging import os from typing import Optional from constants import LUKS_LABEL_DEFAULT from chroot.build import BuildChroot from exec.cmd import run_cmd, CompletedProcess encryption_option = click.option( '--encryption/--no-encryption', help="Force applying/ignoring LUKS encryption when handling the device image." "Defaults to using the Profile's setting.", default=None, is_flag=True, ) def get_accessible_user(path): return None if os.access(path, os.R_OK) else 'root' def check_dev_exists(device_path: str, verb: str = 'find'): if not os.path.exists(device_path): raise Exception(f"Can't {verb} LUKS on {device_path!r}: file does not exist") def get_cryptmapper_path(mapper_name: str) -> str: return f'/dev/mapper/{mapper_name}' def mapper_exists(mapper_name: str, chroot: Optional[BuildChroot]) -> bool: path = get_cryptmapper_path(mapper_name) paths = [path] if chroot: paths.append(chroot.get_path(path)) for p in paths: if os.path.exists(p): return True return False def get_password_io(password: Optional[str]) -> Optional[bytes]: return password.encode() if password else None def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bool: check_dev_exists(device_path, 'check') run_func = native_chroot.run_cmd if native_chroot else run_cmd user = get_accessible_user(device_path) cmd = ["blkid", '--match-token', 'TYPE=crypto_LUKS', device_path] result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator] assert isinstance(result, CompletedProcess) return bool(result.stdout and result.stdout.strip()) def get_luks_offset( mapper_name: str, native_chroot: Optional[BuildChroot] = None, ) -> tuple[int, int]: device_path = get_cryptmapper_path(mapper_name) check_dev_exists(device_path, 'get offset of') run_func = native_chroot.run_cmd if native_chroot else run_cmd user = get_accessible_user(device_path) stdout: str = '' cmd = ['cryptsetup', 'status', mapper_name] result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator] assert isinstance(result, CompletedProcess) if not (result.stdout and (stdout := result.stdout.strip())): raise Exception(f"Couldn't get LUKS offset for {mapper_name!r} from 'cryptsetup status': empty stdout: {stdout!r}") markers = {'offset': -1, 'sector size': -1} for line in stdout.decode().split('\n'): line = line.strip() for item in markers: offset_marker = f'{item}:' if line.startswith(offset_marker): try: markers[item] = int(line.split(offset_marker)[-1].strip().split(' ')[0]) except Exception as ex: raise Exception(f"Couldn't get LUKS {item=} for {mapper_name!r} due to an exception parsing cryptsetup output: {ex}") for i in markers.values(): if i != -1: continue logging.debug(f"Failed to find ':' in stdout: {stdout}") raise Exception(f"Failed to find LUKS offset for {mapper_name!r}: Offset line not found") return markers['offset'], markers['sector size'] def luks_create( backing_device: str, label: str = LUKS_LABEL_DEFAULT, native_chroot: Optional[BuildChroot] = None, password: Optional[str] = None, extra_opts: list[str] = [], use_random: bool = True, cipher: Optional[str] = None, pbkdf: Optional[str] = None, iter_time: Optional[int] = None, ): check_dev_exists(backing_device, 'create') run_func = native_chroot.run_cmd if native_chroot else run_cmd extra_opts = list(extra_opts) # copy list before modification if use_random: extra_opts += ['--use-random'] if cipher: extra_opts += ['--cipher', cipher] if pbkdf: extra_opts += ['--pbkdf', pbkdf] if iter_time is not None: extra_opts += ['--iter-time', str(iter_time)] if label: extra_opts += ['--label', label] logging.info(f"Creating LUKS volume at {backing_device!r}{' (unattended)' if password else ''}") result = run_func( # type: ignore[operator] ['cryptsetup', '-q', 'luksFormat', *extra_opts, backing_device], switch_user=get_accessible_user(backing_device), attach_tty=not password, stdin_input=get_password_io(password), ) rc = result if isinstance(result, int) else result.returncode if rc: raise Exception("Failed to format LUKS device: cryptsetup error^^^^") def luks_open( backing_device: str, mapper_name: str, extra_opts: list[str] = [], password: Optional[str] = None, native_chroot: Optional[BuildChroot] = None, schedule_close: bool = True, idempotent: bool = False, ): check_dev_exists(backing_device, 'open') run_func = native_chroot.run_cmd if native_chroot else run_cmd if mapper_exists(mapper_name, native_chroot): if idempotent: logging.debug(f"LUKS mapper {mapper_name!r} already open") return raise Exception(f"Can't open LUKS for {backing_device!r} with mapper name {mapper_name!r}: " "mapper file already exists") logging.info(f"Opening LUKS mapper {mapper_name!r} for {backing_device!r}") result = run_func( # type: ignore[operator] ['cryptsetup', 'luksOpen', *extra_opts, backing_device, mapper_name], switch_user='root', attach_tty=not password, stdin_input=get_password_io(password), ) rc = result if isinstance(result, int) else result.returncode if rc: raise Exception("Failed to open LUKS device: cryptsetup error^^^^") if schedule_close: atexit.register(luks_close, mapper_name, native_chroot=native_chroot, idempotent=True) logging.info(f"LUKS mapper {mapper_name!r} opened!") def luks_close( mapper_name: str, native_chroot: Optional[BuildChroot] = None, extra_opts: list[str] = [], idempotent: bool = False, ): run_func = native_chroot.run_cmd if native_chroot else run_cmd if not mapper_exists(mapper_name, native_chroot): if idempotent: logging.debug(f"LUKS mapper {mapper_name!r} already closed") return 0 raise Exception(f"Can't close LUKS mapper {mapper_name!r}: mapper doesn't exist") logging.info(f"Closing LUKS mapper {mapper_name!r}") result = run_func( # type: ignore[operator] ['cryptsetup', 'close', *extra_opts, mapper_name], switch_user='root', ) rc = result if isinstance(result, int) else result.returncode if rc: raise Exception("Failed to close LUKS device: cryptsetup error^^^^") logging.info(f"LUKS mapper {mapper_name!r} closed.")