diff --git a/constants.py b/constants.py index 723dcc0..29b7bad 100644 --- a/constants.py +++ b/constants.py @@ -172,3 +172,6 @@ SRCINFO_TARBALL_URL = f'{KUPFER_HTTPS_BASE}/{SRCINFO_TARBALL_FILE}' FLAVOUR_INFO_FILE = 'flavourinfo.json' FLAVOUR_DESCRIPTION_PREFIX = 'kupfer flavour:' + +LUKS_LABEL_DEFAULT = 'kupfer_crypt' +LUKS_MAPPER_DEFAULT = 'kupfer-crypt' diff --git a/image/cryptsetup.py b/image/cryptsetup.py new file mode 100644 index 0000000..b7edce7 --- /dev/null +++ b/image/cryptsetup.py @@ -0,0 +1,138 @@ +import atexit +import logging +import os + +from typing import Optional + +from constants import LUKS_LABEL_DEFAULT +from chroot.build import BuildChroot +from exec.cmd import run_cmd, CompletedProcess + + +def get_accessible_user(path): + return None if os.access(path, os.R_OK) else 'root' + + +def check_dev_exists(device_path: str, verb: str = 'find'): + if not os.path.exists(device_path): + raise Exception(f"Can't {verb} LUKS on {device_path!r}: file does not exist") + + +def get_cryptmapper_path(mapper_name: str) -> str: + return f'/dev/mapper/{mapper_name}' + + +def mapper_exists(mapper_name: str, chroot: Optional[BuildChroot]) -> bool: + path = get_cryptmapper_path(mapper_name) + paths = [path] + if chroot: + paths.append(chroot.get_path(path)) + for p in paths: + if os.path.exists(p): + return True + return False + + +def get_password_io(password: Optional[str]) -> Optional[bytes]: + return password.encode() if password else None + + +def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bool: + check_dev_exists(device_path, 'check') + run_func = native_chroot.run_cmd if native_chroot else run_cmd + user = get_accessible_user(device_path) + cmd = ["blkid", '--match-token', 'TYPE=crypto_LUKS', device_path] + result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator] + assert isinstance(result, CompletedProcess) + return bool(result.stdout and result.stdout.strip()) + + +def luks_create( + backing_device: str, + label: str = LUKS_LABEL_DEFAULT, + native_chroot: Optional[BuildChroot] = None, + password: Optional[str] = None, + extra_opts: list[str] = [], + use_random: bool = True, + cipher: Optional[str] = None, + pbkdf: Optional[str] = None, + iter_time: Optional[int] = None, +): + check_dev_exists(backing_device, 'create') + run_func = native_chroot.run_cmd if native_chroot else run_cmd + extra_opts = list(extra_opts) # copy list before modification + if use_random: + extra_opts += ['--use-random'] + if cipher: + extra_opts += ['--cipher', cipher] + if pbkdf: + extra_opts += ['--pbkdf', pbkdf] + if iter_time is not None: + extra_opts += ['--iter-time', str(iter_time)] + if label: + extra_opts += ['--label', label] + logging.info(f"Creating LUKS volume at {backing_device!r}{' (unattended)' if password else ''}") + result = run_func( # type: ignore[operator] + ['cryptsetup', '-q', 'luksFormat', *extra_opts, backing_device], + switch_user=get_accessible_user(backing_device), + attach_tty=not password, + stdin_input=get_password_io(password), + ) + rc = result if isinstance(result, int) else result.returncode + if rc: + raise Exception("Failed to format LUKS device: cryptsetup error^^^^") + + +def luks_open( + backing_device: str, + mapper_name: str, + extra_opts: list[str] = [], + password: Optional[str] = None, + native_chroot: Optional[BuildChroot] = None, + schedule_close: bool = True, + idempotent: bool = False, +): + check_dev_exists(backing_device, 'open') + run_func = native_chroot.run_cmd if native_chroot else run_cmd + if mapper_exists(mapper_name, native_chroot): + if idempotent: + logging.debug(f"LUKS mapper {mapper_name!r} already open") + return + raise Exception(f"Can't open LUKS for {backing_device!r} with mapper name {mapper_name!r}: " + "mapper file already exists") + logging.info(f"Opening LUKS mapper {mapper_name!r} for {backing_device!r}") + result = run_func( # type: ignore[operator] + ['cryptsetup', 'luksOpen', *extra_opts, backing_device, mapper_name], + switch_user='root', + attach_tty=not password, + stdin_input=get_password_io(password), + ) + rc = result if isinstance(result, int) else result.returncode + if rc: + raise Exception("Failed to open LUKS device: cryptsetup error^^^^") + if schedule_close: + atexit.register(luks_close, mapper_name, native_chroot=native_chroot, idempotent=True) + logging.info(f"LUKS mapper {mapper_name!r} opened!") + + +def luks_close( + mapper_name: str, + native_chroot: Optional[BuildChroot] = None, + extra_opts: list[str] = [], + idempotent: bool = False, +): + run_func = native_chroot.run_cmd if native_chroot else run_cmd + if not mapper_exists(mapper_name, native_chroot): + if idempotent: + logging.debug(f"LUKS mapper {mapper_name!r} already closed") + return 0 + raise Exception(f"Can't close LUKS mapper {mapper_name!r}: mapper doesn't exist") + logging.info(f"Closing LUKS mapper {mapper_name!r}") + result = run_func( # type: ignore[operator] + ['cryptsetup', 'close', *extra_opts, mapper_name], + switch_user='root', + ) + rc = result if isinstance(result, int) else result.returncode + if rc: + raise Exception("Failed to close LUKS device: cryptsetup error^^^^") + logging.info(f"LUKS mapper {mapper_name!r} closed.")