Compare commits
24 Commits
prawn/chro
...
prawn/devi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ad4690c0a | ||
|
|
161e14a438 | ||
|
|
066b6abaaa | ||
|
|
9f5bafab57 | ||
|
|
272d55b735 | ||
|
|
af1d8d1737 | ||
|
|
78874a15e6 | ||
|
|
f38fb798bc | ||
|
|
de7b597518 | ||
|
|
f140fa36ce | ||
|
|
69c73e41dd | ||
|
|
e269841038 | ||
|
|
932e739255 | ||
|
|
63156776a2 | ||
|
|
5edfac42ce | ||
|
|
00613096d5 | ||
|
|
c4797c709f | ||
|
|
28c68418a6 | ||
|
|
cc1b4b3ee2 | ||
|
|
ff3b5e70dd | ||
|
|
ac25266a00 | ||
|
|
c99463a0f6 | ||
|
|
6d6f582b71 | ||
|
|
785e41f8b7 |
@@ -2,6 +2,8 @@ import atexit
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from copy import deepcopy
|
||||
from shlex import quote as shell_quote
|
||||
from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping
|
||||
@@ -10,7 +12,7 @@ from uuid import uuid4
|
||||
from config.state import config
|
||||
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
|
||||
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
|
||||
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
|
||||
from exec.cmd import FileDescriptor, run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
|
||||
from exec.file import makedir, root_makedir, root_write_file, write_file
|
||||
from generator import generate_makepkg_conf
|
||||
from utils import mount, umount, check_findmnt, log_or_exception
|
||||
@@ -58,7 +60,8 @@ class AbstractChroot(Protocol):
|
||||
capture_output: bool,
|
||||
cwd: str,
|
||||
fail_inactive: bool,
|
||||
stdout: Optional[int],
|
||||
stdout: Optional[FileDescriptor],
|
||||
stderr: Optional[FileDescriptor],
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -222,7 +225,8 @@ class Chroot(AbstractChroot):
|
||||
capture_output: bool = False,
|
||||
cwd: Optional[str] = None,
|
||||
fail_inactive: bool = True,
|
||||
stdout: Optional[int] = None,
|
||||
stdout: Optional[FileDescriptor] = None,
|
||||
stderr: Optional[FileDescriptor] = None,
|
||||
switch_user: Optional[str] = None,
|
||||
) -> Union[int, subprocess.CompletedProcess]:
|
||||
if not self.active and fail_inactive:
|
||||
@@ -246,7 +250,7 @@ class Chroot(AbstractChroot):
|
||||
inner_cmd = wrap_in_bash(script, flatten_result=False)
|
||||
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
|
||||
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
|
||||
|
||||
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
|
||||
return self.mount(
|
||||
@@ -371,20 +375,22 @@ class Chroot(AbstractChroot):
|
||||
packages: list[str],
|
||||
refresh: bool = False,
|
||||
allow_fail: bool = True,
|
||||
redirect_stderr: bool = True,
|
||||
) -> dict[str, Union[int, subprocess.CompletedProcess]]:
|
||||
"""Try installing packages, fall back to installing one by one"""
|
||||
results = {}
|
||||
stderr = sys.stdout if redirect_stderr else sys.stderr
|
||||
if refresh:
|
||||
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm')
|
||||
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm', stderr=stderr)
|
||||
cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
|
||||
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}')
|
||||
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}', stderr=stderr)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
results |= {package: result for package in packages}
|
||||
if result.returncode != 0 and allow_fail:
|
||||
results = {}
|
||||
logging.debug('Falling back to serial installation')
|
||||
for pkg in set(packages):
|
||||
results[pkg] = self.run_cmd(f'{cmd} {pkg}')
|
||||
results[pkg] = self.run_cmd(f'{cmd} {pkg}', stderr=stderr)
|
||||
return results
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from glob import glob
|
||||
from shutil import rmtree
|
||||
@@ -31,17 +32,20 @@ class BaseChroot(Chroot):
|
||||
|
||||
logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}')
|
||||
|
||||
result = run_root_cmd([
|
||||
'pacstrap',
|
||||
'-C',
|
||||
pacman_conf_target,
|
||||
'-G',
|
||||
self.path,
|
||||
] + self.base_packages + [
|
||||
'--needed',
|
||||
'--overwrite=*',
|
||||
'-yyuu',
|
||||
])
|
||||
result = run_root_cmd(
|
||||
[
|
||||
'pacstrap',
|
||||
'-C',
|
||||
pacman_conf_target,
|
||||
'-G',
|
||||
self.path,
|
||||
*self.base_packages,
|
||||
'--needed',
|
||||
'--overwrite=*',
|
||||
'-yyuu',
|
||||
],
|
||||
stderr=sys.stdout,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to initialize chroot "{self.name}"')
|
||||
self.initialized = True
|
||||
|
||||
@@ -135,7 +135,7 @@ def prompt_profile_device(current: Optional[str], profile_name: str) -> tuple[st
|
||||
devices = get_devices()
|
||||
print(click.style("Pick your device!\nThese are the available devices:", bold=True))
|
||||
for dev in sorted(devices.keys()):
|
||||
print(devices[dev])
|
||||
print(f"{devices[dev]}\n")
|
||||
return prompt_choice(current, f'profiles.{profile_name}.device', devices.keys())
|
||||
|
||||
|
||||
|
||||
@@ -147,6 +147,8 @@ class RuntimeConfiguration(DataClass):
|
||||
script_source_dir: Optional[str]
|
||||
arch: Optional[Arch]
|
||||
uid: Optional[int]
|
||||
progress_bars: Optional[bool]
|
||||
colors: Optional[bool]
|
||||
|
||||
|
||||
class ConfigLoadState(DataClass):
|
||||
|
||||
@@ -61,6 +61,8 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
|
||||
'script_source_dir': None,
|
||||
'arch': None,
|
||||
'uid': None,
|
||||
'progress_bars': None,
|
||||
'colors': None,
|
||||
})
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +1,84 @@
|
||||
import click
|
||||
import logging
|
||||
|
||||
from .device import get_devices
|
||||
from json import dumps as json_dump
|
||||
from typing import Optional
|
||||
|
||||
from config.state import config
|
||||
from utils import colors_supported, color_str
|
||||
|
||||
from .device import get_devices, get_profile_device
|
||||
|
||||
|
||||
@click.command(name='devices')
|
||||
def cmd_devices():
|
||||
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
|
||||
@click.option(
|
||||
'--force-parse-deviceinfo/--no-parse-deviceinfo',
|
||||
is_flag=True,
|
||||
default=None,
|
||||
help="Force or disable deviceinfo parsing. The default is to try but continue if it fails.",
|
||||
)
|
||||
@click.option(
|
||||
'--download-packages/--no-download-packages',
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help='Download packages while trying to parse deviceinfo',
|
||||
)
|
||||
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
|
||||
def cmd_devices(
|
||||
json: bool = False,
|
||||
force_parse_deviceinfo: Optional[bool] = True,
|
||||
download_packages: bool = False,
|
||||
output_file: Optional[str] = None,
|
||||
):
|
||||
'list the available devices and descriptions'
|
||||
devices = get_devices()
|
||||
if not devices:
|
||||
raise Exception("No devices found!")
|
||||
for d in sorted(devices.keys()):
|
||||
print(devices[d])
|
||||
profile_device = None
|
||||
try:
|
||||
dev = get_profile_device()
|
||||
assert dev
|
||||
profile_device = dev
|
||||
except Exception as ex:
|
||||
logging.debug(f"Failed to get profile device for visual highlighting, not a problem: {ex}")
|
||||
output = ['']
|
||||
json_output = {}
|
||||
interactive_json = json and not output_file
|
||||
if output_file:
|
||||
json = True
|
||||
use_colors = colors_supported(False if interactive_json else config.runtime.colors)
|
||||
for name in sorted(devices.keys()):
|
||||
prefix = ''
|
||||
suffix = ''
|
||||
device = devices[name]
|
||||
assert device
|
||||
if force_parse_deviceinfo in [None, True]:
|
||||
try:
|
||||
device.parse_deviceinfo(try_download=download_packages)
|
||||
except Exception as ex:
|
||||
if not force_parse_deviceinfo:
|
||||
logging.debug(f"Failed to parse deviceinfo for extended description, not a problem: {ex}")
|
||||
else:
|
||||
raise ex
|
||||
|
||||
if json:
|
||||
json_output[name] = device.get_summary().toDict()
|
||||
if interactive_json:
|
||||
continue
|
||||
if profile_device and profile_device.name == device.name:
|
||||
prefix = color_str('>>> ', bold=True, fg="bright_green", use_colors=use_colors)
|
||||
suffix = '\n\n'
|
||||
suffix += color_str('Currently selected by profile', bold=True, use_colors=use_colors) + " "
|
||||
suffix += color_str(f'"{config.file.profiles.current}"', bold=True, fg="bright_green", use_colors=use_colors)
|
||||
snippet = f'{device.nice_str(colors=use_colors, newlines=True)}{suffix}'
|
||||
# prefix each line in the snippet
|
||||
snippet = '\n'.join([f'{prefix}{line}' for line in snippet.split('\n')])
|
||||
output.append(f"{snippet}\n")
|
||||
if interactive_json:
|
||||
output = ['\n' + json_dump(json_output, indent=4)]
|
||||
if output_file:
|
||||
with open(output_file, 'w') as fd:
|
||||
fd.write(json_dump(json_output))
|
||||
for line in output:
|
||||
print(line)
|
||||
|
||||
@@ -9,7 +9,7 @@ from config.scheme import DataClass, munchclass
|
||||
from distro.distro import get_kupfer_local
|
||||
from distro.package import LocalPackage
|
||||
from packages.pkgbuild import Pkgbuild, _pkgbuilds_cache, discover_pkgbuilds, get_pkgbuild_by_path, init_pkgbuilds
|
||||
from utils import read_files_from_tar
|
||||
from utils import read_files_from_tar, color_str
|
||||
|
||||
from .deviceinfo import DeviceInfo, parse_deviceinfo
|
||||
|
||||
@@ -22,6 +22,27 @@ DEVICE_DEPRECATIONS = {
|
||||
}
|
||||
|
||||
|
||||
class DeviceSummary(DataClass):
|
||||
name: str
|
||||
description: str
|
||||
arch: str
|
||||
package_name: Optional[str]
|
||||
package_path: Optional[str]
|
||||
|
||||
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
|
||||
separator = '\n' if newlines else ', '
|
||||
assert bool(self.package_path) == bool(self.package_name)
|
||||
package_path = {"Package Path": self.package_path} if self.package_path else {}
|
||||
fields = {
|
||||
"Device": self.name,
|
||||
"Description": self.description or f"[no package {'description' if self.package_name else 'associated (?!)'} and deviceinfo not parsed]",
|
||||
"Architecture": self.arch,
|
||||
"Package Name": self.package_name or "no package associated. PROBABLY A BUG!",
|
||||
**package_path,
|
||||
}
|
||||
return separator.join([f"{color_str(name, bold=True, use_colors=colors)}: {value}" for name, value in fields.items()])
|
||||
|
||||
|
||||
@munchclass()
|
||||
class Device(DataClass):
|
||||
name: str
|
||||
@@ -30,8 +51,24 @@ class Device(DataClass):
|
||||
deviceinfo: Optional[DeviceInfo]
|
||||
|
||||
def __repr__(self):
|
||||
return (f'Device "{self.name}": "{self.package.description if self.package else ""}", '
|
||||
f'Architecture: {self.arch}, package: {self.package.name if self.package else "??? PROBABLY A BUG!"}')
|
||||
return f'Device<{self.name},{self.arch},{self.package.path if self.package else "[no package]"}>'
|
||||
|
||||
def __str__(self):
|
||||
return self.nice_str(newlines=True)
|
||||
|
||||
def nice_str(self, *args, **kwargs) -> str:
|
||||
return self.get_summary().nice_str(*args, **kwargs)
|
||||
|
||||
def get_summary(self) -> DeviceSummary:
|
||||
result: dict[str, Optional[str]] = {}
|
||||
description = ((self.package.description if self.package else "").strip() or
|
||||
(self.deviceinfo.get("name", "[No name in deviceinfo]") if self.deviceinfo else "")).strip()
|
||||
result["name"] = self.name
|
||||
result["description"] = description
|
||||
result["arch"] = self.arch
|
||||
result["package_name"] = self.package.name if self.package else None
|
||||
result["package_path"] = self.package.path if self.package else None
|
||||
return DeviceSummary(result)
|
||||
|
||||
def parse_deviceinfo(self, try_download: bool = True, lazy: bool = True):
|
||||
if not lazy or 'deviceinfo' not in self or self.deviceinfo is None:
|
||||
@@ -146,7 +183,7 @@ def get_device(name: str, pkgbuilds: Optional[dict[str, Pkgbuild]] = None, lazy:
|
||||
if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_path)):
|
||||
logging.debug(f'Exact device pkgbuild path "pkgbuilds/{relative_path}" doesn\'t exist, scanning entire repo')
|
||||
return get_device(name, pkgbuilds=pkgbuilds, lazy=lazy, scan_all=True)
|
||||
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy, _config=config) if p.name == pkgname][0]
|
||||
pkgbuild = [p for p in get_pkgbuild_by_path(relative_path, lazy=lazy) if p.name == pkgname][0]
|
||||
device = parse_device_pkg(pkgbuild)
|
||||
if lazy:
|
||||
_device_cache[name] = device
|
||||
|
||||
@@ -42,7 +42,7 @@ ONEPLUS_ENCHILADA_PKG = f'device-{ONEPLUS_ENCHILADA}'
|
||||
def enchilada_pkgbuild(initialise_pkgbuilds_dir: ConfigStateHolder):
|
||||
config = initialise_pkgbuilds_dir
|
||||
config.try_load_file()
|
||||
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG), _config=config)[0]
|
||||
return parse_pkgbuild(os.path.join('device', ONEPLUS_ENCHILADA_PKG))[0]
|
||||
|
||||
|
||||
def validate_oneplus_enchilada(d: Device):
|
||||
|
||||
@@ -72,9 +72,9 @@ class RemoteDistro(Distro[RemoteRepo]):
|
||||
return RemoteRepo(**kwargs)
|
||||
|
||||
|
||||
def get_base_distro(arch: str) -> RemoteDistro:
|
||||
def get_base_distro(arch: str, scan: bool = False) -> RemoteDistro:
|
||||
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
|
||||
return RemoteDistro(arch=arch, repo_infos=repos, scan=False)
|
||||
return RemoteDistro(arch=arch, repo_infos=repos, scan=scan)
|
||||
|
||||
|
||||
def get_kupfer(arch: str, url_template: str, scan: bool = False) -> Distro:
|
||||
|
||||
@@ -17,7 +17,7 @@ class BinaryPackage(PackageInfo):
|
||||
arch: str
|
||||
filename: str
|
||||
resolved_url: Optional[str]
|
||||
_desc: Optional[dict[str, str]]
|
||||
_desc: Optional[dict[str, str | list[str]]]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -39,17 +39,25 @@ class BinaryPackage(PackageInfo):
|
||||
@classmethod
|
||||
def parse_desc(clss, desc_str: str, resolved_repo_url=None):
|
||||
"""Parses a desc file, returning a PackageInfo"""
|
||||
|
||||
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
|
||||
desc = {}
|
||||
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
|
||||
desc[key.strip()] = value.strip()
|
||||
desc: dict[str, str | list[str]] = {}
|
||||
for segment in f'\n{desc_str}'.split('\n%'):
|
||||
if not segment.strip():
|
||||
continue
|
||||
key, elements = (e.strip() for e in segment.strip().split('%\n', 1))
|
||||
elements_split = elements.split('\n')
|
||||
desc[key] = elements if len(elements_split) == 1 else elements_split
|
||||
validated: dict[str, str] = {}
|
||||
for key in ['NAME', 'VERSION', 'ARCH', 'FILENAME']:
|
||||
assert key in desc
|
||||
value = desc[key]
|
||||
assert isinstance(value, str)
|
||||
validated[key] = value
|
||||
p = clss(
|
||||
name=desc['NAME'],
|
||||
version=desc['VERSION'],
|
||||
arch=desc['ARCH'],
|
||||
filename=desc['FILENAME'],
|
||||
resolved_url='/'.join([resolved_repo_url, desc['FILENAME']]),
|
||||
name=validated['NAME'],
|
||||
version=validated['VERSION'],
|
||||
arch=validated['ARCH'],
|
||||
filename=validated['FILENAME'],
|
||||
resolved_url='/'.join([resolved_repo_url, validated['FILENAME']]),
|
||||
)
|
||||
p._desc = desc
|
||||
return p
|
||||
|
||||
@@ -39,21 +39,39 @@ class Repo(RepoInfo, Generic[BinaryPackageType]):
|
||||
def resolve_url(self) -> str:
|
||||
return resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
|
||||
|
||||
def scan(self):
|
||||
def scan(self, allow_failure: bool = False) -> bool:
|
||||
failed = False
|
||||
self.resolved_url = self.resolve_url()
|
||||
self.remote = not self.resolved_url.startswith('file://')
|
||||
path = self.acquire_db_file()
|
||||
try:
|
||||
path = self.acquire_db_file()
|
||||
index = tarfile.open(path)
|
||||
except Exception as ex:
|
||||
if not allow_failure:
|
||||
raise ex
|
||||
logging.error(f"Repo {self.name}, {self.arch}: Error acquiring repo DB: {ex!r}")
|
||||
return False
|
||||
logging.debug(f'Parsing repo file at {path}')
|
||||
with tarfile.open(path) as index:
|
||||
for node in index.getmembers():
|
||||
if os.path.basename(node.name) == 'desc':
|
||||
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
|
||||
fd = index.extractfile(node)
|
||||
assert fd
|
||||
pkg = self._parse_desc(fd.read().decode())
|
||||
self.packages[pkg.name] = pkg
|
||||
|
||||
for node in index.getmembers():
|
||||
if os.path.basename(node.name) == 'desc':
|
||||
pkgname = os.path.dirname(node.name)
|
||||
logging.debug(f'Parsing desc file for {pkgname}')
|
||||
fd = index.extractfile(node)
|
||||
assert fd
|
||||
contents = fd.read().decode()
|
||||
try:
|
||||
pkg = self._parse_desc(contents)
|
||||
except Exception as ex:
|
||||
if not allow_failure:
|
||||
raise ex
|
||||
logging.error(f'Repo {self.name}, {self.arch}: Error parsing desc for "{pkgname}": {ex!r}')
|
||||
failed = True
|
||||
continue
|
||||
self.packages[pkg.name] = pkg
|
||||
if failed:
|
||||
return False
|
||||
self.scanned = True
|
||||
return True
|
||||
|
||||
def _parse_desc(self, desc_text: str): # can't annotate the type properly :(
|
||||
raise NotImplementedError()
|
||||
|
||||
18
exec/cmd.py
18
exec/cmd.py
@@ -5,10 +5,12 @@ import subprocess
|
||||
|
||||
from subprocess import CompletedProcess # make it easy for users of this module
|
||||
from shlex import quote as shell_quote
|
||||
from typing import Optional, Union, TypeAlias
|
||||
from typing import IO, Optional, Union, TypeAlias
|
||||
|
||||
ElevationMethod: TypeAlias = str
|
||||
|
||||
FileDescriptor: TypeAlias = Union[int, IO]
|
||||
|
||||
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
|
||||
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
|
||||
|
||||
@@ -89,8 +91,8 @@ def run_cmd(
|
||||
cwd: Optional[str] = None,
|
||||
switch_user: Optional[str] = None,
|
||||
elevation_method: Optional[ElevationMethod] = None,
|
||||
stdout: Optional[int] = None,
|
||||
stderr=None,
|
||||
stdout: Optional[FileDescriptor] = None,
|
||||
stderr: Optional[FileDescriptor] = None,
|
||||
) -> Union[CompletedProcess, int]:
|
||||
"execute `script` as `switch_user`, elevating and su'ing as necessary"
|
||||
kwargs: dict = {}
|
||||
@@ -99,10 +101,12 @@ def run_cmd(
|
||||
env_cmd = generate_env_cmd(env)
|
||||
kwargs['env'] = env
|
||||
if not attach_tty:
|
||||
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
|
||||
if stderr:
|
||||
kwargs['stderr'] = stderr
|
||||
|
||||
if (stdout, stderr) == (None, None):
|
||||
kwargs['capture_output'] = capture_output
|
||||
else:
|
||||
for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
|
||||
if fd is not None:
|
||||
kwargs[name] = fd
|
||||
script = flatten_shell_script(script)
|
||||
if cwd:
|
||||
kwargs['cwd'] = cwd
|
||||
|
||||
@@ -1,20 +1,70 @@
|
||||
import click
|
||||
import logging
|
||||
|
||||
from .flavour import get_flavours
|
||||
from json import dumps as json_dump
|
||||
from typing import Optional
|
||||
|
||||
from config.state import config
|
||||
from utils import colors_supported, color_str
|
||||
|
||||
from .flavour import get_flavours, get_profile_flavour
|
||||
|
||||
profile_option = click.option('-p', '--profile', help="name of the profile to use", required=False, default=None)
|
||||
|
||||
|
||||
@click.command(name='flavours')
|
||||
def cmd_flavours():
|
||||
@click.option('-j', '--json', is_flag=True, help='output machine-parsable JSON format')
|
||||
@click.option('--output-file', type=click.Path(exists=False, file_okay=True), help="Dump JSON to file")
|
||||
def cmd_flavours(json: bool = False, output_file: Optional[str] = None):
|
||||
'list information about available flavours'
|
||||
results = []
|
||||
json_results = {}
|
||||
profile_flavour = None
|
||||
flavours = get_flavours()
|
||||
interactive_json = json and not output_file
|
||||
use_colors = colors_supported(config.runtime.colors) and not interactive_json
|
||||
if output_file:
|
||||
json = True
|
||||
if not flavours:
|
||||
raise Exception("No flavours found!")
|
||||
if not interactive_json:
|
||||
try:
|
||||
profile_flavour = get_profile_flavour()
|
||||
except Exception as ex:
|
||||
logging.debug(f"Failed to get profile flavour for marking as currently selected, continuing anyway. Exception: {ex}")
|
||||
for name in sorted(flavours.keys()):
|
||||
f = flavours[name]
|
||||
try:
|
||||
f.parse_flavourinfo()
|
||||
except:
|
||||
pass
|
||||
print(f)
|
||||
except Exception as ex:
|
||||
logging.debug(f"A problem happened while parsing flavourinfo for {name}, continuing anyway. Exception: {ex}")
|
||||
if not interactive_json:
|
||||
block = [*f.nice_str(newlines=True, colors=use_colors).split('\n'), '']
|
||||
if profile_flavour == f:
|
||||
prefix = color_str('>>> ', bold=True, fg='bright_green', use_colors=use_colors)
|
||||
block += [
|
||||
color_str("Currently selected by profile ", bold=True, use_colors=use_colors) +
|
||||
color_str(f'"{config.file.profiles.current}"\n', bold=True, fg="bright_green")
|
||||
]
|
||||
block = [prefix + line for line in block]
|
||||
results += block
|
||||
if json:
|
||||
d = dict(f)
|
||||
d["description"] = f.flavour_info.description if (f.flavour_info and f.flavour_info.description) else f.description
|
||||
if "flavour_info" in d and d["flavour_info"]:
|
||||
for k in set(d["flavour_info"].keys()) - set(['description']):
|
||||
d[k] = d["flavour_info"][k]
|
||||
del d["flavour_info"]
|
||||
d["pkgbuild"] = f.pkgbuild.path if f.pkgbuild else None
|
||||
d["package"] = f.pkgbuild.name
|
||||
d["arches"] = sorted(f.pkgbuild.arches) if f.pkgbuild else None
|
||||
json_results[d["name"]] = d
|
||||
print()
|
||||
if output_file:
|
||||
with open(output_file, 'w') as fd:
|
||||
fd.write(json_dump(json_results))
|
||||
if interactive_json:
|
||||
print(json_dump(json_results, indent=4))
|
||||
else:
|
||||
for r in results:
|
||||
print(r)
|
||||
|
||||
@@ -4,16 +4,16 @@ import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from config.state import config
|
||||
from constants import FLAVOUR_DESCRIPTION_PREFIX, FLAVOUR_INFO_FILE
|
||||
from dataclass import DataClass
|
||||
from packages.pkgbuild import discover_pkgbuilds, get_pkgbuild_by_name, init_pkgbuilds, Pkgbuild
|
||||
from utils import color_str
|
||||
|
||||
|
||||
@dataclass
|
||||
class FlavourInfo:
|
||||
class FlavourInfo(DataClass):
|
||||
rootfs_size: int # rootfs size in GB
|
||||
description: Optional[str]
|
||||
|
||||
@@ -21,8 +21,7 @@ class FlavourInfo:
|
||||
return f'rootfs_size: {self.rootfs_size}'
|
||||
|
||||
|
||||
@dataclass
|
||||
class Flavour:
|
||||
class Flavour(DataClass):
|
||||
name: str
|
||||
pkgbuild: Pkgbuild
|
||||
description: str
|
||||
@@ -43,7 +42,27 @@ class Flavour:
|
||||
return Flavour(name=name, pkgbuild=pkgbuild, description=description.strip(), flavour_info=None)
|
||||
|
||||
def __repr__(self):
|
||||
return f'Flavour "{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}'
|
||||
return f'Flavour<"{self.name}": "{self.description}", package: {self.pkgbuild.name if self.pkgbuild else "??? PROBABLY A BUG!"}{f", {self.flavour_info}" if self.flavour_info else ""}>'
|
||||
|
||||
def __str__(self):
|
||||
return self.nice_str()
|
||||
|
||||
def nice_str(self, newlines: bool = False, colors: bool = False) -> str:
|
||||
separator = '\n' if newlines else ', '
|
||||
|
||||
def get_lines(k, v, key_prefix=''):
|
||||
results = []
|
||||
full_k = f'{key_prefix}.{k}' if key_prefix else k
|
||||
if not isinstance(v, (dict, DataClass)):
|
||||
results = [f'{color_str(full_k, bold=True)}: {v}']
|
||||
else:
|
||||
for _k, _v in v.items():
|
||||
if _k.startswith('_'):
|
||||
continue
|
||||
results += get_lines(_k, _v, key_prefix=full_k)
|
||||
return results
|
||||
|
||||
return separator.join(get_lines(None, self))
|
||||
|
||||
def parse_flavourinfo(self, lazy: bool = True):
|
||||
if lazy and self.flavour_info is not None:
|
||||
|
||||
@@ -64,7 +64,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
|
||||
|
||||
logging.debug(f'Finding end block of shrunken filesystem on {loop_device}p2')
|
||||
blocks = int(re.search('is now [0-9]+', result.stdout.decode('utf-8')).group(0).split(' ')[2]) # type: ignore
|
||||
sectors = blocks * sectors_blocks_factor #+ 157812 - 25600
|
||||
sectors = blocks * sectors_blocks_factor
|
||||
|
||||
logging.debug(f'Shrinking partition at {loop_device}p2 to {sectors} sectors')
|
||||
child_proccess = subprocess.Popen(
|
||||
|
||||
27
logger.py
27
logger.py
@@ -3,11 +3,13 @@ import coloredlogs
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from typing import Optional
|
||||
|
||||
def setup_logging(verbose: bool, log_setup: bool = True):
|
||||
|
||||
def setup_logging(verbose: bool, quiet: bool = False, force_colors: Optional[bool] = None, log_setup: bool = True):
|
||||
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
|
||||
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
|
||||
level = logging.DEBUG if verbose else logging.INFO
|
||||
level = logging.DEBUG if verbose and not quiet else (logging.INFO if not quiet else logging.ERROR)
|
||||
coloredlogs.install(
|
||||
stream=sys.stdout,
|
||||
fmt='%(asctime)s %(levelname)s: %(message)s',
|
||||
@@ -15,9 +17,14 @@ def setup_logging(verbose: bool, log_setup: bool = True):
|
||||
level=level,
|
||||
level_styles=level_colors,
|
||||
field_styles=field_colors,
|
||||
isatty=force_colors,
|
||||
)
|
||||
# don't raise Exceptions when e.g. output stream is closed
|
||||
logging.raiseExceptions = False
|
||||
if log_setup:
|
||||
logging.debug('Logging set up.')
|
||||
logging.debug('Logger: Logging set up.')
|
||||
if force_colors is not None:
|
||||
logging.debug(f'Logger: Force-{"en" if force_colors else "dis"}abled colors')
|
||||
|
||||
|
||||
verbose_option = click.option(
|
||||
@@ -26,3 +33,17 @@ verbose_option = click.option(
|
||||
is_flag=True,
|
||||
help='Enables verbose logging',
|
||||
)
|
||||
|
||||
quiet_option = click.option(
|
||||
'-q',
|
||||
'--quiet',
|
||||
is_flag=True,
|
||||
help='Disable most logging, only log errors. (Currently only affects KBS logging, not called subprograms)',
|
||||
)
|
||||
|
||||
color_option = click.option(
|
||||
'--force-colors/--no-colors',
|
||||
is_flag=True,
|
||||
default=None,
|
||||
help='Force enable/disable log coloring. Defaults to autodetection.',
|
||||
)
|
||||
|
||||
22
main.py
22
main.py
@@ -3,11 +3,13 @@
|
||||
import click
|
||||
import subprocess
|
||||
|
||||
from os import isatty
|
||||
from traceback import format_exc, format_exception_only, format_tb
|
||||
from typing import Optional
|
||||
|
||||
from logger import logging, setup_logging, verbose_option
|
||||
from logger import color_option, logging, quiet_option, setup_logging, verbose_option
|
||||
from wrapper import nowrapper_option, enforce_wrap
|
||||
from progressbar import progress_bars_option
|
||||
|
||||
from config.cli import config, config_option, cmd_config
|
||||
from packages.cli import cmd_packages
|
||||
@@ -22,11 +24,25 @@ from image.cli import cmd_image
|
||||
@click.group()
|
||||
@click.option('--error-shell', '-E', 'error_shell', is_flag=True, default=False, help='Spawn shell after error occurs')
|
||||
@verbose_option
|
||||
@quiet_option
|
||||
@config_option
|
||||
@nowrapper_option
|
||||
def cli(verbose: bool = False, config_file: Optional[str] = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
|
||||
setup_logging(verbose)
|
||||
@color_option
|
||||
@progress_bars_option
|
||||
def cli(
|
||||
verbose: bool = False,
|
||||
quiet: bool = False,
|
||||
config_file: Optional[str] = None,
|
||||
wrapper_override: Optional[bool] = None,
|
||||
error_shell: bool = False,
|
||||
force_colors: Optional[bool] = None,
|
||||
force_progress_bars: Optional[bool] = None,
|
||||
):
|
||||
setup_logging(verbose, quiet=quiet, force_colors=force_colors)
|
||||
# stdout is fd 1
|
||||
config.runtime.colors = isatty(1) if force_colors is None else force_colors
|
||||
config.runtime.verbose = verbose
|
||||
config.runtime.progress_bars = force_progress_bars
|
||||
config.runtime.no_wrap = wrapper_override is False
|
||||
config.runtime.error_shell = error_shell
|
||||
config.try_load_file(config_file)
|
||||
|
||||
@@ -3,6 +3,7 @@ import multiprocessing
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from copy import deepcopy
|
||||
from urllib.error import HTTPError
|
||||
@@ -17,8 +18,9 @@ from chroot.build import get_build_chroot, BuildChroot
|
||||
from distro.distro import get_kupfer_https, get_kupfer_local
|
||||
from distro.package import RemotePackage, LocalPackage
|
||||
from distro.repo import LocalRepo
|
||||
from progressbar import BAR_PADDING, get_levels_bar
|
||||
from wrapper import check_programs_wrap, is_wrapped
|
||||
from utils import sha256sum
|
||||
from utils import ellipsize, sha256sum
|
||||
|
||||
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild
|
||||
|
||||
@@ -227,7 +229,7 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
|
||||
target_file,
|
||||
]
|
||||
logging.debug(f'repo: running cmd: {cmd}')
|
||||
result = run_cmd(cmd)
|
||||
result = run_cmd(cmd, stderr=sys.stdout)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed add package {target_file} to repo {repo_name}')
|
||||
@@ -274,8 +276,8 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
|
||||
|
||||
|
||||
def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]:
|
||||
logging.debug(f"checking if we can download {package.name}")
|
||||
filename = os.path.basename(dest_file_path)
|
||||
logging.debug(f"checking if we can download {filename}")
|
||||
pkgname = package.name
|
||||
repo_name = package.repo
|
||||
repos = get_kupfer_https(arch, scan=True).repos
|
||||
@@ -473,7 +475,7 @@ def setup_sources(package: Pkgbuild, lazy: bool = True):
|
||||
assert config.runtime.arch
|
||||
chroot = setup_build_chroot(config.runtime.arch)
|
||||
logging.info(f'{package.path}: Setting up sources with makepkg')
|
||||
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer')
|
||||
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer', stderr=sys.stdout)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}')
|
||||
@@ -583,6 +585,7 @@ def build_package(
|
||||
inner_env=env,
|
||||
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
|
||||
switch_user=build_user,
|
||||
stderr=sys.stdout,
|
||||
)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
@@ -635,17 +638,33 @@ def get_unbuilt_package_levels(
|
||||
includes_dependants = " (includes dependants)" if rebuild_dependants else ""
|
||||
logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}")
|
||||
i = 0
|
||||
for level_packages in package_levels:
|
||||
total_levels = len(package_levels)
|
||||
package_bar = get_levels_bar(
|
||||
total=sum([len(lev) for lev in package_levels]),
|
||||
desc=f"Checking pkgs ({arch})",
|
||||
unit='pkgs',
|
||||
fields={"levels_total": total_levels},
|
||||
enable_rate=False,
|
||||
)
|
||||
counter_built = package_bar.add_subcounter('green')
|
||||
counter_unbuilt = package_bar.add_subcounter('blue')
|
||||
for level_num, level_packages in enumerate(package_levels):
|
||||
level_num = level_num + 1
|
||||
package_bar.update(0, name=" " * BAR_PADDING, level=level_num)
|
||||
level = set[Pkgbuild]()
|
||||
if not level_packages:
|
||||
continue
|
||||
|
||||
def add_to_level(pkg, level, reason=''):
|
||||
if reason:
|
||||
reason = f': {reason}'
|
||||
logging.info(f"Level {i} ({arch}): Adding {package.path}{reason}")
|
||||
counter_unbuilt.update(force=True)
|
||||
logging.info(f"Level {level}/{total_levels} ({arch}): Adding {package.path}{reason}")
|
||||
level.add(package)
|
||||
build_names.update(package.names())
|
||||
|
||||
for package in level_packages:
|
||||
package_bar.update(0, force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
|
||||
if (force and package in packages):
|
||||
add_to_level(package, level, 'query match and force=True')
|
||||
elif rebuild_dependants and package in dependants:
|
||||
@@ -653,12 +672,14 @@ def get_unbuilt_package_levels(
|
||||
elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources):
|
||||
add_to_level(package, level, 'package unbuilt')
|
||||
else:
|
||||
logging.info(f"Level {i}: {package.path} ({arch}): Package doesn't need [re]building")
|
||||
logging.info(f"Level {level_num}/{total_levels} ({arch}): {package.path}: Package doesn't need [re]building")
|
||||
counter_built.update(force=True)
|
||||
|
||||
logging.debug(f'Finished checking level {level_num}/{total_levels} ({arch}). Adding unbuilt pkgs: {get_pkg_names_str(level)}')
|
||||
if level:
|
||||
build_levels.append(level)
|
||||
logging.debug(f'Finished checking level {i}. Adding unbuilt pkgs: {get_pkg_names_str(level)}')
|
||||
i += 1
|
||||
package_bar.close(clear=True)
|
||||
return build_levels
|
||||
|
||||
|
||||
@@ -691,11 +712,24 @@ def build_packages(
|
||||
|
||||
logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}")
|
||||
|
||||
total_levels = len(build_levels)
|
||||
package_bar = get_levels_bar(
|
||||
desc=f'Building pkgs ({arch})',
|
||||
color='purple',
|
||||
unit='pkgs',
|
||||
total=sum([len(lev) for lev in build_levels]),
|
||||
fields={"levels_total": total_levels},
|
||||
enable_rate=False,
|
||||
)
|
||||
files = []
|
||||
updated_repos: set[str] = set()
|
||||
package_bar.update(-1)
|
||||
for level, need_build in enumerate(build_levels):
|
||||
logging.info(f"(Level {level}) Building {get_pkg_names_str(need_build)}")
|
||||
level = level + 1
|
||||
package_bar.update(incr=0, force=True, name=" " * BAR_PADDING, level=level)
|
||||
logging.info(f"(Level {level}/{total_levels}) Building {get_pkg_names_str(need_build)}")
|
||||
for package in need_build:
|
||||
package_bar.update(force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
|
||||
base = package.pkgbase if isinstance(package, SubPkgbuild) else package
|
||||
assert isinstance(base, Pkgbase)
|
||||
if package.is_built(arch):
|
||||
@@ -714,11 +748,14 @@ def build_packages(
|
||||
for _arch in ['any', arch]:
|
||||
if _arch in base.arches:
|
||||
base._built_for.add(_arch)
|
||||
package_bar.update()
|
||||
# rescan affected repos
|
||||
local_repos = get_kupfer_local(arch, in_chroot=False, scan=False)
|
||||
for repo_name in updated_repos:
|
||||
assert repo_name in local_repos.repos
|
||||
local_repos.repos[repo_name].scan()
|
||||
|
||||
package_bar.close(clear=True)
|
||||
return files
|
||||
|
||||
|
||||
@@ -794,6 +831,6 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
|
||||
assert p.startswith(hostdir)
|
||||
_files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/')))
|
||||
pkgfiles = _files
|
||||
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles)
|
||||
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles, stderr=sys.stdout)
|
||||
binfmt_register(arch, chroot=native_chroot)
|
||||
_qemu_enabled[arch] = True
|
||||
|
||||
@@ -224,15 +224,17 @@ def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bo
|
||||
logging.fatal("No packages matched")
|
||||
return
|
||||
scp_put_files(files, '/tmp').check_returncode()
|
||||
run_ssh_command([
|
||||
'sudo',
|
||||
'pacman',
|
||||
'-U',
|
||||
] + [os.path.join('/tmp', os.path.basename(file)) for file in files] + [
|
||||
'--noconfirm',
|
||||
"'--overwrite=\\*'",
|
||||
],
|
||||
alloc_tty=True).check_returncode()
|
||||
run_ssh_command(
|
||||
[
|
||||
'sudo',
|
||||
'pacman',
|
||||
'-U',
|
||||
*[os.path.join('/tmp', os.path.basename(file)) for file in files],
|
||||
'--noconfirm',
|
||||
"'--overwrite=\\*'",
|
||||
],
|
||||
alloc_tty=True,
|
||||
).check_returncode()
|
||||
|
||||
|
||||
CLEAN_LOCATIONS = ['src', 'pkg', *SRCINFO_CACHE_FILES]
|
||||
@@ -439,7 +441,7 @@ def cmd_check(paths):
|
||||
formatted = False
|
||||
reason = 'Found literal " although no special character was found in the line to justify the usage of a literal "'
|
||||
|
||||
if "'" in line and not '"' in line:
|
||||
if "'" in line and '"' not in line:
|
||||
formatted = False
|
||||
reason = 'Found literal \' although either a literal " or no qoutes should be used'
|
||||
|
||||
|
||||
@@ -332,7 +332,7 @@ def parse_pkgbuild(
|
||||
global config
|
||||
if _config:
|
||||
config = _config
|
||||
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup.
|
||||
setup_logging(verbose=config.runtime.verbose, force_colors=config.runtime.colors, log_setup=False) # different subprocess needs log setup.
|
||||
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
|
||||
|
||||
if force_refresh_srcinfo:
|
||||
|
||||
52
progressbar.py
Normal file
52
progressbar.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import click
|
||||
import sys
|
||||
|
||||
from enlighten import Counter, Manager, get_manager as _getmanager
|
||||
from typing import Hashable, Optional
|
||||
|
||||
from config.state import config
|
||||
|
||||
BAR_PADDING = 25
|
||||
DEFAULT_OUTPUT = sys.stderr
|
||||
|
||||
managers: dict[Hashable, Manager] = {}
|
||||
|
||||
progress_bars_option = click.option(
|
||||
'--force-progress-bars/--no-progress-bars',
|
||||
is_flag=True,
|
||||
default=None,
|
||||
help='Force enable/disable progress bars. Defaults to autodetection.',
|
||||
)
|
||||
|
||||
|
||||
def get_manager(file=DEFAULT_OUTPUT, enabled: Optional[bool] = None) -> Manager:
|
||||
global managers
|
||||
m = managers.get(file, None)
|
||||
if not m:
|
||||
kwargs = {}
|
||||
if enabled is None or config.runtime.progress_bars is False:
|
||||
enabled = config.runtime.progress_bars
|
||||
if enabled is not None:
|
||||
kwargs = {"enabled": enabled}
|
||||
m = _getmanager(file, **kwargs)
|
||||
managers[file] = m
|
||||
return m
|
||||
|
||||
|
||||
def get_progress_bar(*kargs, file=DEFAULT_OUTPUT, leave=False, **kwargs) -> Counter:
|
||||
m = get_manager(file=file)
|
||||
|
||||
kwargs["file"] = file
|
||||
kwargs["leave"] = leave
|
||||
return m.counter(*kargs, **kwargs)
|
||||
|
||||
|
||||
def get_levels_bar(*kargs, file=DEFAULT_OUTPUT, enable_rate=True, **kwargs):
|
||||
kwargs["fields"] = {"name": "None", "level": 1, "levels_total": 1} | (kwargs.get("fields", None) or {})
|
||||
f = (u'{desc}: {name}{desc_pad}{percentage:3.0f}%|{bar}| '
|
||||
u'{count:{len_total}d}/{total:d} '
|
||||
u'[lvl: {level}/{levels_total}] ')
|
||||
if enable_rate:
|
||||
f += u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
|
||||
kwargs["bar_format"] = f
|
||||
return get_progress_bar(*kargs, **kwargs)
|
||||
@@ -8,3 +8,4 @@ munch
|
||||
setuptools # required by munch
|
||||
requests
|
||||
python-dateutil
|
||||
enlighten
|
||||
|
||||
26
utils.py
26
utils.py
@@ -1,4 +1,5 @@
|
||||
import atexit
|
||||
import click
|
||||
import datetime
|
||||
import grp
|
||||
import hashlib
|
||||
@@ -169,3 +170,28 @@ def sha256sum(filename):
|
||||
while n := f.readinto(mv):
|
||||
h.update(mv[:n])
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def ellipsize(s: str, length: int = 25, padding: Optional[str] = None, ellipsis: str = '...', rjust: bool = False):
|
||||
"""
|
||||
Ellipsize `s`, shortening it to `(length - len(ellipsis))` and appending `ellipsis` if `s` is longer than `length`.
|
||||
If `padding` is non-empty and `s` is shorter than length, `s` is padded with `padding` until it's `length` long.
|
||||
"""
|
||||
if len(s) > length:
|
||||
return s[:length - len(ellipsis)] + ellipsis
|
||||
if not padding:
|
||||
return s
|
||||
pad = s.rjust if rjust else s.ljust
|
||||
return pad(length, padding)
|
||||
|
||||
|
||||
def colors_supported(force_colors: Optional[bool] = None) -> bool:
|
||||
"If force_colors is None, returns isatty(stdout)"
|
||||
# stdout is fd 1
|
||||
return force_colors if force_colors is not None else os.isatty(1)
|
||||
|
||||
|
||||
def color_str(s: str, use_colors: Optional[bool] = None, **kwargs) -> str:
|
||||
if colors_supported(use_colors):
|
||||
return click.style(s, **kwargs)
|
||||
return s
|
||||
|
||||
Reference in New Issue
Block a user