mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-23 05:35:44 -05:00
346 lines
11 KiB
Python
346 lines
11 KiB
Python
import atexit
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import click
|
|
import logging
|
|
|
|
from chroot import Chroot, get_device_chroot
|
|
from constants import BASE_PACKAGES, DEVICES, FLAVOURS
|
|
from config import config
|
|
from distro import get_base_distro, get_kupfer_https, get_kupfer_local
|
|
from packages import build_enable_qemu_binfmt, discover_packages, build_packages
|
|
from ssh import copy_ssh_keys
|
|
from wrapper import enforce_wrap
|
|
from signal import pause
|
|
|
|
|
|
def shrink_fs(loop_device: str, file: str, sector_size: int):
|
|
# 8: 512 bytes sectors
|
|
# 1: 4096 bytes sectors
|
|
sectors_blocks_factor = 4096 // sector_size
|
|
|
|
logging.debug(f"Checking filesystem at {loop_device}p2")
|
|
result = subprocess.run(['e2fsck', '-fy', f'{loop_device}p2'])
|
|
if result.returncode > 2:
|
|
# https://man7.org/linux/man-pages/man8/e2fsck.8.html#EXIT_CODE
|
|
raise Exception(f'Failed to e2fsck {loop_device}p2 with exit code {result.returncode}')
|
|
|
|
logging.debug(f'Shrinking filesystem at {loop_device}p2')
|
|
result = subprocess.run(['resize2fs', '-M', f'{loop_device}p2'], capture_output=True)
|
|
if result.returncode != 0:
|
|
print(result.stdout)
|
|
print(result.stderr)
|
|
raise Exception(f'Failed to resize2fs {loop_device}p2')
|
|
|
|
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
|
|
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2])
|
|
sectors = blocks * sectors_blocks_factor #+ 157812 - 25600
|
|
|
|
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
|
|
child_proccess = subprocess.Popen(
|
|
['fdisk', '-b', str(sector_size), loop_device],
|
|
stdin=subprocess.PIPE,
|
|
)
|
|
child_proccess.stdin.write('\n'.join([
|
|
'd',
|
|
'2',
|
|
'n',
|
|
'p',
|
|
'2',
|
|
'',
|
|
f'+{sectors}',
|
|
'w',
|
|
'q',
|
|
]).encode('utf-8'))
|
|
|
|
child_proccess.communicate()
|
|
|
|
returncode = child_proccess.wait()
|
|
if returncode == 1:
|
|
# For some reason re-reading the partition table fails, but that is not a problem
|
|
subprocess.run(['partprobe'])
|
|
if returncode > 1:
|
|
raise Exception(f'Failed to shrink partition size of {loop_device}p2 with fdisk')
|
|
|
|
logging.debug(f'Finding end sector of partition at {loop_device}p2')
|
|
result = subprocess.run(['fdisk', '-b', str(sector_size), '-l', loop_device], capture_output=True)
|
|
if result.returncode != 0:
|
|
print(result.stdout)
|
|
print(result.stderr)
|
|
raise Exception(f'Failed to fdisk -l {loop_device}')
|
|
|
|
end_sector = 0
|
|
for line in result.stdout.decode('utf-8').split('\n'):
|
|
if line.startswith(f'{loop_device}p2'):
|
|
parts = list(filter(lambda part: part != '', line.split(' ')))
|
|
end_sector = int(parts[2])
|
|
|
|
if end_sector == 0:
|
|
raise Exception(f'Failed to find end sector of {loop_device}p2')
|
|
|
|
end_block = end_sector // sectors_blocks_factor
|
|
|
|
logging.debug(f'Truncating {file} to {end_block} blocks')
|
|
result = subprocess.run(['truncate', '-o', '-s', str(end_block), file])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to truncate {file}')
|
|
|
|
|
|
def get_device_and_flavour(profile: str = None) -> tuple[str, str]:
|
|
config.enforce_config_loaded()
|
|
profile = config.get_profile(profile)
|
|
if not profile['device']:
|
|
raise Exception("Please set the device using 'kupferbootstrap config init ...'")
|
|
|
|
if not profile['flavour']:
|
|
raise Exception("Please set the flavour using 'kupferbootstrap config init ...'")
|
|
|
|
return (profile['device'], profile['flavour'])
|
|
|
|
|
|
def get_image_name(device_chroot: Chroot) -> str:
|
|
return f'{device_chroot.name}.img'
|
|
|
|
|
|
def get_image_path(device_chroot: Chroot) -> str:
|
|
return os.path.join(config.get_path('images'), get_image_name(device_chroot))
|
|
|
|
|
|
def losetup_rootfs_image(image_path: str, sector_size: int) -> str:
|
|
logging.debug(f'Creating loop device for {image_path}')
|
|
result = subprocess.run([
|
|
'losetup',
|
|
'-f',
|
|
'-b',
|
|
str(sector_size),
|
|
'-P',
|
|
image_path,
|
|
])
|
|
if result.returncode != 0:
|
|
logging.fatal(f'Failed create loop device for {image_path}')
|
|
exit(1)
|
|
|
|
logging.debug(f'Finding loop device for {image_path}')
|
|
|
|
result = subprocess.run(['losetup', '-J'], capture_output=True)
|
|
if result.returncode != 0:
|
|
print(result.stdout)
|
|
print(result.stderr)
|
|
logging.fatal('Failed to list loop devices')
|
|
exit(1)
|
|
|
|
data = json.loads(result.stdout.decode('utf-8'))
|
|
loop_device = ''
|
|
for d in data['loopdevices']:
|
|
if d['back-file'] == image_path:
|
|
loop_device = d['name']
|
|
break
|
|
|
|
if loop_device == '':
|
|
raise Exception(f'Failed to find loop device for {image_path}')
|
|
|
|
def losetup_destroy():
|
|
logging.debug(f'Destroying loop device {loop_device} for {image_path}')
|
|
subprocess.run(
|
|
[
|
|
'losetup',
|
|
'-d',
|
|
loop_device,
|
|
],
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
atexit.register(losetup_destroy)
|
|
|
|
return loop_device
|
|
|
|
|
|
def mount_rootfs_loop_device(loop_device, chroot: Chroot):
|
|
logging.debug(f'Mounting {loop_device}p2 at {chroot.path}')
|
|
|
|
chroot.mount_rootfs(loop_device + 'p2')
|
|
assert (os.path.ismount(chroot.path))
|
|
|
|
os.makedirs(chroot.get_path('boot'), exist_ok=True)
|
|
|
|
logging.debug(f'Mounting {loop_device}p1 at {chroot.path}/boot')
|
|
chroot.mount(loop_device + 'p1', '/boot', options=['defaults'])
|
|
|
|
|
|
def dump_bootimg(image_path: str) -> str:
|
|
path = '/tmp/boot.img'
|
|
result = subprocess.run([
|
|
'debugfs',
|
|
image_path,
|
|
'-R',
|
|
f'dump /boot.img {path}',
|
|
])
|
|
if result.returncode != 0:
|
|
logging.fatal('Failed to dump boot.img')
|
|
exit(1)
|
|
return path
|
|
|
|
|
|
def dump_lk2nd(image_path: str) -> str:
|
|
"""
|
|
This doesn't append the image with the appended DTB which is needed for some devices, so it should get added in the future.
|
|
"""
|
|
path = '/tmp/lk2nd.img'
|
|
result = subprocess.run([
|
|
'debugfs',
|
|
image_path,
|
|
'-R',
|
|
f'dump /lk2nd.img {path}',
|
|
])
|
|
if result.returncode != 0:
|
|
logging.fatal('Failed to dump lk2nd.img')
|
|
exit(1)
|
|
return path
|
|
|
|
|
|
def dump_qhypstub(image_path: str) -> str:
|
|
path = '/tmp/qhypstub.bin'
|
|
result = subprocess.run([
|
|
'debugfs',
|
|
image_path,
|
|
'-R',
|
|
f'dump /qhypstub.bin {path}',
|
|
])
|
|
if result.returncode != 0:
|
|
logging.fatal('Failed to dump qhypstub.bin')
|
|
exit(1)
|
|
return path
|
|
|
|
|
|
@click.group(name='image')
|
|
def cmd_image():
|
|
pass
|
|
|
|
|
|
@cmd_image.command(name='build')
|
|
@click.argument('profile_name', required=False)
|
|
@click.option('--build-pkgs/--no-build-pkgs', '-p/-P', default=True, help='Whether to build missing/outdated packages. Defaults to true.')
|
|
def cmd_build(profile_name: str = None, build_pkgs: bool = True):
|
|
enforce_wrap()
|
|
profile = config.get_profile(profile_name)
|
|
device, flavour = get_device_and_flavour(profile_name)
|
|
post_cmds = FLAVOURS[flavour].get('post_cmds', [])
|
|
|
|
user = profile['username'] or 'kupfer'
|
|
# TODO: PARSE DEVICE ARCH AND SECTOR SIZE
|
|
arch = 'aarch64'
|
|
sector_size = 4096
|
|
|
|
build_enable_qemu_binfmt(arch)
|
|
|
|
packages_dir = config.get_package_dir(arch)
|
|
if os.path.exists(os.path.join(packages_dir, 'main')):
|
|
extra_repos = get_kupfer_local(arch).repos
|
|
else:
|
|
extra_repos = get_kupfer_https(arch).repos
|
|
packages = BASE_PACKAGES + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
|
|
|
|
if build_pkgs:
|
|
repo = discover_packages()
|
|
build_packages(repo, [p for name, p in repo.items() if name in packages], arch)
|
|
|
|
chroot = get_device_chroot(device=device, flavour=flavour, arch=arch, packages=packages, extra_repos=extra_repos)
|
|
image_path = get_image_path(chroot)
|
|
|
|
os.makedirs(config.get_path('images'), exist_ok=True)
|
|
new_image = not os.path.exists(image_path)
|
|
if new_image:
|
|
result = subprocess.run([
|
|
'truncate',
|
|
'-s',
|
|
f"{FLAVOURS[flavour].get('size',2)}G",
|
|
image_path,
|
|
])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to allocate {image_path}')
|
|
|
|
loop_device = losetup_rootfs_image(image_path, sector_size)
|
|
|
|
if new_image:
|
|
boot_partition_size = '100MiB'
|
|
create_partition_table = ['mklabel', 'msdos']
|
|
create_boot_partition = ['mkpart', 'primary', 'ext2', '0%', boot_partition_size]
|
|
create_root_partition = ['mkpart', 'primary', boot_partition_size, '100%']
|
|
enable_boot = ['set', '1', 'boot', 'on']
|
|
result = subprocess.run([
|
|
'parted',
|
|
'--script',
|
|
loop_device,
|
|
] + create_partition_table + create_boot_partition + create_root_partition + enable_boot)
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create partitions on {loop_device}')
|
|
|
|
result = subprocess.run([
|
|
'mkfs.ext2',
|
|
'-F',
|
|
'-L',
|
|
'kupfer_boot',
|
|
f'{loop_device}p1',
|
|
])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create ext2 filesystem on {loop_device}p1')
|
|
|
|
result = subprocess.run([
|
|
'mkfs.ext4',
|
|
'-O',
|
|
'^metadata_csum',
|
|
'-F',
|
|
'-L',
|
|
'kupfer_root',
|
|
f'{loop_device}p2',
|
|
])
|
|
if result.returncode != 0:
|
|
raise Exception(f'Failed to create ext4 filesystem on {loop_device}p2')
|
|
|
|
mount_rootfs_loop_device(loop_device, chroot)
|
|
|
|
chroot.mount_pacman_cache()
|
|
chroot.initialize()
|
|
chroot.activate()
|
|
chroot.create_user(
|
|
user=user,
|
|
password=profile['password'],
|
|
)
|
|
|
|
copy_ssh_keys(
|
|
chroot.path,
|
|
user=user,
|
|
)
|
|
with open(os.path.join(chroot.path, 'etc', 'pacman.conf'), 'w') as file:
|
|
file.write(get_base_distro(arch).get_pacman_conf(check_space=True, extra_repos=get_kupfer_https(arch).repos))
|
|
if post_cmds:
|
|
result = chroot.run_cmd(' && '.join(post_cmds))
|
|
if result.returncode != 0:
|
|
raise Exception('Error running post_cmds')
|
|
|
|
|
|
@cmd_image.command(name='inspect')
|
|
@click.option('--shell', '-s', is_flag=True)
|
|
def cmd_inspect(shell: bool = False):
|
|
device, flavour = get_device_and_flavour()
|
|
# TODO: get arch from profile
|
|
arch = 'aarch64'
|
|
# TODO: PARSE DEVICE SECTOR SIZE
|
|
sector_size = 4096
|
|
chroot = get_device_chroot(device, flavour, arch)
|
|
image_path = get_image_path(chroot)
|
|
loop_device = losetup_rootfs_image(image_path, sector_size)
|
|
|
|
mount_rootfs_loop_device(loop_device, chroot)
|
|
|
|
logging.info(f'Inspect the rootfs image at {chroot.path}')
|
|
|
|
if shell:
|
|
chroot.initialized = True
|
|
chroot.activate()
|
|
build_enable_qemu_binfmt(arch)
|
|
chroot.run_cmd('/bin/bash')
|
|
else:
|
|
pause()
|