a lot: profiles, some more help strings. partial: exceptions instead of exit()

Signed-off-by: InsanePrawn <insane.prawny@gmail.com>
This commit is contained in:
InsanePrawn 2021-09-29 02:00:59 +02:00
parent e705af21f5
commit f09deaa9a5
9 changed files with 190 additions and 106 deletions

16
boot.py
View file

@ -2,13 +2,20 @@ import os
import urllib.request
from image import get_device_and_flavour, get_image_name, dump_bootimg, dump_lk2nd
from fastboot import fastboot_boot, fastboot_erase_dtbo
from constants import BOOT_STRATEGIES, FASTBOOT, JUMPDRIVE, LK2ND, JUMPDRIVE_VERSION
from constants import BOOT_STRATEGIES, FLASH_PARTS, FASTBOOT, JUMPDRIVE, JUMPDRIVE_VERSION
import click
import logging
LK2ND = FLASH_PARTS['LK2ND']
BOOTIMG = FLASH_PARTS['BOOTIMG']
TYPES = [LK2ND, JUMPDRIVE, BOOTIMG]
@click.command(name='boot')
@click.argument('type', required=False)
@click.argument('type', required=False, default=BOOTIMG)
def cmd_boot(type):
f"""Flash one of {', '.join(TYPES)}"""
device, flavour = get_device_and_flavour()
image_name = get_image_name(device, flavour)
strategy = BOOT_STRATEGIES[device]
@ -21,8 +28,9 @@ def cmd_boot(type):
urllib.request.urlretrieve(f'https://github.com/dreemurrs-embedded/Jumpdrive/releases/download/{JUMPDRIVE_VERSION}/{file}', path)
elif type == LK2ND:
path = dump_lk2nd(image_name)
else:
elif type == BOOTIMG:
path = dump_bootimg(image_name)
else:
raise Exception(f'Unknown boot image type {type}')
fastboot_erase_dtbo()
fastboot_boot(path)

View file

@ -69,6 +69,22 @@ def create_chroot(
return chroot_path
def run_chroot_cmd(
script: str,
chroot_name,
chroot_base_path: str = None,
):
chroot_path = get_chroot_path(chroot_name, override_basepath=chroot_base_path)
result = subprocess.run([
'arch-chroot',
chroot_path,
'/bin/bash',
'-c',
script,
])
return result
def create_chroot_user(
chroot_name,
chroot_base_path: str = None,
@ -76,23 +92,18 @@ def create_chroot_user(
password='123456',
groups=['network', 'video', 'audio', 'optical', 'storage', 'input', 'scanner', 'games', 'lp', 'rfkill', 'wheel'],
):
chroot_path = get_chroot_path(chroot_name, override_basepath=chroot_base_path)
install_script = f'''
set -e
if ! id -u "{user}" >/dev/null 2>&1; then
useradd -m {user}
fi
usermod -a -G {",".join(groups)} {user}
echo "{user}:{password}" | chpasswd
chown {user}:{user} /home/{user} -R
'''
result = subprocess.run([
'arch-chroot',
chroot_path,
'/bin/bash',
'-c',
install_script,
])
if password:
install_script += f'echo "{user}:{password}" | chpasswd'
else:
install_script += 'passwd'
result = run_chroot_cmd(install_script, chroot_name=chroot_name, chroot_base_path=chroot_base_path)
if result.returncode != 0:
logging.fatal('Failed to setup user')
exit(1)
raise Exception('Failed to setup user')

View file

@ -7,7 +7,10 @@ import click
CONFIG_DEFAULT_PATH = os.path.join(appdirs.user_config_dir('kupfer'), 'kupferbootstrap.toml')
PROFILE_DEFAULTS = {
Profile = dict[str, str]
PROFILE_DEFAULTS: Profile = {
'parent': '',
'device': '',
'flavour': '',
'pkgs_include': [],
@ -30,6 +33,7 @@ CONFIG_DEFAULTS = {
'pkgbuilds': os.path.abspath(os.getcwd()),
},
'profiles': {
'current': 'default',
'default': deepcopy(PROFILE_DEFAULTS),
},
}
@ -41,6 +45,61 @@ CONFIG_RUNTIME_DEFAULTS = {
}
def resolve_profile(
name: str,
sparse_profiles: dict[str, Profile],
resolved: dict[str, Profile] = None,
_visited=None,
) -> dict[str, Profile]:
"""
Recursively resolves the specified profile by `name` and its parents to merge the config semantically,
applying include and exclude overrides along the hierarchy.
If `resolved` is passed `None`, a fresh dictionary will be created.
`resolved` will be modified in-place during parsing and also returned.
A sanitized `sparse_profiles` dict is assumed, no checking for unknown keys or incorrect data types is performed.
`_visited` should not be passed by users.
"""
if _visited is None:
_visited = list[str]()
if resolved is None:
resolved = dict[str, Profile]()
if name in _visited:
loop = list(_visited)
raise Exception(f'Dependency loop detected in profiles: {" -> ".join(loop+[loop[0]])}')
if name in resolved:
return resolved
_visited.append(name)
sparse = sparse_profiles[name]
full = deepcopy(sparse)
if 'parent' in sparse and (parent_name := sparse['parent']):
parent = resolve_profile(name=parent_name, sparse_profiles=sparse_profiles, resolved=resolved, _visited=_visited)[parent_name]
full = parent | sparse
# join our includes with parent's
includes = set(parent.get('pkgs_include', []) + sparse.get('pkgs_include', []))
if 'pkgs_exclude' in sparse:
includes -= set(sparse['pkgs_exclude'])
full['pkgs_include'] = list(includes)
# join our includes with parent's
excludes = set(parent.get('pkgs_exclude', []) + sparse.get('pkgs_exclude', []))
# our includes override parent excludes
if 'pkgs_include' in sparse:
excludes -= set(sparse['pkgs_include'])
full['pkgs_exclude'] = list(excludes)
# now init missing keys
for key, value in PROFILE_DEFAULTS.items():
if key not in full.keys():
full[key] = None
if type(value) == list:
full[key] = []
resolved[name] = full
return resolved
def sanitize_config(conf: dict, warn_missing_defaultprofile=True) -> dict:
"""checks the input config dict for unknown keys and returns only the known parts"""
return merge_configs(conf_new=conf, conf_base={}, warn_missing_defaultprofile=warn_missing_defaultprofile)
@ -78,7 +137,10 @@ def merge_configs(conf_new: dict, conf_base={}, warn_missing_defaultprofile=True
for profile_name, profile_conf in outer_conf.items():
if not isinstance(profile_conf, dict):
logging.warning('Skipped key "{profile_name}" in profile section: only subsections allowed')
if profile_name == 'current':
parsed[outer_name][profile_name] = profile_conf
else:
logging.warning('Skipped key "{profile_name}" in profile section: only subsections and "current" allowed')
continue
# init profile
@ -159,6 +221,7 @@ class ConfigStateHolder:
file: dict = {}
# runtime config not persisted anywhere
runtime: dict = CONFIG_RUNTIME_DEFAULTS
_profile_cache: dict[str, Profile] = None
def __init__(self, runtime_conf={}, file_conf_path: str = None, file_conf_base: dict = {}):
"""init a stateholder, optionally loading `file_conf_path`"""
@ -171,6 +234,7 @@ class ConfigStateHolder:
def try_load_file(self, config_file=None, base=CONFIG_DEFAULTS):
_conf_file = config_file if config_file is not None else CONFIG_DEFAULT_PATH
self.runtime['config_file'] = _conf_file
self._profile_cache = None
try:
self.file = parse_file(config_file=_conf_file, base=base)
except Exception as ex:
@ -190,6 +254,12 @@ class ConfigStateHolder:
msg = "File doesn't exist. Try running `kupferbootstrap config init` first?"
raise ConfigLoadException(extra_msg=msg, inner_exception=ex)
def get_profile(self, name: str = None):
if not name:
name = self.file['profiles']['current']
self._profile_cache = resolve_profile(name, self.file['profiles'], resolved=self._profile_cache)
return self._profile_cache[name]
config = ConfigStateHolder(file_conf_base=CONFIG_DEFAULTS)
@ -213,5 +283,16 @@ if __name__ == '__main__':
except ConfigLoadException as ex:
logging.fatal(str(ex))
conf = deepcopy(CONFIG_DEFAULTS)
conf['profiles']['pinephone'] = {'hostname': 'slowphone', 'pkgs_include': ['zsh', 'tmux', 'mpv', 'firefox']}
conf['profiles']['pinephone'] = {
'hostname': 'slowphone',
'parent': '',
'pkgs_include': ['zsh', 'tmux', 'mpv', 'firefox'],
'pkgs_exclude': ['pixman-git'],
}
conf['profiles']['yeetphone'] = {
'parent': 'pinephone',
'hostname': 'yeetphone',
'pkgs_include': ['pixman-git'],
'pkgs_exclude': ['tmux'],
}
print(toml.dumps(conf))

View file

@ -1,10 +1,10 @@
FASTBOOT = 'fastboot'
ROOTFS = 'rootfs'
BOOTIMG = 'bootimg'
LK2ND = 'lk2nd'
QHYPSTUB = 'qhypstub'
FLASH_PARTS = {
'ROOTFS': 'rootfs',
'BOOTIMG': 'bootimg',
'LK2ND': 'lk2nd',
'QHYPSTUB': 'qhypstub',
}
EMMC = 'emmc'
EMMCFILE = 'emmc-file'
MICROSD = 'microsd'
@ -28,9 +28,16 @@ DEVICES = {
}
FLAVOURS = {
'barebone': [],
'debug-shell': ['hook-debug-shell'],
'gnome': ['gnome'],
'barebone': {
'packages': []
},
'debug-shell': {
'packages': ['hook-debug-shell']
},
'gnome': {
'packages': ['gnome', 'archlinux-appstream-data', 'gnome-software-packagekit-plugin'],
'post_cmds': ['systemctl enable gdm']
},
}
REPOSITORIES = [

View file

@ -38,7 +38,7 @@ class Repo:
remote: bool
def scan(self):
self.resolved_url = resolve_url(self.url_template, self.repo_name, self.arch)
self.resolved_url = resolve_url(self.url_template, repo_name=self.repo_name, arch=self.arch)
self.remote = not self.resolved_url.startswith('file://')
# TODO
@ -50,6 +50,10 @@ class Repo:
if scan:
self.scan()
def config_snippet(self) -> str:
options = {'Server': self.url_template} | self.options.items()
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options])
class RepoInfo:
options: dict[str, str] = {}
@ -85,6 +89,9 @@ class Distro:
for package in repo.packages:
results[package.name] = package
def config_snippet(self) -> str:
return '\n'.join(repo.config_snippet() for repo in self.repos)
_base_distros: dict[str, Distro] = None

View file

@ -1,5 +1,5 @@
import atexit
from constants import BOOTIMG, LK2ND, LOCATIONS, QHYPSTUB, ROOTFS
from constants import FLASH_PARTS, LOCATIONS
from fastboot import fastboot_flash
import shutil
from image import dump_bootimg, dump_lk2nd, dump_qhypstub, get_device_and_flavour, get_image_name
@ -7,7 +7,11 @@ import os
import subprocess
import click
import tempfile
from logger import logging
BOOTIMG = FLASH_PARTS['BOOTIMG']
LK2ND = FLASH_PARTS['LK2ND']
QHYPSTUB = FLASH_PARTS['QHYPSTUB']
ROOTFS = FLASH_PARTS['ROOTFS']
@click.command(name='flash')
@ -17,13 +21,14 @@ def cmd_flash(what, location):
device, flavour = get_device_and_flavour()
image_name = get_image_name(device, flavour)
if what not in FLASH_PARTS.values():
raise Exception(f'Unknown what "{what}", must be one of {", ".join(FLASH_PARTS.values())}')
if what == ROOTFS:
if location == None:
logging.info(f'You need to specify a location to flash {what} to')
exit(1)
if location is None:
raise Exception(f'You need to specify a location to flash {what} to')
if location not in LOCATIONS:
logging.info(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)} for location')
exit(1)
raise Exception(f'Invalid location {location}. Choose one of {", ".join(LOCATIONS)}')
path = ''
dir = '/dev/disk/by-id'
@ -33,16 +38,13 @@ def cmd_flash(what, location):
path = os.path.realpath(os.path.join(dir, file))
result = subprocess.run(['lsblk', path, '-o', 'SIZE'], capture_output=True)
if result.returncode != 0:
logging.info(f'Failed to lsblk {path}')
exit(1)
raise Exception(f'Failed to lsblk {path}')
if result.stdout == b'SIZE\n 0B\n':
logging.info(
raise Exception(
f'Disk {path} has a size of 0B. That probably means it is not available (e.g. no microSD inserted or no microSD card slot installed in the device) or corrupt or defect'
)
exit(1)
if path == '':
logging.fatal(f'Unable to discover Jumpdrive')
exit(1)
raise Exception('Unable to discover Jumpdrive')
image_dir = tempfile.gettempdir()
image_path = os.path.join(image_dir, f'minimal-{image_name}')
@ -60,8 +62,7 @@ def cmd_flash(what, location):
image_path,
])
if result.returncode != 0:
logging.fatal(f'Failed to e2fsck {image_path}')
exit(1)
raise Exception(f'Failed to e2fsck {image_path}')
result = subprocess.run([
'resize2fs',
@ -69,8 +70,7 @@ def cmd_flash(what, location):
image_path,
])
if result.returncode != 0:
logging.fatal(f'Failed to resize2fs {image_path}')
exit(1)
raise Exception(f'Failed to resize2fs {image_path}')
if location.endswith('-file'):
part_mount = '/mnt/kupfer/fs'
@ -95,8 +95,7 @@ def cmd_flash(what, location):
part_mount,
])
if result.returncode != 0:
logging.fatal(f'Failed to mount {path} to {part_mount}')
exit(1)
raise Exception(f'Failed to mount {path} to {part_mount}')
dir = os.path.join(part_mount, '.stowaways')
if not os.path.exists(dir):
@ -113,8 +112,7 @@ def cmd_flash(what, location):
os.path.join(dir, 'kupfer.img'),
])
if result.returncode != 0:
logging.fatal(f'Failed to mount {path} to {part_mount}')
exit(1)
raise Exception(f'Failed to mount {path} to {part_mount}')
else:
result = subprocess.run([
'dd',
@ -127,8 +125,7 @@ def cmd_flash(what, location):
'conv=sync,noerror',
])
if result.returncode != 0:
logging.info(f'Failed to flash {image_path} to {path}')
exit(1)
raise Exception(f'Failed to flash {image_path} to {path}')
elif what == BOOTIMG:
path = dump_bootimg(image_name)
@ -140,5 +137,4 @@ def cmd_flash(what, location):
path = dump_qhypstub(image_name)
fastboot_flash('qhypstub', path)
else:
logging.fatal(f'Unknown what {what}')
exit(1)
raise Exception(f'Unknown what "{what}", this must be a bug in kupferbootstrap!')

View file

@ -3,25 +3,20 @@ import os
import subprocess
import click
from logger import logging
from chroot import create_chroot, create_chroot_user, get_chroot_path
from chroot import create_chroot, create_chroot_user, get_chroot_path, run_chroot_cmd
from constants import DEVICES, FLAVOURS, REPOSITORIES
from config import config
def get_device_and_flavour() -> tuple[str, str]:
if not os.path.exists('.device'):
logging.fatal(f'Please set the device using \'kupferbootstrap image device ...\'')
exit(1)
if not os.path.exists('.flavour'):
logging.fatal(f'Please set the flavour using \'kupferbootstrap image flavour ...\'')
exit(1)
def get_device_and_flavour(profile=None) -> tuple[str, str]:
profile = config.get_profile(profile)
if not profile['device']:
raise Exception("Please set the device using 'kupferbootstrap config init ...'")
with open('.device', 'r') as file:
device = file.read()
with open('.flavour', 'r') as file:
flavour = file.read()
if not profile['flavour']:
raise Exception("Please set the flavour using 'kupferbootstrap config init ...'")
return (device, flavour)
return (profile['device'], profile['flavour'])
def get_image_name(device, flavour) -> str:
@ -96,7 +91,7 @@ def dump_qhypstub(image_name: str) -> str:
f'dump /boot/qhypstub.bin {path}',
])
if result.returncode != 0:
logging.fatal(f'Faild to dump qhypstub.bin')
logging.fatal('Faild to dump qhypstub.bin')
exit(1)
return path
@ -106,40 +101,11 @@ def cmd_image():
pass
@click.command(name='device')
@click.argument('device')
def cmd_device(device):
for key in DEVICES.keys():
if '-'.join(key.split('-')[1:]) == device:
device = key
break
if device not in DEVICES:
logging.fatal(f'Unknown device {device}. Pick one from:\n{", ".join(DEVICES.keys())}')
exit(1)
logging.info(f'Setting device to {device}')
with open('.device', 'w') as file:
file.write(device)
@click.command(name='flavour')
@click.argument('flavour')
def cmd_flavour(flavour):
if flavour not in FLAVOURS:
logging.fatal(f'Unknown flavour {flavour}. Pick one from:\n{", ".join(FLAVOURS.keys())}')
exit(1)
logging.info(f'Setting flavour to {flavour}')
with open('.flavour', 'w') as file:
file.write(flavour)
@click.command(name='build')
def cmd_build():
profile = config.get_profile()
device, flavour = get_device_and_flavour()
post_cmds = FLAVOURS[flavour].get('post_cmds', [])
image_name = get_image_name(device, flavour)
if not os.path.exists(image_name):
@ -172,14 +138,18 @@ def cmd_build():
else:
url = 'https://gitlab.com/kupfer/packages/prebuilts/-/raw/main/$repo'
extra_repos = {repo: {'Server': url} for repo in REPOSITORIES}
packages = ['base', 'base-kupfer'] + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
create_chroot(
chroot_name,
packages=['base', 'base-kupfer'] + DEVICES[device] + FLAVOURS[flavour],
packages=packages,
pacman_conf='/app/local/etc/pacman.conf',
extra_repos=extra_repos,
)
create_chroot_user(chroot_name)
create_chroot_user(chroot_name, user=profile['username'], password=profile['password'])
if post_cmds:
result = run_chroot_cmd(' && '.join(post_cmds, chroot_name))
if result.returncode != 0:
raise Exception('Error running post_cmds')
"""
@ -199,7 +169,5 @@ def cmd_inspect():
signal.pause()
"""
cmd_image.add_command(cmd_device)
cmd_image.add_command(cmd_flavour)
cmd_image.add_command(cmd_build)
# cmd_image.add_command(cmd_inspect)

View file

@ -1,6 +1,7 @@
import click
import logging
import sys
from traceback import format_exc as get_trace
def setup_logging(verbose: bool):

View file

@ -6,7 +6,7 @@ from flash import cmd_flash
from ssh import cmd_ssh
from forwarding import cmd_forwarding
from telnet import cmd_telnet
from logger import setup_logging, verbose_option
from logger import logging, setup_logging, verbose_option, get_trace
import click
from config import config, config_option
from wrapper import enforce_wrap, nowrapper_option
@ -25,7 +25,12 @@ def cli(verbose: bool = False, config_file: str = None, no_wrapper: bool = False
def main():
return cli(prog_name='kupferbootstrap')
try:
return cli(prog_name='kupferbootstrap')
except Exception as err:
logging.debug(get_trace())
logging.fatal(err)
exit(1)
cli.add_command(cmd_cache)