mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-22 13:15:44 -05:00
540 lines
18 KiB
Python
540 lines
18 KiB
Python
import atexit
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import click
|
|
import logging
|
|
from signal import pause
|
|
from subprocess import CompletedProcess
|
|
from typing import Optional, Union
|
|
|
|
from config.state import config, Profile
|
|
from chroot.device import DeviceChroot, get_device_chroot
|
|
from constants import Arch, BASE_LOCAL_PACKAGES, BASE_PACKAGES, POST_INSTALL_CMDS
|
|
from distro.distro import get_base_distro, get_kupfer_https
|
|
from devices.device import Device, get_profile_device
|
|
from exec.cmd import run_root_cmd, generate_cmd_su
|
|
from exec.file import get_temp_dir, root_write_file, root_makedir, makedir
|
|
from flavours.flavour import Flavour, get_profile_flavour
|
|
from net.ssh import copy_ssh_keys
|
|
from packages.build import build_enable_qemu_binfmt, build_packages, filter_pkgbuilds
|
|
from wrapper import enforce_wrap
|
|
|
|
# image files need to be slightly smaller than partitions to fit
|
|
IMG_FILE_ROOT_DEFAULT_SIZE = "1800M"
|
|
IMG_FILE_BOOT_DEFAULT_SIZE = "90M"
|
|
|
|
|
|
def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess:
|
|
cmd = [
|
|
'dd',
|
|
f'if={input}',
|
|
f'of={output}',
|
|
f'bs={blocksize}',
|
|
'oflag=direct',
|
|
'status=progress',
|
|
'conv=sync,noerror',
|
|
]
|
|
logging.debug(f'running dd cmd: {cmd}')
|
|
return run_root_cmd(cmd)
|
|
|
|
|
|
def partprobe(device: str):
|
|
return run_root_cmd(['partprobe', device])
|
|
|
|
|
|
def bytes_to_sectors(b: int, sector_size: int, round_up: bool = True):
|
|
sectors, rest = divmod(b, sector_size)
|
|
if rest and round_up:
|
|
sectors += 1
|
|
return sectors
|
|
|
|
|
|
def get_fs_size(partition: str) -> tuple[int, int]:
|
|
blocks_cmd = run_root_cmd(['dumpe2fs', '-h', partition], env={"LC_ALL": "C"}, capture_output=True)
|
|
if blocks_cmd.returncode != 0:
|
|
logging.debug(f"dumpe2fs stdout:\n: {blocks_cmd.stdout}")
|
|
logging.debug(f"dumpe2fs stderr:\n {blocks_cmd.stderr}")
|
|
raise Exception(f'Failed to detect new filesystem size of {partition}')
|
|
blocks_text = blocks_cmd.stdout.decode('utf-8') if blocks_cmd.stdout else ''
|
|
try:
|
|
fs_blocks = int(re.search('\\nBlock count:[ ]+([0-9]+)\\n', blocks_text, flags=re.MULTILINE).group(1)) # type: ignore[union-attr]
|
|
fs_block_size = int(re.search('\\nBlock size:[ ]+([0-9]+)\\n', blocks_text).group(1)) # type: ignore[union-attr]
|
|
except Exception as ex:
|
|
logging.debug(f"dumpe2fs stdout:\n {blocks_text}")
|
|
logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}")
|
|
logging.info("Failed to scrape block size and count from dumpe2fs:", ex)
|
|
raise ex
|
|
return fs_blocks, fs_block_size
|
|
|
|
|
|
def align_bytes(size_bytes: int, alignment: int = 4096) -> int:
|
|
rest = size_bytes % alignment
|
|
if rest:
|
|
size_bytes += alignment - rest
|
|
return size_bytes
|
|
|
|
|
|
def shrink_fs(loop_device: str, file: str, sector_size: int):
|
|
partprobe(loop_device)
|
|
logging.debug(f"Checking filesystem at {loop_device}p2")
|
|
result = run_root_cmd(['e2fsck', '-fy', f'{loop_device}p2'])
|
|
if result.returncode > 2:
|
|
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
|
|
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}')
|
|
|
|
logging.info(f'Shrinking filesystem at {loop_device}p2')
|
|
result = run_root_cmd(['resize2fs', '-M', f'{loop_device}p2'])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to resize2fs {loop_device}p2')
|
|
|
|
logging.debug(f'Reading size of shrunken filesystem on {loop_device}p2')
|
|
fs_blocks, fs_block_size = get_fs_size(f'{loop_device}p2')
|
|
sectors = bytes_to_sectors(fs_blocks * fs_block_size, sector_size)
|
|
|
|
logging.info(f'Shrinking partition at {loop_device}p2 to {sectors} sectors ({sectors * sector_size} bytes)')
|
|
child_proccess = subprocess.Popen(
|
|
generate_cmd_su(['fdisk', '-b', str(sector_size), loop_device], switch_user='root'), # type: ignore
|
|
stdin=subprocess.PIPE,
|
|
)
|
|
child_proccess.stdin.write('\n'.join([ # type: ignore
|
|
'd',
|
|
'2',
|
|
'n',
|
|
'p',
|
|
'2',
|
|
'',
|
|
f'+{sectors}',
|
|
'w',
|
|
'q',
|
|
]).encode('utf-8'))
|
|
|
|
child_proccess.communicate()
|
|
|
|
returncode = child_proccess.wait()
|
|
if returncode == 1:
|
|
# For some reason re-reading the partition table fails, but that is not a problem
|
|
partprobe(loop_device)
|
|
if returncode > 1:
|
|
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk')
|
|
|
|
partprobe(loop_device).check_returncode()
|
|
|
|
logging.debug(f'Finding end sector of partition at {loop_device}p2')
|
|
result = run_root_cmd(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True)
|
|
if result.returncode != 0:
|
|
print(result.stdout)
|
|
print(result.stderr)
|
|
raise Exception(f'Failed to fdisk -l {loop_device}')
|
|
|
|
end_sector = 0
|
|
for line in result.stdout.decode('utf-8').split('\n'):
|
|
if line.startswith(f'{loop_device}p2'):
|
|
parts = list(filter(lambda part: part != '', line.split(' ')))
|
|
end_sector = int(parts[2])
|
|
|
|
if end_sector == 0:
|
|
raise Exception(f'Failed to find end sector of {loop_device}p2')
|
|
|
|
end_size = align_bytes((end_sector + 1) * sector_size, 4096)
|
|
|
|
logging.debug(f'({end_sector} + 1) sectors * {sector_size} bytes/sector = {end_size} bytes')
|
|
logging.info(f'Truncating {file} to {end_size} bytes')
|
|
result = subprocess.run(['truncate', '-s', str(end_size), file])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to truncate {file}')
|
|
partprobe(loop_device)
|
|
|
|
|
|
def losetup_destroy(loop_device):
|
|
logging.debug(f'Destroying loop device {loop_device}')
|
|
run_root_cmd(
|
|
[
|
|
'losetup',
|
|
'-d',
|
|
loop_device,
|
|
],
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
|
|
def get_device_name(device: Union[str, Device]) -> str:
|
|
return device.name if isinstance(device, Device) else device
|
|
|
|
|
|
def get_flavour_name(flavour: Union[str, Flavour]) -> str:
|
|
if isinstance(flavour, Flavour):
|
|
return flavour.name
|
|
return flavour
|
|
|
|
|
|
def get_image_name(device: Union[str, Device], flavour: Union[str, Flavour], img_type='full') -> str:
|
|
return f'{get_device_name(device)}-{get_flavour_name(flavour)}-{img_type}.img'
|
|
|
|
|
|
def get_image_path(device: Union[str, Device], flavour: Union[str, Flavour], img_type='full') -> str:
|
|
return os.path.join(config.get_path('images'), get_image_name(device, flavour, img_type))
|
|
|
|
|
|
def losetup_rootfs_image(image_path: str, sector_size: int) -> str:
|
|
logging.debug(f'Creating loop device for {image_path} with sector size {sector_size}')
|
|
result = run_root_cmd([
|
|
'losetup',
|
|
'-f',
|
|
'-b',
|
|
str(sector_size),
|
|
'-P',
|
|
image_path,
|
|
])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create loop device for {image_path}')
|
|
|
|
logging.debug(f'Finding loop device for {image_path}')
|
|
|
|
result = subprocess.run(['losetup', '-J'], capture_output=True)
|
|
if result.returncode != 0:
|
|
print(result.stdout)
|
|
print(result.stderr)
|
|
raise Exception('Failed to list loop devices')
|
|
|
|
data = json.loads(result.stdout.decode('utf-8'))
|
|
loop_device = ''
|
|
for d in data['loopdevices']:
|
|
if d['back-file'] == image_path:
|
|
loop_device = d['name']
|
|
break
|
|
|
|
if loop_device == '':
|
|
raise Exception(f'Failed to find loop device for {image_path}')
|
|
partprobe(loop_device)
|
|
|
|
atexit.register(losetup_destroy, loop_device)
|
|
|
|
return loop_device
|
|
|
|
|
|
def mount_chroot(rootfs_source: str, boot_src: str, chroot: DeviceChroot):
|
|
logging.debug(f'Mounting {rootfs_source} at {chroot.path}')
|
|
|
|
chroot.mount_rootfs(rootfs_source)
|
|
assert (os.path.ismount(chroot.path))
|
|
|
|
root_makedir(chroot.get_path('boot'))
|
|
|
|
logging.debug(f'Mounting {boot_src} at {chroot.path}/boot')
|
|
chroot.mount(boot_src, '/boot', options=['defaults'])
|
|
|
|
|
|
def dump_file_from_image(image_path: str, file_path: str, target_path: Optional[str] = None):
|
|
target_path = target_path or os.path.join(get_temp_dir(), os.path.basename(file_path))
|
|
result = run_root_cmd([
|
|
'debugfs',
|
|
image_path,
|
|
'-R',
|
|
f'\'dump /{file_path.lstrip("/")} {target_path}\'',
|
|
])
|
|
if result.returncode != 0 or not os.path.exists(target_path):
|
|
raise Exception(f'Failed to dump {file_path} from /boot')
|
|
return target_path
|
|
|
|
|
|
def dump_aboot(image_path: str) -> str:
|
|
return dump_file_from_image(image_path, file_path='/aboot.img')
|
|
|
|
|
|
def dump_lk2nd(image_path: str) -> str:
|
|
"""
|
|
This doesn't append the image with the appended DTB which is needed for some devices, so it should get added in the future.
|
|
"""
|
|
return dump_file_from_image(image_path, file_path='/lk2nd.img')
|
|
|
|
|
|
def dump_qhypstub(image_path: str) -> str:
|
|
return dump_file_from_image(image_path, file_path='/qhyptstub.img')
|
|
|
|
|
|
def create_img_file(image_path: str, size_str: str):
|
|
result = subprocess.run([
|
|
'truncate',
|
|
'-s',
|
|
size_str,
|
|
image_path,
|
|
])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to allocate {image_path}')
|
|
return image_path
|
|
|
|
|
|
def partition_device(device: str):
|
|
boot_partition_size = '100MiB'
|
|
create_partition_table = ['mklabel', 'msdos']
|
|
create_boot_partition = ['mkpart', 'primary', 'ext2', '0%', boot_partition_size]
|
|
create_root_partition = ['mkpart', 'primary', boot_partition_size, '100%']
|
|
enable_boot = ['set', '1', 'boot', 'on']
|
|
result = run_root_cmd([
|
|
'parted',
|
|
'--script',
|
|
device,
|
|
] + create_partition_table + create_boot_partition + create_root_partition + enable_boot)
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create partitions on {device}')
|
|
|
|
|
|
def create_filesystem(device: str, blocksize: Optional[int], label=None, options=[], fstype='ext4'):
|
|
"""Creates a new filesystem. Blocksize defaults"""
|
|
labels = ['-L', label] if label else []
|
|
cmd = [f'mkfs.{fstype}', '-F', *labels]
|
|
if blocksize:
|
|
# blocksize can be 4k max due to pagesize
|
|
blocksize = min(blocksize, 4096)
|
|
if fstype.startswith('ext'):
|
|
# blocksize for ext-fs must be >=1024
|
|
blocksize = max(blocksize, 1024)
|
|
cmd += [
|
|
'-b',
|
|
str(blocksize),
|
|
]
|
|
cmd.append(device)
|
|
result = run_root_cmd(cmd)
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create {fstype} filesystem on {device} with CMD: {cmd}')
|
|
|
|
|
|
def create_root_fs(device: str, blocksize: Optional[int]):
|
|
create_filesystem(device, blocksize=blocksize, label='kupfer_root', options=['-O', '^metadata_csum', '-N', '100000'])
|
|
|
|
|
|
def create_boot_fs(device: str, blocksize: Optional[int]):
|
|
create_filesystem(device, blocksize=blocksize, label='kupfer_boot', fstype='ext2')
|
|
|
|
|
|
def install_rootfs(
|
|
rootfs_device: str,
|
|
bootfs_device: str,
|
|
device: Union[str, Device],
|
|
flavour: Flavour,
|
|
arch: Arch,
|
|
packages: list[str],
|
|
use_local_repos: bool,
|
|
profile: Profile,
|
|
):
|
|
user = profile['username'] or 'kupfer'
|
|
chroot = get_device_chroot(device=get_device_name(device), flavour=flavour.name, arch=arch, packages=packages, use_local_repos=use_local_repos)
|
|
|
|
mount_chroot(rootfs_device, bootfs_device, chroot)
|
|
|
|
chroot.mount_pacman_cache()
|
|
chroot.initialize()
|
|
chroot.activate()
|
|
chroot.create_user(
|
|
user=user,
|
|
password=profile['password'],
|
|
)
|
|
chroot.add_sudo_config(config_name='wheel', privilegee='%wheel', password_required=True)
|
|
copy_ssh_keys(
|
|
chroot,
|
|
user=user,
|
|
allow_fail=True,
|
|
)
|
|
files = {
|
|
'etc/pacman.conf': get_base_distro(arch).get_pacman_conf(
|
|
check_space=True,
|
|
extra_repos=get_kupfer_https(arch).repos,
|
|
in_chroot=True,
|
|
),
|
|
'etc/hostname': profile['hostname'] or 'kupfer',
|
|
}
|
|
for target, content in files.items():
|
|
root_write_file(os.path.join(chroot.path, target.lstrip('/')), content)
|
|
|
|
logging.info("Running post-install CMDs")
|
|
for cmd in POST_INSTALL_CMDS:
|
|
result = chroot.run_cmd(cmd)
|
|
assert isinstance(result, subprocess.CompletedProcess)
|
|
if result.returncode != 0:
|
|
raise Exception(f'Error running post-install cmd: {cmd}')
|
|
|
|
logging.info('Preparing to unmount chroot')
|
|
res = chroot.run_cmd('sync && umount /boot', attach_tty=True)
|
|
logging.debug(f'rc: {res}')
|
|
chroot.deactivate()
|
|
|
|
logging.debug(f'Unmounting rootfs at "{chroot.path}"')
|
|
res = run_root_cmd(['umount', chroot.path])
|
|
assert isinstance(res, CompletedProcess)
|
|
logging.debug(f'rc: {res.returncode}')
|
|
|
|
|
|
@click.group(name='image')
|
|
def cmd_image():
|
|
"""Build, flash and boot device images"""
|
|
|
|
|
|
sectorsize_option = click.option(
|
|
'-b',
|
|
'--sector-size',
|
|
help="Override the device's sector size",
|
|
type=int,
|
|
default=None,
|
|
)
|
|
|
|
|
|
@cmd_image.command(name='build')
|
|
@click.argument('profile_name', required=False)
|
|
@click.option(
|
|
'--local-repos/--no-local-repos',
|
|
'-l/-L',
|
|
help='Whether to use local package repos at all or only use HTTPS repos.',
|
|
default=True,
|
|
show_default=True,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--build-pkgs/--no-build-pkgs',
|
|
'-p/-P',
|
|
help='Whether to build missing/outdated local packages if local repos are enabled.',
|
|
default=True,
|
|
show_default=True,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--no-download-pkgs',
|
|
help='Disable trying to download packages instead of building if building is enabled.',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--block-target',
|
|
help='Override the block device file to write the final image to',
|
|
type=click.Path(),
|
|
default=None,
|
|
)
|
|
@click.option(
|
|
'--skip-part-images',
|
|
help='Skip creating image files for the partitions and directly work on the target block device.',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@sectorsize_option
|
|
def cmd_build(
|
|
profile_name: Optional[str] = None,
|
|
local_repos: bool = True,
|
|
build_pkgs: bool = True,
|
|
no_download_pkgs=False,
|
|
block_target: Optional[str] = None,
|
|
sector_size: Optional[int] = None,
|
|
skip_part_images: bool = False,
|
|
):
|
|
"""
|
|
Build a device image.
|
|
|
|
Unless overriden, required packages will be built or preferably downloaded from HTTPS repos.
|
|
"""
|
|
|
|
config.enforce_profile_device_set()
|
|
config.enforce_profile_flavour_set()
|
|
enforce_wrap()
|
|
device = get_profile_device(profile_name)
|
|
arch = device.arch
|
|
# check_programs_wrap(['makepkg', 'pacman', 'pacstrap'])
|
|
profile: Profile = config.get_profile(profile_name)
|
|
flavour = get_profile_flavour(profile_name)
|
|
rootfs_size_mb = flavour.parse_flavourinfo().rootfs_size * 1000 + int(profile.size_extra_mb)
|
|
|
|
packages = BASE_LOCAL_PACKAGES + [device.package.name, flavour.pkgbuild.name]
|
|
packages_extra = BASE_PACKAGES + profile.pkgs_include
|
|
|
|
if arch != config.runtime.arch:
|
|
build_enable_qemu_binfmt(arch)
|
|
|
|
if local_repos and build_pkgs:
|
|
logging.info("Making sure all packages are built")
|
|
# enforce that local base packages are built
|
|
pkgbuilds = set(filter_pkgbuilds(packages, arch=arch, allow_empty_results=False, use_paths=False))
|
|
# extra packages might be a mix of package names that are in our PKGBUILDs and packages from the base distro
|
|
pkgbuilds |= set(filter_pkgbuilds(packages_extra, arch=arch, allow_empty_results=True, use_paths=False))
|
|
build_packages(pkgbuilds, arch, try_download=not no_download_pkgs)
|
|
|
|
sector_size = sector_size or device.get_image_sectorsize()
|
|
|
|
image_path = block_target or get_image_path(device, flavour.name)
|
|
|
|
makedir(os.path.dirname(image_path))
|
|
|
|
logging.info(f'Creating new file at {image_path}')
|
|
create_img_file(image_path, f"{rootfs_size_mb}M")
|
|
|
|
loop_device = losetup_rootfs_image(image_path, sector_size or device.get_image_sectorsize_default())
|
|
|
|
partition_device(loop_device)
|
|
partprobe(loop_device)
|
|
|
|
boot_dev: str
|
|
root_dev: str
|
|
loop_boot = loop_device + 'p1'
|
|
loop_root = loop_device + 'p2'
|
|
if skip_part_images:
|
|
boot_dev = loop_boot
|
|
root_dev = loop_root
|
|
else:
|
|
logging.info('Creating per-partition image files')
|
|
boot_dev = create_img_file(get_image_path(device, flavour, 'boot'), IMG_FILE_BOOT_DEFAULT_SIZE)
|
|
root_dev = create_img_file(get_image_path(device, flavour, 'root'), f'{rootfs_size_mb - 200}M')
|
|
|
|
create_boot_fs(boot_dev, sector_size)
|
|
create_root_fs(root_dev, sector_size)
|
|
|
|
install_rootfs(
|
|
root_dev,
|
|
boot_dev,
|
|
device,
|
|
flavour,
|
|
arch,
|
|
list(set(packages) | set(packages_extra)),
|
|
local_repos,
|
|
profile,
|
|
)
|
|
|
|
if not skip_part_images:
|
|
logging.info('Copying partition image files into full image:')
|
|
logging.info(f'Block-copying /boot to {image_path}')
|
|
dd_image(input=boot_dev, output=loop_boot)
|
|
logging.info(f'Block-copying rootfs to {image_path}')
|
|
dd_image(input=root_dev, output=loop_root)
|
|
|
|
logging.info(f'Done! Image saved to {image_path}')
|
|
|
|
|
|
@cmd_image.command(name='inspect')
|
|
@click.option('--shell', '-s', is_flag=True)
|
|
@sectorsize_option
|
|
@click.argument('profile', required=False)
|
|
def cmd_inspect(profile: Optional[str] = None, shell: bool = False, sector_size: Optional[int] = None):
|
|
"""Loop-mount the device image for inspection."""
|
|
config.enforce_profile_device_set()
|
|
config.enforce_profile_flavour_set()
|
|
enforce_wrap()
|
|
device = get_profile_device(profile)
|
|
arch = device.arch
|
|
flavour = get_profile_flavour(profile).name
|
|
sector_size = sector_size or device.get_image_sectorsize_default()
|
|
|
|
chroot = get_device_chroot(device.name, flavour, arch)
|
|
image_path = get_image_path(device, flavour)
|
|
loop_device = losetup_rootfs_image(image_path, sector_size)
|
|
partprobe(loop_device)
|
|
mount_chroot(loop_device + 'p2', loop_device + 'p1', chroot)
|
|
|
|
logging.info(f'Inspect the rootfs image at {chroot.path}')
|
|
|
|
if shell:
|
|
chroot.initialized = True
|
|
chroot.activate()
|
|
if arch != config.runtime.arch:
|
|
logging.info('Installing requisites for foreign-arch shell')
|
|
build_enable_qemu_binfmt(arch)
|
|
logging.info('Starting inspection shell')
|
|
chroot.run_cmd('/bin/bash')
|
|
else:
|
|
pause()
|