From f09deaa9a59b72bfcd91d2f0f8336840e1f61e20 Mon Sep 17 00:00:00 2001 From: InsanePrawn Date: Wed, 29 Sep 2021 02:00:59 +0200 Subject: [PATCH] a lot: profiles, some more help strings. partial: exceptions instead of exit() Signed-off-by: InsanePrawn --- boot.py | 16 +++++++--- chroot.py | 35 +++++++++++++-------- config.py | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++-- constants.py | 25 +++++++++------ distro.py | 9 +++++- flash.py | 46 +++++++++++++-------------- image.py | 68 +++++++++++----------------------------- logger.py | 1 + main.py | 9 ++++-- 9 files changed, 190 insertions(+), 106 deletions(-) diff --git a/boot.py b/boot.py index 90ea766..fd7e0a5 100644 --- a/boot.py +++ b/boot.py @@ -2,13 +2,20 @@ import os import urllib.request from image import get_device_and_flavour, get_image_name, dump_bootimg, dump_lk2nd from fastboot import fastboot_boot, fastboot_erase_dtbo -from constants import BOOT_STRATEGIES, FASTBOOT, JUMPDRIVE, LK2ND, JUMPDRIVE_VERSION +from constants import BOOT_STRATEGIES, FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION import click +import logging + +LK2ND = FLASH_PARTS['LK2ND'] +BOOTIMG = FLASH_PARTS['BOOTIMG'] + +TYPES = [LK2ND, JUMPDRIVE, BOOTIMG] @click.command(name='boot') -@click.argument('type', required=False) +@click.argument('type', required=False, default=BOOTIMG) def cmd_boot(type): + f"""Flash one of {', '.join(TYPES)}""" device, flavour = get_device_and_flavour() image_name = get_image_name(device, flavour) strategy = BOOT_STRATEGIES[device] @@ -21,8 +28,9 @@ def cmd_boot(type): urllib.request.urlretrieve(f'https://github.com/dreemurrs-embedded/Jumpdrive/releases/download/{JUMPDRIVE_VERSION}/{file}', path) elif type == LK2ND: path = dump_lk2nd(image_name) - else: + elif type == BOOTIMG: path = dump_bootimg(image_name) - + else: + raise Exception(f'Unknown boot image type {type}') fastboot_erase_dtbo() fastboot_boot(path) diff --git a/chroot.py b/chroot.py index 9048bd2..f4ecd42 100644 --- a/chroot.py +++ b/chroot.py @@ -69,6 +69,22 @@ def create_chroot( return chroot_path +def run_chroot_cmd( + script: str, + chroot_name, + chroot_base_path: str = None, +): + chroot_path = get_chroot_path(chroot_name, override_basepath=chroot_base_path) + result = subprocess.run([ + 'arch-chroot', + chroot_path, + '/bin/bash', + '-c', + script, + ]) + return result + + def create_chroot_user( chroot_name, chroot_base_path: str = None, @@ -76,23 +92,18 @@ def create_chroot_user( password='123456', groups=['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'], ): - chroot_path = get_chroot_path(chroot_name, override_basepath=chroot_base_path) - install_script = f''' + set -e if ! id -u "{user}" >/dev/null 2>&1; then useradd -m {user} fi usermod -a -G {",".join(groups)} {user} - echo "{user}:{password}" | chpasswd chown {user}:{user} /home/{user} -R ''' - result = subprocess.run([ - 'arch-chroot', - chroot_path, - '/bin/bash', - '-c', - install_script, - ]) + if password: + install_script += f'echo "{user}:{password}" | chpasswd' + else: + install_script += 'passwd' + result = run_chroot_cmd(install_script, chroot_name=chroot_name, chroot_base_path=chroot_base_path) if result.returncode != 0: - logging.fatal('Failed to setup user') - exit(1) + raise Exception('Failed to setup user') diff --git a/config.py b/config.py index db86296..94cfadf 100644 --- a/config.py +++ b/config.py @@ -7,7 +7,10 @@ import click CONFIG_DEFAULT_PATH = os.path.join(appdirs.user_config_dir('kupfer'), 'kupferbootstrap.toml') -PROFILE_DEFAULTS = { +Profile = dict[str, str] + +PROFILE_DEFAULTS: Profile = { + 'parent': '', 'device': '', 'flavour': '', 'pkgs_include': [], @@ -30,6 +33,7 @@ CONFIG_DEFAULTS = { 'pkgbuilds': os.path.abspath(os.getcwd()), }, 'profiles': { + 'current': 'default', 'default': deepcopy(PROFILE_DEFAULTS), }, } @@ -41,6 +45,61 @@ CONFIG_RUNTIME_DEFAULTS = { } +def resolve_profile( + name: str, + sparse_profiles: dict[str, Profile], + resolved: dict[str, Profile] = None, + _visited=None, +) -> dict[str, Profile]: + """ + Recursively resolves the specified profile by `name` and its parents to merge the config semantically, + applying include and exclude overrides along the hierarchy. + If `resolved` is passed `None`, a fresh dictionary will be created. + `resolved` will be modified in-place during parsing and also returned. + A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed. + `_visited` should not be passed by users. + """ + if _visited is None: + _visited = list[str]() + if resolved is None: + resolved = dict[str, Profile]() + if name in _visited: + loop = list(_visited) + raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}') + if name in resolved: + return resolved + + _visited.append(name) + sparse = sparse_profiles[name] + full = deepcopy(sparse) + if 'parent' in sparse and (parent_name := sparse['parent']): + parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name] + full = parent | sparse + + # join our includes with parent's + includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', [])) + if 'pkgs_exclude' in sparse: + includes -= set(sparse['pkgs_exclude']) + full['pkgs_include'] = list(includes) + + # join our includes with parent's + excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', [])) + # our includes override parent excludes + if 'pkgs_include' in sparse: + excludes -= set(sparse['pkgs_include']) + full['pkgs_exclude'] = list(excludes) + + # now init missing keys + for key, value in PROFILE_DEFAULTS.items(): + if key not in full.keys(): + full[key] = None + if type(value) == list: + full[key] = [] + + resolved[name] = full + return resolved + + def sanitize_config(conf: dict, warn_missing_defaultprofile=True) -> dict: """checks the input config dict for unknown keys and returns only the known parts""" return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile) @@ -78,7 +137,10 @@ def merge_configs(conf_new: dict, conf_base={}, warn_missing_defaultprofile=True for profile_name, profile_conf in outer_conf.items(): if not isinstance(profile_conf, dict): - logging.warning('Skipped key "{profile_name}" in profile section: only subsections allowed') + if profile_name == 'current': + parsed[outer_name][profile_name] = profile_conf + else: + logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed') continue # init profile @@ -159,6 +221,7 @@ class ConfigStateHolder: file: dict = {} # runtime config not persisted anywhere runtime: dict = CONFIG_RUNTIME_DEFAULTS + _profile_cache: dict[str, Profile] = None def __init__(self, runtime_conf={}, file_conf_path: str = None, file_conf_base: dict = {}): """init a stateholder, optionally loading `file_conf_path`""" @@ -171,6 +234,7 @@ class ConfigStateHolder: def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS): _conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH self.runtime['config_file'] = _conf_file + self._profile_cache = None try: self.file = parse_file(config_file=_conf_file, base=base) except Exception as ex: @@ -190,6 +254,12 @@ class ConfigStateHolder: msg = "File doesn't exist. Try running `kupferbootstrap config init` first?" raise ConfigLoadException(extra_msg=msg, inner_exception=ex) + def get_profile(self, name: str = None): + if not name: + name = self.file['profiles']['current'] + self._profile_cache = resolve_profile(name, self.file['profiles'], resolved=self._profile_cache) + return self._profile_cache[name] + config = ConfigStateHolder(file_conf_base=CONFIG_DEFAULTS) @@ -213,5 +283,16 @@ if __name__ == '__main__': except ConfigLoadException as ex: logging.fatal(str(ex)) conf = deepcopy(CONFIG_DEFAULTS) - conf['profiles']['pinephone'] = {'hostname': 'slowphone', 'pkgs_include': ['zsh', 'tmux', 'mpv', 'firefox']} + conf['profiles']['pinephone'] = { + 'hostname': 'slowphone', + 'parent': '', + 'pkgs_include': ['zsh', 'tmux', 'mpv', 'firefox'], + 'pkgs_exclude': ['pixman-git'], + } + conf['profiles']['yeetphone'] = { + 'parent': 'pinephone', + 'hostname': 'yeetphone', + 'pkgs_include': ['pixman-git'], + 'pkgs_exclude': ['tmux'], + } print(toml.dumps(conf)) diff --git a/constants.py b/constants.py index 5f91b3d..947ebfe 100644 --- a/constants.py +++ b/constants.py @@ -1,10 +1,10 @@ FASTBOOT = 'fastboot' - -ROOTFS = 'rootfs' -BOOTIMG = 'bootimg' -LK2ND = 'lk2nd' -QHYPSTUB = 'qhypstub' - +FLASH_PARTS = { + 'ROOTFS': 'rootfs', + 'BOOTIMG': 'bootimg', + 'LK2ND': 'lk2nd', + 'QHYPSTUB': 'qhypstub', +} EMMC = 'emmc' EMMCFILE = 'emmc-file' MICROSD = 'microsd' @@ -28,9 +28,16 @@ DEVICES = { } FLAVOURS = { - 'barebone': [], - 'debug-shell': ['hook-debug-shell'], - 'gnome': ['gnome'], + 'barebone': { + 'packages': [] + }, + 'debug-shell': { + 'packages': ['hook-debug-shell'] + }, + 'gnome': { + 'packages': ['gnome', 'archlinux-appstream-data', 'gnome-software-packagekit-plugin'], + 'post_cmds': ['systemctl enable gdm'] + }, } REPOSITORIES = [ diff --git a/distro.py b/distro.py index f80be8d..6aabdf9 100644 --- a/distro.py +++ b/distro.py @@ -38,7 +38,7 @@ class Repo: remote: bool def scan(self): - self.resolved_url = resolve_url(self.url_template, self.repo_name, self.arch) + self.resolved_url = resolve_url(self.url_template, repo_name=self.repo_name, arch=self.arch) self.remote = not self.resolved_url.startswith('file://') # TODO @@ -50,6 +50,10 @@ class Repo: if scan: self.scan() + def config_snippet(self) -> str: + options = {'Server': self.url_template} | self.options.items() + return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options]) + class RepoInfo: options: dict[str, str] = {} @@ -85,6 +89,9 @@ class Distro: for package in repo.packages: results[package.name] = package + def config_snippet(self) -> str: + return '\n'.join(repo.config_snippet() for repo in self.repos) + _base_distros: dict[str, Distro] = None diff --git a/flash.py b/flash.py index fed839a..fc92dba 100644 --- a/flash.py +++ b/flash.py @@ -1,5 +1,5 @@ import atexit -from constants import BOOTIMG, LK2ND, LOCATIONS, QHYPSTUB, ROOTFS +from constants import FLASH_PARTS, LOCATIONS from fastboot import fastboot_flash import shutil from image import dump_bootimg, dump_lk2nd, dump_qhypstub, get_device_and_flavour, get_image_name @@ -7,7 +7,11 @@ import os import subprocess import click import tempfile -from logger import logging + +BOOTIMG = FLASH_PARTS['BOOTIMG'] +LK2ND = FLASH_PARTS['LK2ND'] +QHYPSTUB = FLASH_PARTS['QHYPSTUB'] +ROOTFS = FLASH_PARTS['ROOTFS'] @click.command(name='flash') @@ -17,13 +21,14 @@ def cmd_flash(what, location): device, flavour = get_device_and_flavour() image_name = get_image_name(device, flavour) + if what not in FLASH_PARTS.values(): + raise Exception(f'Unknown what "{what}", must be one of {", ".join(FLASH_PARTS.values())}') + if what == ROOTFS: - if location == None: - logging.info(f'You need to specify a location to flash {what} to') - exit(1) + if location is None: + raise Exception(f'You need to specify a location to flash {what} to') if location not in LOCATIONS: - logging.info(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)} for location') - exit(1) + raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}') path = '' dir = '/dev/disk/by-id' @@ -33,16 +38,13 @@ def cmd_flash(what, location): path = os.path.realpath(os.path.join(dir, file)) result = subprocess.run(['lsblk', path, '-o', 'SIZE'], capture_output=True) if result.returncode != 0: - logging.info(f'Failed to lsblk {path}') - exit(1) + raise Exception(f'Failed to lsblk {path}') if result.stdout == b'SIZE\n 0B\n': - logging.info( + raise Exception( f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no microSD inserted or no microSD card slot installed in the device) or corrupt or defect' ) - exit(1) if path == '': - logging.fatal(f'Unable to discover Jumpdrive') - exit(1) + raise Exception('Unable to discover Jumpdrive') image_dir = tempfile.gettempdir() image_path = os.path.join(image_dir, f'minimal-{image_name}') @@ -60,8 +62,7 @@ def cmd_flash(what, location): image_path, ]) if result.returncode != 0: - logging.fatal(f'Failed to e2fsck {image_path}') - exit(1) + raise Exception(f'Failed to e2fsck {image_path}') result = subprocess.run([ 'resize2fs', @@ -69,8 +70,7 @@ def cmd_flash(what, location): image_path, ]) if result.returncode != 0: - logging.fatal(f'Failed to resize2fs {image_path}') - exit(1) + raise Exception(f'Failed to resize2fs {image_path}') if location.endswith('-file'): part_mount = '/mnt/kupfer/fs' @@ -95,8 +95,7 @@ def cmd_flash(what, location): part_mount, ]) if result.returncode != 0: - logging.fatal(f'Failed to mount {path} to {part_mount}') - exit(1) + raise Exception(f'Failed to mount {path} to {part_mount}') dir = os.path.join(part_mount, '.stowaways') if not os.path.exists(dir): @@ -113,8 +112,7 @@ def cmd_flash(what, location): os.path.join(dir, 'kupfer.img'), ]) if result.returncode != 0: - logging.fatal(f'Failed to mount {path} to {part_mount}') - exit(1) + raise Exception(f'Failed to mount {path} to {part_mount}') else: result = subprocess.run([ 'dd', @@ -127,8 +125,7 @@ def cmd_flash(what, location): 'conv=sync,noerror', ]) if result.returncode != 0: - logging.info(f'Failed to flash {image_path} to {path}') - exit(1) + raise Exception(f'Failed to flash {image_path} to {path}') elif what == BOOTIMG: path = dump_bootimg(image_name) @@ -140,5 +137,4 @@ def cmd_flash(what, location): path = dump_qhypstub(image_name) fastboot_flash('qhypstub', path) else: - logging.fatal(f'Unknown what {what}') - exit(1) + raise Exception(f'Unknown what "{what}", this must be a bug in kupferbootstrap!') diff --git a/image.py b/image.py index b051eda..9e27913 100644 --- a/image.py +++ b/image.py @@ -3,25 +3,20 @@ import os import subprocess import click from logger import logging -from chroot import create_chroot, create_chroot_user, get_chroot_path +from chroot import create_chroot, create_chroot_user, get_chroot_path, run_chroot_cmd from constants import DEVICES, FLAVOURS, REPOSITORIES from config import config -def get_device_and_flavour() -> tuple[str, str]: - if not os.path.exists('.device'): - logging.fatal(f'Please set the device using \'kupferbootstrap image device ...\'') - exit(1) - if not os.path.exists('.flavour'): - logging.fatal(f'Please set the flavour using \'kupferbootstrap image flavour ...\'') - exit(1) +def get_device_and_flavour(profile=None) -> tuple[str, str]: + profile = config.get_profile(profile) + if not profile['device']: + raise Exception("Please set the device using 'kupferbootstrap config init ...'") - with open('.device', 'r') as file: - device = file.read() - with open('.flavour', 'r') as file: - flavour = file.read() + if not profile['flavour']: + raise Exception("Please set the flavour using 'kupferbootstrap config init ...'") - return (device, flavour) + return (profile['device'], profile['flavour']) def get_image_name(device, flavour) -> str: @@ -96,7 +91,7 @@ def dump_qhypstub(image_name: str) -> str: f'dump /boot/qhypstub.bin {path}', ]) if result.returncode != 0: - logging.fatal(f'Faild to dump qhypstub.bin') + logging.fatal('Faild to dump qhypstub.bin') exit(1) return path @@ -106,40 +101,11 @@ def cmd_image(): pass -@click.command(name='device') -@click.argument('device') -def cmd_device(device): - for key in DEVICES.keys(): - if '-'.join(key.split('-')[1:]) == device: - device = key - break - - if device not in DEVICES: - logging.fatal(f'Unknown device {device}. Pick one from:\n{", ".join(DEVICES.keys())}') - exit(1) - - logging.info(f'Setting device to {device}') - - with open('.device', 'w') as file: - file.write(device) - - -@click.command(name='flavour') -@click.argument('flavour') -def cmd_flavour(flavour): - if flavour not in FLAVOURS: - logging.fatal(f'Unknown flavour {flavour}. Pick one from:\n{", ".join(FLAVOURS.keys())}') - exit(1) - - logging.info(f'Setting flavour to {flavour}') - - with open('.flavour', 'w') as file: - file.write(flavour) - - @click.command(name='build') def cmd_build(): + profile = config.get_profile() device, flavour = get_device_and_flavour() + post_cmds = FLAVOURS[flavour].get('post_cmds', []) image_name = get_image_name(device, flavour) if not os.path.exists(image_name): @@ -172,14 +138,18 @@ def cmd_build(): else: url = 'https://gitlab.com/kupfer/packages/prebuilts/-/raw/main/$repo' extra_repos = {repo: {'Server': url} for repo in REPOSITORIES} - + packages = ['base', 'base-kupfer'] + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include'] create_chroot( chroot_name, - packages=['base', 'base-kupfer'] + DEVICES[device] + FLAVOURS[flavour], + packages=packages, pacman_conf='/app/local/etc/pacman.conf', extra_repos=extra_repos, ) - create_chroot_user(chroot_name) + create_chroot_user(chroot_name, user=profile['username'], password=profile['password']) + if post_cmds: + result = run_chroot_cmd(' && '.join(post_cmds, chroot_name)) + if result.returncode != 0: + raise Exception('Error running post_cmds') """ @@ -199,7 +169,5 @@ def cmd_inspect(): signal.pause() """ -cmd_image.add_command(cmd_device) -cmd_image.add_command(cmd_flavour) cmd_image.add_command(cmd_build) # cmd_image.add_command(cmd_inspect) diff --git a/logger.py b/logger.py index fc79ac4..081a104 100644 --- a/logger.py +++ b/logger.py @@ -1,6 +1,7 @@ import click import logging import sys +from traceback import format_exc as get_trace def setup_logging(verbose: bool): diff --git a/main.py b/main.py index 1004edf..b0f2bc0 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ from flash import cmd_flash from ssh import cmd_ssh from forwarding import cmd_forwarding from telnet import cmd_telnet -from logger import setup_logging, verbose_option +from logger import logging, setup_logging, verbose_option, get_trace import click from config import config, config_option from wrapper import enforce_wrap, nowrapper_option @@ -25,7 +25,12 @@ def cli(verbose: bool = False, config_file: str = None, no_wrapper: bool = False def main(): - return cli(prog_name='kupferbootstrap') + try: + return cli(prog_name='kupferbootstrap') + except Exception as err: + logging.debug(get_trace()) + logging.fatal(err) + exit(1) cli.add_command(cmd_cache)