kupferbootstrap/packages/pkgbuild.py

350 lines
12 KiB
Python
Raw Normal View History

from __future__ import annotations
import click
import logging
import multiprocessing
import os
import subprocess
from joblib import Parallel, delayed
2022-08-26 03:28:17 +02:00
from typing import Optional
from config import config, ConfigStateHolder
from constants import REPOSITORIES
2022-08-15 17:41:23 +02:00
from exec.cmd import run_cmd
from constants import Arch, MAKEPKG_CMD
2022-02-20 20:20:34 +01:00
from distro.package import PackageInfo
from logger import setup_logging
from utils import git
from wrapper import check_programs_wrap
def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=False, update=True):
check_programs_wrap(['git'])
git_dir = os.path.join(pkgbuilds_dir, '.git')
if not os.path.exists(git_dir):
logging.info('Cloning branch {branch} from {repo}')
result = git(['clone', '-b', branch, repo_url, pkgbuilds_dir])
if result.returncode != 0:
raise Exception('Error cloning pkgbuilds')
else:
result = git(['--git-dir', git_dir, 'branch', '--show-current'], capture_output=True)
current_branch = result.stdout.decode().strip()
if current_branch != branch:
logging.warning(f'pkgbuilds repository is on the wrong branch: {current_branch}, requested: {branch}')
if interactive and click.confirm('Would you like to switch branches?', default=False):
result = git(['switch', branch], dir=pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed switching branches')
if update:
if interactive:
if not click.confirm('Would you like to try updating the PKGBUILDs repo?'):
return
result = git(['pull'], pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed to update pkgbuilds')
def init_pkgbuilds(interactive=False):
pkgbuilds_dir = config.get_path('pkgbuilds')
repo_url = config.file.pkgbuilds.git_repo
branch = config.file.pkgbuilds.git_branch
clone_pkbuilds(pkgbuilds_dir, repo_url, branch, interactive=interactive, update=False)
class Pkgbuild(PackageInfo):
name: str
version: str
arches: list[Arch]
depends: list[str]
provides: list[str]
replaces: list[str]
local_depends: list[str]
repo: str
mode: str
path: str
pkgver: str
pkgrel: str
2022-08-26 03:28:17 +02:00
sources_refreshed: bool
def __init__(
self,
relative_path: str,
arches: list[Arch] = [],
depends: list[str] = [],
provides: list[str] = [],
2022-02-17 06:08:31 +01:00
replaces: list[str] = [],
repo: Optional[str] = None,
2022-08-26 03:28:17 +02:00
sources_refreshed: bool = False,
) -> None:
"""
Create new Pkgbuild representation for file located at `{relative_path}/PKGBUILD`.
`relative_path` will be stored in `self.path`.
"""
self.name = os.path.basename(relative_path)
self.version = ''
self.arches = list(arches)
self.depends = list(depends)
self.provides = list(provides)
self.replaces = list(replaces)
self.local_depends = []
self.repo = repo or ''
self.mode = ''
self.path = relative_path
self.pkgver = ''
self.pkgrel = ''
2022-08-26 03:28:17 +02:00
self.sources_refreshed = sources_refreshed
def __repr__(self):
2022-08-26 03:28:17 +02:00
return ','.join([
'Pkgbuild(' + self.name,
repr(self.path),
self.version + ("🔄" if self.sources_refreshed else ""),
self.mode + ')',
])
def names(self):
return list(set([self.name] + self.provides + self.replaces))
def update_version(self):
"""updates `self.version` from `self.pkgver` and `self.pkgrel`"""
self.version = f'{self.pkgver}-{self.pkgrel}'
def update(self, pkg: Pkgbuild):
self.version = pkg.version
self.arches = list(pkg.arches)
self.depends = list(pkg.depends)
self.provides = list(pkg.provides)
self.replaces = list(pkg.replaces)
self.local_depends = list(pkg.local_depends)
self.repo = pkg.repo
self.mode = pkg.mode
self.path = pkg.path
self.pkgver = pkg.pkgver
self.pkgrel = pkg.pkgrel
2022-08-26 03:28:17 +02:00
self.sources_refreshed = self.sources_refreshed or pkg.sources_refreshed
self.update_version()
2022-08-26 03:28:17 +02:00
def refresh_sources(self):
raise NotImplementedError()
class Pkgbase(Pkgbuild):
2022-08-26 03:28:17 +02:00
subpackages: list[SubPkgbuild]
2022-08-26 03:28:17 +02:00
def __init__(self, relative_path: str, subpackages: list[SubPkgbuild] = [], **args):
self.subpackages = list(subpackages)
super().__init__(relative_path, **args)
def update(self, pkg: Pkgbuild):
if not isinstance(pkg, Pkgbase):
raise Exception(f"Tried to update pkgbase {self.name} with non-base pkg {pkg}")
Pkgbuild.update(self, pkg)
sub_dict = {p.name: p for p in self.subpackages}
self.subpackages.clear()
for new_pkg in pkg.subpackages:
name = new_pkg.name
if name not in sub_dict:
sub_dict[name] = new_pkg
else:
sub_dict[name].update(new_pkg)
updated = sub_dict[name]
2022-08-26 03:28:17 +02:00
updated.sources_refreshed = self.sources_refreshed
self.subpackages.append(updated)
2022-08-26 03:28:17 +02:00
def refresh_sources(self, lazy: bool = True):
'''
Reloads the pkgbuild from disk.
Does **NOT** actually perform the makepkg action to refresh the pkgver() first!
'''
if lazy and self.sources_refreshed:
return
parsed = parse_pkgbuild(self.path, sources_refreshed=True)
basepkgs = [p for p in parsed if isinstance(p, Pkgbase)]
if not len(basepkgs) == 1:
raise Exception(f"error refreshing {self.name}: wrong number of base packages found: {basepkgs}")
self.sources_refreshed = True
self.update(basepkgs[0])
class SubPkgbuild(Pkgbuild):
pkgbase: Pkgbase
def __init__(self, name: str, pkgbase: Pkgbase):
self.name = name
self.pkgbase = pkgbase
2022-08-26 03:28:17 +02:00
self.sources_refreshed = False
self.update(pkgbase)
self.provides = []
self.replaces = []
2022-08-26 03:28:17 +02:00
def refresh_sources(self, lazy: bool = True):
assert self.pkgbase
self.pkgbase.refresh_sources(lazy=lazy)
2022-08-26 03:28:17 +02:00
def parse_pkgbuild(relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None, sources_refreshed: bool = False) -> list[Pkgbuild]:
"""
Since function may run in a different subprocess, we need to be passed the config via parameter
"""
global config
if _config:
config = _config
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different thread needs log setup.
2022-08-15 07:06:03 +02:00
logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}")
pkgbuilds_dir = config.get_path('pkgbuilds')
pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir)
filename = os.path.join(pkgdir, 'PKGBUILD')
logging.debug(f"Parsing {filename}")
mode = None
with open(filename, 'r') as file:
for line in file.read().split('\n'):
if line.startswith('_mode='):
mode = line.split('=')[1]
break
if mode not in ['host', 'cross']:
raise Exception((f'{relative_pkg_dir}/PKGBUILD has {"no" if mode is None else "an invalid"} mode configured') +
(f': "{mode}"' if mode is not None else ''))
2022-08-26 03:28:17 +02:00
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed)
base_package.mode = mode
base_package.repo = relative_pkg_dir.split('/')[0]
srcinfo = run_cmd(
MAKEPKG_CMD + ['--printsrcinfo'],
cwd=pkgdir,
stdout=subprocess.PIPE,
)
2022-02-18 06:32:04 +01:00
assert (isinstance(srcinfo, subprocess.CompletedProcess))
lines = srcinfo.stdout.decode('utf-8').split('\n')
current: Pkgbuild = base_package
multi_pkgs = False
for line_raw in lines:
line = line_raw.strip()
if not line:
continue
splits = line.split(' = ')
if line.startswith('pkgbase'):
base_package.name = splits[1]
multi_pkgs = True
elif line.startswith('pkgname'):
if multi_pkgs:
current = SubPkgbuild(splits[1], base_package)
assert isinstance(base_package.subpackages, list)
base_package.subpackages.append(current)
else:
current.name = splits[1]
elif line.startswith('pkgver'):
current.pkgver = splits[1]
elif line.startswith('pkgrel'):
current.pkgrel = splits[1]
elif line.startswith('arch'):
current.arches.append(splits[1])
elif line.startswith('provides'):
current.provides.append(splits[1])
2022-02-17 06:08:31 +01:00
elif line.startswith('replaces'):
current.replaces.append(splits[1])
elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'):
current.depends.append(splits[1].split('=')[0].split(': ')[0])
2022-08-26 03:28:17 +02:00
results: list[Pkgbuild] = list(base_package.subpackages)
if len(results) > 1:
logging.debug(f" Split package detected: {base_package.name}: {results}")
base_package.update_version()
else:
results = [base_package]
for pkg in results:
assert isinstance(pkg, Pkgbuild)
pkg.depends = list(set(pkg.depends)) # deduplicate dependencies
pkg.update_version()
if not (pkg.version == base_package.version):
raise Exception(f'Subpackage malformed! Versions differ! base: {base_package}, subpackage: {pkg}')
return results
_pkgbuilds_cache = dict[str, Pkgbuild]()
_pkgbuilds_paths = dict[str, list[Pkgbuild]]()
_pkgbuilds_scanned: bool = False
2022-08-26 03:28:17 +02:00
def get_pkgbuild_by_path(relative_path: str, lazy: bool = True, _config: Optional[ConfigStateHolder] = None) -> list[Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_paths
if lazy and relative_path in _pkgbuilds_paths:
return _pkgbuilds_paths[relative_path]
parsed = parse_pkgbuild(relative_path, _config=_config)
_pkgbuilds_paths[relative_path] = parsed
for pkg in parsed:
_pkgbuilds_cache[pkg.name] = pkg
return parsed
def discover_pkgbuilds(parallel: bool = True, lazy: bool = True) -> dict[str, Pkgbuild]:
global _pkgbuilds_cache, _pkgbuilds_scanned
if lazy and _pkgbuilds_scanned:
logging.debug("Reusing cached pkgbuilds repo")
return _pkgbuilds_cache.copy()
pkgbuilds_dir = config.get_path('pkgbuilds')
packages: dict[str, Pkgbuild] = {}
paths = []
init_pkgbuilds(interactive=False)
for repo in REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
paths.append(os.path.join(repo, dir))
logging.info("Parsing PKGBUILDs")
results = []
if parallel:
paths_filtered = paths
if lazy:
# filter out cached packages as the caches don't cross process boundaries
paths_filtered = []
for p in paths:
if p in _pkgbuilds_paths:
# use cache
results += _pkgbuilds_paths[p]
else:
paths_filtered += [p]
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(
delayed(get_pkgbuild_by_path)(path, lazy=lazy, _config=config) for path in paths_filtered))
else:
chunks = (get_pkgbuild_by_path(path, lazy=lazy) for path in paths)
_pkgbuilds_paths.clear()
# one list of packages per path
for pkglist in chunks:
_pkgbuilds_paths[pkglist[0].path] = pkglist
results += pkglist
logging.debug('Building package dictionary!')
for package in results:
for name in [package.name] + package.replaces:
if name in packages:
logging.warning(f'Overriding {packages[package.name]} with {package}')
packages[name] = package
# This filters the deps to only include the ones that are provided in this repo
for package in packages.values():
package.local_depends = package.depends.copy()
for dep in package.depends.copy():
found = dep in packages
2022-08-26 03:28:17 +02:00
for pkg in packages.values():
if found:
break
2022-08-26 03:28:17 +02:00
if dep in pkg.names():
logging.debug(f'Found {pkg.name} that provides {dep}')
found = True
break
if not found:
logging.debug(f'Removing {dep} from dependencies')
package.local_depends.remove(dep)
_pkgbuilds_cache.clear()
_pkgbuilds_cache.update(packages)
_pkgbuilds_scanned = True
return packages