mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-22 13:15:44 -05:00
496 lines
19 KiB
Python
496 lines
19 KiB
Python
import click
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
from glob import glob
|
|
from typing import Iterable, Optional
|
|
|
|
from config.state import config
|
|
from constants import Arch, ARCHES, SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE, SRCINFO_TARBALL_FILE, SRCINFO_TARBALL_URL
|
|
from exec.cmd import run_cmd, shell_quote, CompletedProcess
|
|
from exec.file import get_temp_dir, makedir, remove_file
|
|
from devices.device import get_profile_device
|
|
from distro.distro import get_kupfer_local, get_kupfer_url, get_kupfer_repo_names
|
|
from distro.package import LocalPackage
|
|
from net.ssh import run_ssh_command, scp_put_files
|
|
from utils import download_file, git, sha256sum
|
|
from wrapper import check_programs_wrap, enforce_wrap
|
|
|
|
from .build import build_packages_by_paths, init_prebuilts
|
|
from .pkgbuild import discover_pkgbuilds, filter_pkgbuilds, get_pkgbuild_dirs, init_pkgbuilds
|
|
|
|
SRCINFO_CACHE_FILES = [SRCINFO_FILE, SRCINFO_INITIALISED_FILE, SRCINFO_METADATA_FILE]
|
|
|
|
|
|
def build(
|
|
paths: Iterable[str],
|
|
force: bool,
|
|
arch: Optional[Arch] = None,
|
|
rebuild_dependants: bool = False,
|
|
try_download: bool = False,
|
|
):
|
|
config.enforce_config_loaded()
|
|
enforce_wrap()
|
|
arch = arch or get_profile_device(hint_or_set_arch=True).arch
|
|
|
|
if arch not in ARCHES:
|
|
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
|
|
|
|
return build_packages_by_paths(
|
|
paths,
|
|
arch,
|
|
force=force,
|
|
rebuild_dependants=rebuild_dependants,
|
|
try_download=try_download,
|
|
enable_crosscompile=config.file.build.crosscompile,
|
|
enable_crossdirect=config.file.build.crossdirect,
|
|
enable_ccache=config.file.build.ccache,
|
|
clean_chroot=config.file.build.clean_mode,
|
|
)
|
|
|
|
|
|
def init_pkgbuild_caches(clean_src_dirs: bool = True, remote_branch: Optional[str] = None):
|
|
|
|
def read_srcinitialised_checksum(src_initialised):
|
|
with open(src_initialised) as fd:
|
|
d = json.load(fd)
|
|
if isinstance(d, dict):
|
|
return d.get('PKGBUILD', '!!!ERROR!!!')
|
|
raise Exception("JSON content not a dictionary!")
|
|
|
|
# get_kupfer_url() resolves repo branch variable in url
|
|
url = get_kupfer_url(url=SRCINFO_TARBALL_URL, branch=remote_branch)
|
|
cachetar = os.path.join(config.get_path('packages'), SRCINFO_TARBALL_FILE)
|
|
makedir(os.path.dirname(cachetar))
|
|
logging.info(f"Updating PKGBUILD caches from {url}" + (", pruning outdated src/ directories" if clean_src_dirs else ""))
|
|
updated = download_file(cachetar, url)
|
|
logging.info("Cache tarball was " + ('downloaded successfully' if updated else 'already up to date'))
|
|
tmpdir = get_temp_dir()
|
|
logging.debug(f"Extracting {cachetar} to {tmpdir}")
|
|
res = run_cmd(['tar', 'xf', cachetar], cwd=tmpdir)
|
|
assert isinstance(res, CompletedProcess)
|
|
if res.returncode:
|
|
raise Exception(f"failed to extract srcinfo cache archive '{cachetar}'")
|
|
pkgbuild_dirs = get_pkgbuild_dirs()
|
|
for pkg in pkgbuild_dirs:
|
|
logging.info(f"{pkg}: analyzing cache")
|
|
pkgdir = os.path.join(config.get_path('pkgbuilds'), pkg)
|
|
srcdir = os.path.join(pkgdir, 'src')
|
|
src_initialised = os.path.join(pkgdir, SRCINFO_INITIALISED_FILE)
|
|
cachedir = os.path.join(tmpdir, pkg)
|
|
pkgbuild_checksum = sha256sum(os.path.join(pkgdir, 'PKGBUILD'))
|
|
copy_files: set[str] = set(SRCINFO_CACHE_FILES)
|
|
if os.path.exists(src_initialised):
|
|
try:
|
|
if read_srcinitialised_checksum(src_initialised) == pkgbuild_checksum:
|
|
copy_files.remove(SRCINFO_INITIALISED_FILE)
|
|
for f in copy_files.copy():
|
|
fpath = os.path.join(pkgdir, f)
|
|
if os.path.exists(fpath):
|
|
copy_files.remove(f)
|
|
if not copy_files:
|
|
logging.info(f"{pkg}: SRCINFO cache already up to date")
|
|
continue
|
|
except Exception as ex:
|
|
logging.warning(f"{pkg}: Something went wrong parsing {SRCINFO_INITIALISED_FILE}, treating as outdated!:\n{ex}")
|
|
if clean_src_dirs and os.path.exists(srcdir):
|
|
logging.info(f"{pkg}: outdated src/ detected, removing")
|
|
remove_file(srcdir, recursive=True)
|
|
remove_file(src_initialised)
|
|
if not os.path.exists(cachedir):
|
|
logging.info(f"{pkg}: not found in remote repo cache, skipping")
|
|
continue
|
|
cache_initialised = os.path.join(cachedir, SRCINFO_INITIALISED_FILE)
|
|
try:
|
|
if read_srcinitialised_checksum(cache_initialised) != pkgbuild_checksum:
|
|
logging.info(f"{pkg}: PKGBUILD checksum differs from remote repo cache, skipping")
|
|
continue
|
|
except Exception as ex:
|
|
logging.warning(f"{pkg}: Failed to parse the remote repo's cached {SRCINFO_INITIALISED_FILE}, skipping!:\n{ex}")
|
|
continue
|
|
if not copy_files:
|
|
continue
|
|
logging.info(f"{pkg}: Copying srcinfo cache from remote repo")
|
|
logging.debug(f'{pkg}: copying {copy_files}')
|
|
copy_files_list = [shell_quote(os.path.join(cachedir, f)) for f in copy_files]
|
|
res = run_cmd(f"cp {' '.join(copy_files_list)} {shell_quote(pkgdir)}/")
|
|
assert isinstance(res, CompletedProcess)
|
|
if res.returncode:
|
|
raise Exception(f"{pkg}: failed to copy cache contents from {cachedir}")
|
|
|
|
|
|
non_interactive_flag = click.option('--non-interactive', is_flag=True)
|
|
init_caches_flag = click.option(
|
|
'--init-caches/--no-init-caches',
|
|
is_flag=True,
|
|
default=True,
|
|
show_default=True,
|
|
help="Fill PKGBUILDs caches from HTTPS repo where checksums match",
|
|
)
|
|
remove_outdated_src_flag = click.option(
|
|
'--clean-src-dirs/--no-clean-src-dirs',
|
|
is_flag=True,
|
|
default=True,
|
|
show_default=True,
|
|
help="Remove outdated src/ directories to avoid problems",
|
|
)
|
|
switch_branch_flag = click.option('--switch-branch', is_flag=True, help="Force the branch to be corrected even in non-interactive mode")
|
|
discard_changes_flag = click.option('--discard-changes', is_flag=True, help="When switching branches, discard any locally changed conflicting files")
|
|
|
|
|
|
@click.group(name='packages')
|
|
def cmd_packages():
|
|
"""Build and manage packages and PKGBUILDs"""
|
|
|
|
|
|
@cmd_packages.command(name='update')
|
|
@non_interactive_flag
|
|
@init_caches_flag
|
|
@switch_branch_flag
|
|
@discard_changes_flag
|
|
@remove_outdated_src_flag
|
|
def cmd_update(
|
|
non_interactive: bool = False,
|
|
init_caches: bool = False,
|
|
clean_src_dirs: bool = True,
|
|
switch_branch: bool = False,
|
|
discard_changes: bool = False,
|
|
):
|
|
"""Update PKGBUILDs git repo"""
|
|
enforce_wrap()
|
|
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=True, switch_branch=switch_branch, discard_changes=discard_changes)
|
|
if init_caches:
|
|
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
|
|
logging.info("Refreshing outdated SRCINFO caches")
|
|
discover_pkgbuilds(lazy=False)
|
|
|
|
|
|
@cmd_packages.command(name='init')
|
|
@non_interactive_flag
|
|
@init_caches_flag
|
|
@switch_branch_flag
|
|
@discard_changes_flag
|
|
@remove_outdated_src_flag
|
|
@click.option('-u', '--update', is_flag=True, help='Use git pull to update the PKGBUILDs')
|
|
def cmd_init(
|
|
non_interactive: bool = False,
|
|
init_caches: bool = True,
|
|
clean_src_dirs: bool = True,
|
|
switch_branch: bool = False,
|
|
discard_changes: bool = False,
|
|
update: bool = False,
|
|
):
|
|
"Ensure PKGBUILDs git repo is checked out locally"
|
|
init_pkgbuilds(interactive=not non_interactive, lazy=False, update=update, switch_branch=switch_branch, discard_changes=discard_changes)
|
|
if init_caches:
|
|
init_pkgbuild_caches(clean_src_dirs=clean_src_dirs)
|
|
for arch in ARCHES:
|
|
init_prebuilts(arch)
|
|
|
|
|
|
@cmd_packages.command(name='build')
|
|
@click.option('--force', is_flag=True, default=False, help='Rebuild even if package is already built')
|
|
@click.option('--arch', default=None, required=False, type=click.Choice(ARCHES), help="The CPU architecture to build for")
|
|
@click.option('--rebuild-dependants', is_flag=True, default=False, help='Rebuild packages that depend on packages that will be [re]built')
|
|
@click.option('--no-download', is_flag=True, default=False, help="Don't try downloading packages from online repos before building")
|
|
@click.argument('paths', nargs=-1)
|
|
def cmd_build(paths: list[str], force=False, arch: Optional[Arch] = None, rebuild_dependants: bool = False, no_download: bool = False):
|
|
"""
|
|
Build packages (and dependencies) by paths as required.
|
|
|
|
The paths are specified relative to the PKGBUILDs dir, eg. "cross/crossdirect".
|
|
|
|
Multiple paths may be specified as separate arguments.
|
|
|
|
Packages that aren't built already will be downloaded from HTTPS repos unless --no-download is passed,
|
|
if an exact version match exists on the server.
|
|
"""
|
|
build(paths, force, arch=arch, rebuild_dependants=rebuild_dependants, try_download=not no_download)
|
|
|
|
|
|
@cmd_packages.command(name='sideload')
|
|
@click.argument('paths', nargs=-1)
|
|
@click.option('--arch', default=None, required=False, type=click.Choice(ARCHES), help="The CPU architecture to build for")
|
|
@click.option('-B', '--no-build', is_flag=True, default=False, help="Don't try to build packages, just copy and install")
|
|
def cmd_sideload(paths: Iterable[str], arch: Optional[Arch] = None, no_build: bool = False):
|
|
"""Build packages, copy to the device via SSH and install them"""
|
|
if not paths:
|
|
raise Exception("No packages specified")
|
|
arch = arch or get_profile_device(hint_or_set_arch=True).arch
|
|
if not no_build:
|
|
build(paths, False, arch=arch, try_download=True)
|
|
repo: dict[str, LocalPackage] = get_kupfer_local(arch=arch, scan=True, in_chroot=False).get_packages()
|
|
files = [pkg.resolved_url.split('file://')[1] for pkg in repo.values() if pkg.resolved_url and pkg.name in paths]
|
|
logging.debug(f"Sideload: Found package files: {files}")
|
|
if not files:
|
|
logging.fatal("No packages matched")
|
|
return
|
|
scp_put_files(files, '/tmp').check_returncode()
|
|
run_ssh_command(
|
|
[
|
|
'sudo',
|
|
'pacman',
|
|
'-U',
|
|
*[os.path.join('/tmp', os.path.basename(file)) for file in files],
|
|
'--noconfirm',
|
|
"'--overwrite=\\*'",
|
|
],
|
|
alloc_tty=True,
|
|
).check_returncode()
|
|
|
|
|
|
CLEAN_LOCATIONS = ['src', 'pkg', *SRCINFO_CACHE_FILES]
|
|
|
|
|
|
@cmd_packages.command(name='clean')
|
|
@click.option('-f', '--force', is_flag=True, default=False, help="Don't prompt for confirmation")
|
|
@click.option('-n', '--noop', is_flag=True, default=False, help="Print what would be removed but dont execute")
|
|
@click.argument('what', type=click.Choice(['all', 'git', *CLEAN_LOCATIONS]), nargs=-1)
|
|
def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = False):
|
|
"""
|
|
Clean temporary files from PKGBUILDs
|
|
|
|
Specifying no location defaults to the special value 'all', meaning all regular locations.
|
|
|
|
There is also the special value 'git' which uses git to clean everything.
|
|
Be careful with it, as it means re-downloading sources for your packages.
|
|
"""
|
|
if noop:
|
|
logging.debug('Running in noop mode!')
|
|
if force:
|
|
logging.debug('Running in FORCE mode!')
|
|
what = what or ['all']
|
|
logging.debug(f'Clearing {what} from PKGBUILDs')
|
|
pkgbuilds = config.get_path('pkgbuilds')
|
|
if 'git' in what:
|
|
check_programs_wrap(['git'])
|
|
warning = "Really reset PKGBUILDs to git state completely?\nThis will erase any untracked changes to your PKGBUILDs directory."
|
|
if not (noop or force or click.confirm(warning)):
|
|
return
|
|
result = git(
|
|
[
|
|
'clean',
|
|
'-dffX' + ('n' if noop else ''),
|
|
] + get_kupfer_repo_names(local=True),
|
|
dir=pkgbuilds,
|
|
)
|
|
if result.returncode != 0:
|
|
logging.fatal('Failed to git clean')
|
|
exit(1)
|
|
else:
|
|
if 'all' in what:
|
|
what = CLEAN_LOCATIONS
|
|
what = set(what)
|
|
dirs = []
|
|
for loc in CLEAN_LOCATIONS:
|
|
if loc in what:
|
|
logging.info(f'gathering {loc} instances')
|
|
dirs += glob(os.path.join(pkgbuilds, '*', '*', loc))
|
|
|
|
dir_lines = '\n'.join(dirs)
|
|
verb = 'Would remove' if noop else 'Removing'
|
|
logging.info(verb + ':\n' + dir_lines)
|
|
|
|
if not (noop or force):
|
|
if not click.confirm("Really remove all of these?", default=True):
|
|
return
|
|
|
|
if not noop:
|
|
for dir in dirs:
|
|
remove_file(dir, recursive=True)
|
|
|
|
|
|
@cmd_packages.command(name='list')
|
|
def cmd_list():
|
|
"List information about available source packages (PKGBUILDs)"
|
|
pkgdir = os.path.join(config.get_path('pkgbuilds'), get_kupfer_repo_names(local=False)[0])
|
|
if not os.path.exists(pkgdir):
|
|
raise Exception(f"PKGBUILDs seem not to be initialised yet: {pkgdir} doesn't exist!\n"
|
|
f"Try running `kupferbootstrap packages init` first!")
|
|
check_programs_wrap(['makepkg', 'pacman'])
|
|
packages = discover_pkgbuilds()
|
|
logging.info(f'Done! {len(packages)} Pkgbuilds:')
|
|
for name in sorted(packages.keys()):
|
|
p = packages[name]
|
|
print(f'name: {p.name}; ver: {p.version}; mode: {p.mode}; crossdirect: {p.crossdirect} provides: {p.provides}; replaces: {p.replaces};'
|
|
f'local_depends: {p.local_depends}; depends: {p.depends}')
|
|
|
|
|
|
@cmd_packages.command(name='check')
|
|
@click.argument('paths', nargs=-1)
|
|
def cmd_check(paths):
|
|
"""Check that specified PKGBUILDs are formatted correctly"""
|
|
config.enforce_config_loaded()
|
|
check_programs_wrap(['makepkg'])
|
|
|
|
def check_quoteworthy(s: str) -> bool:
|
|
quoteworthy = ['"', "'", "$", " ", ";", "&", "<", ">", "*", "?"]
|
|
for symbol in quoteworthy:
|
|
if symbol in s:
|
|
return True
|
|
return False
|
|
|
|
paths = list(paths) or ['all']
|
|
packages = filter_pkgbuilds(paths, allow_empty_results=False)
|
|
|
|
for package in packages:
|
|
name = package.name
|
|
|
|
is_git_package = False
|
|
if name.endswith('-git'):
|
|
is_git_package = True
|
|
|
|
required_arches = ''
|
|
provided_arches: list[str] = []
|
|
|
|
mode_key = '_mode'
|
|
nodeps_key = '_nodeps'
|
|
crossdirect_key = '_crossdirect'
|
|
pkgbase_key = 'pkgbase'
|
|
pkgname_key = 'pkgname'
|
|
arches_key = '_arches'
|
|
arch_key = 'arch'
|
|
commit_key = '_commit'
|
|
source_key = 'source'
|
|
sha256sums_key = 'sha256sums'
|
|
required = {
|
|
mode_key: True,
|
|
nodeps_key: False,
|
|
crossdirect_key: False,
|
|
pkgbase_key: False,
|
|
pkgname_key: True,
|
|
'pkgdesc': False,
|
|
'pkgver': True,
|
|
'pkgrel': True,
|
|
arches_key: True,
|
|
arch_key: True,
|
|
'license': True,
|
|
'url': False,
|
|
'provides': is_git_package,
|
|
'conflicts': False,
|
|
'replaces': False,
|
|
'depends': False,
|
|
'optdepends': False,
|
|
'makedepends': False,
|
|
'backup': False,
|
|
'install': False,
|
|
'options': False,
|
|
commit_key: is_git_package,
|
|
source_key: False,
|
|
sha256sums_key: False,
|
|
'noextract': False,
|
|
}
|
|
pkgbuild_path = os.path.join(config.get_path('pkgbuilds'), package.path, 'PKGBUILD')
|
|
with open(pkgbuild_path, 'r') as file:
|
|
content = file.read()
|
|
if '\t' in content:
|
|
logging.fatal(f'\\t is not allowed in {pkgbuild_path}')
|
|
exit(1)
|
|
lines = content.split('\n')
|
|
if len(lines) == 0:
|
|
logging.fatal(f'Empty {pkgbuild_path}')
|
|
exit(1)
|
|
line_index = 0
|
|
key_index = 0
|
|
hold_key = False
|
|
key = ""
|
|
while True:
|
|
line = lines[line_index]
|
|
|
|
if line.startswith('#'):
|
|
line_index += 1
|
|
continue
|
|
|
|
if line.startswith('_') and line.split('=', 1)[0] not in [mode_key, nodeps_key, arches_key, commit_key]:
|
|
line_index += 1
|
|
continue
|
|
|
|
formatted = True
|
|
next_key = False
|
|
next_line = False
|
|
reason = ""
|
|
|
|
if hold_key:
|
|
next_line = True
|
|
else:
|
|
if key_index < len(required):
|
|
key = list(required)[key_index]
|
|
if line.startswith(key):
|
|
if key == pkgbase_key:
|
|
required[pkgname_key] = False
|
|
if key == source_key:
|
|
required[sha256sums_key] = True
|
|
next_key = True
|
|
next_line = True
|
|
elif key in required and not required[key]:
|
|
next_key = True
|
|
|
|
if line == ')':
|
|
hold_key = False
|
|
next_key = True
|
|
|
|
if key == arches_key:
|
|
required_arches = line.split('=')[1]
|
|
|
|
if line.endswith('=('):
|
|
hold_key = True
|
|
|
|
if line.startswith(' ') or line == ')':
|
|
next_line = True
|
|
|
|
if line.startswith(' ') and not line.startswith(' '):
|
|
formatted = False
|
|
reason = 'Multiline variables should be indented with 4 spaces'
|
|
|
|
if '"' in line and not check_quoteworthy(line):
|
|
formatted = False
|
|
reason = 'Found literal " although no special character was found in the line to justify the usage of a literal "'
|
|
|
|
if "'" in line and '"' not in line:
|
|
formatted = False
|
|
reason = 'Found literal \' although either a literal " or no qoutes should be used'
|
|
|
|
if ('=(' in line and ' ' in line and '"' not in line and not line.endswith('=(')) or (hold_key and line.endswith(')')):
|
|
formatted = False
|
|
reason = 'Multiple elements in a list need to be in separate lines'
|
|
|
|
if formatted and not next_key and not next_line:
|
|
if key_index == len(required):
|
|
if lines[line_index] == '':
|
|
break
|
|
else:
|
|
formatted = False
|
|
reason = 'Expected final emtpy line after all variables'
|
|
else:
|
|
formatted = False
|
|
reason = f'Expected to find "{key}"'
|
|
|
|
if not formatted:
|
|
logging.fatal(f'Formatting error in {pkgbuild_path}: Line {line_index+1}: "{line}"')
|
|
if reason != "":
|
|
logging.fatal(reason)
|
|
exit(1)
|
|
|
|
if key == arch_key:
|
|
if line.endswith(')'):
|
|
if line.startswith(f'{arch_key}=('):
|
|
check_arches_hint(pkgbuild_path, required_arches, [line[6:-1]])
|
|
else:
|
|
check_arches_hint(pkgbuild_path, required_arches, provided_arches)
|
|
elif line.startswith(' '):
|
|
provided_arches.append(line[4:])
|
|
|
|
if next_key and not hold_key:
|
|
key_index += 1
|
|
if next_line:
|
|
line_index += 1
|
|
|
|
logging.info(f'{package.path} nicely formatted!')
|
|
|
|
|
|
def check_arches_hint(path: str, required: str, provided: list[str]):
|
|
if required == 'all':
|
|
for arch in ARCHES:
|
|
if arch not in provided:
|
|
logging.warning(f'Missing {arch} in arches list in {path}, because _arches hint is `all`')
|