diff --git a/packages/pkgbuild.py b/packages/pkgbuild.py index 31352d4..3e0fbab 100644 --- a/packages/pkgbuild.py +++ b/packages/pkgbuild.py @@ -1,25 +1,23 @@ from __future__ import annotations import click -import json import logging import multiprocessing import os -import subprocess from joblib import Parallel, delayed from typing import Iterable, Optional from config import config, ConfigStateHolder from constants import REPOSITORIES -from dataclass import DataClass -from exec.cmd import run_cmd -from constants import Arch, MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE +from constants import Arch from distro.package import PackageInfo from logger import setup_logging -from utils import git, sha256sum +from utils import git from wrapper import check_programs_wrap +from .srcinfo_cache import SrcinfoMetaFile + def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=False, update=True): check_programs_wrap(['git']) @@ -42,7 +40,7 @@ def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=F if interactive: if not click.confirm('Would you like to try updating the PKGBUILDs repo?'): return - result = git(['pull'], pkgbuilds_dir) + result = git(['pull'], dir=pkgbuilds_dir) if result.returncode != 0: raise Exception('failed to update pkgbuilds') @@ -205,16 +203,6 @@ class SubPkgbuild(Pkgbuild): self.pkgbase.refresh_sources(lazy=lazy) -class SrcinfoMetaFile(DataClass): - - class SrcinfoMetaChecksums(DataClass): - PKGBUILD: str - SRCINFO: str - - checksums: SrcinfoMetaChecksums - build_mode: str - - def parse_pkgbuild( relative_pkg_dir: str, _config: Optional[ConfigStateHolder] = None, @@ -228,74 +216,20 @@ def parse_pkgbuild( if _config: config = _config setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup. - logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}") - pkgbuilds_dir = config.get_path('pkgbuilds') - pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir) - filename = os.path.join(pkgdir, 'PKGBUILD') - mode = None + logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}") - srcinfo_file = os.path.join(pkgdir, SRCINFO_FILE) - srcinfo_meta_file = os.path.join(pkgdir, SRCINFO_METADATA_FILE) - refresh = force_refresh_srcinfo or not os.path.exists(srcinfo_meta_file) - if not refresh and not os.path.exists(srcinfo_meta_file): - logging.debug(f"{relative_pkg_dir}: {SRCINFO_METADATA_FILE} doesn't exist, running makepkg --printsrcinfo") - refresh = True - # parse metadata (mostly checksums) - if not refresh: - try: - with open(srcinfo_meta_file, 'r') as meta_fd: - metadata_raw = json.load(meta_fd) - metadata = SrcinfoMetaFile.fromDict(metadata_raw, validate=True) - except Exception as ex: - logging.debug(f"{relative_pkg_dir}: something went wrong parsing json from {srcinfo_meta_file}," - f"running makepkg instead instead: {ex}") - refresh = True - # validate checksums - if not refresh: - assert metadata and metadata.checksums - for filename, checksum in metadata.checksums.items(): - file_sum = sha256sum(os.path.join(pkgdir, filename)) - if file_sum != checksum: - logging.debug(f"{relative_pkg_dir}: Checksums for {filename} don't match") - refresh = True - break - # checksums are valid! - logging.debug(f'{relative_pkg_dir}: srcinfo cache hit!') - mode = metadata.build_mode - with open(srcinfo_file, 'r') as srcinfo_fd: - lines = srcinfo_fd.read().split('\n') + if force_refresh_srcinfo: + logging.info('force-refreshing SRCINFOs') + # parse SRCINFO cache metadata and get correct SRCINFO lines + srcinfo_cache, lines = SrcinfoMetaFile.handle_directory(relative_pkg_dir, force_refresh=force_refresh_srcinfo, write=True) + assert lines and srcinfo_cache + assert 'build_mode' in srcinfo_cache + mode = srcinfo_cache.build_mode + if mode not in ['host', 'cross']: + err = 'an invalid' if mode is not None else 'no' + err_end = f": {repr(mode)}" if mode is not None else "." + raise Exception(f'{relative_pkg_dir}/PKGBUILD has {err} mode configured{err_end}') - # do the actual refresh - if refresh: - logging.debug(f"Parsing {filename}") - with open(filename, 'r') as file: - for line in file.read().split('\n'): - if line.startswith('_mode='): - mode = line.split('=')[1] - break - if mode not in ['host', 'cross']: - err = 'an invalid' if mode else 'no' - raise Exception(f'{relative_pkg_dir}/PKGBUILD has {err} mode configured' + (f": {repr(mode)}" if mode is not None else "")) - - srcinfo_proc = run_cmd( - MAKEPKG_CMD + ['--printsrcinfo'], - cwd=pkgdir, - stdout=subprocess.PIPE, - ) - assert (isinstance(srcinfo_proc, subprocess.CompletedProcess)) - output = srcinfo_proc.stdout.decode('utf-8') - lines = output.split('\n') - with open(srcinfo_file, 'w') as srcinfo_fd: - srcinfo_fd.write(output) - checksums = {os.path.basename(p): sha256sum(p) for p in [filename, srcinfo_file]} - metadata = SrcinfoMetaFile.fromDict({ - 'build_mode': mode, - 'checksums': checksums, - }, validate=True) - with open(srcinfo_meta_file, 'w') as meta_fd: - json.dump(metadata, meta_fd) - - assert mode base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed) base_package.mode = mode base_package.repo = relative_pkg_dir.split('/')[0] @@ -392,7 +326,7 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True, repositories: O continue paths.append(p) - logging.info("Parsing PKGBUILDs") + logging.info(f"Discovering PKGBUILDs{f' in repositories: {repositories}' if repositories else ''}") results = [] if parallel: diff --git a/packages/srcinfo_cache.py b/packages/srcinfo_cache.py new file mode 100644 index 0000000..bb530e2 --- /dev/null +++ b/packages/srcinfo_cache.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import json +import logging +import os +import subprocess + +from typing import Optional + +from config import config +from constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE +from dataclass import DataClass +from exec.cmd import run_cmd +from utils import sha256sum + +SRCINFO_CHECKSUM_FILES = ['PKGBUILD', SRCINFO_FILE] + + +class SrcinfoMetaFile(DataClass): + + class Checksums(DataClass): + PKGBUILD: str + SRCINFO: str + + checksums: Checksums + build_mode: Optional[str] + + _relative_path: str + + @staticmethod + def parse_existing(relative_pkg_dir: str) -> SrcinfoMetaFile: + 'tries to parse the srcinfo_meta.json file in the specified pkgbuild dir' + pkgdir = os.path.join(config.get_path('pkgbuilds'), relative_pkg_dir) + srcinfo_meta_file = os.path.join(pkgdir, SRCINFO_METADATA_FILE) + if not os.path.exists(srcinfo_meta_file): + raise Exception(f"{relative_pkg_dir}: {SRCINFO_METADATA_FILE} doesn't exist") + with open(srcinfo_meta_file, 'r') as meta_fd: + metadata_raw = json.load(meta_fd) | {'_relative_path': relative_pkg_dir} + return SrcinfoMetaFile.fromDict(metadata_raw, validate=True) + + @staticmethod + def generate_new(relative_pkg_dir: str, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]: + 'Creates a new SrcinfoMetaFile object with checksums, creating a SRCINFO as necessary' + s = SrcinfoMetaFile({'_relative_path': relative_pkg_dir, 'build_mode': '', 'checksums': {}}, validate=True) + return s, s.refresh_all() + + @staticmethod + def handle_directory(relative_pkg_dir: str, force_refresh: bool = False, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]: + lines = None + # try reading existing cache metadata + try: + metadata = SrcinfoMetaFile.parse_existing(relative_pkg_dir) + except Exception as ex: + logging.debug(f"{relative_pkg_dir}: something went wrong parsing json from {SRCINFO_METADATA_FILE}," + f"running `makepkg --printsrcinfo` instead instead: {ex}") + return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write) + # if for whatever reason only the SRCINFO got deleted but PKGBUILD has not been modified, + # we do want the checksum verification to work. So regenerate SRCINFO first. + if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_pkg_dir, SRCINFO_FILE)): + lines = metadata.refresh_srcinfo() + if not metadata.validate_checksums(): + # metadata is invalid + return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write) + # metadata is valid + assert metadata + if not force_refresh: + logging.debug(f'{metadata._relative_path}: srcinfo checksums match!') + lines = lines or metadata.read_srcinfo_file() + else: + lines = metadata.refresh_all(write=write) + return metadata, lines + + def refresh_checksums(self): + pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) + checksums = self.Checksums({p: sha256sum(os.path.join(pkgdir, p)) for p in SRCINFO_CHECKSUM_FILES}) + if 'checksums' not in self or self.checksums is None: + self['checksums'] = checksums + else: + self.checksums.clear() + self.checksums.update(checksums) + + def refresh_build_mode(self): + self['build_mode'] = None + with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file: + lines = file.read().split('\n') + for line in lines: + if line.startswith('_mode='): + self.build_mode = line.split('=', 1)[1].strip("\"'") + return + + def refresh_srcinfo(self) -> list[str]: + 'Run `makepkg --printsrcinfo` to create an updated SRCINFO file and return the lines from it' + logging.info(f"{self._relative_path}: Generating SRCINFO with makepkg") + pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) + srcinfo_file = os.path.join(pkgdir, SRCINFO_FILE) + sproc = run_cmd( + MAKEPKG_CMD + ['--printsrcinfo'], + cwd=pkgdir, + stdout=subprocess.PIPE, + ) + assert (isinstance(sproc, subprocess.CompletedProcess)) + if sproc.returncode: + raise Exception(f"{self._relative_path}: makepkg failed to parse the PKGBUILD! Error code: {sproc.returncode}") + output = sproc.stdout.decode('utf-8') + with open(srcinfo_file, 'w') as srcinfo_fd: + srcinfo_fd.write(output) + return output.split('\n') + + def read_srcinfo_file(self) -> list[str]: + with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, SRCINFO_FILE), 'r') as srcinfo_fd: + lines = srcinfo_fd.read().split('\n') + return lines + + def refresh_all(self, write: bool = True) -> list[str]: + lines = self.refresh_srcinfo() + self.refresh_checksums() + self.refresh_build_mode() + if write: + self.write() + return lines + + def validate_checksums(self) -> bool: + "Returns True if all checksummed files exist and checksums match" + pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) + assert self.checksums + for filename in SRCINFO_CHECKSUM_FILES: + if filename not in self.checksums: + logging.debug(f"{self._relative_path}: No checksum for {filename} available") + return False + checksum = self.checksums[filename] + path = os.path.join(pkgdir, filename) + if not os.path.exists(path): + logging.debug(f"{self._relative_path}: can't checksum'{filename}: file doesn't exist") + return False + file_sum = sha256sum(path) + if file_sum != checksum: + logging.debug(f'{self._relative_path}: Checksum for file "{filename}" doesn\'t match') + return False + return True + + def toJSON(self) -> str: + 'Returns a json representation, with private keys that start with "_" filtered out' + return json.dumps({key: val for key, val in self.toDict().items() if not key.startswith('_')}) + + def write(self): + 'Write the filtered json representation to disk as srcinfo_meta.json' + filepath = os.path.join(config.get_path('pkgbuilds'), self._relative_path, SRCINFO_METADATA_FILE) + logging.debug(f'{self._relative_path}: writing {SRCINFO_METADATA_FILE}') + with open(filepath, 'w') as fd: + fd.write(self.toJSON())