Compare commits
10 commits
dev
...
prawn/prog
Author | SHA1 | Date | |
---|---|---|---|
|
791e4d69ac | ||
|
578a6ce3e5 | ||
|
acf3136f99 | ||
|
8ece98a7d7 | ||
|
5c834a86b9 | ||
|
025dedc4b6 | ||
|
219eb1cb75 | ||
|
d381d04932 | ||
|
47649e2916 | ||
|
dac454dc67 |
12 changed files with 190 additions and 41 deletions
|
@ -2,6 +2,8 @@ import atexit
|
|||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from copy import deepcopy
|
||||
from shlex import quote as shell_quote
|
||||
from typing import ClassVar, Iterable, Protocol, Union, Optional, Mapping
|
||||
|
@ -10,7 +12,7 @@ from uuid import uuid4
|
|||
from config.state import config
|
||||
from constants import Arch, CHROOT_PATHS, GCC_HOSTSPECS
|
||||
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
|
||||
from exec.cmd import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
|
||||
from exec.cmd import FileDescriptor, run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash, generate_cmd_su
|
||||
from exec.file import makedir, root_makedir, root_write_file, write_file
|
||||
from generator import generate_makepkg_conf
|
||||
from utils import mount, umount, check_findmnt, log_or_exception
|
||||
|
@ -58,7 +60,8 @@ class AbstractChroot(Protocol):
|
|||
capture_output: bool,
|
||||
cwd: str,
|
||||
fail_inactive: bool,
|
||||
stdout: Optional[int],
|
||||
stdout: Optional[FileDescriptor],
|
||||
stderr: Optional[FileDescriptor],
|
||||
):
|
||||
pass
|
||||
|
||||
|
@ -222,7 +225,8 @@ class Chroot(AbstractChroot):
|
|||
capture_output: bool = False,
|
||||
cwd: Optional[str] = None,
|
||||
fail_inactive: bool = True,
|
||||
stdout: Optional[int] = None,
|
||||
stdout: Optional[FileDescriptor] = None,
|
||||
stderr: Optional[FileDescriptor] = None,
|
||||
switch_user: Optional[str] = None,
|
||||
) -> Union[int, subprocess.CompletedProcess]:
|
||||
if not self.active and fail_inactive:
|
||||
|
@ -246,7 +250,7 @@ class Chroot(AbstractChroot):
|
|||
inner_cmd = wrap_in_bash(script, flatten_result=False)
|
||||
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + inner_cmd, shell_quote_items=True)
|
||||
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout, stderr=stderr)
|
||||
|
||||
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
|
||||
return self.mount(
|
||||
|
@ -371,20 +375,22 @@ class Chroot(AbstractChroot):
|
|||
packages: list[str],
|
||||
refresh: bool = False,
|
||||
allow_fail: bool = True,
|
||||
redirect_stderr: bool = True,
|
||||
) -> dict[str, Union[int, subprocess.CompletedProcess]]:
|
||||
"""Try installing packages, fall back to installing one by one"""
|
||||
results = {}
|
||||
stderr = sys.stdout if redirect_stderr else sys.stderr
|
||||
if refresh:
|
||||
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm')
|
||||
results['refresh'] = self.run_cmd('pacman -Syy --noconfirm', stderr=stderr)
|
||||
cmd = "pacman -S --noconfirm --needed --overwrite='/*'"
|
||||
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}')
|
||||
result = self.run_cmd(f'{cmd} -y {" ".join(packages)}', stderr=stderr)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
results |= {package: result for package in packages}
|
||||
if result.returncode != 0 and allow_fail:
|
||||
results = {}
|
||||
logging.debug('Falling back to serial installation')
|
||||
for pkg in set(packages):
|
||||
results[pkg] = self.run_cmd(f'{cmd} {pkg}')
|
||||
results[pkg] = self.run_cmd(f'{cmd} {pkg}', stderr=stderr)
|
||||
return results
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from glob import glob
|
||||
from shutil import rmtree
|
||||
|
@ -31,17 +32,20 @@ class BaseChroot(Chroot):
|
|||
|
||||
logging.info(f'Pacstrapping chroot {self.name}: {", ".join(self.base_packages)}')
|
||||
|
||||
result = run_root_cmd([
|
||||
'pacstrap',
|
||||
'-C',
|
||||
pacman_conf_target,
|
||||
'-G',
|
||||
self.path,
|
||||
] + self.base_packages + [
|
||||
'--needed',
|
||||
'--overwrite=*',
|
||||
'-yyuu',
|
||||
])
|
||||
result = run_root_cmd(
|
||||
[
|
||||
'pacstrap',
|
||||
'-C',
|
||||
pacman_conf_target,
|
||||
'-G',
|
||||
self.path,
|
||||
*self.base_packages,
|
||||
'--needed',
|
||||
'--overwrite=*',
|
||||
'-yyuu',
|
||||
],
|
||||
stderr=sys.stdout,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed to initialize chroot "{self.name}"')
|
||||
self.initialized = True
|
||||
|
|
|
@ -147,6 +147,8 @@ class RuntimeConfiguration(DataClass):
|
|||
script_source_dir: Optional[str]
|
||||
arch: Optional[Arch]
|
||||
uid: Optional[int]
|
||||
progress_bars: Optional[bool]
|
||||
colors: Optional[bool]
|
||||
|
||||
|
||||
class ConfigLoadState(DataClass):
|
||||
|
|
|
@ -61,6 +61,8 @@ CONFIG_RUNTIME_DEFAULTS: RuntimeConfiguration = RuntimeConfiguration.fromDict({
|
|||
'script_source_dir': None,
|
||||
'arch': None,
|
||||
'uid': None,
|
||||
'progress_bars': None,
|
||||
'colors': None,
|
||||
})
|
||||
|
||||
|
||||
|
|
18
exec/cmd.py
18
exec/cmd.py
|
@ -5,10 +5,12 @@ import subprocess
|
|||
|
||||
from subprocess import CompletedProcess # make it easy for users of this module
|
||||
from shlex import quote as shell_quote
|
||||
from typing import Optional, Union, TypeAlias
|
||||
from typing import IO, Optional, Union, TypeAlias
|
||||
|
||||
ElevationMethod: TypeAlias = str
|
||||
|
||||
FileDescriptor: TypeAlias = Union[int, IO]
|
||||
|
||||
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
|
||||
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
|
||||
|
||||
|
@ -89,8 +91,8 @@ def run_cmd(
|
|||
cwd: Optional[str] = None,
|
||||
switch_user: Optional[str] = None,
|
||||
elevation_method: Optional[ElevationMethod] = None,
|
||||
stdout: Optional[int] = None,
|
||||
stderr=None,
|
||||
stdout: Optional[FileDescriptor] = None,
|
||||
stderr: Optional[FileDescriptor] = None,
|
||||
) -> Union[CompletedProcess, int]:
|
||||
"execute `script` as `switch_user`, elevating and su'ing as necessary"
|
||||
kwargs: dict = {}
|
||||
|
@ -99,10 +101,12 @@ def run_cmd(
|
|||
env_cmd = generate_env_cmd(env)
|
||||
kwargs['env'] = env
|
||||
if not attach_tty:
|
||||
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
|
||||
if stderr:
|
||||
kwargs['stderr'] = stderr
|
||||
|
||||
if (stdout, stderr) == (None, None):
|
||||
kwargs['capture_output'] = capture_output
|
||||
else:
|
||||
for name, fd in {'stdout': stdout, 'stderr': stderr}.items():
|
||||
if fd is not None:
|
||||
kwargs[name] = fd
|
||||
script = flatten_shell_script(script)
|
||||
if cwd:
|
||||
kwargs['cwd'] = cwd
|
||||
|
|
18
logger.py
18
logger.py
|
@ -3,8 +3,10 @@ import coloredlogs
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from typing import Optional
|
||||
|
||||
def setup_logging(verbose: bool, log_setup: bool = True):
|
||||
|
||||
def setup_logging(verbose: bool, force_colors: Optional[bool] = None, log_setup: bool = True):
|
||||
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
|
||||
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
|
||||
level = logging.DEBUG if verbose else logging.INFO
|
||||
|
@ -15,9 +17,14 @@ def setup_logging(verbose: bool, log_setup: bool = True):
|
|||
level=level,
|
||||
level_styles=level_colors,
|
||||
field_styles=field_colors,
|
||||
isatty=force_colors,
|
||||
)
|
||||
# don't raise Exceptions when e.g. output stream is closed
|
||||
logging.raiseExceptions = False
|
||||
if log_setup:
|
||||
logging.debug('Logging set up.')
|
||||
logging.debug('Logger: Logging set up.')
|
||||
if force_colors is not None:
|
||||
logging.debug(f'Logger: Force-{"en" if force_colors else "dis"}abled colors')
|
||||
|
||||
|
||||
verbose_option = click.option(
|
||||
|
@ -26,3 +33,10 @@ verbose_option = click.option(
|
|||
is_flag=True,
|
||||
help='Enables verbose logging',
|
||||
)
|
||||
|
||||
color_option = click.option(
|
||||
'--force-colors/--no-colors',
|
||||
is_flag=True,
|
||||
default=None,
|
||||
help='Force enable/disable log coloring. Defaults to autodetection.',
|
||||
)
|
||||
|
|
20
main.py
20
main.py
|
@ -3,11 +3,13 @@
|
|||
import click
|
||||
import subprocess
|
||||
|
||||
from os import isatty
|
||||
from traceback import format_exc, format_exception_only, format_tb
|
||||
from typing import Optional
|
||||
|
||||
from logger import logging, setup_logging, verbose_option
|
||||
from logger import color_option, logging, setup_logging, verbose_option
|
||||
from wrapper import nowrapper_option, enforce_wrap
|
||||
from progressbar import progress_bars_option
|
||||
|
||||
from config.cli import config, config_option, cmd_config
|
||||
from packages.cli import cmd_packages
|
||||
|
@ -24,9 +26,21 @@ from image.cli import cmd_image
|
|||
@verbose_option
|
||||
@config_option
|
||||
@nowrapper_option
|
||||
def cli(verbose: bool = False, config_file: Optional[str] = None, wrapper_override: Optional[bool] = None, error_shell: bool = False):
|
||||
setup_logging(verbose)
|
||||
@color_option
|
||||
@progress_bars_option
|
||||
def cli(
|
||||
verbose: bool = False,
|
||||
config_file: Optional[str] = None,
|
||||
wrapper_override: Optional[bool] = None,
|
||||
error_shell: bool = False,
|
||||
force_colors: Optional[bool] = None,
|
||||
force_progress_bars: Optional[bool] = None,
|
||||
):
|
||||
setup_logging(verbose, force_colors=force_colors)
|
||||
# stdout is fd 1
|
||||
config.runtime.colors = isatty(1) if force_colors is None else force_colors
|
||||
config.runtime.verbose = verbose
|
||||
config.runtime.progress_bars = force_progress_bars
|
||||
config.runtime.no_wrap = wrapper_override is False
|
||||
config.runtime.error_shell = error_shell
|
||||
config.try_load_file(config_file)
|
||||
|
|
|
@ -3,6 +3,7 @@ import multiprocessing
|
|||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from copy import deepcopy
|
||||
from urllib.error import HTTPError
|
||||
|
@ -17,8 +18,9 @@ from chroot.build import get_build_chroot, BuildChroot
|
|||
from distro.distro import get_kupfer_https, get_kupfer_local
|
||||
from distro.package import RemotePackage, LocalPackage
|
||||
from distro.repo import LocalRepo
|
||||
from progressbar import BAR_PADDING, get_levels_bar
|
||||
from wrapper import check_programs_wrap, is_wrapped
|
||||
from utils import sha256sum
|
||||
from utils import ellipsize, sha256sum
|
||||
|
||||
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, Pkgbase, Pkgbuild, SubPkgbuild
|
||||
|
||||
|
@ -227,7 +229,7 @@ def add_file_to_repo(file_path: str, repo_name: str, arch: Arch, remove_original
|
|||
target_file,
|
||||
]
|
||||
logging.debug(f'repo: running cmd: {cmd}')
|
||||
result = run_cmd(cmd)
|
||||
result = run_cmd(cmd, stderr=sys.stdout)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'Failed add package {target_file} to repo {repo_name}')
|
||||
|
@ -274,8 +276,8 @@ def add_package_to_repo(package: Pkgbuild, arch: Arch):
|
|||
|
||||
|
||||
def try_download_package(dest_file_path: str, package: Pkgbuild, arch: Arch) -> Optional[str]:
|
||||
logging.debug(f"checking if we can download {package.name}")
|
||||
filename = os.path.basename(dest_file_path)
|
||||
logging.debug(f"checking if we can download {filename}")
|
||||
pkgname = package.name
|
||||
repo_name = package.repo
|
||||
repos = get_kupfer_https(arch, scan=True).repos
|
||||
|
@ -473,7 +475,7 @@ def setup_sources(package: Pkgbuild, lazy: bool = True):
|
|||
assert config.runtime.arch
|
||||
chroot = setup_build_chroot(config.runtime.arch)
|
||||
logging.info(f'{package.path}: Setting up sources with makepkg')
|
||||
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer')
|
||||
result = chroot.run_cmd(makepkg_setup, cwd=dir, switch_user='kupfer', stderr=sys.stdout)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
raise Exception(f'{package.path}: Failed to setup sources, exit code: {result.returncode}')
|
||||
|
@ -583,6 +585,7 @@ def build_package(
|
|||
inner_env=env,
|
||||
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path),
|
||||
switch_user=build_user,
|
||||
stderr=sys.stdout,
|
||||
)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
if result.returncode != 0:
|
||||
|
@ -635,17 +638,33 @@ def get_unbuilt_package_levels(
|
|||
includes_dependants = " (includes dependants)" if rebuild_dependants else ""
|
||||
logging.info(f"Checking for unbuilt packages ({arch}) in dependency order{includes_dependants}:\n{get_pkg_levels_str(package_levels)}")
|
||||
i = 0
|
||||
for level_packages in package_levels:
|
||||
total_levels = len(package_levels)
|
||||
package_bar = get_levels_bar(
|
||||
total=sum([len(lev) for lev in package_levels]),
|
||||
desc=f"Checking pkgs ({arch})",
|
||||
unit='pkgs',
|
||||
fields={"levels_total": total_levels},
|
||||
enable_rate=False,
|
||||
)
|
||||
counter_built = package_bar.add_subcounter('green')
|
||||
counter_unbuilt = package_bar.add_subcounter('blue')
|
||||
for level_num, level_packages in enumerate(package_levels):
|
||||
level_num = level_num + 1
|
||||
package_bar.update(0, name=" " * BAR_PADDING, level=level_num)
|
||||
level = set[Pkgbuild]()
|
||||
if not level_packages:
|
||||
continue
|
||||
|
||||
def add_to_level(pkg, level, reason=''):
|
||||
if reason:
|
||||
reason = f': {reason}'
|
||||
logging.info(f"Level {i} ({arch}): Adding {package.path}{reason}")
|
||||
counter_unbuilt.update(force=True)
|
||||
logging.info(f"Level {level}/{total_levels} ({arch}): Adding {package.path}{reason}")
|
||||
level.add(package)
|
||||
build_names.update(package.names())
|
||||
|
||||
for package in level_packages:
|
||||
package_bar.update(0, force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
|
||||
if (force and package in packages):
|
||||
add_to_level(package, level, 'query match and force=True')
|
||||
elif rebuild_dependants and package in dependants:
|
||||
|
@ -653,12 +672,14 @@ def get_unbuilt_package_levels(
|
|||
elif not check_package_version_built(package, arch, try_download=try_download, refresh_sources=refresh_sources):
|
||||
add_to_level(package, level, 'package unbuilt')
|
||||
else:
|
||||
logging.info(f"Level {i}: {package.path} ({arch}): Package doesn't need [re]building")
|
||||
logging.info(f"Level {level_num}/{total_levels} ({arch}): {package.path}: Package doesn't need [re]building")
|
||||
counter_built.update(force=True)
|
||||
|
||||
logging.debug(f'Finished checking level {level_num}/{total_levels} ({arch}). Adding unbuilt pkgs: {get_pkg_names_str(level)}')
|
||||
if level:
|
||||
build_levels.append(level)
|
||||
logging.debug(f'Finished checking level {i}. Adding unbuilt pkgs: {get_pkg_names_str(level)}')
|
||||
i += 1
|
||||
package_bar.close(clear=True)
|
||||
return build_levels
|
||||
|
||||
|
||||
|
@ -691,11 +712,24 @@ def build_packages(
|
|||
|
||||
logging.info(f"Build plan made:\n{get_pkg_levels_str(build_levels)}")
|
||||
|
||||
total_levels = len(build_levels)
|
||||
package_bar = get_levels_bar(
|
||||
desc=f'Building pkgs ({arch})',
|
||||
color='purple',
|
||||
unit='pkgs',
|
||||
total=sum([len(lev) for lev in build_levels]),
|
||||
fields={"levels_total": total_levels},
|
||||
enable_rate=False,
|
||||
)
|
||||
files = []
|
||||
updated_repos: set[str] = set()
|
||||
package_bar.update(-1)
|
||||
for level, need_build in enumerate(build_levels):
|
||||
logging.info(f"(Level {level}) Building {get_pkg_names_str(need_build)}")
|
||||
level = level + 1
|
||||
package_bar.update(incr=0, force=True, name=" " * BAR_PADDING, level=level)
|
||||
logging.info(f"(Level {level}/{total_levels}) Building {get_pkg_names_str(need_build)}")
|
||||
for package in need_build:
|
||||
package_bar.update(force=True, name=ellipsize(package.name, padding=" ", length=BAR_PADDING))
|
||||
base = package.pkgbase if isinstance(package, SubPkgbuild) else package
|
||||
assert isinstance(base, Pkgbase)
|
||||
if package.is_built(arch):
|
||||
|
@ -714,11 +748,14 @@ def build_packages(
|
|||
for _arch in ['any', arch]:
|
||||
if _arch in base.arches:
|
||||
base._built_for.add(_arch)
|
||||
package_bar.update()
|
||||
# rescan affected repos
|
||||
local_repos = get_kupfer_local(arch, in_chroot=False, scan=False)
|
||||
for repo_name in updated_repos:
|
||||
assert repo_name in local_repos.repos
|
||||
local_repos.repos[repo_name].scan()
|
||||
|
||||
package_bar.close(clear=True)
|
||||
return files
|
||||
|
||||
|
||||
|
@ -794,6 +831,6 @@ def build_enable_qemu_binfmt(arch: Arch, repo: Optional[dict[str, Pkgbuild]] = N
|
|||
assert p.startswith(hostdir)
|
||||
_files.append(os.path.join(CHROOT_PATHS['packages'], p[len(hostdir):].lstrip('/')))
|
||||
pkgfiles = _files
|
||||
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles)
|
||||
runcmd(['pacman', '-U', '--noconfirm', '--needed'] + pkgfiles, stderr=sys.stdout)
|
||||
binfmt_register(arch, chroot=native_chroot)
|
||||
_qemu_enabled[arch] = True
|
||||
|
|
|
@ -332,7 +332,7 @@ def parse_pkgbuild(
|
|||
global config
|
||||
if _config:
|
||||
config = _config
|
||||
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup.
|
||||
setup_logging(verbose=config.runtime.verbose, force_colors=config.runtime.colors, log_setup=False) # different subprocess needs log setup.
|
||||
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
|
||||
|
||||
if force_refresh_srcinfo:
|
||||
|
|
52
progressbar.py
Normal file
52
progressbar.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
import click
|
||||
import sys
|
||||
|
||||
from enlighten import Counter, Manager, get_manager as _getmanager
|
||||
from typing import Hashable, Optional
|
||||
|
||||
from config.state import config
|
||||
|
||||
BAR_PADDING = 25
|
||||
DEFAULT_OUTPUT = sys.stderr
|
||||
|
||||
managers: dict[Hashable, Manager] = {}
|
||||
|
||||
progress_bars_option = click.option(
|
||||
'--force-progress-bars/--no-progress-bars',
|
||||
is_flag=True,
|
||||
default=None,
|
||||
help='Force enable/disable progress bars. Defaults to autodetection.',
|
||||
)
|
||||
|
||||
|
||||
def get_manager(file=DEFAULT_OUTPUT, enabled: Optional[bool] = None) -> Manager:
|
||||
global managers
|
||||
m = managers.get(file, None)
|
||||
if not m:
|
||||
kwargs = {}
|
||||
if enabled is None or config.runtime.progress_bars is False:
|
||||
enabled = config.runtime.progress_bars
|
||||
if enabled is not None:
|
||||
kwargs = {"enabled": enabled}
|
||||
m = _getmanager(file, **kwargs)
|
||||
managers[file] = m
|
||||
return m
|
||||
|
||||
|
||||
def get_progress_bar(*kargs, file=DEFAULT_OUTPUT, leave=False, **kwargs) -> Counter:
|
||||
m = get_manager(file=file)
|
||||
|
||||
kwargs["file"] = file
|
||||
kwargs["leave"] = leave
|
||||
return m.counter(*kargs, **kwargs)
|
||||
|
||||
|
||||
def get_levels_bar(*kargs, file=DEFAULT_OUTPUT, enable_rate=True, **kwargs):
|
||||
kwargs["fields"] = {"name": "None", "level": 1, "levels_total": 1} | (kwargs.get("fields", None) or {})
|
||||
f = (u'{desc}: {name}{desc_pad}{percentage:3.0f}%|{bar}| '
|
||||
u'{count:{len_total}d}/{total:d} '
|
||||
u'[lvl: {level}/{levels_total}] ')
|
||||
if enable_rate:
|
||||
f += u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
|
||||
kwargs["bar_format"] = f
|
||||
return get_progress_bar(*kargs, **kwargs)
|
|
@ -8,3 +8,4 @@ munch
|
|||
setuptools # required by munch
|
||||
requests
|
||||
python-dateutil
|
||||
enlighten
|
||||
|
|
13
utils.py
13
utils.py
|
@ -169,3 +169,16 @@ def sha256sum(filename):
|
|||
while n := f.readinto(mv):
|
||||
h.update(mv[:n])
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def ellipsize(s: str, length: int = 25, padding: Optional[str] = None, ellipsis: str = '...', rjust: bool = False):
|
||||
"""
|
||||
Ellipsize `s`, shortening it to `(length - len(ellipsis))` and appending `ellipsis` if `s` is longer than `length`.
|
||||
If `padding` is non-empty and `s` is shorter than length, `s` is padded with `padding` until it's `length` long.
|
||||
"""
|
||||
if len(s) > length:
|
||||
return s[:length - len(ellipsis)] + ellipsis
|
||||
if not padding:
|
||||
return s
|
||||
pad = s.rjust if rjust else s.ljust
|
||||
return pad(length, padding)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue