kupferbootstrap/packages/cli.py

496 lines
19 KiB
Python

import click
import json
import logging
import os
from glob import glob
from typing import Iterable, Optional
from config.state import config
from constants import Arch, ARCHES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL
from exec.cmd import run_cmd, shell_quote, CompletedProcess
from exec.file import get_temp_dir, makedir, remove_file
from devices.device import get_profile_device
from distro.distro import get_kupfer_local, get_kupfer_url, get_kupfer_repo_names
from distro.package import LocalPackage
from net.ssh import run_ssh_command, scp_put_files
from utils import download_file, git, sha256sum
from wrapper import check_programs_wrap, enforce_wrap
from .build import build_packages_by_paths, init_prebuilts
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, get_pkgbuild_dirs, init_pkgbuilds
SRCINFO_CACHE_FILES = [SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE]
def build(
paths: Iterable[str],
force: bool,
arch: Optional[Arch] = None,
rebuild_dependants: bool = False,
try_download: bool = False,
):
config.enforce_config_loaded()
enforce_wrap()
arch = arch or get_profile_device(hint_or_set_arch=True).arch
if arch not in ARCHES:
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
return build_packages_by_paths(
paths,
arch,
force=force,
rebuild_dependants=rebuild_dependants,
try_download=try_download,
enable_crosscompile=config.file.build.crosscompile,
enable_crossdirect=config.file.build.crossdirect,
enable_ccache=config.file.build.ccache,
clean_chroot=config.file.build.clean_mode,
)
def init_pkgbuild_caches(clean_src_dirs: bool = True, remote_branch: Optional[str] = None):
def read_srcinitialised_checksum(src_initialised):
with open(src_initialised) as fd:
d = json.load(fd)
if isinstance(d, dict):
return d.get('PKGBUILD', '!!!ERROR!!!')
raise Exception("JSON content not a dictionary!")
# get_kupfer_url() resolves repo branch variable in url
url = get_kupfer_url(url=SRCINFO_TARBALL_URL, branch=remote_branch)
cachetar = os.path.join(config.get_path('packages'), SRCINFO_TARBALL_FILE)
makedir(os.path.dirname(cachetar))
logging.info(f"Updating PKGBUILD caches from {url}" + (", pruning outdated src/ directories" if clean_src_dirs else ""))
updated = download_file(cachetar, url)
logging.info("Cache tarball was " + ('downloaded successfully' if updated else 'already up to date'))
tmpdir = get_temp_dir()
logging.debug(f"Extracting {cachetar} to {tmpdir}")
res = run_cmd(['tar', 'xf', cachetar], cwd=tmpdir)
assert isinstance(res, CompletedProcess)
if res.returncode:
raise Exception(f"failed to extract srcinfo cache archive '{cachetar}'")
pkgbuild_dirs = get_pkgbuild_dirs()
for pkg in pkgbuild_dirs:
logging.info(f"{pkg}: analyzing cache")
pkgdir = os.path.join(config.get_path('pkgbuilds'), pkg)
srcdir = os.path.join(pkgdir, 'src')
src_initialised = os.path.join(pkgdir, SRCINFO_INITIALISED_FILE)
cachedir = os.path.join(tmpdir, pkg)
pkgbuild_checksum = sha256sum(os.path.join(pkgdir, 'PKGBUILD'))
copy_files: set[str] = set(SRCINFO_CACHE_FILES)
if os.path.exists(src_initialised):
try:
if read_srcinitialised_checksum(src_initialised) == pkgbuild_checksum:
copy_files.remove(SRCINFO_INITIALISED_FILE)
for f in copy_files.copy():
fpath = os.path.join(pkgdir, f)
if os.path.exists(fpath):
copy_files.remove(f)
if not copy_files:
logging.info(f"{pkg}: SRCINFO cache already up to date")
continue
except Exception as ex:
logging.warning(f"{pkg}: Something went wrong parsing {SRCINFO_INITIALISED_FILE}, treating as outdated!:\n{ex}")
if clean_src_dirs and os.path.exists(srcdir):
logging.info(f"{pkg}: outdated src/ detected, removing")
remove_file(srcdir, recursive=True)
remove_file(src_initialised)
if not os.path.exists(cachedir):
logging.info(f"{pkg}: not found in remote repo cache, skipping")
continue
cache_initialised = os.path.join(cachedir, SRCINFO_INITIALISED_FILE)
try:
if read_srcinitialised_checksum(cache_initialised) != pkgbuild_checksum:
logging.info(f"{pkg}: PKGBUILD checksum differs from remote repo cache, skipping")
continue
except Exception as ex:
logging.warning(f"{pkg}: Failed to parse the remote repo's cached {SRCINFO_INITIALISED_FILE}, skipping!:\n{ex}")
continue
if not copy_files:
continue
logging.info(f"{pkg}: Copying srcinfo cache from remote repo")
logging.debug(f'{pkg}: copying {copy_files}')
copy_files_list = [shell_quote(os.path.join(cachedir, f)) for f in copy_files]
res = run_cmd(f"cp {' '.join(copy_files_list)} {shell_quote(pkgdir)}/")
assert isinstance(res, CompletedProcess)
if res.returncode:
raise Exception(f"{pkg}: failed to copy cache contents from {cachedir}")
non_interactive_flag = click.option('--non-interactive', is_flag=True)
init_caches_flag = click.option(
'--init-caches/--no-init-caches',
is_flag=True,
default=True,
show_default=True,
help="Fill PKGBUILDs caches from HTTPS repo where checksums match",
)
remove_outdated_src_flag = click.option(
'--clean-src-dirs/--no-clean-src-dirs',
is_flag=True,
default=True,
show_default=True,
help="Remove outdated src/ directories to avoid problems",
)
switch_branch_flag = click.option('--switch-branch', is_flag=True, help="Force the branch to be corrected even in non-interactive mode")
discard_changes_flag = click.option('--discard-changes', is_flag=True, help="When switching branches, discard any locally changed conflicting files")
@click.group(name='packages')
def cmd_packages():
"""Build and manage packages and PKGBUILDs"""
@cmd_packages.command(name='update')
@non_interactive_flag
@init_caches_flag
@switch_branch_flag
@discard_changes_flag
@remove_outdated_src_flag
def cmd_update(
non_interactive: bool = False,
init_caches: bool = False,
clean_src_dirs: bool = True,
switch_branch: bool = False,
discard_changes: bool = False,
):
"""Update PKGBUILDs git repo"""
enforce_wrap()
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=True, switch_branch=switch_branch, discard_changes=discard_changes)
if init_caches:
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
logging.info("Refreshing outdated SRCINFO caches")
discover_pkgbuilds(lazy=False)
@cmd_packages.command(name='init')
@non_interactive_flag
@init_caches_flag
@switch_branch_flag
@discard_changes_flag
@remove_outdated_src_flag
@click.option('-u', '--update', is_flag=True, help='Use git pull to update the PKGBUILDs')
def cmd_init(
non_interactive: bool = False,
init_caches: bool = True,
clean_src_dirs: bool = True,
switch_branch: bool = False,
discard_changes: bool = False,
update: bool = False,
):
"Ensure PKGBUILDs git repo is checked out locally"
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=update, switch_branch=switch_branch, discard_changes=discard_changes)
if init_caches:
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
for arch in ARCHES:
init_prebuilts(arch)
@cmd_packages.command(name='build')
@click.option('--force', is_flag=True, default=False, help='Rebuild even if package is already built')
@click.option('--arch', default=None, required=False, type=click.Choice(ARCHES), help="The CPU architecture to build for")
@click.option('--rebuild-dependants', is_flag=True, default=False, help='Rebuild packages that depend on packages that will be [re]built')
@click.option('--no-download', is_flag=True, default=False, help="Don't try downloading packages from online repos before building")
@click.argument('paths', nargs=-1)
def cmd_build(paths: list[str], force=False, arch: Optional[Arch] = None, rebuild_dependants: bool = False, no_download: bool = False):
"""
Build packages (and dependencies) by paths as required.
The paths are specified relative to the PKGBUILDs dir, eg. "cross/crossdirect".
Multiple paths may be specified as separate arguments.
Packages that aren't built already will be downloaded from HTTPS repos unless --no-download is passed,
if an exact version match exists on the server.
"""
build(paths, force, arch=arch, rebuild_dependants=rebuild_dependants, try_download=not no_download)
@cmd_packages.command(name='sideload')
@click.argument('paths', nargs=-1)
@click.option('--arch', default=None, required=False, type=click.Choice(ARCHES), help="The CPU architecture to build for")
@click.option('-B', '--no-build', is_flag=True, default=False, help="Don't try to build packages, just copy and install")
def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bool = False):
"""Build packages, copy to the device via SSH and install them"""
if not paths:
raise Exception("No packages specified")
arch = arch or get_profile_device(hint_or_set_arch=True).arch
if not no_build:
build(paths, False, arch=arch, try_download=True)
repo: dict[str, LocalPackage] = get_kupfer_local(arch=arch, scan=True, in_chroot=False).get_packages()
files = [pkg.resolved_url.split('file://')[1] for pkg in repo.values() if pkg.resolved_url and pkg.name in paths]
logging.debug(f"Sideload: Found package files: {files}")
if not files:
logging.fatal("No packages matched")
return
scp_put_files(files, '/tmp').check_returncode()
run_ssh_command(
[
'sudo',
'pacman',
'-U',
*[os.path.join('/tmp', os.path.basename(file)) for file in files],
'--noconfirm',
"'--overwrite=\\*'",
],
alloc_tty=True,
).check_returncode()
CLEAN_LOCATIONS = ['src', 'pkg', *SRCINFO_CACHE_FILES]
@cmd_packages.command(name='clean')
@click.option('-f', '--force', is_flag=True, default=False, help="Don't prompt for confirmation")
@click.option('-n', '--noop', is_flag=True, default=False, help="Print what would be removed but dont execute")
@click.argument('what', type=click.Choice(['all', 'git', *CLEAN_LOCATIONS]), nargs=-1)
def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = False):
"""
Clean temporary files from PKGBUILDs
Specifying no location defaults to the special value 'all', meaning all regular locations.
There is also the special value 'git' which uses git to clean everything.
Be careful with it, as it means re-downloading sources for your packages.
"""
if noop:
logging.debug('Running in noop mode!')
if force:
logging.debug('Running in FORCE mode!')
what = what or ['all']
logging.debug(f'Clearing {what} from PKGBUILDs')
pkgbuilds = config.get_path('pkgbuilds')
if 'git' in what:
check_programs_wrap(['git'])
warning = "Really reset PKGBUILDs to git state completely?\nThis will erase any untracked changes to your PKGBUILDs directory."
if not (noop or force or click.confirm(warning)):
return
result = git(
[
'clean',
'-dffX' + ('n' if noop else ''),
] + get_kupfer_repo_names(local=True),
dir=pkgbuilds,
)
if result.returncode != 0:
logging.fatal('Failed to git clean')
exit(1)
else:
if 'all' in what:
what = CLEAN_LOCATIONS
what = set(what)
dirs = []
for loc in CLEAN_LOCATIONS:
if loc in what:
logging.info(f'gathering {loc} instances')
dirs += glob(os.path.join(pkgbuilds, '*', '*', loc))
dir_lines = '\n'.join(dirs)
verb = 'Would remove' if noop else 'Removing'
logging.info(verb + ':\n' + dir_lines)
if not (noop or force):
if not click.confirm("Really remove all of these?", default=True):
return
if not noop:
for dir in dirs:
remove_file(dir, recursive=True)
@cmd_packages.command(name='list')
def cmd_list():
"List information about available source packages (PKGBUILDs)"
pkgdir = os.path.join(config.get_path('pkgbuilds'), get_kupfer_repo_names(local=False)[0])
if not os.path.exists(pkgdir):
raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n"
f"Try running `kupferbootstrap packages init` first!")
check_programs_wrap(['makepkg', 'pacman'])
packages = discover_pkgbuilds()
logging.info(f'Done! {len(packages)} Pkgbuilds:')
for name in sorted(packages.keys()):
p = packages[name]
print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; crossdirect: {p.crossdirect} provides: {p.provides}; replaces: {p.replaces};'
f'local_depends: {p.local_depends}; depends: {p.depends}')
@cmd_packages.command(name='check')
@click.argument('paths', nargs=-1)
def cmd_check(paths):
"""Check that specified PKGBUILDs are formatted correctly"""
config.enforce_config_loaded()
check_programs_wrap(['makepkg'])
def check_quoteworthy(s: str) -> bool:
quoteworthy = ['"', "'", "$", " ", ";", "&", "<", ">", "*", "?"]
for symbol in quoteworthy:
if symbol in s:
return True
return False
paths = list(paths) or ['all']
packages = filter_pkgbuilds(paths, allow_empty_results=False)
for package in packages:
name = package.name
is_git_package = False
if name.endswith('-git'):
is_git_package = True
required_arches = ''
provided_arches: list[str] = []
mode_key = '_mode'
nodeps_key = '_nodeps'
crossdirect_key = '_crossdirect'
pkgbase_key = 'pkgbase'
pkgname_key = 'pkgname'
arches_key = '_arches'
arch_key = 'arch'
commit_key = '_commit'
source_key = 'source'
sha256sums_key = 'sha256sums'
required = {
mode_key: True,
nodeps_key: False,
crossdirect_key: False,
pkgbase_key: False,
pkgname_key: True,
'pkgdesc': False,
'pkgver': True,
'pkgrel': True,
arches_key: True,
arch_key: True,
'license': True,
'url': False,
'provides': is_git_package,
'conflicts': False,
'replaces': False,
'depends': False,
'optdepends': False,
'makedepends': False,
'backup': False,
'install': False,
'options': False,
commit_key: is_git_package,
source_key: False,
sha256sums_key: False,
'noextract': False,
}
pkgbuild_path = os.path.join(config.get_path('pkgbuilds'), package.path, 'PKGBUILD')
with open(pkgbuild_path, 'r') as file:
content = file.read()
if '\t' in content:
logging.fatal(f'\\t is not allowed in {pkgbuild_path}')
exit(1)
lines = content.split('\n')
if len(lines) == 0:
logging.fatal(f'Empty {pkgbuild_path}')
exit(1)
line_index = 0
key_index = 0
hold_key = False
key = ""
while True:
line = lines[line_index]
if line.startswith('#'):
line_index += 1
continue
if line.startswith('_') and line.split('=', 1)[0] not in [mode_key, nodeps_key, arches_key, commit_key]:
line_index += 1
continue
formatted = True
next_key = False
next_line = False
reason = ""
if hold_key:
next_line = True
else:
if key_index < len(required):
key = list(required)[key_index]
if line.startswith(key):
if key == pkgbase_key:
required[pkgname_key] = False
if key == source_key:
required[sha256sums_key] = True
next_key = True
next_line = True
elif key in required and not required[key]:
next_key = True
if line == ')':
hold_key = False
next_key = True
if key == arches_key:
required_arches = line.split('=')[1]
if line.endswith('=('):
hold_key = True
if line.startswith(' ') or line == ')':
next_line = True
if line.startswith(' ') and not line.startswith(' '):
formatted = False
reason = 'Multiline variables should be indented with 4 spaces'
if '"' in line and not check_quoteworthy(line):
formatted = False
reason = 'Found literal " although no special character was found in the line to justify the usage of a literal "'
if "'" in line and '"' not in line:
formatted = False
reason = 'Found literal \' although either a literal " or no qoutes should be used'
if ('=(' in line and ' ' in line and '"' not in line and not line.endswith('=(')) or (hold_key and line.endswith(')')):
formatted = False
reason = 'Multiple elements in a list need to be in separate lines'
if formatted and not next_key and not next_line:
if key_index == len(required):
if lines[line_index] == '':
break
else:
formatted = False
reason = 'Expected final emtpy line after all variables'
else:
formatted = False
reason = f'Expected to find "{key}"'
if not formatted:
logging.fatal(f'Formatting error in {pkgbuild_path}: Line {line_index+1}: "{line}"')
if reason != "":
logging.fatal(reason)
exit(1)
if key == arch_key:
if line.endswith(')'):
if line.startswith(f'{arch_key}=('):
check_arches_hint(pkgbuild_path, required_arches, [line[6:-1]])
else:
check_arches_hint(pkgbuild_path, required_arches, provided_arches)
elif line.startswith(' '):
provided_arches.append(line[4:])
if next_key and not hold_key:
key_index += 1
if next_line:
line_index += 1
logging.info(f'{package.path} nicely formatted!')
def check_arches_hint(path: str, required: str, provided: list[str]):
if required == 'all':
for arch in ARCHES:
if arch not in provided:
logging.warning(f'Missing {arch} in arches list in {path}, because _arches hint is `all`')