Compare commits

...

25 Commits

Author SHA1 Message Date
InsanePrawn
2ad4690c0a {devices,flavours}/cli: add --output-file for json dumping 2023-03-17 16:34:20 +01:00
InsanePrawn
161e14a438 distro/repo: scan(): add allow_failure parameter 2023-03-17 16:34:20 +01:00
InsanePrawn
066b6abaaa distro/distro: add scan parameter to get_base_distro() 2023-03-17 16:34:20 +01:00
InsanePrawn
9f5bafab57 distro/package: fix DESC parser 2023-03-17 16:34:20 +01:00
InsanePrawn
272d55b735 devices/cli: add --force-parse-deviceinfo and --download-packages 2023-03-17 16:34:20 +01:00
InsanePrawn
af1d8d1737 flavours/cli: clean up json. (add architectures, flatten flavour_info, etc.) 2023-03-17 16:34:20 +01:00
InsanePrawn
78874a15e6 packages/cli: linter fixes 2023-03-17 16:34:20 +01:00
InsanePrawn
f38fb798bc devices: don't pass config to parse_pkgbuild*() unnecessarily, that's only for multiprocessing 2023-03-17 16:34:20 +01:00
InsanePrawn
de7b597518 logger: add --quiet flag to disable non-error logging 2023-03-17 16:34:20 +01:00
InsanePrawn
f140fa36ce flavours/cli: colorise output, add -j/--json arg 2023-03-17 16:34:20 +01:00
InsanePrawn
69c73e41dd devices/cli: colorize output 2023-03-17 16:34:20 +01:00
InsanePrawn
e269841038 utils: add colors_supported() and color_str() for terminal colors 2023-03-17 16:34:20 +01:00
InsanePrawn
932e739255 devices/cli: add --json parameter 2023-03-17 16:34:20 +01:00
InsanePrawn
63156776a2 devices/cli: make device list way more readable, add package name and path, mark currently selected 2023-03-17 16:34:20 +01:00
InsanePrawn
5edfac42ce main.py: default colors to isatty(stdout) if force_colors is None 2023-03-17 16:34:20 +01:00
InsanePrawn
00613096d5 config/state: add config.runtime.colors, fill in main.py 2023-03-17 16:34:20 +01:00
InsanePrawn
c4797c709f logger: disable raising exceptions, e.g. when stdout is closed 2023-03-17 16:34:20 +01:00
InsanePrawn
28c68418a6 packages/build: get_unbuilt_package_levels(): use force=True while updating pkgbar 2023-03-17 16:34:20 +01:00
InsanePrawn
cc1b4b3ee2 packages/build: redirect output from stderr to stdout 2023-03-17 16:34:20 +01:00
InsanePrawn
ff3b5e70dd progressbar: add ellipsize() 2023-03-17 16:34:20 +01:00
InsanePrawn
ac25266a00 packages: build: use progress bars for get_unbuilt_pkg_levels() and build_packages() 2023-03-17 16:34:20 +01:00
InsanePrawn
c99463a0f6 progressbar: new module based on enlighten 2023-03-17 16:34:20 +01:00
InsanePrawn
6d6f582b71 exec/cmd: fix up stderr and stdout handling, fix capture_output overwriting env 2023-03-17 16:34:20 +01:00
InsanePrawn
785e41f8b7 logger: add --force-colors/--no-colors cli flag 2023-03-17 16:34:20 +01:00
InsanePrawn
4d03f238bb CI: fix for docker buildx 2023-03-17 16:26:53 +01:00
25 changed files with 475 additions and 100 deletions

View File

@@ -46,7 +46,6 @@ build_docker:
image: docker:latest
services: ['docker:dind']
variables:
DOCKER_DRIVER: vfs # overlay2 is not available on ZFS
DOCKER_TLS_CERTDIR: ""
script:
- 'docker build --pull -t "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHA}" -t "${CI_REGISTRY_IMAGE}:${CI_COMMIT_REF_SLUG}" .'

View File

@@ -10,7 +10,7 @@ RUN pacman-key --init && \
android-tools openssh inetutils \
parted
RUN sed -i "s/EUID == 0/EUID == -1/g" $(which makepkg)
RUN sed -i "s/EUID == 0/EUID == -1/g" "$(which makepkg)"
RUN yes | pacman -Scc

View File

@@ -2,6 +2,8 @@ import atexit
import logging
import os
import subprocess
import sys
from copy import deepcopy
from shlex import quote as shell_quote
from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping
@@ -10,7 +12,7 @@ from uuid import uuid4
from config.state import config
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.cmd import FileDescriptor, run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.file import makedir, root_makedir, root_write_file, write_file
from generator import generate_makepkg_conf
from utils import mount, umount, check_findmnt, log_or_exception
@@ -58,7 +60,8 @@ class AbstractChroot(Protocol):
capture_output: bool,
cwd: str,
fail_inactive: bool,
stdout: Optional[int],
stdout: Optional[FileDescriptor],
stderr: Optional[FileDescriptor],
):
pass
@@ -222,7 +225,8 @@ class Chroot(AbstractChroot):
capture_output: bool = False,
cwd: Optional[str] = None,
fail_inactive: bool = True,
stdout: Optional[int] = None,
stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None,
switch_user: Optional[str] = None,
) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive:
@@ -246,7 +250,7 @@ class Chroot(AbstractChroot):
inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
return self.mount(
@@ -371,20 +375,22 @@ class Chroot(AbstractChroot):
packages: list[str],
refresh: bool = False,
allow_fail: bool = True,
redirect_stderr: bool = True,
) -> dict[str, Union[int, subprocess.CompletedProcess]]:
"""Try installing packages, fall back to installing one by one"""
results = {}
stderr = sys.stdout if redirect_stderr else sys.stderr
if refresh:
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm')
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm', stderr=stderr)
cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}')
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}', stderr=stderr)
assert isinstance(result, subprocess.CompletedProcess)
results |= {package: result for package in packages}
if result.returncode != 0 and allow_fail:
results = {}
logging.debug('Falling back to serial installation')
for pkg in set(packages):
results[pkg] = self.run_cmd(f'{cmd} {pkg}')
results[pkg] = self.run_cmd(f'{cmd} {pkg}', stderr=stderr)
return results

View File

@@ -1,5 +1,6 @@
import logging
import os
import sys
from glob import glob
from shutil import rmtree
@@ -31,17 +32,20 @@ class BaseChroot(Chroot):
logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}')
result = run_root_cmd([
'pacstrap',
'-C',
pacman_conf_target,
'-G',
self.path,
] + self.base_packages + [
'--needed',
'--overwrite=*',
'-yyuu',
])
result = run_root_cmd(
[
'pacstrap',
'-C',
pacman_conf_target,
'-G',
self.path,
*self.base_packages,
'--needed',
'--overwrite=*',
'-yyuu',
],
stderr=sys.stdout,
)
if result.returncode != 0:
raise Exception(f'Failed to initialize chroot "{self.name}"')
self.initialized = True

View File

@@ -135,7 +135,7 @@ def prompt_profile_device(current: Optional[str], profile_name: str) -> tuple[st
devices = get_devices()
print(click.style("Pick your device!\nThese are the available devices:", bold=True))
for dev in sorted(devices.keys()):
print(devices[dev])
print(f"{devices[dev]}\n")
return prompt_choice(current, f'profiles.{profile_name}.device', devices.keys())

View File

@@ -147,6 +147,8 @@ class RuntimeConfiguration(DataClass):
script_source_dir: Optional[str]
arch: Optional[Arch]
uid: Optional[int]
progress_bars: Optional[bool]
colors: Optional[bool]
class ConfigLoadState(DataClass):

View File

@@ -61,6 +61,8 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'script_source_dir': None,
'arch': None,
'uid': None,
'progress_bars': None,
'colors': None,
})

View File

@@ -1,13 +1,84 @@
import click
import logging
from .device import get_devices
from json import dumps as json_dump
from typing import Optional
from config.state import config
from utils import colors_supported, color_str
from .device import get_devices, get_profile_device
@click.command(name='devices')
def cmd_devices():
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
@click.option(
'--force-parse-deviceinfo/--no-parse-deviceinfo',
is_flag=True,
default=None,
help="Force or disable deviceinfo parsing. The default is to try but continue if it fails.",
)
@click.option(
'--download-packages/--no-download-packages',
is_flag=True,
default=False,
help='Download packages while trying to parse deviceinfo',
)
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
def cmd_devices(
json: bool = False,
force_parse_deviceinfo: Optional[bool] = True,
download_packages: bool = False,
output_file: Optional[str] = None,
):
'list the available devices and descriptions'
devices = get_devices()
if not devices:
raise Exception("No devices found!")
for d in sorted(devices.keys()):
print(devices[d])
profile_device = None
try:
dev = get_profile_device()
assert dev
profile_device = dev
except Exception as ex:
logging.debug(f"Failed to get profile device for visual highlighting, not a problem: {ex}")
output = ['']
json_output = {}
interactive_json = json and not output_file
if output_file:
json = True
use_colors = colors_supported(False if interactive_json else config.runtime.colors)
for name in sorted(devices.keys()):
prefix = ''
suffix = ''
device = devices[name]
assert device
if force_parse_deviceinfo in [None, True]:
try:
device.parse_deviceinfo(try_download=download_packages)
except Exception as ex:
if not force_parse_deviceinfo:
logging.debug(f"Failed to parse deviceinfo for extended description, not a problem: {ex}")
else:
raise ex
if json:
json_output[name] = device.get_summary().toDict()
if interactive_json:
continue
if profile_device and profile_device.name == device.name:
prefix = color_str('>>> ', bold=True, fg="bright_green", use_colors=use_colors)
suffix = '\n\n'
suffix += color_str('Currently selected by profile', bold=True, use_colors=use_colors) + " "
suffix += color_str(f'"{config.file.profiles.current}"', bold=True, fg="bright_green", use_colors=use_colors)
snippet = f'{device.nice_str(colors=use_colors, newlines=True)}{suffix}'
# prefix each line in the snippet
snippet = '\n'.join([f'{prefix}{line}' for line in snippet.split('\n')])
output.append(f"{snippet}\n")
if interactive_json:
output = ['\n' + json_dump(json_output, indent=4)]
if output_file:
with open(output_file, 'w') as fd:
fd.write(json_dump(json_output))
for line in output:
print(line)

View File

@@ -9,7 +9,7 @@ from config.scheme import DataClass, munchclass
from distro.distro import get_kupfer_local
from distro.package import LocalPackage
from packages.pkgbuild import Pkgbuild, _pkgbuilds_cache, discover_pkgbuilds, get_pkgbuild_by_path, init_pkgbuilds
from utils import read_files_from_tar
from utils import read_files_from_tar, color_str
from .deviceinfo import DeviceInfo, parse_deviceinfo
@@ -22,6 +22,27 @@ DEVICE_DEPRECATIONS = {
}
class DeviceSummary(DataClass):
name: str
description: str
arch: str
package_name: Optional[str]
package_path: Optional[str]
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
separator = '\n' if newlines else ', '
assert bool(self.package_path) == bool(self.package_name)
package_path = {"Package Path": self.package_path} if self.package_path else {}
fields = {
"Device": self.name,
"Description": self.description or f"[no package {'description' if self.package_name else 'associated (?!)'} and deviceinfo not parsed]",
"Architecture": self.arch,
"Package Name": self.package_name or "no package associated. PROBABLY A BUG!",
**package_path,
}
return separator.join([f"{color_str(name, bold=True, use_colors=colors)}: {value}" for name, value in fields.items()])
@munchclass()
class Device(DataClass):
name: str
@@ -30,8 +51,24 @@ class Device(DataClass):
deviceinfo: Optional[DeviceInfo]
def __repr__(self):
return (f'Device "{self.name}": "{self.package.description if self.package else ""}", '
f'Architecture: {self.arch}, package: {self.package.name if self.package else "??? PROBABLY A BUG!"}')
return f'Device<{self.name},{self.arch},{self.package.path if self.package else "[no package]"}>'
def __str__(self):
return self.nice_str(newlines=True)
def nice_str(self, *args, **kwargs) -> str:
return self.get_summary().nice_str(*args, **kwargs)
def get_summary(self) -> DeviceSummary:
result: dict[str, Optional[str]] = {}
description = ((self.package.description if self.package else "").strip() or
(self.deviceinfo.get("name", "[No name in deviceinfo]") if self.deviceinfo else "")).strip()
result["name"] = self.name
result["description"] = description
result["arch"] = self.arch
result["package_name"] = self.package.name if self.package else None
result["package_path"] = self.package.path if self.package else None
return DeviceSummary(result)
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True):
if not lazy or 'deviceinfo' not in self or self.deviceinfo is None:
@@ -146,7 +183,7 @@ def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy:
if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path)):
logging.debug(f'Exact device pkgbuild path "pkgbuilds/{relative_path}" doesn\'t exist, scanning entire repo')
return get_device(name, pkgbuilds=pkgbuilds, lazy=lazy, scan_all=True)
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy, _config=config) if p.name == pkgname][0]
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy) if p.name == pkgname][0]
device = parse_device_pkg(pkgbuild)
if lazy:
_device_cache[name] = device

View File

@@ -42,7 +42,7 @@ ONEPLUS_ENCHILADA_PKG = f'device-{ONEPLUS_ENCHILADA}'
def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder):
config = initialise_pkgbuilds_dir
config.try_load_file()
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), _config=config)[0]
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG))[0]
def validate_oneplus_enchilada(d: Device):

View File

@@ -72,9 +72,9 @@ class RemoteDistro(Distro[RemoteRepo]):
return RemoteRepo(**kwargs)
def get_base_distro(arch: str) -> RemoteDistro:
def get_base_distro(arch: str, scan: bool = False) -> RemoteDistro:
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
return RemoteDistro(arch=arch, repo_infos=repos, scan=False)
return RemoteDistro(arch=arch, repo_infos=repos, scan=scan)
def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:

View File

@@ -17,7 +17,7 @@ class BinaryPackage(PackageInfo):
arch: str
filename: str
resolved_url: Optional[str]
_desc: Optional[dict[str, str]]
_desc: Optional[dict[str, str | list[str]]]
def __init__(
self,
@@ -39,17 +39,25 @@ class BinaryPackage(PackageInfo):
@classmethod
def parse_desc(clss, desc_str: str, resolved_repo_url=None):
"""Parses a desc file, returning a PackageInfo"""
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
desc = {}
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
desc[key.strip()] = value.strip()
desc: dict[str, str | list[str]] = {}
for segment in f'\n{desc_str}'.split('\n%'):
if not segment.strip():
continue
key, elements = (e.strip() for e in segment.strip().split('%\n', 1))
elements_split = elements.split('\n')
desc[key] = elements if len(elements_split) == 1 else elements_split
validated: dict[str, str] = {}
for key in ['NAME', 'VERSION', 'ARCH', 'FILENAME']:
assert key in desc
value = desc[key]
assert isinstance(value, str)
validated[key] = value
p = clss(
name=desc['NAME'],
version=desc['VERSION'],
arch=desc['ARCH'],
filename=desc['FILENAME'],
resolved_url='/'.join([resolved_repo_url, desc['FILENAME']]),
name=validated['NAME'],
version=validated['VERSION'],
arch=validated['ARCH'],
filename=validated['FILENAME'],
resolved_url='/'.join([resolved_repo_url, validated['FILENAME']]),
)
p._desc = desc
return p

View File

@@ -39,21 +39,39 @@ class Repo(RepoInfo, Generic[BinaryPackageType]):
def resolve_url(self) -> str:
return resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
def scan(self):
def scan(self, allow_failure: bool = False) -> bool:
failed = False
self.resolved_url = self.resolve_url()
self.remote = not self.resolved_url.startswith('file://')
path = self.acquire_db_file()
try:
path = self.acquire_db_file()
index = tarfile.open(path)
except Exception as ex:
if not allow_failure:
raise ex
logging.error(f"Repo {self.name}, {self.arch}: Error acquiring repo DB: {ex!r}")
return False
logging.debug(f'Parsing repo file at {path}')
with tarfile.open(path) as index:
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
fd = index.extractfile(node)
assert fd
pkg = self._parse_desc(fd.read().decode())
self.packages[pkg.name] = pkg
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
pkgname = os.path.dirname(node.name)
logging.debug(f'Parsing desc file for {pkgname}')
fd = index.extractfile(node)
assert fd
contents = fd.read().decode()
try:
pkg = self._parse_desc(contents)
except Exception as ex:
if not allow_failure:
raise ex
logging.error(f'Repo {self.name}, {self.arch}: Error parsing desc for "{pkgname}": {ex!r}')
failed = True
continue
self.packages[pkg.name] = pkg
if failed:
return False
self.scanned = True
return True
def _parse_desc(self, desc_text: str): # can't annotate the type properly :(
raise NotImplementedError()

View File

@@ -5,10 +5,12 @@ import subprocess
from subprocess import CompletedProcess # make it easy for users of this module
from shlex import quote as shell_quote
from typing import Optional, Union, TypeAlias
from typing import IO, Optional, Union, TypeAlias
ElevationMethod: TypeAlias = str
FileDescriptor: TypeAlias = Union[int, IO]
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
@@ -89,8 +91,8 @@ def run_cmd(
cwd: Optional[str] = None,
switch_user: Optional[str] = None,
elevation_method: Optional[ElevationMethod] = None,
stdout: Optional[int] = None,
stderr=None,
stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None,
) -> Union[CompletedProcess, int]:
"execute `script` as `switch_user`, elevating and su'ing as necessary"
kwargs: dict = {}
@@ -99,10 +101,12 @@ def run_cmd(
env_cmd = generate_env_cmd(env)
kwargs['env'] = env
if not attach_tty:
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
if stderr:
kwargs['stderr'] = stderr
if (stdout, stderr) == (None, None):
kwargs['capture_output'] = capture_output
else:
for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
if fd is not None:
kwargs[name] = fd
script = flatten_shell_script(script)
if cwd:
kwargs['cwd'] = cwd

View File

@@ -1,20 +1,70 @@
import click
import logging
from .flavour import get_flavours
from json import dumps as json_dump
from typing import Optional
from config.state import config
from utils import colors_supported, color_str
from .flavour import get_flavours, get_profile_flavour
profile_option = click.option('-p', '--profile', help="name of the profile to use", required=False, default=None)
@click.command(name='flavours')
def cmd_flavours():
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
'list information about available flavours'
results = []
json_results = {}
profile_flavour = None
flavours = get_flavours()
interactive_json = json and not output_file
use_colors = colors_supported(config.runtime.colors) and not interactive_json
if output_file:
json = True
if not flavours:
raise Exception("No flavours found!")
if not interactive_json:
try:
profile_flavour = get_profile_flavour()
except Exception as ex:
logging.debug(f"Failed to get profile flavour for marking as currently selected, continuing anyway. Exception: {ex}")
for name in sorted(flavours.keys()):
f = flavours[name]
try:
f.parse_flavourinfo()
except:
pass
print(f)
except Exception as ex:
logging.debug(f"A problem happened while parsing flavourinfo for {name}, continuing anyway. Exception: {ex}")
if not interactive_json:
block = [*f.nice_str(newlines=True, colors=use_colors).split('\n'), '']
if profile_flavour == f:
prefix = color_str('>>> ', bold=True, fg='bright_green', use_colors=use_colors)
block += [
color_str("Currently selected by profile ", bold=True, use_colors=use_colors) +
color_str(f'"{config.file.profiles.current}"\n', bold=True, fg="bright_green")
]
block = [prefix + line for line in block]
results += block
if json:
d = dict(f)
d["description"] = f.flavour_info.description if (f.flavour_info and f.flavour_info.description) else f.description
if "flavour_info" in d and d["flavour_info"]:
for k in set(d["flavour_info"].keys()) - set(['description']):
d[k] = d["flavour_info"][k]
del d["flavour_info"]
d["pkgbuild"] = f.pkgbuild.path if f.pkgbuild else None
d["package"] = f.pkgbuild.name
d["arches"] = sorted(f.pkgbuild.arches) if f.pkgbuild else None
json_results[d["name"]] = d
print()
if output_file:
with open(output_file, 'w') as fd:
fd.write(json_dump(json_results))
if interactive_json:
print(json_dump(json_results, indent=4))
else:
for r in results:
print(r)

View File

@@ -4,16 +4,16 @@ import json
import logging
import os
from dataclasses import dataclass
from typing import Optional
from config.state import config
from constants import FLAVOUR_DESCRIPTION_PREFIX, FLAVOUR_INFO_FILE
from dataclass import DataClass
from packages.pkgbuild import discover_pkgbuilds, get_pkgbuild_by_name, init_pkgbuilds, Pkgbuild
from utils import color_str
@dataclass
class FlavourInfo:
class FlavourInfo(DataClass):
rootfs_size: int # rootfs size in GB
description: Optional[str]
@@ -21,8 +21,7 @@ class FlavourInfo:
return f'rootfs_size: {self.rootfs_size}'
@dataclass
class Flavour:
class Flavour(DataClass):
name: str
pkgbuild: Pkgbuild
description: str
@@ -43,7 +42,27 @@ class Flavour:
return Flavour(name=name, pkgbuild=pkgbuild, description=description.strip(), flavour_info=None)
def __repr__(self):
return f'Flavour "{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}'
return f'Flavour<"{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}>'
def __str__(self):
return self.nice_str()
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
separator = '\n' if newlines else ', '
def get_lines(k, v, key_prefix=''):
results = []
full_k = f'{key_prefix}.{k}' if key_prefix else k
if not isinstance(v, (dict, DataClass)):
results = [f'{color_str(full_k, bold=True)}: {v}']
else:
for _k, _v in v.items():
if _k.startswith('_'):
continue
results += get_lines(_k, _v, key_prefix=full_k)
return results
return separator.join(get_lines(None, self))
def parse_flavourinfo(self, lazy: bool = True):
if lazy and self.flavour_info is not None:

View File

@@ -64,7 +64,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore
sectors = blocks * sectors_blocks_factor #+ 157812 - 25600
sectors = blocks * sectors_blocks_factor
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
child_proccess = subprocess.Popen(

View File

@@ -3,11 +3,13 @@ import coloredlogs
import logging
import sys
from typing import Optional
def setup_logging(verbose: bool, log_setup: bool = True):
def setup_logging(verbose: bool, quiet: bool = False, force_colors: Optional[bool] = None, log_setup: bool = True):
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
level = logging.DEBUG if verbose else logging.INFO
level = logging.DEBUG if verbose and not quiet else (logging.INFO if not quiet else logging.ERROR)
coloredlogs.install(
stream=sys.stdout,
fmt='%(asctime)s %(levelname)s: %(message)s',
@@ -15,9 +17,14 @@ def setup_logging(verbose: bool, log_setup: bool = True):
level=level,
level_styles=level_colors,
field_styles=field_colors,
isatty=force_colors,
)
# don't raise Exceptions when e.g. output stream is closed
logging.raiseExceptions = False
if log_setup:
logging.debug('Logging set up.')
logging.debug('Logger: Logging set up.')
if force_colors is not None:
logging.debug(f'Logger: Force-{"en" if force_colors else "dis"}abled colors')
verbose_option = click.option(
@@ -26,3 +33,17 @@ verbose_option = click.option(
is_flag=True,
help='Enables verbose logging',
)
quiet_option = click.option(
'-q',
'--quiet',
is_flag=True,
help='Disable most logging, only log errors. (Currently only affects KBS logging, not called subprograms)',
)
color_option = click.option(
'--force-colors/--no-colors',
is_flag=True,
default=None,
help='Force enable/disable log coloring. Defaults to autodetection.',
)

22
main.py
View File

@@ -3,11 +3,13 @@
import click
import subprocess
from os import isatty
from traceback import format_exc, format_exception_only, format_tb
from typing import Optional
from logger import logging, setup_logging, verbose_option
from logger import color_option, logging, quiet_option, setup_logging, verbose_option
from wrapper import nowrapper_option, enforce_wrap
from progressbar import progress_bars_option
from config.cli import config, config_option, cmd_config
from packages.cli import cmd_packages
@@ -22,11 +24,25 @@ from image.cli import cmd_image
@click.group()
@click.option('--error-shell', '-E', 'error_shell', is_flag=True, default=False, help='Spawn shell after error occurs')
@verbose_option
@quiet_option
@config_option
@nowrapper_option
def cli(verbose: bool = False, config_file: Optional[str] = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
setup_logging(verbose)
@color_option
@progress_bars_option
def cli(
verbose: bool = False,
quiet: bool = False,
config_file: Optional[str] = None,
wrapper_override: Optional[bool] = None,
error_shell: bool = False,
force_colors: Optional[bool] = None,
force_progress_bars: Optional[bool] = None,
):
setup_logging(verbose, quiet=quiet, force_colors=force_colors)
# stdout is fd 1
config.runtime.colors = isatty(1) if force_colors is None else force_colors
config.runtime.verbose = verbose
config.runtime.progress_bars = force_progress_bars
config.runtime.no_wrap = wrapper_override is False
config.runtime.error_shell = error_shell
config.try_load_file(config_file)

View File

@@ -3,6 +3,7 @@ import multiprocessing
import os
import shutil
import subprocess
import sys
from copy import deepcopy
from urllib.error import HTTPError
@@ -17,8 +18,9 @@ from chroot.build import get_build_chroot, BuildChroot
from distro.distro import get_kupfer_https, get_kupfer_local
from distro.package import RemotePackage, LocalPackage
from distro.repo import LocalRepo
from progressbar import BAR_PADDING, get_levels_bar
from wrapper import check_programs_wrap, is_wrapped
from utils import sha256sum
from utils import ellipsize, sha256sum
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild
@@ -227,7 +229,7 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
target_file,
]
logging.debug(f'repo: running cmd: {cmd}')
result = run_cmd(cmd)
result = run_cmd(cmd, stderr=sys.stdout)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed add package {target_file} to repo {repo_name}')
@@ -274,8 +276,8 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]:
logging.debug(f"checking if we can download {package.name}")
filename = os.path.basename(dest_file_path)
logging.debug(f"checking if we can download {filename}")
pkgname = package.name
repo_name = package.repo
repos = get_kupfer_https(arch, scan=True).repos
@@ -473,7 +475,7 @@ def setup_sources(package: Pkgbuild, lazy: bool = True):
assert config.runtime.arch
chroot = setup_build_chroot(config.runtime.arch)
logging.info(f'{package.path}: Setting up sources with makepkg')
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer')
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer', stderr=sys.stdout)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}')
@@ -583,6 +585,7 @@ def build_package(
inner_env=env,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
switch_user=build_user,
stderr=sys.stdout,
)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
@@ -635,17 +638,33 @@ def get_unbuilt_package_levels(
includes_dependants = " (includes dependants)" if rebuild_dependants else ""
logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}")
i = 0
for level_packages in package_levels:
total_levels = len(package_levels)
package_bar = get_levels_bar(
total=sum([len(lev) for lev in package_levels]),
desc=f"Checking pkgs ({arch})",
unit='pkgs',
fields={"levels_total": total_levels},
enable_rate=False,
)
counter_built = package_bar.add_subcounter('green')
counter_unbuilt = package_bar.add_subcounter('blue')
for level_num, level_packages in enumerate(package_levels):
level_num = level_num + 1
package_bar.update(0, name=" " * BAR_PADDING, level=level_num)
level = set[Pkgbuild]()
if not level_packages:
continue
def add_to_level(pkg, level, reason=''):
if reason:
reason = f': {reason}'
logging.info(f"Level {i} ({arch}): Adding {package.path}{reason}")
counter_unbuilt.update(force=True)
logging.info(f"Level {level}/{total_levels} ({arch}): Adding {package.path}{reason}")
level.add(package)
build_names.update(package.names())
for package in level_packages:
package_bar.update(0, force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
if (force and package in packages):
add_to_level(package, level, 'query match and force=True')
elif rebuild_dependants and package in dependants:
@@ -653,12 +672,14 @@ def get_unbuilt_package_levels(
elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources):
add_to_level(package, level, 'package unbuilt')
else:
logging.info(f"Level {i}: {package.path} ({arch}): Package doesn't need [re]building")
logging.info(f"Level {level_num}/{total_levels} ({arch}): {package.path}: Package doesn't need [re]building")
counter_built.update(force=True)
logging.debug(f'Finished checking level {level_num}/{total_levels} ({arch}). Adding unbuilt pkgs: {get_pkg_names_str(level)}')
if level:
build_levels.append(level)
logging.debug(f'Finished checking level {i}. Adding unbuilt pkgs: {get_pkg_names_str(level)}')
i += 1
package_bar.close(clear=True)
return build_levels
@@ -691,11 +712,24 @@ def build_packages(
logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}")
total_levels = len(build_levels)
package_bar = get_levels_bar(
desc=f'Building pkgs ({arch})',
color='purple',
unit='pkgs',
total=sum([len(lev) for lev in build_levels]),
fields={"levels_total": total_levels},
enable_rate=False,
)
files = []
updated_repos: set[str] = set()
package_bar.update(-1)
for level, need_build in enumerate(build_levels):
logging.info(f"(Level {level}) Building {get_pkg_names_str(need_build)}")
level = level + 1
package_bar.update(incr=0, force=True, name=" " * BAR_PADDING, level=level)
logging.info(f"(Level {level}/{total_levels}) Building {get_pkg_names_str(need_build)}")
for package in need_build:
package_bar.update(force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
base = package.pkgbase if isinstance(package, SubPkgbuild) else package
assert isinstance(base, Pkgbase)
if package.is_built(arch):
@@ -714,11 +748,14 @@ def build_packages(
for _arch in ['any', arch]:
if _arch in base.arches:
base._built_for.add(_arch)
package_bar.update()
# rescan affected repos
local_repos = get_kupfer_local(arch, in_chroot=False, scan=False)
for repo_name in updated_repos:
assert repo_name in local_repos.repos
local_repos.repos[repo_name].scan()
package_bar.close(clear=True)
return files
@@ -794,6 +831,6 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
assert p.startswith(hostdir)
_files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/')))
pkgfiles = _files
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles)
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles, stderr=sys.stdout)
binfmt_register(arch, chroot=native_chroot)
_qemu_enabled[arch] = True

View File

@@ -224,15 +224,17 @@ def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bo
logging.fatal("No packages matched")
return
scp_put_files(files, '/tmp').check_returncode()
run_ssh_command([
'sudo',
'pacman',
'-U',
] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [
'--noconfirm',
"'--overwrite=\\*'",
],
alloc_tty=True).check_returncode()
run_ssh_command(
[
'sudo',
'pacman',
'-U',
*[os.path.join('/tmp', os.path.basename(file)) for file in files],
'--noconfirm',
"'--overwrite=\\*'",
],
alloc_tty=True,
).check_returncode()
CLEAN_LOCATIONS = ['src', 'pkg', *SRCINFO_CACHE_FILES]
@@ -439,7 +441,7 @@ def cmd_check(paths):
formatted = False
reason = 'Found literal " although no special character was found in the line to justify the usage of a literal "'
if "'" in line and not '"' in line:
if "'" in line and '"' not in line:
formatted = False
reason = 'Found literal \' although either a literal " or no qoutes should be used'

View File

@@ -332,7 +332,7 @@ def parse_pkgbuild(
global config
if _config:
config = _config
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup.
setup_logging(verbose=config.runtime.verbose, force_colors=config.runtime.colors, log_setup=False) # different subprocess needs log setup.
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
if force_refresh_srcinfo:

52
progressbar.py Normal file
View File

@@ -0,0 +1,52 @@
import click
import sys
from enlighten import Counter, Manager, get_manager as _getmanager
from typing import Hashable, Optional
from config.state import config
BAR_PADDING = 25
DEFAULT_OUTPUT = sys.stderr
managers: dict[Hashable, Manager] = {}
progress_bars_option = click.option(
'--force-progress-bars/--no-progress-bars',
is_flag=True,
default=None,
help='Force enable/disable progress bars. Defaults to autodetection.',
)
def get_manager(file=DEFAULT_OUTPUT, enabled: Optional[bool] = None) -> Manager:
global managers
m = managers.get(file, None)
if not m:
kwargs = {}
if enabled is None or config.runtime.progress_bars is False:
enabled = config.runtime.progress_bars
if enabled is not None:
kwargs = {"enabled": enabled}
m = _getmanager(file, **kwargs)
managers[file] = m
return m
def get_progress_bar(*kargs, file=DEFAULT_OUTPUT, leave=False, **kwargs) -> Counter:
m = get_manager(file=file)
kwargs["file"] = file
kwargs["leave"] = leave
return m.counter(*kargs, **kwargs)
def get_levels_bar(*kargs, file=DEFAULT_OUTPUT, enable_rate=True, **kwargs):
kwargs["fields"] = {"name": "None", "level": 1, "levels_total": 1} | (kwargs.get("fields", None) or {})
f = (u'{desc}: {name}{desc_pad}{percentage:3.0f}%|{bar}| '
u'{count:{len_total}d}/{total:d} '
u'[lvl: {level}/{levels_total}] ')
if enable_rate:
f += u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
kwargs["bar_format"] = f
return get_progress_bar(*kargs, **kwargs)

View File

@@ -8,3 +8,4 @@ munch
setuptools # required by munch
requests
python-dateutil
enlighten

View File

@@ -1,4 +1,5 @@
import atexit
import click
import datetime
import grp
import hashlib
@@ -169,3 +170,28 @@ def sha256sum(filename):
while n := f.readinto(mv):
h.update(mv[:n])
return h.hexdigest()
def ellipsize(s: str, length: int = 25, padding: Optional[str] = None, ellipsis: str = '...', rjust: bool = False):
"""
Ellipsize `s`, shortening it to `(length - len(ellipsis))` and appending `ellipsis` if `s` is longer than `length`.
If `padding` is non-empty and `s` is shorter than length, `s` is padded with `padding` until it's `length` long.
"""
if len(s) > length:
return s[:length - len(ellipsis)] + ellipsis
if not padding:
return s
pad = s.rjust if rjust else s.ljust
return pad(length, padding)
def colors_supported(force_colors: Optional[bool] = None) -> bool:
"If force_colors is None, returns isatty(stdout)"
# stdout is fd 1
return force_colors if force_colors is not None else os.isatty(1)
def color_str(s: str, use_colors: Optional[bool] = None, **kwargs) -> str:
if colors_supported(use_colors):
return click.style(s, **kwargs)
return s