from __future__ import annotations import json import logging import os import subprocess from typing import Any, ClassVar, Optional from config.state import config from constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE, SRCINFO_INITIALISED_FILE from dictscheme import DictScheme from exec.cmd import run_cmd from utils import sha256sum SRCINFO_CHECKSUM_FILES = ['PKGBUILD', SRCINFO_FILE] class JsonFile(DictScheme): _filename: ClassVar[str] _relative_path: str _strip_hidden: ClassVar[bool] = True _sparse: ClassVar[bool] = False def toJSON(self) -> str: 'Returns a json representation, with private keys that start with "_" filtered out' return json.dumps(self.toDict(), indent=2) def write(self): 'Write the filtered json representation to disk' filepath = os.path.join(config.get_path('pkgbuilds'), self._relative_path, self._filename) logging.debug(f'{self._relative_path}: writing {self._filename}') with open(filepath, 'w') as fd: fd.write(self.toJSON()) @classmethod def _read_file(cls, relative_path) -> Optional[dict]: pkgdir = os.path.join(config.get_path('pkgbuilds'), relative_path) filepath = os.path.join(pkgdir, cls._filename) if not os.path.exists(filepath): raise Exception(f"{relative_path}: {cls._filename} doesn't exist") with open(filepath, 'r') as fd: contents = json.load(fd) return contents def read(self) -> Optional[dict[str, Any]]: """ Try reading and parsing the JSON file. Due to the way this class works, it should be a dict (or empty). No error handling is provided, bring your own try/catch! """ return type(self)._read_file(self._relative_path) class SrcInitialisedFile(JsonFile): PKGBUILD: str _filename: ClassVar[str] = SRCINFO_INITIALISED_FILE def __init__(self, relative_path: str, raise_exception: bool = False): self._relative_path = relative_path try: content = self.read() assert isinstance(content, dict) self.update(content) except Exception as ex: if raise_exception: raise ex srcinfo_meta_defaults = { 'build_mode': None, "build_nodeps": None, "build_crossdirect": None, } class SrcinfoMetaFile(JsonFile): checksums: dict[str, str] build_mode: Optional[str] build_nodeps: Optional[bool] build_crossdirect: Optional[bool] _changed: bool _filename: ClassVar[str] = SRCINFO_METADATA_FILE @staticmethod def parse_existing(relative_pkg_dir: str) -> SrcinfoMetaFile: 'tries to parse the srcinfo_meta.json file in the specified pkgbuild dir' metadata_raw = SrcinfoMetaFile._read_file(relative_pkg_dir) return SrcinfoMetaFile.fromDict(metadata_raw | { '_relative_path': relative_pkg_dir, '_changed': False, }) @staticmethod def generate_new(relative_pkg_dir: str, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]: 'Creates a new SrcinfoMetaFile object with checksums, creating a SRCINFO as necessary' s = SrcinfoMetaFile({ '_relative_path': relative_pkg_dir, '_changed': True, 'checksums': {}, **srcinfo_meta_defaults, }) return s, s.refresh_all() @staticmethod def handle_directory(relative_pkg_dir: str, force_refresh: bool = False, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]: lines = None # try reading existing cache metadata try: metadata = SrcinfoMetaFile.parse_existing(relative_pkg_dir) except Exception as ex: logging.debug(f"{relative_pkg_dir}: something went wrong parsing json from {SrcinfoMetaFile._filename}," f"running `makepkg --printsrcinfo` instead instead: {ex}") return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write) # if for whatever reason only the SRCINFO got deleted but PKGBUILD has not been modified, # we do want the checksum verification to work. So regenerate SRCINFO first. if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_pkg_dir, SRCINFO_FILE)): lines = metadata.refresh_srcinfo() if not metadata.validate_checksums(): # metadata is invalid return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write) # metadata is valid assert metadata if not force_refresh: logging.debug(f'{metadata._relative_path}: srcinfo checksums match!') lines = lines or metadata.read_srcinfo_file() for build_field in srcinfo_meta_defaults.keys(): if build_field not in metadata: metadata.refresh_build_fields() if write: metadata.write() break else: lines = metadata.refresh_all(write=write) return metadata, lines def refresh_checksums(self): pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) if 'checksums' not in self: self['checksums'] = None checksums_old = self.checksums.copy() checksums = {p: sha256sum(os.path.join(pkgdir, p)) for p in SRCINFO_CHECKSUM_FILES} if self.checksums is None: self.checksums = checksums else: self.checksums.clear() self.checksums.update(checksums) if checksums != checksums_old: self._changed = True def refresh_build_fields(self): self.update(srcinfo_meta_defaults) with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file: lines = file.read().split('\n') for line in lines: if not line.startswith('_') or '=' not in line: continue key, val = line.split('=', 1) val = val.strip("\"'") if key == '_mode': self.build_mode = val elif key == '_nodeps': self.build_nodeps = val.lower() == 'true' elif key == '_crossdirect': self.build_crossdirect = val.lower() == 'true' else: continue def refresh_srcinfo(self) -> list[str]: 'Run `makepkg --printsrcinfo` to create an updated SRCINFO file and return the lines from it' logging.info(f"{self._relative_path}: Generating SRCINFO with makepkg") pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) srcinfo_file = os.path.join(pkgdir, SRCINFO_FILE) sproc = run_cmd( MAKEPKG_CMD + ['--printsrcinfo'], cwd=pkgdir, stdout=subprocess.PIPE, ) assert (isinstance(sproc, subprocess.CompletedProcess)) if sproc.returncode: raise Exception(f"{self._relative_path}: makepkg failed to parse the PKGBUILD! Error code: {sproc.returncode}") output = sproc.stdout.decode('utf-8') with open(srcinfo_file, 'w') as srcinfo_fd: srcinfo_fd.write(output) return output.split('\n') def read_srcinfo_file(self) -> list[str]: with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, SRCINFO_FILE), 'r') as srcinfo_fd: lines = srcinfo_fd.read().split('\n') return lines def refresh_all(self, write: bool = True) -> list[str]: lines = self.refresh_srcinfo() self.refresh_checksums() self.refresh_build_fields() if write: self.write() return lines def validate_checksums(self) -> bool: "Returns True if all checksummed files exist and checksums match" pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path) assert self.checksums for filename in SRCINFO_CHECKSUM_FILES: if filename not in self.checksums: logging.debug(f"{self._relative_path}: No checksum for {filename} available") return False checksum = self.checksums[filename] path = os.path.join(pkgdir, filename) if not os.path.exists(path): logging.debug(f"{self._relative_path}: can't checksum'{filename}: file doesn't exist") return False file_sum = sha256sum(path) if file_sum != checksum: logging.debug(f'{self._relative_path}: Checksum for file "{filename}" doesn\'t match') return False return True def is_src_initialised(self) -> bool: checksum = self.checksums["PKGBUILD"] assert checksum try: initfile = SrcInitialisedFile(self._relative_path, raise_exception=True) if "PKGBUILD" not in initfile: raise Exception("'PKGBUILD' not in parser output") initialised_checksum = initfile.PKGBUILD except Exception as ex: logging.debug(f"{self._relative_path}: Couldn't read or parse {SRCINFO_INITIALISED_FILE}: {ex}") initialised_checksum = None result = checksum == initialised_checksum if initialised_checksum and not result: logging.debug("Sources were set up for a different version. " f"Current PKGBUILD checksum: {checksum}; " f"Initialised for: {initialised_checksum}") return result def write_src_initialised(self): initfile = SrcInitialisedFile(self._relative_path) self.refresh_checksums() initfile.PKGBUILD = self.checksums["PKGBUILD"] initfile.write()