Compare commits

...
Sign in to create a new pull request.

10 commits

12 changed files with 190 additions and 41 deletions

View file

@ -2,6 +2,8 @@ import atexit
import logging import logging
import os import os
import subprocess import subprocess
import sys
from copy import deepcopy from copy import deepcopy
from shlex import quote as shell_quote from shlex import quote as shell_quote
from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping
@ -10,7 +12,7 @@ from uuid import uuid4
from config.state import config from config.state import config
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su from exec.cmd import FileDescriptor, run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
from exec.file import makedir, root_makedir, root_write_file, write_file from exec.file import makedir, root_makedir, root_write_file, write_file
from generator import generate_makepkg_conf from generator import generate_makepkg_conf
from utils import mount, umount, check_findmnt, log_or_exception from utils import mount, umount, check_findmnt, log_or_exception
@ -58,7 +60,8 @@ class AbstractChroot(Protocol):
capture_output: bool, capture_output: bool,
cwd: str, cwd: str,
fail_inactive: bool, fail_inactive: bool,
stdout: Optional[int], stdout: Optional[FileDescriptor],
stderr: Optional[FileDescriptor],
): ):
pass pass
@ -222,7 +225,8 @@ class Chroot(AbstractChroot):
capture_output: bool = False, capture_output: bool = False,
cwd: Optional[str] = None, cwd: Optional[str] = None,
fail_inactive: bool = True, fail_inactive: bool = True,
stdout: Optional[int] = None, stdout: Optional[FileDescriptor] = None,
stderr: Optional[FileDescriptor] = None,
switch_user: Optional[str] = None, switch_user: Optional[str] = None,
) -> Union[int, subprocess.CompletedProcess]: ) -> Union[int, subprocess.CompletedProcess]:
if not self.active and fail_inactive: if not self.active and fail_inactive:
@ -246,7 +250,7 @@ class Chroot(AbstractChroot):
inner_cmd = wrap_in_bash(script, flatten_result=False) inner_cmd = wrap_in_bash(script, flatten_result=False)
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True) cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout) return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str: def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
return self.mount( return self.mount(
@ -371,20 +375,22 @@ class Chroot(AbstractChroot):
packages: list[str], packages: list[str],
refresh: bool = False, refresh: bool = False,
allow_fail: bool = True, allow_fail: bool = True,
redirect_stderr: bool = True,
) -> dict[str, Union[int, subprocess.CompletedProcess]]: ) -> dict[str, Union[int, subprocess.CompletedProcess]]:
"""Try installing packages, fall back to installing one by one""" """Try installing packages, fall back to installing one by one"""
results = {} results = {}
stderr = sys.stdout if redirect_stderr else sys.stderr
if refresh: if refresh:
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm') results['refresh'] = self.run_cmd('pacman -Syy --noconfirm', stderr=stderr)
cmd = "pacman -S --noconfirm --needed --overwrite='/*'" cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}') result = self.run_cmd(f'{cmd} -y {" ".join(packages)}', stderr=stderr)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
results |= {package: result for package in packages} results |= {package: result for package in packages}
if result.returncode != 0 and allow_fail: if result.returncode != 0 and allow_fail:
results = {} results = {}
logging.debug('Falling back to serial installation') logging.debug('Falling back to serial installation')
for pkg in set(packages): for pkg in set(packages):
results[pkg] = self.run_cmd(f'{cmd} {pkg}') results[pkg] = self.run_cmd(f'{cmd} {pkg}', stderr=stderr)
return results return results

View file

@ -1,5 +1,6 @@
import logging import logging
import os import os
import sys
from glob import glob from glob import glob
from shutil import rmtree from shutil import rmtree
@ -31,17 +32,20 @@ class BaseChroot(Chroot):
logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}') logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}')
result = run_root_cmd([ result = run_root_cmd(
'pacstrap', [
'-C', 'pacstrap',
pacman_conf_target, '-C',
'-G', pacman_conf_target,
self.path, '-G',
] + self.base_packages + [ self.path,
'--needed', *self.base_packages,
'--overwrite=*', '--needed',
'-yyuu', '--overwrite=*',
]) '-yyuu',
],
stderr=sys.stdout,
)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed to initialize chroot "{self.name}"') raise Exception(f'Failed to initialize chroot "{self.name}"')
self.initialized = True self.initialized = True

View file

@ -147,6 +147,8 @@ class RuntimeConfiguration(DataClass):
script_source_dir: Optional[str] script_source_dir: Optional[str]
arch: Optional[Arch] arch: Optional[Arch]
uid: Optional[int] uid: Optional[int]
progress_bars: Optional[bool]
colors: Optional[bool]
class ConfigLoadState(DataClass): class ConfigLoadState(DataClass):

View file

@ -61,6 +61,8 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
'script_source_dir': None, 'script_source_dir': None,
'arch': None, 'arch': None,
'uid': None, 'uid': None,
'progress_bars': None,
'colors': None,
}) })

View file

@ -5,10 +5,12 @@ import subprocess
from subprocess import CompletedProcess # make it easy for users of this module from subprocess import CompletedProcess # make it easy for users of this module
from shlex import quote as shell_quote from shlex import quote as shell_quote
from typing import Optional, Union, TypeAlias from typing import IO, Optional, Union, TypeAlias
ElevationMethod: TypeAlias = str ElevationMethod: TypeAlias = str
FileDescriptor: TypeAlias = Union[int, IO]
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT. # as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key. # when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
@ -89,8 +91,8 @@ def run_cmd(
cwd: Optional[str] = None, cwd: Optional[str] = None,
switch_user: Optional[str] = None, switch_user: Optional[str] = None,
elevation_method: Optional[ElevationMethod] = None, elevation_method: Optional[ElevationMethod] = None,
stdout: Optional[int] = None, stdout: Optional[FileDescriptor] = None,
stderr=None, stderr: Optional[FileDescriptor] = None,
) -> Union[CompletedProcess, int]: ) -> Union[CompletedProcess, int]:
"execute `script` as `switch_user`, elevating and su'ing as necessary" "execute `script` as `switch_user`, elevating and su'ing as necessary"
kwargs: dict = {} kwargs: dict = {}
@ -99,10 +101,12 @@ def run_cmd(
env_cmd = generate_env_cmd(env) env_cmd = generate_env_cmd(env)
kwargs['env'] = env kwargs['env'] = env
if not attach_tty: if not attach_tty:
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output} if (stdout, stderr) == (None, None):
if stderr: kwargs['capture_output'] = capture_output
kwargs['stderr'] = stderr else:
for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
if fd is not None:
kwargs[name] = fd
script = flatten_shell_script(script) script = flatten_shell_script(script)
if cwd: if cwd:
kwargs['cwd'] = cwd kwargs['cwd'] = cwd

View file

@ -3,8 +3,10 @@ import coloredlogs
import logging import logging
import sys import sys
from typing import Optional
def setup_logging(verbose: bool, log_setup: bool = True):
def setup_logging(verbose: bool, force_colors: Optional[bool] = None, log_setup: bool = True):
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}} level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}} field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
level = logging.DEBUG if verbose else logging.INFO level = logging.DEBUG if verbose else logging.INFO
@ -15,9 +17,14 @@ def setup_logging(verbose: bool, log_setup: bool = True):
level=level, level=level,
level_styles=level_colors, level_styles=level_colors,
field_styles=field_colors, field_styles=field_colors,
isatty=force_colors,
) )
# don't raise Exceptions when e.g. output stream is closed
logging.raiseExceptions = False
if log_setup: if log_setup:
logging.debug('Logging set up.') logging.debug('Logger: Logging set up.')
if force_colors is not None:
logging.debug(f'Logger: Force-{"en" if force_colors else "dis"}abled colors')
verbose_option = click.option( verbose_option = click.option(
@ -26,3 +33,10 @@ verbose_option = click.option(
is_flag=True, is_flag=True,
help='Enables verbose logging', help='Enables verbose logging',
) )
color_option = click.option(
'--force-colors/--no-colors',
is_flag=True,
default=None,
help='Force enable/disable log coloring. Defaults to autodetection.',
)

20
main.py
View file

@ -3,11 +3,13 @@
import click import click
import subprocess import subprocess
from os import isatty
from traceback import format_exc, format_exception_only, format_tb from traceback import format_exc, format_exception_only, format_tb
from typing import Optional from typing import Optional
from logger import logging, setup_logging, verbose_option from logger import color_option, logging, setup_logging, verbose_option
from wrapper import nowrapper_option, enforce_wrap from wrapper import nowrapper_option, enforce_wrap
from progressbar import progress_bars_option
from config.cli import config, config_option, cmd_config from config.cli import config, config_option, cmd_config
from packages.cli import cmd_packages from packages.cli import cmd_packages
@ -24,9 +26,21 @@ from image.cli import cmd_image
@verbose_option @verbose_option
@config_option @config_option
@nowrapper_option @nowrapper_option
def cli(verbose: bool = False, config_file: Optional[str] = None, wrapper_override: Optional[bool] = None, error_shell: bool = False): @color_option
setup_logging(verbose) @progress_bars_option
def cli(
verbose: bool = False,
config_file: Optional[str] = None,
wrapper_override: Optional[bool] = None,
error_shell: bool = False,
force_colors: Optional[bool] = None,
force_progress_bars: Optional[bool] = None,
):
setup_logging(verbose, force_colors=force_colors)
# stdout is fd 1
config.runtime.colors = isatty(1) if force_colors is None else force_colors
config.runtime.verbose = verbose config.runtime.verbose = verbose
config.runtime.progress_bars = force_progress_bars
config.runtime.no_wrap = wrapper_override is False config.runtime.no_wrap = wrapper_override is False
config.runtime.error_shell = error_shell config.runtime.error_shell = error_shell
config.try_load_file(config_file) config.try_load_file(config_file)

View file

@ -3,6 +3,7 @@ import multiprocessing
import os import os
import shutil import shutil
import subprocess import subprocess
import sys
from copy import deepcopy from copy import deepcopy
from urllib.error import HTTPError from urllib.error import HTTPError
@ -17,8 +18,9 @@ from chroot.build import get_build_chroot, BuildChroot
from distro.distro import get_kupfer_https, get_kupfer_local from distro.distro import get_kupfer_https, get_kupfer_local
from distro.package import RemotePackage, LocalPackage from distro.package import RemotePackage, LocalPackage
from distro.repo import LocalRepo from distro.repo import LocalRepo
from progressbar import BAR_PADDING, get_levels_bar
from wrapper import check_programs_wrap, is_wrapped from wrapper import check_programs_wrap, is_wrapped
from utils import sha256sum from utils import ellipsize, sha256sum
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild
@ -227,7 +229,7 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
target_file, target_file,
] ]
logging.debug(f'repo: running cmd: {cmd}') logging.debug(f'repo: running cmd: {cmd}')
result = run_cmd(cmd) result = run_cmd(cmd, stderr=sys.stdout)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'Failed add package {target_file} to repo {repo_name}') raise Exception(f'Failed add package {target_file} to repo {repo_name}')
@ -274,8 +276,8 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]: def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]:
logging.debug(f"checking if we can download {package.name}")
filename = os.path.basename(dest_file_path) filename = os.path.basename(dest_file_path)
logging.debug(f"checking if we can download {filename}")
pkgname = package.name pkgname = package.name
repo_name = package.repo repo_name = package.repo
repos = get_kupfer_https(arch, scan=True).repos repos = get_kupfer_https(arch, scan=True).repos
@ -473,7 +475,7 @@ def setup_sources(package: Pkgbuild, lazy: bool = True):
assert config.runtime.arch assert config.runtime.arch
chroot = setup_build_chroot(config.runtime.arch) chroot = setup_build_chroot(config.runtime.arch)
logging.info(f'{package.path}: Setting up sources with makepkg') logging.info(f'{package.path}: Setting up sources with makepkg')
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer') result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer', stderr=sys.stdout)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}') raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}')
@ -583,6 +585,7 @@ def build_package(
inner_env=env, inner_env=env,
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path), cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
switch_user=build_user, switch_user=build_user,
stderr=sys.stdout,
) )
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0: if result.returncode != 0:
@ -635,17 +638,33 @@ def get_unbuilt_package_levels(
includes_dependants = " (includes dependants)" if rebuild_dependants else "" includes_dependants = " (includes dependants)" if rebuild_dependants else ""
logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}") logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}")
i = 0 i = 0
for level_packages in package_levels: total_levels = len(package_levels)
package_bar = get_levels_bar(
total=sum([len(lev) for lev in package_levels]),
desc=f"Checking pkgs ({arch})",
unit='pkgs',
fields={"levels_total": total_levels},
enable_rate=False,
)
counter_built = package_bar.add_subcounter('green')
counter_unbuilt = package_bar.add_subcounter('blue')
for level_num, level_packages in enumerate(package_levels):
level_num = level_num + 1
package_bar.update(0, name=" " * BAR_PADDING, level=level_num)
level = set[Pkgbuild]() level = set[Pkgbuild]()
if not level_packages:
continue
def add_to_level(pkg, level, reason=''): def add_to_level(pkg, level, reason=''):
if reason: if reason:
reason = f': {reason}' reason = f': {reason}'
logging.info(f"Level {i} ({arch}): Adding {package.path}{reason}") counter_unbuilt.update(force=True)
logging.info(f"Level {level}/{total_levels} ({arch}): Adding {package.path}{reason}")
level.add(package) level.add(package)
build_names.update(package.names()) build_names.update(package.names())
for package in level_packages: for package in level_packages:
package_bar.update(0, force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
if (force and package in packages): if (force and package in packages):
add_to_level(package, level, 'query match and force=True') add_to_level(package, level, 'query match and force=True')
elif rebuild_dependants and package in dependants: elif rebuild_dependants and package in dependants:
@ -653,12 +672,14 @@ def get_unbuilt_package_levels(
elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources): elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources):
add_to_level(package, level, 'package unbuilt') add_to_level(package, level, 'package unbuilt')
else: else:
logging.info(f"Level {i}: {package.path} ({arch}): Package doesn't need [re]building") logging.info(f"Level {level_num}/{total_levels} ({arch}): {package.path}: Package doesn't need [re]building")
counter_built.update(force=True)
logging.debug(f'Finished checking level {level_num}/{total_levels} ({arch}). Adding unbuilt pkgs: {get_pkg_names_str(level)}')
if level: if level:
build_levels.append(level) build_levels.append(level)
logging.debug(f'Finished checking level {i}. Adding unbuilt pkgs: {get_pkg_names_str(level)}')
i += 1 i += 1
package_bar.close(clear=True)
return build_levels return build_levels
@ -691,11 +712,24 @@ def build_packages(
logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}") logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}")
total_levels = len(build_levels)
package_bar = get_levels_bar(
desc=f'Building pkgs ({arch})',
color='purple',
unit='pkgs',
total=sum([len(lev) for lev in build_levels]),
fields={"levels_total": total_levels},
enable_rate=False,
)
files = [] files = []
updated_repos: set[str] = set() updated_repos: set[str] = set()
package_bar.update(-1)
for level, need_build in enumerate(build_levels): for level, need_build in enumerate(build_levels):
logging.info(f"(Level {level}) Building {get_pkg_names_str(need_build)}") level = level + 1
package_bar.update(incr=0, force=True, name=" " * BAR_PADDING, level=level)
logging.info(f"(Level {level}/{total_levels}) Building {get_pkg_names_str(need_build)}")
for package in need_build: for package in need_build:
package_bar.update(force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
base = package.pkgbase if isinstance(package, SubPkgbuild) else package base = package.pkgbase if isinstance(package, SubPkgbuild) else package
assert isinstance(base, Pkgbase) assert isinstance(base, Pkgbase)
if package.is_built(arch): if package.is_built(arch):
@ -714,11 +748,14 @@ def build_packages(
for _arch in ['any', arch]: for _arch in ['any', arch]:
if _arch in base.arches: if _arch in base.arches:
base._built_for.add(_arch) base._built_for.add(_arch)
package_bar.update()
# rescan affected repos # rescan affected repos
local_repos = get_kupfer_local(arch, in_chroot=False, scan=False) local_repos = get_kupfer_local(arch, in_chroot=False, scan=False)
for repo_name in updated_repos: for repo_name in updated_repos:
assert repo_name in local_repos.repos assert repo_name in local_repos.repos
local_repos.repos[repo_name].scan() local_repos.repos[repo_name].scan()
package_bar.close(clear=True)
return files return files
@ -794,6 +831,6 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
assert p.startswith(hostdir) assert p.startswith(hostdir)
_files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/'))) _files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/')))
pkgfiles = _files pkgfiles = _files
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles) runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles, stderr=sys.stdout)
binfmt_register(arch, chroot=native_chroot) binfmt_register(arch, chroot=native_chroot)
_qemu_enabled[arch] = True _qemu_enabled[arch] = True

View file

@ -332,7 +332,7 @@ def parse_pkgbuild(
global config global config
if _config: if _config:
config = _config config = _config
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup. setup_logging(verbose=config.runtime.verbose, force_colors=config.runtime.colors, log_setup=False) # different subprocess needs log setup.
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}") logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
if force_refresh_srcinfo: if force_refresh_srcinfo:

52
progressbar.py Normal file
View file

@ -0,0 +1,52 @@
import click
import sys
from enlighten import Counter, Manager, get_manager as _getmanager
from typing import Hashable, Optional
from config.state import config
BAR_PADDING = 25
DEFAULT_OUTPUT = sys.stderr
managers: dict[Hashable, Manager] = {}
progress_bars_option = click.option(
'--force-progress-bars/--no-progress-bars',
is_flag=True,
default=None,
help='Force enable/disable progress bars. Defaults to autodetection.',
)
def get_manager(file=DEFAULT_OUTPUT, enabled: Optional[bool] = None) -> Manager:
global managers
m = managers.get(file, None)
if not m:
kwargs = {}
if enabled is None or config.runtime.progress_bars is False:
enabled = config.runtime.progress_bars
if enabled is not None:
kwargs = {"enabled": enabled}
m = _getmanager(file, **kwargs)
managers[file] = m
return m
def get_progress_bar(*kargs, file=DEFAULT_OUTPUT, leave=False, **kwargs) -> Counter:
m = get_manager(file=file)
kwargs["file"] = file
kwargs["leave"] = leave
return m.counter(*kargs, **kwargs)
def get_levels_bar(*kargs, file=DEFAULT_OUTPUT, enable_rate=True, **kwargs):
kwargs["fields"] = {"name": "None", "level": 1, "levels_total": 1} | (kwargs.get("fields", None) or {})
f = (u'{desc}: {name}{desc_pad}{percentage:3.0f}%|{bar}| '
u'{count:{len_total}d}/{total:d} '
u'[lvl: {level}/{levels_total}] ')
if enable_rate:
f += u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
kwargs["bar_format"] = f
return get_progress_bar(*kargs, **kwargs)

View file

@ -8,3 +8,4 @@ munch
setuptools # required by munch setuptools # required by munch
requests requests
python-dateutil python-dateutil
enlighten

View file

@ -169,3 +169,16 @@ def sha256sum(filename):
while n := f.readinto(mv): while n := f.readinto(mv):
h.update(mv[:n]) h.update(mv[:n])
return h.hexdigest() return h.hexdigest()
def ellipsize(s: str, length: int = 25, padding: Optional[str] = None, ellipsis: str = '...', rjust: bool = False):
"""
Ellipsize `s`, shortening it to `(length - len(ellipsis))` and appending `ellipsis` if `s` is longer than `length`.
If `padding` is non-empty and `s` is shorter than length, `s` is padded with `padding` until it's `length` long.
"""
if len(s) > length:
return s[:length - len(ellipsis)] + ellipsis
if not padding:
return s
pad = s.rjust if rjust else s.ljust
return pad(length, padding)