image: add new module: cryptsetup

This commit is contained in:
InsanePrawn 2023-07-08 18:43:28 +02:00
parent 370510e09f
commit 6de8137c90
2 changed files with 141 additions and 0 deletions

View file

@ -172,3 +172,6 @@ SRCINFO_TARBALL_URL = f'{KUPFER_HTTPS_BASE}/{SRCINFO_TARBALL_FILE}'
FLAVOUR_INFO_FILE = 'flavourinfo.json'
FLAVOUR_DESCRIPTION_PREFIX = 'kupfer flavour:'
LUKS_LABEL_DEFAULT = 'kupfer_crypt'
LUKS_MAPPER_DEFAULT = 'kupfer-crypt'

138
image/cryptsetup.py Normal file
View file

@ -0,0 +1,138 @@
import atexit
import logging
import os
from typing import Optional
from constants import LUKS_LABEL_DEFAULT
from chroot.build import BuildChroot
from exec.cmd import run_cmd, CompletedProcess
def get_accessible_user(path):
return None if os.access(path, os.R_OK) else 'root'
def check_dev_exists(device_path: str, verb: str = 'find'):
if not os.path.exists(device_path):
raise Exception(f"Can't {verb} LUKS on {device_path!r}: file does not exist")
def get_cryptmapper_path(mapper_name: str) -> str:
return f'/dev/mapper/{mapper_name}'
def mapper_exists(mapper_name: str, chroot: Optional[BuildChroot]) -> bool:
path = get_cryptmapper_path(mapper_name)
paths = [path]
if chroot:
paths.append(chroot.get_path(path))
for p in paths:
if os.path.exists(p):
return True
return False
def get_password_io(password: Optional[str]) -> Optional[bytes]:
return password.encode() if password else None
def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bool:
check_dev_exists(device_path, 'check')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
user = get_accessible_user(device_path)
cmd = ["blkid", '--match-token', 'TYPE=crypto_LUKS', device_path]
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
assert isinstance(result, CompletedProcess)
return bool(result.stdout and result.stdout.strip())
def luks_create(
backing_device: str,
label: str = LUKS_LABEL_DEFAULT,
native_chroot: Optional[BuildChroot] = None,
password: Optional[str] = None,
extra_opts: list[str] = [],
use_random: bool = True,
cipher: Optional[str] = None,
pbkdf: Optional[str] = None,
iter_time: Optional[int] = None,
):
check_dev_exists(backing_device, 'create')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
extra_opts = list(extra_opts) # copy list before modification
if use_random:
extra_opts += ['--use-random']
if cipher:
extra_opts += ['--cipher', cipher]
if pbkdf:
extra_opts += ['--pbkdf', pbkdf]
if iter_time is not None:
extra_opts += ['--iter-time', str(iter_time)]
if label:
extra_opts += ['--label', label]
logging.info(f"Creating LUKS volume at {backing_device!r}{' (unattended)' if password else ''}")
result = run_func( # type: ignore[operator]
['cryptsetup', '-q', 'luksFormat', *extra_opts, backing_device],
switch_user=get_accessible_user(backing_device),
attach_tty=not password,
stdin_input=get_password_io(password),
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to format LUKS device: cryptsetup error^^^^")
def luks_open(
backing_device: str,
mapper_name: str,
extra_opts: list[str] = [],
password: Optional[str] = None,
native_chroot: Optional[BuildChroot] = None,
schedule_close: bool = True,
idempotent: bool = False,
):
check_dev_exists(backing_device, 'open')
run_func = native_chroot.run_cmd if native_chroot else run_cmd
if mapper_exists(mapper_name, native_chroot):
if idempotent:
logging.debug(f"LUKS mapper {mapper_name!r} already open")
return
raise Exception(f"Can't open LUKS for {backing_device!r} with mapper name {mapper_name!r}: "
"mapper file already exists")
logging.info(f"Opening LUKS mapper {mapper_name!r} for {backing_device!r}")
result = run_func( # type: ignore[operator]
['cryptsetup', 'luksOpen', *extra_opts, backing_device, mapper_name],
switch_user='root',
attach_tty=not password,
stdin_input=get_password_io(password),
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to open LUKS device: cryptsetup error^^^^")
if schedule_close:
atexit.register(luks_close, mapper_name, native_chroot=native_chroot, idempotent=True)
logging.info(f"LUKS mapper {mapper_name!r} opened!")
def luks_close(
mapper_name: str,
native_chroot: Optional[BuildChroot] = None,
extra_opts: list[str] = [],
idempotent: bool = False,
):
run_func = native_chroot.run_cmd if native_chroot else run_cmd
if not mapper_exists(mapper_name, native_chroot):
if idempotent:
logging.debug(f"LUKS mapper {mapper_name!r} already closed")
return 0
raise Exception(f"Can't close LUKS mapper {mapper_name!r}: mapper doesn't exist")
logging.info(f"Closing LUKS mapper {mapper_name!r}")
result = run_func( # type: ignore[operator]
['cryptsetup', 'close', *extra_opts, mapper_name],
switch_user='root',
)
rc = result if isinstance(result, int) else result.returncode
if rc:
raise Exception("Failed to close LUKS device: cryptsetup error^^^^")
logging.info(f"LUKS mapper {mapper_name!r} closed.")