packages: parse_pkgbuild(): banish SRCINFO caching into new file srcinfo_cache.py

This commit is contained in:
InsanePrawn 2022-09-11 00:36:43 +02:00
parent 2f8d53648e
commit 686a62685e
2 changed files with 168 additions and 84 deletions

View file

@ -1,25 +1,23 @@
from __future__ import annotations
import click
import json
import logging
import multiprocessing
import os
import subprocess
from joblib import Parallel, delayed
from typing import Iterable, Optional
from config import config, ConfigStateHolder
from constants import REPOSITORIES
from dataclass import DataClass
from exec.cmd import run_cmd
from constants import Arch, MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE
from constants import Arch
from distro.package import PackageInfo
from logger import setup_logging
from utils import git, sha256sum
from utils import git
from wrapper import check_programs_wrap
from .srcinfo_cache import SrcinfoMetaFile
def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=False, update=True):
check_programs_wrap(['git'])
@ -42,7 +40,7 @@ def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=F
if interactive:
if not click.confirm('Would you like to try updating the PKGBUILDs repo?'):
return
result = git(['pull'], pkgbuilds_dir)
result = git(['pull'], dir=pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed to update pkgbuilds')
@ -205,16 +203,6 @@ class SubPkgbuild(Pkgbuild):
self.pkgbase.refresh_sources(lazy=lazy)
class SrcinfoMetaFile(DataClass):
class SrcinfoMetaChecksums(DataClass):
PKGBUILD: str
SRCINFO: str
checksums: SrcinfoMetaChecksums
build_mode: str
def parse_pkgbuild(
relative_pkg_dir: str,
_config: Optional[ConfigStateHolder] = None,
@ -228,74 +216,20 @@ def parse_pkgbuild(
if _config:
config = _config
setup_logging(verbose=config.runtime.verbose, log_setup=False) # different subprocess needs log setup.
logging.info(f"Parsing PKGBUILD for {relative_pkg_dir}")
pkgbuilds_dir = config.get_path('pkgbuilds')
pkgdir = os.path.join(pkgbuilds_dir, relative_pkg_dir)
filename = os.path.join(pkgdir, 'PKGBUILD')
mode = None
logging.info(f"Discovering PKGBUILD for {relative_pkg_dir}")
srcinfo_file = os.path.join(pkgdir, SRCINFO_FILE)
srcinfo_meta_file = os.path.join(pkgdir, SRCINFO_METADATA_FILE)
refresh = force_refresh_srcinfo or not os.path.exists(srcinfo_meta_file)
if not refresh and not os.path.exists(srcinfo_meta_file):
logging.debug(f"{relative_pkg_dir}: {SRCINFO_METADATA_FILE} doesn't exist, running makepkg --printsrcinfo")
refresh = True
# parse metadata (mostly checksums)
if not refresh:
try:
with open(srcinfo_meta_file, 'r') as meta_fd:
metadata_raw = json.load(meta_fd)
metadata = SrcinfoMetaFile.fromDict(metadata_raw, validate=True)
except Exception as ex:
logging.debug(f"{relative_pkg_dir}: something went wrong parsing json from {srcinfo_meta_file},"
f"running makepkg instead instead: {ex}")
refresh = True
# validate checksums
if not refresh:
assert metadata and metadata.checksums
for filename, checksum in metadata.checksums.items():
file_sum = sha256sum(os.path.join(pkgdir, filename))
if file_sum != checksum:
logging.debug(f"{relative_pkg_dir}: Checksums for {filename} don't match")
refresh = True
break
# checksums are valid!
logging.debug(f'{relative_pkg_dir}: srcinfo cache hit!')
mode = metadata.build_mode
with open(srcinfo_file, 'r') as srcinfo_fd:
lines = srcinfo_fd.read().split('\n')
if force_refresh_srcinfo:
logging.info('force-refreshing SRCINFOs')
# parse SRCINFO cache metadata and get correct SRCINFO lines
srcinfo_cache, lines = SrcinfoMetaFile.handle_directory(relative_pkg_dir, force_refresh=force_refresh_srcinfo, write=True)
assert lines and srcinfo_cache
assert 'build_mode' in srcinfo_cache
mode = srcinfo_cache.build_mode
if mode not in ['host', 'cross']:
err = 'an invalid' if mode is not None else 'no'
err_end = f": {repr(mode)}" if mode is not None else "."
raise Exception(f'{relative_pkg_dir}/PKGBUILD has {err} mode configured{err_end}')
# do the actual refresh
if refresh:
logging.debug(f"Parsing {filename}")
with open(filename, 'r') as file:
for line in file.read().split('\n'):
if line.startswith('_mode='):
mode = line.split('=')[1]
break
if mode not in ['host', 'cross']:
err = 'an invalid' if mode else 'no'
raise Exception(f'{relative_pkg_dir}/PKGBUILD has {err} mode configured' + (f": {repr(mode)}" if mode is not None else ""))
srcinfo_proc = run_cmd(
MAKEPKG_CMD + ['--printsrcinfo'],
cwd=pkgdir,
stdout=subprocess.PIPE,
)
assert (isinstance(srcinfo_proc, subprocess.CompletedProcess))
output = srcinfo_proc.stdout.decode('utf-8')
lines = output.split('\n')
with open(srcinfo_file, 'w') as srcinfo_fd:
srcinfo_fd.write(output)
checksums = {os.path.basename(p): sha256sum(p) for p in [filename, srcinfo_file]}
metadata = SrcinfoMetaFile.fromDict({
'build_mode': mode,
'checksums': checksums,
}, validate=True)
with open(srcinfo_meta_file, 'w') as meta_fd:
json.dump(metadata, meta_fd)
assert mode
base_package = Pkgbase(relative_pkg_dir, sources_refreshed=sources_refreshed)
base_package.mode = mode
base_package.repo = relative_pkg_dir.split('/')[0]
@ -392,7 +326,7 @@ def discover_pkgbuilds(parallel: bool = True, lazy: bool = True, repositories: O
continue
paths.append(p)
logging.info("Parsing PKGBUILDs")
logging.info(f"Discovering PKGBUILDs{f' in repositories: {repositories}' if repositories else ''}")
results = []
if parallel:

150
packages/srcinfo_cache.py Normal file
View file

@ -0,0 +1,150 @@
from __future__ import annotations
import json
import logging
import os
import subprocess
from typing import Optional
from config import config
from constants import MAKEPKG_CMD, SRCINFO_FILE, SRCINFO_METADATA_FILE
from dataclass import DataClass
from exec.cmd import run_cmd
from utils import sha256sum
SRCINFO_CHECKSUM_FILES = ['PKGBUILD', SRCINFO_FILE]
class SrcinfoMetaFile(DataClass):
class Checksums(DataClass):
PKGBUILD: str
SRCINFO: str
checksums: Checksums
build_mode: Optional[str]
_relative_path: str
@staticmethod
def parse_existing(relative_pkg_dir: str) -> SrcinfoMetaFile:
'tries to parse the srcinfo_meta.json file in the specified pkgbuild dir'
pkgdir = os.path.join(config.get_path('pkgbuilds'), relative_pkg_dir)
srcinfo_meta_file = os.path.join(pkgdir, SRCINFO_METADATA_FILE)
if not os.path.exists(srcinfo_meta_file):
raise Exception(f"{relative_pkg_dir}: {SRCINFO_METADATA_FILE} doesn't exist")
with open(srcinfo_meta_file, 'r') as meta_fd:
metadata_raw = json.load(meta_fd) | {'_relative_path': relative_pkg_dir}
return SrcinfoMetaFile.fromDict(metadata_raw, validate=True)
@staticmethod
def generate_new(relative_pkg_dir: str, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]:
'Creates a new SrcinfoMetaFile object with checksums, creating a SRCINFO as necessary'
s = SrcinfoMetaFile({'_relative_path': relative_pkg_dir, 'build_mode': '', 'checksums': {}}, validate=True)
return s, s.refresh_all()
@staticmethod
def handle_directory(relative_pkg_dir: str, force_refresh: bool = False, write: bool = True) -> tuple[SrcinfoMetaFile, list[str]]:
lines = None
# try reading existing cache metadata
try:
metadata = SrcinfoMetaFile.parse_existing(relative_pkg_dir)
except Exception as ex:
logging.debug(f"{relative_pkg_dir}: something went wrong parsing json from {SRCINFO_METADATA_FILE},"
f"running `makepkg --printsrcinfo` instead instead: {ex}")
return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write)
# if for whatever reason only the SRCINFO got deleted but PKGBUILD has not been modified,
# we do want the checksum verification to work. So regenerate SRCINFO first.
if not os.path.exists(os.path.join(config.get_path('pkgbuilds'), relative_pkg_dir, SRCINFO_FILE)):
lines = metadata.refresh_srcinfo()
if not metadata.validate_checksums():
# metadata is invalid
return SrcinfoMetaFile.generate_new(relative_pkg_dir, write=write)
# metadata is valid
assert metadata
if not force_refresh:
logging.debug(f'{metadata._relative_path}: srcinfo checksums match!')
lines = lines or metadata.read_srcinfo_file()
else:
lines = metadata.refresh_all(write=write)
return metadata, lines
def refresh_checksums(self):
pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path)
checksums = self.Checksums({p: sha256sum(os.path.join(pkgdir, p)) for p in SRCINFO_CHECKSUM_FILES})
if 'checksums' not in self or self.checksums is None:
self['checksums'] = checksums
else:
self.checksums.clear()
self.checksums.update(checksums)
def refresh_build_mode(self):
self['build_mode'] = None
with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, 'PKGBUILD'), 'r') as file:
lines = file.read().split('\n')
for line in lines:
if line.startswith('_mode='):
self.build_mode = line.split('=', 1)[1].strip("\"'")
return
def refresh_srcinfo(self) -> list[str]:
'Run `makepkg --printsrcinfo` to create an updated SRCINFO file and return the lines from it'
logging.info(f"{self._relative_path}: Generating SRCINFO with makepkg")
pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path)
srcinfo_file = os.path.join(pkgdir, SRCINFO_FILE)
sproc = run_cmd(
MAKEPKG_CMD + ['--printsrcinfo'],
cwd=pkgdir,
stdout=subprocess.PIPE,
)
assert (isinstance(sproc, subprocess.CompletedProcess))
if sproc.returncode:
raise Exception(f"{self._relative_path}: makepkg failed to parse the PKGBUILD! Error code: {sproc.returncode}")
output = sproc.stdout.decode('utf-8')
with open(srcinfo_file, 'w') as srcinfo_fd:
srcinfo_fd.write(output)
return output.split('\n')
def read_srcinfo_file(self) -> list[str]:
with open(os.path.join(config.get_path('pkgbuilds'), self._relative_path, SRCINFO_FILE), 'r') as srcinfo_fd:
lines = srcinfo_fd.read().split('\n')
return lines
def refresh_all(self, write: bool = True) -> list[str]:
lines = self.refresh_srcinfo()
self.refresh_checksums()
self.refresh_build_mode()
if write:
self.write()
return lines
def validate_checksums(self) -> bool:
"Returns True if all checksummed files exist and checksums match"
pkgdir = os.path.join(config.get_path('pkgbuilds'), self._relative_path)
assert self.checksums
for filename in SRCINFO_CHECKSUM_FILES:
if filename not in self.checksums:
logging.debug(f"{self._relative_path}: No checksum for {filename} available")
return False
checksum = self.checksums[filename]
path = os.path.join(pkgdir, filename)
if not os.path.exists(path):
logging.debug(f"{self._relative_path}: can't checksum'{filename}: file doesn't exist")
return False
file_sum = sha256sum(path)
if file_sum != checksum:
logging.debug(f'{self._relative_path}: Checksum for file "{filename}" doesn\'t match')
return False
return True
def toJSON(self) -> str:
'Returns a json representation, with private keys that start with "_" filtered out'
return json.dumps({key: val for key, val in self.toDict().items() if not key.startswith('_')})
def write(self):
'Write the filtered json representation to disk as srcinfo_meta.json'
filepath = os.path.join(config.get_path('pkgbuilds'), self._relative_path, SRCINFO_METADATA_FILE)
logging.debug(f'{self._relative_path}: writing {SRCINFO_METADATA_FILE}')
with open(filepath, 'w') as fd:
fd.write(self.toJSON())