WIP checkin 5 or smth

This commit is contained in:
InsanePrawn 2022-03-13 22:17:02 +01:00
parent af067b2cfa
commit d274ec0f8b
18 changed files with 728 additions and 387 deletions

View file

@ -8,7 +8,7 @@ from wrapper import enforce_wrap
from .abstract import Chroot
from .base import get_base_chroot
from .build import get_build_chroot, BuildChroot
#from .device import get_device_chroot, DeviceChroot
# from .device import get_device_chroot, DeviceChroot
from .helpers import get_chroot_path
# export Chroot class
@ -59,4 +59,3 @@ def cmd_chroot(type: str = 'build', arch: str = None, enable_crossdirect=True):
chroot.activate()
logging.debug(f'Starting shell in {chroot.name}:')
chroot.run_cmd('bash', attach_tty=True)
chroot.run_cmd('bash', attach_tty=True)

142
distro/abstract.py Normal file
View file

@ -0,0 +1,142 @@
from copy import deepcopy
from typing import Optional, Mapping, ChainMap, Any
from .version import compare_package_versions
class PackageInfo:
name: str
version: str
_filename: Optional[str]
depends: list[str]
provides: list[str]
replaces: list[str]
def __init__(self, name: str, version: str, filename: str = None):
self.name = name
self.version = version
self._filename = filename
self.depends = []
self.provides = []
self.replaces = []
def __repr__(self):
return f'{self.name}@{self.version}'
def compare_version(self, other: str) -> int:
"""Returns -1 if `other` is newer than `self`, 0 if `self == other`, 1 if `self` is newer than `other`"""
return compare_package_versions(self.version, other)
def get_filename(self, ext='.zst') -> str:
assert self._filename
return self._filename
def acquire(self) -> Optional[str]:
"""
Acquires the package through either build or download.
Returns the downloaded file's path.
"""
raise NotImplementedError()
def is_remote(self) -> bool:
raise NotImplementedError()
class RepoSearchResult:
"""Repo search results split along qualifier. Truthy value is calculated on whether all members are empty"""
exact_name: list[PackageInfo]
provides: list[PackageInfo]
replaces: list[PackageInfo]
def __init__(self):
self.exact_name = []
self.provides = []
self.replaces = []
def __bool__(self):
return self.exact_name and self.provides and self.replaces
ResultSource = Any
ResultSources = Mapping[ResultSource, RepoSearchResult]
class MergedResults:
results: ResultSources
exact_name: Mapping[PackageInfo, ResultSource]
replaces: Mapping[PackageInfo, ResultSource]
provides: Mapping[PackageInfo, ResultSource]
def __init__(self, sources: ResultSources = {}):
self.results = {}
self.update(sources)
def update(self, additional_sources: ResultSources = {}):
assert isinstance(self.results, dict)
self.results.update(additional_sources)
self.exact_name = {}
self.replaces = {}
self.provides = {}
for source, results in self.results.items():
for source_category, target_category in [
(results.exact_name, self.exact_name),
(results.replaces, self.replaces),
(results.provides, self.provides),
]:
for pkg in source_category:
target_category[pkg] = source
class RepoInfo:
name: str
options: dict[str, str] = {}
url_template: str
packages: dict[str, PackageInfo]
remote: bool
def __init__(self, name: str, url_template: str, options: dict[str, str] = {}):
self.name = name
self.url_template = url_template
self.options = deepcopy(options)
self.remote = not url_template.startswith('file://')
def acquire_package(self, package: PackageInfo) -> Optional[str]:
if package not in self.packages.values():
raise NotImplementedError(f'Package {package} did not come from our repo')
return package.acquire()
def config_snippet(self) -> str:
options = {'Server': self.url_template} | self.options
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()])
def scan(self, refresh: bool = False):
pass
def get_providers(self, name: str) -> RepoSearchResult:
results = RepoSearchResult()
for package in self.packages.values():
if name == package.name:
results.exact_name.append(package)
if name in package.provides:
results.provides.append(package)
if name in package.replaces:
results.replaces.append(package)
return results
class DistroInfo:
repos: Mapping[str, RepoInfo]
def get_packages(self) -> Mapping[str, PackageInfo]:
""" get packages from all repos, semantically overlaying them"""
# results = {}
# for repo in list(self.repos.values())[::-1]: # TODO: figure if the list even needs to be reversed
# assert repo.packages is not None
# for package in repo.packages.values():
# results[package.name] = package
# return results
return ChainMap[str, PackageInfo](*[repo.packages for repo in list(self.repos.values())])
def get_providers(self, name: str, allow_empty: bool = False) -> MergedResults:
"""Returns a mapping from repo.name to RepoSearchResult"""
return MergedResults({name: repo.get_providers(name) for name, repo in list(self.repos.items())})

View file

@ -1,39 +1,18 @@
from typing import Optional, Mapping, ChainMap
from typing import Optional, Mapping
from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from generator import generate_pacman_conf_body
from config import config
from .package import PackageInfo
from .repo import RepoInfo, Repo, RepoSearchResult
class DistroInfo:
repos: Mapping[str, Repo]
def get_packages(self) -> Mapping[str, PackageInfo]:
""" get packages from all repos, semantically overlaying them"""
# results = {}
# for repo in list(self.repos.values())[::-1]: # TODO: figure if the list even needs to be reversed
# assert repo.packages is not None
# for package in repo.packages.values():
# results[package.name] = package
# return results
return ChainMap[str, PackageInfo](*[repo.packages for repo in list(self.repos.values())])
def get_providers(self, name: str, allow_empty: bool = False) -> dict[str, RepoSearchResult]:
"""Returns a mapping from repo.name to RepoSearchResult"""
for repo in self.repos:
providers = repo.get_providers(name)
# check whether we got empty lists as result. results class implements custom __bool__()
if providers or allow_empty:
yield repo.name, providers
from .abstract import RepoInfo, DistroInfo
from .repo import Repo
class Distro(DistroInfo):
arch: str
repos: Mapping[str, Repo]
def __init__(self, arch: str, repo_infos: dict[str, RepoInfo], scan=False):
def __init__(self, arch: str, repo_infos: Mapping[str, RepoInfo], scan=False):
assert (arch in ARCHES)
self.arch = arch
self.repos = dict[str, Repo]()
@ -56,23 +35,36 @@ class Distro(DistroInfo):
def get_base_distro(arch: str) -> Distro:
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
repos = {name: RepoInfo(name, url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
return Distro(arch=arch, repo_infos=repos, scan=False)
def get_kupfer(arch: str, url_template: str) -> Distro:
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
repos: Mapping[str, Repo] = {name: Repo(name, url_template=url_template, arch=arch, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
return Distro(
arch=arch,
repo_infos=repos,
)
def get_kupfer_https(arch: str) -> Distro:
return get_kupfer(arch, KUPFER_HTTPS)
kupfer_https: dict[Arch, Distro]
kupfer_local: dict[Arch, dict[bool, Distro]]
def get_kupfer_local(arch: Optional[str] = None, in_chroot: bool = True) -> Distro:
def get_kupfer_https(arch: Arch) -> Distro:
global kupfer_https
if arch not in kupfer_https or not kupfer_https[arch]:
kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS)
return kupfer_https[arch]
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True) -> Distro:
global kupfer_local
arch = arch or config.runtime['arch']
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
return get_kupfer(arch, f"file://{dir}/$arch/$repo")
if arch not in kupfer_local:
kupfer_local[arch] = {}
locals = kupfer_local[arch]
if in_chroot not in locals or not locals[in_chroot]:
locals[in_chroot] = get_kupfer(arch, f"file://{dir}/$arch/$repo")
return locals[in_chroot]

View file

@ -3,81 +3,43 @@ from typing import Optional
import logging
from constants import Arch
from utils import download_file
from .version import compare_package_versions
from .abstract import PackageInfo
class PackageInfo:
name: str
version: str
class Package(PackageInfo):
arch: Arch
_filename: Optional[str]
depends: list[str]
provides: list[str]
replaces: list[str]
def __init__(self, name: str, version: str, arch: Arch, filename: str = None):
self.name = name
self.version = version
self.arch = arch
self._filename = filename
self.depends = []
self.provides = []
self.replaces = []
def __repr__(self):
return f'{self.name}@{self.version}'
def compare_version(self, other: str) -> int:
"""Returns -1 if `other` is newer than `self`, 0 if `self == other`, 1 if `self` is newer than `other`"""
return compare_package_versions(self.version, other)
def get_filename(self, ext='.zst') -> str:
return self._filename or f'{self.name}-{self.version}-{self.arch}.pkg.tar{ext}'
def acquire(self) -> Optional[str]:
"""
Acquires the package through either build or download.
Returns the downloaded file's path.
"""
raise NotImplementedError()
def is_remote(self) -> bool:
raise NotImplementedError()
class RemotePackage(PackageInfo):
resolved_url: Optional[str] = None
repo_name: str
md5sum: Optional[str]
def __init__(self, repo_name: str, *args, resolved_url: Optional[str] = None, **kwargs):
def __init__(self, arch: Arch, repo_name: str, *args, resolved_url: Optional[str] = None, **kwargs):
self.repo_name = repo_name
self.resolved_url = resolved_url
super().__init__(*args, **kwargs)
def acquire(self):
assert self.resolved_url
assert self.is_remote()
return download_file(f'{self.resolved_url}/{self.get_filename()}')
def get_filename(self, ext='.zst') -> str:
return self._filename or f'{self.name}-{self.version}-{self.arch}.pkg.tar{ext}'
def is_remote(self) -> bool:
return bool(self.resolved_url and not self.resolved_url.startswith('file://'))
def parse_package_desc(desc_str: str, arch: Arch, repo_name: str, resolved_url=None) -> PackageInfo:
"""Parses a desc file, returning a PackageInfo"""
@staticmethod
def parse_desc(desc_str: str, repo_name: str, resolved_url=None) -> Package:
"""Parses a desc file, returning a Package"""
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
desc = {}
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
desc[key.strip()] = value.strip()
return RemotePackage(name=desc['NAME'],
package = Package(name=desc['NAME'],
version=desc['VERSION'],
arch=arch,
arch=desc['ARCH'],
filename=desc['FILENAME'],
resolved_url=resolved_url,
repo_name=repo_name)
package.md5sum = desc.get('MD5SUM', None)
return package
def split_version_str(version_str) -> tuple[str, str]:

11
distro/remote/package.py Normal file
View file

@ -0,0 +1,11 @@
from utils import download_file
from .package import Package
class RemotePackage(Package):
def acquire(self):
assert self.resolved_url
assert self.is_remote()
return download_file(f'{self.resolved_url}/{self.get_filename()}')

View file

@ -1,16 +1,13 @@
from copy import deepcopy
from io import BufferedReader
from itertools import chain
from typing import Any, Mapping
import logging
import os
import tarfile
from config import config
from utils import download_file
from .package import PackageInfo, parse_package_desc
from .abstract import RepoInfo
from .package import Package
def resolve_url(url_template, repo_name: str, arch: str):
@ -20,95 +17,36 @@ def resolve_url(url_template, repo_name: str, arch: str):
return result
class RepoSearchResult:
"""Repo search results split along qualifier. Truthy value is calculated on whether all members are empty"""
exact_name: list[PackageInfo]
provides: list[PackageInfo]
replaces: list[PackageInfo]
def __init__(self):
self.exact_name = []
self.provides = []
self.replaces = []
def __bool__(self):
return self.exact_name and self.provides and self.replaces
ResultSources = Mapping[Any, RepoSearchResult]
class MergedResults:
results: ResultSources
exact_name: list[PackageInfo] = []
provides: list[PackageInfo] = []
replaces: list[PackageInfo] = []
def __init__(self, sources: ResultSources = {}):
pass
def update(self, additional_sources: ResultSources = {}):
for source, result in additional_sources.items():
self.results[source] = result
self.exact_name = chain()
class RepoInfo:
options: dict[str, str] = {}
url_template: str
packages: dict[str, PackageInfo]
remote: bool
def __init__(self, url_template: str, options: dict[str, str] = {}):
self.url_template = url_template
self.options = deepcopy(options)
self.remote = not url_template.startswith('file://')
def acquire_package(self, package: PackageInfo) -> str:
if package not in self.packages.values():
raise NotImplementedError(f'Package {package} did not come from our repo')
return package.acquire()
def scan(self, refresh: bool = False):
pass
def get_providers(self, name: str) -> RepoSearchResult:
results = RepoSearchResult()
for package in self.packages.values():
if name == package.name:
results.exact_name.append(package)
if name in package.provides:
results.provides.apend(package)
if name in package.replaces:
results.replaces.append(package)
return results
class Repo(RepoInfo):
name: str
resolved_url: str
arch: str
scanned: bool = False
scanned: bool
def __init__(self, name: str, url_template: str, arch: str, options: dict[str, str] = {}, scan=False):
self.scanned = False
self.packages = {}
self.name = name
self.url_template = url_template
self.arch = arch
super().__init__(url_template=url_template, options=options)
super().__init__(name, url_template=url_template, options=options)
if scan:
self.scan()
def get_package_from_desc(self, desc_str: str) -> PackageInfo:
return parse_package_desc(desc_str=desc_str, arch=self.arch, repo_name=self.name, resolved_url=self.resolved_url)
def scan(self, refresh: bool = False):
if refresh or not self.scanned:
def acquire_index(self) -> str:
"""[Download and] return local file path to repo .db file"""
self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
self.remote = not self.resolved_url.startswith('file://')
uri = f'{self.resolved_url}/{self.name}.db'
path = ''
if self.remote:
logging.debug(f'Downloading repo file from {uri}')
path = download_file(uri)
else:
path = uri.split('file://')[1]
return path
def scan(self, refresh: bool = False):
if refresh or not self.scanned:
path = self.acquire_index()
logging.debug(f'Parsing repo file at {path}')
with tarfile.open(path) as index:
for node in index.getmembers():
@ -117,15 +55,6 @@ class Repo(RepoInfo):
with index.extractfile(node) as reader: # type: ignore
assert isinstance(reader, BufferedReader)
desc = reader.read().decode()
pkg = self.get_package_from_desc(desc)
pkg = Package.parse_desc(desc, repo_name=self.name, resolved_url=self.resolved_url)
self.packages[pkg.name] = pkg
self.scanned = True
def config_snippet(self) -> str:
options = {'Server': self.url_template} | self.options
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()])
def get_RepoInfo(self):
return RepoInfo(url_template=self.url_template, options=self.options)
return RepoInfo(url_template=self.url_template, options=self.options)

View file

@ -1,5 +1,5 @@
from enum import IntEnum
from typing import Optional, NamedTuple, TypeVar, Sequence, Union
from typing import NamedTuple, Sequence, Union
# free-form python port of https://gitlab.archlinux.org/pacman/pacman/-/blob/master/lib/libalpm/version.c

View file

@ -13,7 +13,7 @@ from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_PACKAGES, DEVICES, FLAVOURS
from config import config, Profile
from distro.distro import get_base_distro, get_kupfer_https
from packages.local_repo import get_repo, LocalRepo
#from packages.local_repo import get_repo, LocalRepo
from ssh import copy_ssh_keys
from wrapper import enforce_wrap

View file

@ -11,8 +11,8 @@ from ssh import run_ssh_command, scp_put_files
from wrapper import enforce_wrap
from utils import git
#from .pkgbuild import Pkgbuild
from .local_repo import LocalRepo, get_repo
# from .pkgbuild import Pkgbuild
#from .local_repo import get_local_repo
def build(paths: Iterable[str], force: bool, arch: Optional[Arch]):
@ -23,7 +23,7 @@ def build(paths: Iterable[str], force: bool, arch: Optional[Arch]):
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
enforce_wrap()
config.enforce_config_loaded()
local_repo = get_repo()
local_repo = get_local_repo()
local_repo.init(arch)
# repo: dict[str, Pkgbuild] = local_repo.discover_packages()
if arch != config.runtime['arch']:
@ -50,7 +50,7 @@ def cmd_packages():
def cmd_update(non_interactive: bool = False):
"""Update PKGBUILDs git repo"""
enforce_wrap()
get_repo().pkgbuilds.init(interactive=not non_interactive)
get_local_repo().pkgbuilds.init(interactive=not non_interactive)
@cmd_packages.command(name='build')
@ -135,7 +135,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
@cmd_packages.command(name='list')
def cmd_list():
enforce_wrap()
repo = get_repo()
repo = get_local_repo()
logging.info('Discovering packages.')
packages = repo.discover_packages()
logging.info('Done! Pkgbuilds:')
@ -151,7 +151,7 @@ def cmd_check(paths: list[str]):
"""Check that specified PKGBUILDs are formatted correctly"""
enforce_wrap()
paths = list(paths)
repo = get_repo()
repo = get_local_repo()
packages = repo.pkgbuilds.filter_packages_by_paths(paths, allow_empty_results=False)
for package in packages:

View file

@ -1,17 +0,0 @@
from distro.package import PackageInfo
from distro.version import compare_package_versions, VerComp
from .source_repo import SourceRepo, SourcePackage
class HybridPackage(PackageInfo):
pkgbuild: Pkgbuild
binary_package: PackageInfo
def __init__(self, source_pkgbuild: Pkgbuild, binary_package: PackageInfo):
self.pkgbuild = source_pkgbuild
self.binary_package = binary_package
def acquire(self, download=True, build=True) -> str:
version_comparison = self.binary_package.compare_version(Pkgbuild.version)

34
packages/local_distro.py Normal file
View file

@ -0,0 +1,34 @@
import logging
import subprocess
import os
from typing import Optional
from binfmt import register as binfmt_register
from config import config
from chroot.build import setup_build_chroot
from constants import Arch, ARCHES, QEMU_BINFMT_PKGS, REPOSITORIES
from wrapper import enforce_wrap
from distro.distro import Distro
from .local_repo import LocalRepo
class LocalDistro(Distro):
pass
_local_distros = dict[Arch, LocalDistro]()
def get_local_distro(arch: Arch, repo_names: list[str] = REPOSITORIES) -> LocalDistro:
global _local_distros
if arch not in _local_distros or not _local_distros[arch]:
repos = dict[str, LocalRepo]()
for name in repo_names:
repos[name] = LocalRepo(name, arch)
_local_distros[arch] = LocalDistro(arch, repos)
return _local_distros[arch]
def get_local_distro_flat(arch: Arch, flat_repo_name: str = "local"):
return get_local_distro(arch, [flat_repo_name])

View file

@ -1,42 +1,32 @@
from hashlib import md5
import logging
import os
import shutil
import subprocess
from typing import Optional
from typing import Iterable
from binfmt import register as binfmt_register
from config import config
from constants import REPOSITORIES, QEMU_BINFMT_PKGS, ARCHES, Arch
from distro.distro import Distro
from constants import Arch, CHROOT_PATHS
from distro.repo import Repo
from wrapper import enforce_wrap
from distro.abstract import PackageInfo
from utils import md5sum_file
from .pkgbuild import Pkgbuild
from .source_repo import SourceRepo, get_repo as get_source_repo
from .helpers import setup_build_chroot
from .pkgbuild import Pkgbuild, Pkgbase, SubPkgbuild
class LocalRepo(Repo):
initialized: bool = False
pkgbuilds: SourceRepo
repo_dir: str
def __init__(self, repo_dir: str = None):
def __init__(self, name: str, arch: Arch, repo_dir: Optional[str] = None, options: dict[str, str] = {'SigLevel': 'Never'}, scan=False):
self.repo_dir = repo_dir or config.get_path('packages')
self.pkgbuilds = get_source_repo()
self.full_path = os.path.join(self.repo_dir, arch, name)
super().__init__(name=name, url_template=f'file://{CHROOT_PATHS["packages"]}/$arch/$repo', arch=arch, options=options, scan=scan)
def init(self, arch: Arch, discover_packages: bool = True, parallel: bool = True):
"""Ensure that all `constants.REPOSITORIES` inside `self.repo_dir` exist"""
self.pkgbuilds.init()
if discover_packages:
self.pkgbuilds.discover_packages(parallel=parallel)
def init(self):
"""Create repo database files"""
if not self.initialized:
for _arch in set([arch, config.runtime['arch']]):
for repo in REPOSITORIES:
repo_dir = os.path.join(self.repo_dir, arch, repo)
repo = self.name
repo_dir = os.path.join(self.repo_dir, self.arch, repo)
os.makedirs(repo_dir, exist_ok=True)
for ext1 in ['db', 'files']:
for ext2 in ['', '.tar.xz']:
@ -60,10 +50,9 @@ class LocalRepo(Repo):
self.init()
super().scan(refresh=refresh)
def add_file_to_repo(self, file_path: str, repo_name: str, arch: Arch):
repo_dir = os.path.join(self.repo_dir, arch, repo_name)
pacman_cache_dir = os.path.join(config.get_path('pacman'), arch)
def copy_file_to_repo(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
repo_dir = self.full_path
target_file = os.path.join(repo_dir, file_name)
os.makedirs(repo_dir, exist_ok=True)
@ -79,27 +68,24 @@ class LocalRepo(Repo):
else:
logging.warning('Exact package file (confirmed by hash) was already in the repo. Skipped and deleted.')
os.unlink(file_path)
return os.path.join(repo_dir, file_name), md5sum
# clean up same name package from pacman cache
cache_file = os.path.join(pacman_cache_dir, file_name)
if os.path.exists(cache_file) and md5sum_file(cache_file) != md5sum:
logging.debug(f'Removing stale cache file (checksum mismatch): {cache_file}')
os.unlink(cache_file)
def run_repo_add(self, target_file: str):
cmd = [
'repo-add',
'--remove',
os.path.join(
repo_dir,
f'{repo_name}.db.tar.xz',
self.full_path,
f'{self.name}.db.tar.xz',
),
target_file,
]
logging.debug(f'repo: running cmd: {cmd}')
result = subprocess.run(cmd)
if result.returncode != 0:
raise Exception(f'Failed to add package {target_file} to repo {repo_name}')
raise Exception(f'Failed to add package {target_file} to repo {self.name}')
for ext in ['db', 'files']:
file = os.path.join(repo_dir, f'{repo_name}.{ext}')
file = os.path.join(self.full_path, f'{self.name}.{ext}')
if os.path.exists(file + '.tar.xz'):
os.unlink(file)
shutil.copyfile(file + '.tar.xz', file)
@ -107,109 +93,29 @@ class LocalRepo(Repo):
if os.path.exists(old):
os.unlink(old)
def add_package_to_repo(self, package: Pkgbuild, arch: Arch):
logging.info(f'Adding {package.path} to repo {package.repo}')
pkgbuild_dir = self.pkgbuilds.pkgbuilds_dir
def add_file_to_repo(self, file_path: str):
pacman_cache_dir = os.path.join(config.get_path('pacman'), self.arch)
file_name = os.path.basename(file_path)
# copy file to repo dir
target_file, md5sum = self.copy_file_to_repo(file_path)
# clean up same name package from pacman cache
cache_file = os.path.join(pacman_cache_dir, file_name)
if os.path.exists(cache_file) and md5sum_file(cache_file) != md5sum:
logging.debug(f'Removing stale cache file (checksum mismatch): {cache_file}')
os.unlink(cache_file)
self.run_repo_add(target_file)
return target_file
def add_package_to_repo(self, package: Pkgbuild):
logging.info(f'Adding {package.name} at {package.path} to repo {self.name}')
pkgbuild_dir = package.path
assert package.path
files = []
for file in os.listdir(pkgbuild_dir):
# Forced extension by makepkg.conf
if file.endswith('.pkg.tar.xz') or file.endswith('.pkg.tar.zst'):
assert package.repo and package.repo.name
repo_name = package.repo.name
repo_dir = os.path.join(self.repo_dir, arch, repo_name)
files.append(os.path.join(repo_dir, file))
self.add_file_to_repo(os.path.join(pkgbuild_dir, file), repo_name, arch)
files.append(self.add_file_to_repo(os.path.join(pkgbuild_dir, file)))
return files
def check_package_version_built(self, package: Pkgbuild, arch: Arch) -> bool:
native_chroot = setup_build_chroot(config.runtime['arch'])
missing = False
for line in package.get_pkg_filenames(arch, native_chroot):
if not line:
continue
assert package.repo and package.repo.name
file = os.path.join(self.repo_dir, arch, package.repo.name, os.path.basename(line))
logging.debug(f'Checking if {file} is built')
if os.path.exists(file):
self.add_file_to_repo(file, repo_name=package.repo.name, arch=arch)
else:
missing = True
return not missing
def get_unbuilt_package_levels(self, packages: Iterable[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
package_levels = self.pkgbuilds.generate_dependency_chain(packages)
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
i = 0
for level_packages in package_levels:
level = set[Pkgbuild]()
for package in level_packages:
if ((not self.check_package_version_built(package, arch)) or set.intersection(set(package.depends), set(build_names)) or
(force and package in packages)):
level.add(package)
build_names.update(package.names())
if level:
build_levels.append(level)
logging.debug(f'Adding to level {i}:' + '\n' + ('\n'.join([p.name for p in level])))
i += 1
return build_levels
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
build_levels = self.get_unbuilt_package_levels(packages, arch, force=force)
if not build_levels:
logging.info('Everything built already')
return
self.pkgbuilds.build_package_levels(
build_levels,
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
_local_repos = dict[Arch, LocalRepo]()
def get_local_repo(arch: Arch) -> LocalRepo:
global _local_repos
if arch not in _local_repos or not _local_repos[arch]:
_local_repos[arch] = LocalRepo()
return _local_repos[arch]
def build_enable_qemu_binfmt(foreign_arch: Arch):
if foreign_arch not in ARCHES:
raise Exception(f'Unknown architecture "{foreign_arch}". Choices: {", ".join(ARCHES)}')
enforce_wrap()
native = config.runtime['arch']
native_repo = get_local_repo(native)
native_repo.init()
# build qemu-user, binfmt, crossdirect
chroot = setup_build_chroot(native)
logging.info('Installing qemu-user (building if necessary)')
qemu_pkgs = [native_repo.pkgbuilds.pkgbuilds[pkg] for pkg in QEMU_BINFMT_PKGS]
native_repo.build_packages(
qemu_pkgs,
native,
enable_crosscompile=False,
enable_crossdirect=False,
enable_ccache=False,
)
subprocess.run(['pacman', '-Syy', '--noconfirm', '--needed', '--config', os.path.join(chroot.path, 'etc/pacman.conf')] + QEMU_BINFMT_PKGS)
if foreign_arch != native:
binfmt_register(foreign_arch)

204
packages/meta_distro.py Normal file
View file

@ -0,0 +1,204 @@
import logging
import subprocess
import os
from typing import Iterable, Iterator, Mapping, Optional
from binfmt import register as binfmt_register
from config import config
from chroot.build import setup_build_chroot
from distro.abstract import DistroInfo, PackageInfo
#from distro.distro import Distro
from constants import Arch, ARCHES, QEMU_BINFMT_PKGS
from wrapper import enforce_wrap
from .pkgbuild import Pkgbuild
from .local_distro import LocalDistro
from .source_distro import SourceDistro
from .meta_package import MetaPackage
class MetaDistro(DistroInfo):
def __init__(
self,
source_distro: SourceDistro,
remote_distro: DistroInfo,
local_distro: LocalDistro,
):
pass
def get_unbuilt_package_levels(self, packages: Iterable[PackageInfo], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
package_levels = self.pkgbuilds.generate_dependency_chain(packages)
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
i = 0
for level_packages in package_levels:
level = set[Pkgbuild]()
for package in level_packages:
if ((not self.check_package_version_built(package, arch)) or set.intersection(set(package.depends), set(build_names)) or
(force and package in packages)):
level.add(package)
build_names.update(package.names())
if level:
build_levels.append(level)
logging.debug(f'Adding to level {i}:' + '\n' + ('\n'.join([p.name for p in level])))
i += 1
return build_levels
def generate_dependency_chain(self, to_build: Iterable[MetaPackage]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
First the top-level packages get selected by searching the paths.
Then their dependencies and sub-dependencies and so on get added to the selection.
"""
visited = set[Pkgbuild]()
visited_names = set[str]()
dep_levels: list[set[Pkgbuild]] = [set(), set()]
package_repo = self.pkgbuilds
def visit(package: Pkgbuild, visited=visited, visited_names=visited_names):
visited.add(package)
visited_names.update(package.names())
def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]:
result = dict[Pkgbuild, int]()
for i, level in enumerate(levels):
for pkg in level:
result[pkg] = i
return result
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for dep_name in package.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
visit(dep_pkg)
yield dep_pkg
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for pkg in get_dependencies(package, package_repo):
yield pkg
for sub_pkg in get_recursive_dependencies(pkg, package_repo):
yield sub_pkg
logging.debug('Generating dependency chain:')
# init level 0
for package in to_build:
visit(package)
dep_levels[0].add(package)
logging.debug(f'Adding requested package {package.name}')
# add dependencies of our requested builds to level 0
for dep_pkg in get_recursive_dependencies(package):
logging.debug(f"Adding {package.name}'s dependency {dep_pkg.name} to level 0")
dep_levels[0].add(dep_pkg)
visit(dep_pkg)
"""
Starting with `level` = 0, iterate over the packages in `dep_levels[level]`:
1. Moving packages that are dependencies of other packages up to `level`+1
2. Adding yet unadded local dependencies of all pkgs on `level` to `level`+1
3. increment level
"""
level = 0
# protect against dependency cycles
repeat_count = 0
_last_level: Optional[set[Pkgbuild]] = None
while dep_levels[level]:
level_copy = dep_levels[level].copy()
modified = False
logging.debug(f'Scanning dependency level {level}')
if level > 100:
raise Exception('Dependency chain reached 100 levels depth, this is probably a bug. Aborting!')
for pkg in level_copy:
pkg_done = False
if pkg not in dep_levels[level]:
# pkg has been moved, move on
continue
# move pkg to level+1 if something else depends on it
for other_pkg in level_copy:
if pkg == other_pkg:
continue
if pkg_done:
break
if not issubclass(type(other_pkg), Pkgbuild):
raise Exception('Not a Pkgbuild object:' + repr(other_pkg))
for dep_name in other_pkg.depends:
if dep_name in pkg.names():
dep_levels[level].remove(pkg)
dep_levels[level + 1].add(pkg)
logging.debug(f'Moving {pkg.name} to level {level+1} because {other_pkg.name} depends on it as {dep_name}')
modified = True
pkg_done = True
break
for dep_name in pkg.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
logging.debug(f"Adding {pkg.name}'s dependency {dep_name} to level {level}")
dep_levels[level].add(dep_pkg)
visit(dep_pkg)
modified = True
if _last_level == dep_levels[level]:
repeat_count += 1
else:
repeat_count = 0
if repeat_count > 10:
raise Exception(f'Probable dependency cycle detected: Level has been passed on unmodifed multiple times: #{level}: {_last_level}')
_last_level = dep_levels[level].copy()
if not modified: # if the level was modified, make another pass.
level += 1
dep_levels.append(set[Pkgbuild]())
# reverse level list into buildorder (deps first!), prune empty levels
return list([lvl for lvl in dep_levels[::-1] if lvl])
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
build_levels = self.get_unbuilt_package_levels(packages, arch, force=force)
if not build_levels:
logging.info('Everything built already')
return
self.pkgbuilds.build_package_levels(
build_levels,
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def get_packages(self) -> Mapping[str, MetaPackage]:
return super().get_packages()
def build_enable_qemu_binfmt(self, foreign_arch: Arch):
if foreign_arch not in ARCHES:
raise Exception(f'Unknown architecture "{foreign_arch}". Choices: {", ".join(ARCHES)}')
enforce_wrap()
native = config.runtime['arch']
assert self.arch == native
self.init()
# build qemu-user, binfmt, crossdirect
chroot = setup_build_chroot(native)
logging.info('Installing qemu-user (building if necessary)')
qemu_pkgs = [pkg for pkgname, pkg in self.get_packages().items() if pkgname in QEMU_BINFMT_PKGS]
self.build_packages(
qemu_pkgs,
native,
enable_crosscompile=False,
enable_crossdirect=False,
enable_ccache=False,
)
subprocess.run(['pacman', '-Syy', '--noconfirm', '--needed', '--config', os.path.join(chroot.path, 'etc/pacman.conf')] + QEMU_BINFMT_PKGS)
if foreign_arch != native:
binfmt_register(foreign_arch)

24
packages/meta_package.py Normal file
View file

@ -0,0 +1,24 @@
from typing import Optional
from distro.package import Package, PackageInfo
from distro.version import compare_package_versions, VerComp
from .source_repo import SourceRepo, SourcePackage
from .local_package import LocalPackage
from distro.remote.package import RemotePackage
from .pkgbuild import Pkgbuild
class MetaPackage(PackageInfo):
pkgbuild: Optional[Pkgbuild]
local_package: Optional[LocalPackage]
remote_package: Optional[RemotePackage]
def __init__(self, source_pkgbuild: Optional[Pkgbuild], local_package: Optional[PackageInfo], remote_package: Optional[PackageInfo]):
self.pkgbuild = source_pkgbuild
self.local_package = local_package
self.remote_package = remote_package
def acquire(self, download=True, build=True) -> str:
version_comparison = self.binary_package.compare_version(Pkgbuild.version)

64
packages/meta_repo.py Normal file
View file

@ -0,0 +1,64 @@
from hashlib import md5
import logging
import os
import shutil
import subprocess
from typing import Iterable
from config import config
from constants import Arch
from distro.repo import RepoInfo
from .pkgbuild import Pkgbuild
from .source_repo import SourceRepo, get_repo as get_source_repo
from .local_distro import get_local_distro, LocalRepo
from .helpers import setup_build_chroot
class MetaRepo(LocalRepo):
def __init__(self, name, local_repo: LocalRepo):
self.name = name
self.local_repo = local_repo
self.arch = local_repo.arch
def init(self, discover_packages: bool = True, parallel: bool = True):
self.pkgbuilds.init()
if discover_packages:
self.pkgbuilds.discover_packages(refresh=False, parallel=parallel)
self.local_repo.init()
def add_package_to_repo(self, package: Pkgbuild, arch: Arch):
logging.info(f'Adding {package.path} to repo {package.repo}')
pkgbuild_dir = self.pkgbuilds.pkgbuilds_dir
files = []
for file in os.listdir(pkgbuild_dir):
# Forced extension by makepkg.conf
if file.endswith('.pkg.tar.xz') or file.endswith('.pkg.tar.zst'):
assert package.repo and package.repo.name
repo_name = package.repo.name
repo_dir = os.path.join(self.repo_dir, arch, repo_name)
files.append(os.path.join(repo_dir, file))
self.local_repo.add_file_to_repo(os.path.join(pkgbuild_dir, file), repo_name, arch)
if files and self.local_repo.scanned:
self.scan(refresh=True)
return files
def check_package_version_built(self, package: Pkgbuild) -> bool:
native_chroot = setup_build_chroot(config.runtime['arch'])
missing = False
for line in package.get_pkg_filenames(self.arch, native_chroot):
if not line:
continue
assert package.repo and package.repo.name
file = os.path.join(self.repo_dir, self.arch, package.repo.name, os.path.basename(line))
logging.debug(f'Checking if {file} is built')
if os.path.exists(file):
self.add_file_to_repo(file, repo_name=package.repo.name, arch=self.arch)
else:
missing = True
return not missing

View file

@ -1,6 +0,0 @@
from .local_repo import LocalRepo, get_repo
#from
class MirroredRepo:
local_repo: LocalRepo

View file

@ -5,44 +5,46 @@ import os
import subprocess
from copy import deepcopy
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Sequence
from chroot.build import BuildChroot
from config import config
from constants import Arch, CHROOT_PATHS, MAKEPKG_CMD, CROSSDIRECT_PKGS, GCC_HOSTSPECS
from distro.package import PackageInfo
from distro.repo import Repo
from distro.abstract import PackageInfo
from .helpers import setup_build_chroot, get_makepkg_env
class Pkgbuild:
class Pkgbuild(PackageInfo):
name: str
version: str
arches: list[Arch]
depends: list[str]
provides: list[str]
replaces: list[str]
local_depends: list[str]
repo: Optional[Repo] = None
mode = ''
path = ''
pkgver = ''
pkgrel = ''
source_packages: dict[Arch, 'SourcePackage']
source_packages: dict[Arch, SourcePackage]
def __init__(
self,
relative_path: str,
repo: Repo,
arches: list[Arch] = [],
depends: list[str] = [],
provides: list[str] = [],
replaces: list[str] = [],
) -> None:
"""Create new Pkgbuild representation for file located at `relative_path/PKGBUILD`. `relative_path` will be written to `self.path`"""
self.name = os.path.basename(relative_path)
self.version = ''
self.repo = repo
self.path = relative_path
self.depends = deepcopy(depends)
self.provides = deepcopy(provides)
self.replaces = deepcopy(replaces)
self.arches = deepcopy(arches)
self.source_packages = {}
def __repr__(self):
@ -51,10 +53,6 @@ class Pkgbuild:
def names(self):
return list(set([self.name] + self.provides + self.replaces))
def getPackageInfo(self, arch: Arch, ext='zst'):
assert self.name and self.version and self.pkgver and self.pkgrel
return PackageInfo(name=self.name, version=self.version, arch=arch)
def get_pkg_filenames(self, arch: Arch, native_chroot: BuildChroot) -> Iterable[str]:
config_path = '/' + native_chroot.write_makepkg_conf(
target_arch=arch,
@ -203,14 +201,34 @@ class Pkgbuild:
class Pkgbase(Pkgbuild):
subpackages: list[Pkgbuild]
subpackages: Sequence[SubPkgbuild]
def __init__(self, relative_path: str, subpackages: list[Pkgbuild] = [], **args):
self.subpackages = deepcopy(subpackages)
def __init__(self, relative_path: str, subpackages: Sequence[SubPkgbuild] = [], **args):
self.subpackages = list(subpackages)
super().__init__(relative_path, **args)
def parse_pkgbuild(relative_pkg_dir: str, native_chroot: BuildChroot) -> list[Pkgbuild]:
class SubPkgbuild(Pkgbuild):
pkgbase: Pkgbase
def __init__(self, name: str, pkgbase: Pkgbase):
self.depends = []
self.provides = []
self.replaces = []
self.local_depends = []
self.name = name
self.pkgbase = pkgbase
self.arches = pkgbase.arches
self.version = pkgbase.version
self.mode = pkgbase.mode
self.path = pkgbase.path
self.pkgver = pkgbase.pkgver
self.pkgrel = pkgbase.pkgrel
def parse_pkgbuild(relative_pkg_dir: str, native_chroot: BuildChroot) -> Sequence[Pkgbuild]:
mode = None
with open(os.path.join(native_chroot.get_path(CHROOT_PATHS['pkgbuilds']), relative_pkg_dir, 'PKGBUILD'), 'r') as file:
for line in file.read().split('\n'):
@ -232,7 +250,7 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: BuildChroot) -> list[Pk
assert (isinstance(srcinfo, subprocess.CompletedProcess))
lines = srcinfo.stdout.decode('utf-8').split('\n')
current = base_package
current: Pkgbuild = base_package
multi_pkgs = False
for line_raw in lines:
line = line_raw.strip()
@ -244,24 +262,28 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: BuildChroot) -> list[Pk
multi_pkgs = True
elif line.startswith('pkgname'):
if multi_pkgs:
if current is not base_package:
current = SubPkgbuild(splits[1], base_package)
assert isinstance(base_package.subpackages, list)
base_package.subpackages.append(current)
current = deepcopy(base_package)
else:
current.name = splits[1]
elif line.startswith('pkgver'):
current.pkgver = splits[1]
elif line.startswith('pkgrel'):
current.pkgrel = splits[1]
elif line.startswith('arch'):
current.arches.append(splits[1])
elif line.startswith('provides'):
current.provides.append(splits[1])
elif line.startswith('replaces'):
current.replaces.append(splits[1])
elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'):
current.depends.append(splits[1].split('=')[0].split(': ')[0])
current.depends = list(set(current.depends))
results = base_package.subpackages or [base_package]
results: Sequence[Pkgbuild] = list(base_package.subpackages) or [base_package]
for pkg in results:
assert isinstance(pkg, Pkgbuild)
pkg.depends = list(set(pkg.depends))
pkg.update_version()
if not (pkg.pkgver == base_package.pkgver and pkg.pkgrel == base_package.pkgrel):
raise Exception('subpackage malformed! pkgver differs!')
@ -286,7 +308,6 @@ class SourcePackage(PackageInfo):
self.path = self.pkgbuild.path
self.pkgbuild.update_version()
self.version = self.pkgbuild.version
self.version = self.pkgbuild.version
def acquire(self):
return os.path.join(self.pkgbuild.build(arch=self.arch), self.get_filename())

76
packages/source_distro.py Normal file
View file

@ -0,0 +1,76 @@
import logging
from typing import Iterable
from constants import Arch
from distro.abstract import DistroInfo
from .source_repo import SourceRepo, Pkgbuild
class SourceDistro(DistroInfo):
repos: dict[str, SourceRepo]
def build_package_levels(
self,
build_levels: list[set[Pkgbuild]],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
for level, packages in enumerate(build_levels):
logging.info(f"(Level {level}) Building {', '.join([x.name for x in packages])}")
for package in packages:
package.build(
arch=arch,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
self.build_package_levels(
[set(packages)],
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages_by_paths(
self,
paths: Iterable[str],
arch: Arch,
force=False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
if isinstance(paths, str):
paths = [paths]
packages = self.filter_packages_by_paths(paths, allow_empty_results=False)
return self.build_packages(
packages,
arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)