mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-23 21:55:43 -05:00
179 lines
6.7 KiB
Python
179 lines
6.7 KiB
Python
import atexit
|
|
import click
|
|
import logging
|
|
import os
|
|
|
|
from typing import Optional
|
|
|
|
from constants import LUKS_LABEL_DEFAULT
|
|
from chroot.build import BuildChroot
|
|
from exec.cmd import run_cmd, CompletedProcess
|
|
|
|
encryption_option = click.option(
|
|
'--encryption/--no-encryption',
|
|
help="Force applying/ignoring LUKS encryption when handling the device image."
|
|
"Defaults to using the Profile's setting.",
|
|
default=None,
|
|
is_flag=True,
|
|
)
|
|
|
|
|
|
def get_accessible_user(path):
|
|
return None if os.access(path, os.R_OK) else 'root'
|
|
|
|
|
|
def check_dev_exists(device_path: str, verb: str = 'find'):
|
|
if not os.path.exists(device_path):
|
|
raise Exception(f"Can't {verb} LUKS on {device_path!r}: file does not exist")
|
|
|
|
|
|
def get_cryptmapper_path(mapper_name: str) -> str:
|
|
return f'/dev/mapper/{mapper_name}'
|
|
|
|
|
|
def mapper_exists(mapper_name: str, chroot: Optional[BuildChroot]) -> bool:
|
|
path = get_cryptmapper_path(mapper_name)
|
|
paths = [path]
|
|
if chroot:
|
|
paths.append(chroot.get_path(path))
|
|
for p in paths:
|
|
if os.path.exists(p):
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_password_io(password: Optional[str]) -> Optional[bytes]:
|
|
return password.encode() if password else None
|
|
|
|
|
|
def is_luks(device_path: str, native_chroot: Optional[BuildChroot] = None) -> bool:
|
|
check_dev_exists(device_path, 'check')
|
|
run_func = native_chroot.run_cmd if native_chroot else run_cmd
|
|
user = get_accessible_user(device_path)
|
|
cmd = ["blkid", '--match-token', 'TYPE=crypto_LUKS', device_path]
|
|
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
|
|
assert isinstance(result, CompletedProcess)
|
|
return bool(result.stdout and result.stdout.strip())
|
|
|
|
|
|
def get_luks_offset(
|
|
mapper_name: str,
|
|
native_chroot: Optional[BuildChroot] = None,
|
|
) -> tuple[int, int]:
|
|
device_path = get_cryptmapper_path(mapper_name)
|
|
check_dev_exists(device_path, 'get offset of')
|
|
run_func = native_chroot.run_cmd if native_chroot else run_cmd
|
|
user = get_accessible_user(device_path)
|
|
stdout: str = ''
|
|
cmd = ['cryptsetup', 'status', mapper_name]
|
|
result = run_func(cmd, capture_output=True, switch_user=user) # type: ignore[operator]
|
|
assert isinstance(result, CompletedProcess)
|
|
if not (result.stdout and (stdout := result.stdout.strip())):
|
|
raise Exception(f"Couldn't get LUKS offset for {mapper_name!r} from 'cryptsetup status': empty stdout: {stdout!r}")
|
|
markers = {'offset': -1, 'sector size': -1}
|
|
for line in stdout.decode().split('\n'):
|
|
line = line.strip()
|
|
for item in markers:
|
|
offset_marker = f'{item}:'
|
|
if line.startswith(offset_marker):
|
|
try:
|
|
markers[item] = int(line.split(offset_marker)[-1].strip().split(' ')[0])
|
|
except Exception as ex:
|
|
raise Exception(f"Couldn't get LUKS {item=} for {mapper_name!r} due to an exception parsing cryptsetup output: {ex}")
|
|
for i in markers.values():
|
|
if i != -1:
|
|
continue
|
|
logging.debug(f"Failed to find ':' in stdout: {stdout}")
|
|
raise Exception(f"Failed to find LUKS offset for {mapper_name!r}: Offset line not found")
|
|
return markers['offset'], markers['sector size']
|
|
|
|
|
|
def luks_create(
|
|
backing_device: str,
|
|
label: str = LUKS_LABEL_DEFAULT,
|
|
native_chroot: Optional[BuildChroot] = None,
|
|
password: Optional[str] = None,
|
|
extra_opts: list[str] = [],
|
|
use_random: bool = True,
|
|
cipher: Optional[str] = None,
|
|
pbkdf: Optional[str] = None,
|
|
iter_time: Optional[int] = None,
|
|
):
|
|
check_dev_exists(backing_device, 'create')
|
|
run_func = native_chroot.run_cmd if native_chroot else run_cmd
|
|
extra_opts = list(extra_opts) # copy list before modification
|
|
if use_random:
|
|
extra_opts += ['--use-random']
|
|
if cipher:
|
|
extra_opts += ['--cipher', cipher]
|
|
if pbkdf:
|
|
extra_opts += ['--pbkdf', pbkdf]
|
|
if iter_time is not None:
|
|
extra_opts += ['--iter-time', str(iter_time)]
|
|
if label:
|
|
extra_opts += ['--label', label]
|
|
logging.info(f"Creating LUKS volume at {backing_device!r}{' (unattended)' if password else ''}")
|
|
result = run_func( # type: ignore[operator]
|
|
['cryptsetup', '-q', 'luksFormat', *extra_opts, backing_device],
|
|
switch_user=get_accessible_user(backing_device),
|
|
attach_tty=not password,
|
|
stdin_input=get_password_io(password),
|
|
)
|
|
rc = result if isinstance(result, int) else result.returncode
|
|
if rc:
|
|
raise Exception("Failed to format LUKS device: cryptsetup error^^^^")
|
|
|
|
|
|
def luks_open(
|
|
backing_device: str,
|
|
mapper_name: str,
|
|
extra_opts: list[str] = [],
|
|
password: Optional[str] = None,
|
|
native_chroot: Optional[BuildChroot] = None,
|
|
schedule_close: bool = True,
|
|
idempotent: bool = False,
|
|
):
|
|
check_dev_exists(backing_device, 'open')
|
|
run_func = native_chroot.run_cmd if native_chroot else run_cmd
|
|
if mapper_exists(mapper_name, native_chroot):
|
|
if idempotent:
|
|
logging.debug(f"LUKS mapper {mapper_name!r} already open")
|
|
return
|
|
raise Exception(f"Can't open LUKS for {backing_device!r} with mapper name {mapper_name!r}: "
|
|
"mapper file already exists")
|
|
logging.info(f"Opening LUKS mapper {mapper_name!r} for {backing_device!r}")
|
|
result = run_func( # type: ignore[operator]
|
|
['cryptsetup', 'luksOpen', *extra_opts, backing_device, mapper_name],
|
|
switch_user='root',
|
|
attach_tty=not password,
|
|
stdin_input=get_password_io(password),
|
|
)
|
|
rc = result if isinstance(result, int) else result.returncode
|
|
if rc:
|
|
raise Exception("Failed to open LUKS device: cryptsetup error^^^^")
|
|
if schedule_close:
|
|
atexit.register(luks_close, mapper_name, native_chroot=native_chroot, idempotent=True)
|
|
logging.info(f"LUKS mapper {mapper_name!r} opened!")
|
|
|
|
|
|
def luks_close(
|
|
mapper_name: str,
|
|
native_chroot: Optional[BuildChroot] = None,
|
|
extra_opts: list[str] = [],
|
|
idempotent: bool = False,
|
|
):
|
|
run_func = native_chroot.run_cmd if native_chroot else run_cmd
|
|
if not mapper_exists(mapper_name, native_chroot):
|
|
if idempotent:
|
|
logging.debug(f"LUKS mapper {mapper_name!r} already closed")
|
|
return 0
|
|
raise Exception(f"Can't close LUKS mapper {mapper_name!r}: mapper doesn't exist")
|
|
logging.info(f"Closing LUKS mapper {mapper_name!r}")
|
|
result = run_func( # type: ignore[operator]
|
|
['cryptsetup', 'close', *extra_opts, mapper_name],
|
|
switch_user='root',
|
|
)
|
|
rc = result if isinstance(result, int) else result.returncode
|
|
if rc:
|
|
raise Exception("Failed to close LUKS device: cryptsetup error^^^^")
|
|
logging.info(f"LUKS mapper {mapper_name!r} closed.")
|