Compare commits

..

6 Commits

Author SHA1 Message Date
InsanePrawn
d274ec0f8b WIP checkin 5 or smth 2022-03-13 22:17:17 +01:00
InsanePrawn
af067b2cfa WIP 4 2022-03-06 17:11:32 +01:00
InsanePrawn
2f3005e446 dirty commit #3 2022-03-06 17:09:51 +01:00
InsanePrawn
e57d3d09ea WIP commit #2 2022-03-06 17:09:51 +01:00
InsanePrawn
5488b39a4c WIP checkin 2022-03-06 17:09:49 +01:00
InsanePrawn
9269a68e74 distro: add version.py for version-comparison by pacman rules 2022-03-06 17:09:14 +01:00
30 changed files with 1663 additions and 760 deletions

View File

@@ -3,11 +3,11 @@
Kupfer Linux bootstrapping tool - drives pacstrap, makepkg, mkfs and fastboot, just to name a few.
## Installation
Install Docker, Python 3 with libraries `click`, `appdirs`, `joblib`, `toml`, `typing_extentions`, and `coloredlogs` and put `bin/` into your `PATH`.
Install Docker, Python 3 with libraries `click`, `appdirs`, `joblib`, `toml` and put `bin/` into your `PATH`.
Then use `kupferbootstrap`.
## Usage
1. Initialize config with defaults: `kupferbootstrap config init -N`
1. Initialise config with defaults: `kupferbootstrap config init -N`
1. Configure your device profile: `kupferbootstrap config profile init`
1. Build an image and packages along the way: `kupferbootstrap image build`

View File

@@ -8,6 +8,7 @@ from wrapper import enforce_wrap
from .abstract import Chroot
from .base import get_base_chroot
from .build import get_build_chroot, BuildChroot
# from .device import get_device_chroot, DeviceChroot
from .helpers import get_chroot_path
# export Chroot class

View File

@@ -5,7 +5,6 @@ import subprocess
from copy import deepcopy
from shlex import quote as shell_quote
from typing import Protocol, Union, Optional, Mapping
from uuid import uuid4
from config import config
from constants import Arch, CHROOT_PATHS
@@ -21,11 +20,11 @@ class AbstractChroot(Protocol):
arch: Arch
path: str
copy_base: bool
initialized: bool
active: bool
initialized: bool = False
active: bool = False
active_mounts: list[str]
extra_repos: Mapping[str, RepoInfo]
base_packages: list[str]
base_packages: list[str] = ['base']
def __init__(
self,
@@ -89,12 +88,9 @@ class Chroot(AbstractChroot):
base_packages: list[str] = ['base', 'base-devel', 'git'],
path_override: str = None,
):
self.uuid = uuid4()
if copy_base is None:
logging.debug(f'{name}: copy_base is none!')
copy_base = (name == base_chroot_name(arch))
self.active = False
self.initialized = False
self.active_mounts = list[str]()
self.name = name
self.arch = arch
@@ -116,12 +112,11 @@ class Chroot(AbstractChroot):
if self.initialized and not reset:
# chroot must have been initialized already!
if fail_if_initialized:
raise Exception(f"Chroot {self.name} ({self.uuid}) is already initialized, this seems like a bug")
logging.debug(f"Base chroot {self.name} ({self.uuid}) already initialized")
raise Exception(f"Chroot {self.name} is already initialized, this seems like a bug")
return
active_previously = self.active
self.deactivate(fail_if_inactive=False, ignore_rootfs=True)
self.deactivate_core()
self.create_rootfs(reset, pacman_conf_target, active_previously)
@@ -204,11 +199,11 @@ class Chroot(AbstractChroot):
# additional mounts like crossdirect are intentionally left intact. Is such a chroot still `active` afterwards?
self.active = False
def deactivate(self, fail_if_inactive: bool = False, ignore_rootfs: bool = False):
def deactivate(self, fail_if_inactive: bool = False):
if not self.active:
if fail_if_inactive:
raise Exception(f"Chroot {self.name} not activated, can't deactivate!")
self.umount_many([mnt for mnt in self.active_mounts if mnt not in ['/', '/boot'] or not ignore_rootfs])
self.umount_many(self.active_mounts)
self.active = False
def run_cmd(
@@ -352,13 +347,10 @@ def get_chroot(
) -> Chroot:
global chroots
if default and name not in chroots:
logging.debug(f'Adding chroot {name} to chroot map: {default.uuid}')
logging.debug(f'Adding chroot {name} to chroot map')
chroots[name] = default
else:
existing = chroots[name]
if fail_if_exists:
raise Exception(f'chroot {name} already exists: {existing.uuid}')
logging.debug(f"returning existing chroot {name}: {existing.uuid}")
elif fail_if_exists:
raise Exception(f'chroot {name} already exists')
chroot = chroots[name]
if extra_repos is not None:
chroot.extra_repos = dict(extra_repos) # copy to new dict

View File

@@ -24,12 +24,20 @@ class BuildChroot(Chroot):
raise Exception('base_chroot == self, bailing out. this is a bug')
base_chroot.initialize()
logging.info(f'Copying {base_chroot.name} chroot to {self.name}')
cmd = ['rsync', '-a', '--delete', '-q', '-W', '-x']
for mountpoint in CHROOT_PATHS.values():
cmd += ['--exclude', mountpoint.rstrip('/')]
cmd += [f'{base_chroot.path}/', f'{self.path}/']
logging.debug(f"running rsync: {cmd}")
result = subprocess.run(cmd)
result = subprocess.run([
'rsync',
'-a',
'--delete',
'-q',
'-W',
'-x',
'--exclude',
CHROOT_PATHS['pkgbuilds'].strip('/'),
'--exclude',
CHROOT_PATHS['packages'].strip('/'),
f'{base_chroot.path}/',
f'{self.path}/',
])
if result.returncode != 0:
raise Exception(f'Failed to copy {base_chroot.name} to {self.name}')

View File

@@ -6,8 +6,6 @@ import logging
from copy import deepcopy
from typing import Optional, Union, TypedDict, Any, Mapping
from constants import DEFAULT_PACKAGE_BRANCH
CONFIG_DIR = appdirs.user_config_dir('kupfer')
CACHE_DIR = appdirs.user_cache_dir('kupfer')
@@ -53,11 +51,10 @@ CONFIG_DEFAULTS: dict = {
},
'pkgbuilds': {
'git_repo': 'https://gitlab.com/kupfer/packages/pkgbuilds.git',
'git_branch': DEFAULT_PACKAGE_BRANCH,
'git_branch': 'dev',
},
'pacman': {
'parallel_downloads': 4,
'repo_branch': DEFAULT_PACKAGE_BRANCH,
},
'paths': {
'cache_dir': CACHE_DIR,
@@ -147,9 +144,9 @@ def resolve_profile(
# now init missing keys
for key, value in PROFILE_DEFAULTS.items():
if key not in full.keys():
full[key] = None # type: ignore[literal-required]
full[key] = None # type: ignore[misc]
if type(value) == list:
full[key] = [] # type: ignore[literal-required]
full[key] = [] # type: ignore[misc]
full['size_extra_mb'] = int(full['size_extra_mb'] or 0)

View File

@@ -58,7 +58,7 @@ FLAVOURS: dict[str, Flavour] = {
'phosh': {
'packages': [
'phosh',
'phosh-osk-stub', # temporary replacement for 'squeekboard',
# 'squeekboard', #temporarily disabled
'gnome-control-center',
'gnome-software',
'gnome-software-packagekit-plugin',
@@ -82,8 +82,7 @@ REPOSITORIES = [
'phosh',
]
DEFAULT_PACKAGE_BRANCH = 'dev'
KUPFER_HTTPS = 'https://gitlab.com/kupfer/packages/prebuilts/-/raw/%branch%/$arch/$repo'
KUPFER_HTTPS = 'https://gitlab.com/kupfer/packages/prebuilts/-/raw/main/$repo'
Arch: TypeAlias = str
ARCHES = [

142
distro/abstract.py Normal file
View File

@@ -0,0 +1,142 @@
from copy import deepcopy
from typing import Optional, Mapping, ChainMap, Any
from .version import compare_package_versions
class PackageInfo:
name: str
version: str
_filename: Optional[str]
depends: list[str]
provides: list[str]
replaces: list[str]
def __init__(self, name: str, version: str, filename: str = None):
self.name = name
self.version = version
self._filename = filename
self.depends = []
self.provides = []
self.replaces = []
def __repr__(self):
return f'{self.name}@{self.version}'
def compare_version(self, other: str) -> int:
"""Returns -1 if `other` is newer than `self`, 0 if `self == other`, 1 if `self` is newer than `other`"""
return compare_package_versions(self.version, other)
def get_filename(self, ext='.zst') -> str:
assert self._filename
return self._filename
def acquire(self) -> Optional[str]:
"""
Acquires the package through either build or download.
Returns the downloaded file's path.
"""
raise NotImplementedError()
def is_remote(self) -> bool:
raise NotImplementedError()
class RepoSearchResult:
"""Repo search results split along qualifier. Truthy value is calculated on whether all members are empty"""
exact_name: list[PackageInfo]
provides: list[PackageInfo]
replaces: list[PackageInfo]
def __init__(self):
self.exact_name = []
self.provides = []
self.replaces = []
def __bool__(self):
return self.exact_name and self.provides and self.replaces
ResultSource = Any
ResultSources = Mapping[ResultSource, RepoSearchResult]
class MergedResults:
results: ResultSources
exact_name: Mapping[PackageInfo, ResultSource]
replaces: Mapping[PackageInfo, ResultSource]
provides: Mapping[PackageInfo, ResultSource]
def __init__(self, sources: ResultSources = {}):
self.results = {}
self.update(sources)
def update(self, additional_sources: ResultSources = {}):
assert isinstance(self.results, dict)
self.results.update(additional_sources)
self.exact_name = {}
self.replaces = {}
self.provides = {}
for source, results in self.results.items():
for source_category, target_category in [
(results.exact_name, self.exact_name),
(results.replaces, self.replaces),
(results.provides, self.provides),
]:
for pkg in source_category:
target_category[pkg] = source
class RepoInfo:
name: str
options: dict[str, str] = {}
url_template: str
packages: dict[str, PackageInfo]
remote: bool
def __init__(self, name: str, url_template: str, options: dict[str, str] = {}):
self.name = name
self.url_template = url_template
self.options = deepcopy(options)
self.remote = not url_template.startswith('file://')
def acquire_package(self, package: PackageInfo) -> Optional[str]:
if package not in self.packages.values():
raise NotImplementedError(f'Package {package} did not come from our repo')
return package.acquire()
def config_snippet(self) -> str:
options = {'Server': self.url_template} | self.options
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()])
def scan(self, refresh: bool = False):
pass
def get_providers(self, name: str) -> RepoSearchResult:
results = RepoSearchResult()
for package in self.packages.values():
if name == package.name:
results.exact_name.append(package)
if name in package.provides:
results.provides.append(package)
if name in package.replaces:
results.replaces.append(package)
return results
class DistroInfo:
repos: Mapping[str, RepoInfo]
def get_packages(self) -> Mapping[str, PackageInfo]:
""" get packages from all repos, semantically overlaying them"""
# results = {}
# for repo in list(self.repos.values())[::-1]: # TODO: figure if the list even needs to be reversed
# assert repo.packages is not None
# for package in repo.packages.values():
# results[package.name] = package
# return results
return ChainMap[str, PackageInfo](*[repo.packages for repo in list(self.repos.values())])
def get_providers(self, name: str, allow_empty: bool = False) -> MergedResults:
"""Returns a mapping from repo.name to RepoSearchResult"""
return MergedResults({name: repo.get_providers(name) for name, repo in list(self.repos.items())})

View File

@@ -1,18 +1,18 @@
from typing import Optional, Mapping
from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from constants import Arch, ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
from generator import generate_pacman_conf_body
from config import config
from .package import PackageInfo
from .repo import RepoInfo, Repo
from .abstract import RepoInfo, DistroInfo
from .repo import Repo
class Distro:
repos: Mapping[str, Repo]
class Distro(DistroInfo):
arch: str
repos: Mapping[str, Repo]
def __init__(self, arch: str, repo_infos: dict[str, RepoInfo], scan=False):
def __init__(self, arch: str, repo_infos: Mapping[str, RepoInfo], scan=False):
assert (arch in ARCHES)
self.arch = arch
self.repos = dict[str, Repo]()
@@ -25,17 +25,9 @@ class Distro:
scan=scan,
)
def get_packages(self):
""" get packages from all repos, semantically overlaying them"""
results = dict[str, PackageInfo]()
for repo in self.repos.values().reverse():
assert (repo.packages is not None)
for package in repo.packages:
results[package.name] = package
def repos_config_snippet(self, extra_repos: Mapping[str, RepoInfo] = {}) -> str:
extras = [Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()]
return '\n\n'.join(repo.config_snippet() for repo in (extras + list(self.repos.values())))
return '\n\n'.join(repo.config_snippet() for repo in (list(self.repos.values()) + extras))
def get_pacman_conf(self, extra_repos: Mapping[str, RepoInfo] = {}, check_space: bool = True):
body = generate_pacman_conf_body(self.arch, check_space=check_space)
@@ -43,23 +35,36 @@ class Distro:
def get_base_distro(arch: str) -> Distro:
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
repos = {name: RepoInfo(name, url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
return Distro(arch=arch, repo_infos=repos, scan=False)
def get_kupfer(arch: str, url_template: str) -> Distro:
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
repos: Mapping[str, Repo] = {name: Repo(name, url_template=url_template, arch=arch, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
return Distro(
arch=arch,
repo_infos=repos,
)
def get_kupfer_https(arch: str) -> Distro:
return get_kupfer(arch, KUPFER_HTTPS.replace('%branch%', config.file['pacman']['repo_branch']))
kupfer_https: dict[Arch, Distro]
kupfer_local: dict[Arch, dict[bool, Distro]]
def get_kupfer_local(arch: Optional[str] = None, in_chroot: bool = True) -> Distro:
def get_kupfer_https(arch: Arch) -> Distro:
global kupfer_https
if arch not in kupfer_https or not kupfer_https[arch]:
kupfer_https[arch] = get_kupfer(arch, KUPFER_HTTPS)
return kupfer_https[arch]
def get_kupfer_local(arch: Optional[Arch] = None, in_chroot: bool = True) -> Distro:
global kupfer_local
arch = arch or config.runtime['arch']
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
return get_kupfer(arch, f"file://{dir}/$arch/$repo")
if arch not in kupfer_local:
kupfer_local[arch] = {}
locals = kupfer_local[arch]
if in_chroot not in locals or not locals[in_chroot]:
locals[in_chroot] = get_kupfer(arch, f"file://{dir}/$arch/$repo")
return locals[in_chroot]

View File

@@ -1,33 +1,48 @@
from __future__ import annotations
from typing import Optional
import logging
from constants import Arch
from .abstract import PackageInfo
class PackageInfo:
name: str
version: str
filename: str
resolved_url: Optional[str]
class Package(PackageInfo):
arch: Arch
resolved_url: Optional[str] = None
repo_name: str
md5sum: Optional[str]
def __init__(
self,
name: str,
version: str,
filename: str,
resolved_url: str = None,
):
self.name = name
self.version = version
self.filename = filename
def __init__(self, arch: Arch, repo_name: str, *args, resolved_url: Optional[str] = None, **kwargs):
self.repo_name = repo_name
self.resolved_url = resolved_url
super().__init__(*args, **kwargs)
def __repr__(self):
return f'{self.name}@{self.version}'
def get_filename(self, ext='.zst') -> str:
return self._filename or f'{self.name}-{self.version}-{self.arch}.pkg.tar{ext}'
def is_remote(self) -> bool:
return bool(self.resolved_url and not self.resolved_url.startswith('file://'))
@staticmethod
def parse_desc(desc_str: str, resolved_url=None):
"""Parses a desc file, returning a PackageInfo"""
def parse_desc(desc_str: str, repo_name: str, resolved_url=None) -> Package:
"""Parses a desc file, returning a Package"""
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
desc = {}
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
desc[key.strip()] = value.strip()
return PackageInfo(desc['NAME'], desc['VERSION'], desc['FILENAME'], resolved_url=resolved_url)
package = Package(name=desc['NAME'],
version=desc['VERSION'],
arch=desc['ARCH'],
filename=desc['FILENAME'],
resolved_url=resolved_url,
repo_name=repo_name)
package.md5sum = desc.get('MD5SUM', None)
return package
def split_version_str(version_str) -> tuple[str, str]:
pkgver, pkgrel = version_str.rsplit('-', maxsplit=1)
logging.debug('Split versions: pkgver: {pkgver}; pkgrel: {pkgrel}')
return pkgver, pkgrel

11
distro/remote/package.py Normal file
View File

@@ -0,0 +1,11 @@
from utils import download_file
from .package import Package
class RemotePackage(Package):
def acquire(self):
assert self.resolved_url
assert self.is_remote()
return download_file(f'{self.resolved_url}/{self.get_filename()}')

View File

@@ -1,13 +1,13 @@
from copy import deepcopy
from io import BufferedReader
import logging
import os
import tarfile
import tempfile
import urllib.request
from config import config
from utils import download_file
from .package import PackageInfo
from .abstract import RepoInfo
from .package import Package
def resolve_url(url_template, repo_name: str, arch: str):
@@ -17,58 +17,44 @@ def resolve_url(url_template, repo_name: str, arch: str):
return result
class RepoInfo:
options: dict[str, str] = {}
url_template: str
def __init__(self, url_template: str, options: dict[str, str] = {}):
self.url_template = url_template
self.options.update(options)
class Repo(RepoInfo):
name: str
resolved_url: str
arch: str
packages: dict[str, PackageInfo]
remote: bool
scanned: bool = False
scanned: bool
def scan(self):
self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
self.remote = not self.resolved_url.startswith('file://')
uri = f'{self.resolved_url}/{self.name}.db'
path = ''
if self.remote:
logging.debug(f'Downloading repo file from {uri}')
with urllib.request.urlopen(uri) as request:
fd, path = tempfile.mkstemp()
with open(fd, 'wb') as writable:
writable.write(request.read())
else:
path = uri.split('file://')[1]
logging.debug(f'Parsing repo file at {path}')
with tarfile.open(path) as index:
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
pkg = PackageInfo.parse_desc(index.extractfile(node).read().decode(), self.resolved_url)
self.packages[pkg.name] = pkg
self.scanned = True
def __init__(self, name: str, url_template: str, arch: str, options={}, scan=False):
def __init__(self, name: str, url_template: str, arch: str, options: dict[str, str] = {}, scan=False):
self.scanned = False
self.packages = {}
self.name = name
self.url_template = url_template
self.arch = arch
self.options = deepcopy(options)
super().__init__(name, url_template=url_template, options=options)
if scan:
self.scan()
def config_snippet(self) -> str:
options = {'Server': self.url_template} | self.options
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()])
def acquire_index(self) -> str:
"""[Download and] return local file path to repo .db file"""
self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
self.remote = not self.resolved_url.startswith('file://')
uri = f'{self.resolved_url}/{self.name}.db'
if self.remote:
logging.debug(f'Downloading repo file from {uri}')
path = download_file(uri)
else:
path = uri.split('file://')[1]
return path
def get_RepoInfo(self):
return RepoInfo(url_template=self.url_template, options=self.options)
def scan(self, refresh: bool = False):
if refresh or not self.scanned:
path = self.acquire_index()
logging.debug(f'Parsing repo file at {path}')
with tarfile.open(path) as index:
for node in index.getmembers():
if os.path.basename(node.name) == 'desc':
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
with index.extractfile(node) as reader: # type: ignore
assert isinstance(reader, BufferedReader)
desc = reader.read().decode()
pkg = Package.parse_desc(desc, repo_name=self.name, resolved_url=self.resolved_url)
self.packages[pkg.name] = pkg
self.scanned = True

176
distro/version.py Normal file
View File

@@ -0,0 +1,176 @@
from enum import IntEnum
from typing import NamedTuple, Sequence, Union
# free-form python port of https://gitlab.archlinux.org/pacman/pacman/-/blob/master/lib/libalpm/version.c
Version = Union[str, int]
class VerComp(IntEnum):
RIGHT_NEWER = -1
EQUAL = 0
RIGHT_OLDER = 1
class EVR(NamedTuple):
epoch: int
version: str
release: int
subrelease: int
def parseEVR(input: str) -> EVR:
"""Parse `Epoch`, `Version` and `Release` from version-string `[Epoch:]Version[-Release]`"""
epoch = 0
version = ''
release = 1
subrelease = 0
rest = input
if ':' in rest:
split, rest = rest.split(':', maxsplit=1)
if split.isdigit():
epoch = int(split)
version = rest
if '-' in rest:
version, _release = rest.rsplit('-', maxsplit=1)
if _release.isnumeric():
release = int(_release)
else:
splits = _release.split('.')
assert len(splits) == 2
for split in splits:
assert split.isnumeric()
release, subrelease = (int(i) for i in splits)
return EVR(epoch, version, release, subrelease)
def int_compare(a: int, b: int) -> VerComp:
if b > a:
return VerComp.RIGHT_NEWER
if a > b:
return VerComp.RIGHT_OLDER
return VerComp.EQUAL
def rpm_version_compare(a: Version, b: Version) -> VerComp:
"""return -1: `b` is newer than `a`, 0: `a == b`, +1: `a` is newer than `b`"""
if a == b:
return VerComp.EQUAL
if isinstance(a, int) and isinstance(b, int):
return int_compare(a, b)
a = str(a)
b = str(b)
is_num: bool
one = 0
two = 0
offset1 = 0
offset2 = 0
def is_valid(index: int, sequence: Sequence) -> bool:
"""checks whether `index` is in range for `sequence`"""
return index < len(sequence)
def valid_one():
return is_valid(one, a)
def valid_two():
return is_valid(two, b)
# loop through each version segment of `a` and `b` and compare them
while valid_one() and valid_two():
while valid_one() and not a[one].isalnum():
one += 1
while valid_two() and not b[two].isalnum():
two += 1
# If we ran to the end of either, we are finished with the loop
if not (valid_one() and valid_two()):
break
# If the separator lengths were different, we are also finished
if (one - offset1) != (two - offset2):
return VerComp.RIGHT_NEWER if (one - offset1) < (two - offset2) else VerComp.RIGHT_OLDER
offset1 = one
offset2 = two
# grab first completely alpha or completely numeric segment
# leave `one` and `two` pointing to the start of the alpha or numeric
# segment and walk `offset1` and `offset2` to end of segment
if (a[offset1].isdigit()):
is_num = True
str_function = str.isdigit
else:
is_num = False
str_function = str.isalpha
while is_valid(offset1, a) and str_function(a[offset1]):
offset1 += 1
while is_valid(offset2, b) and str_function(b[offset2]):
offset2 += 1
# this cannot happen, as we previously tested to make sure that
# the first string has a non-empty segment
assert one != offset1
one_cut = a[one:offset1]
two_cut = b[two:offset2]
# take care of the case where the two version segments are
# different types: one numeric, the other alpha (i.e. empty)
# numeric segments are always newer than alpha segments
if two == offset2:
return VerComp.RIGHT_OLDER if is_num else VerComp.RIGHT_NEWER
if is_num:
# throw away any leading zeros - it's a number, right?
one_cut.lstrip('0')
two_cut.lstrip('0')
# whichever number has more digits wins
len_one, len_two = len(one_cut), len(two_cut)
if len_one != len_two:
return VerComp.RIGHT_OLDER if len_one > len_two else VerComp.RIGHT_NEWER
if two_cut > one_cut:
return VerComp.RIGHT_NEWER
if one_cut > two_cut:
return VerComp.RIGHT_OLDER
one = offset1
two = offset2
# this catches the case where all numeric and alpha segments have compared
# identically but the segment separating characters were different
if not valid_one() and not valid_two():
return VerComp.EQUAL
# the final showdown. we never want a remaining alpha string to beat an empty string.
# the logic is a bit weird, but:
# - if one is empty and two is not an alpha, two is newer.
# - if one is an alpha, two is newer.
# - otherwise one is newer.
if a[one].isalpha() or (not valid_one() and not b[two].isalpha()):
return VerComp.RIGHT_NEWER
else:
return VerComp.RIGHT_OLDER
def compare_package_versions(ver_a: str, ver_b: str) -> VerComp:
"""return -1: `b` is newer than `a`, 0: `a == b`, +1: `a` is newer than `b`"""
parsed_a, parsed_b = parseEVR(ver_a), parseEVR(ver_b)
for a, b in zip(parsed_a, parsed_b):
assert isinstance(a, (str, int))
assert isinstance(b, (str, int))
result = rpm_version_compare(a, b)
if result != VerComp.EQUAL:
return result
return VerComp.EQUAL

View File

@@ -18,9 +18,9 @@ ROOTFS = FLASH_PARTS['ROOTFS']
@click.command(name='flash')
@click.argument('what', type=click.Choice(list(FLASH_PARTS.values())))
@click.argument('location', type=str, required=False)
def cmd_flash(what: str, location: str):
"""Flash a partition onto a device. `location` takes either a path to a block device or one of emmc, sdcard"""
@click.argument('location', required=False, type=click.Choice(LOCATIONS))
def cmd_flash(what, location):
"""Flash a partition onto a device"""
enforce_wrap()
device, flavour = get_device_and_flavour()
device_image_name = get_image_name(device, flavour)

View File

@@ -13,7 +13,7 @@ from chroot.device import DeviceChroot, get_device_chroot
from constants import Arch, BASE_PACKAGES, DEVICES, FLAVOURS
from config import config, Profile
from distro.distro import get_base_distro, get_kupfer_https
from packages import build_enable_qemu_binfmt, discover_packages, build_packages
#from packages.local_repo import get_repo, LocalRepo
from ssh import copy_ssh_keys
from wrapper import enforce_wrap
@@ -23,7 +23,7 @@ IMG_FILE_BOOT_DEFAULT_SIZE = "90M"
def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess:
cmd = [
return subprocess.run([
'dd',
f'if={input}',
f'of={output}',
@@ -32,9 +32,7 @@ def dd_image(input: str, output: str, blocksize='1M') -> CompletedProcess:
'oflag=direct',
'status=progress',
'conv=sync,noerror',
]
logging.debug(f'running dd cmd: {cmd}')
return subprocess.run(cmd)
])
def partprobe(device: str):
@@ -355,11 +353,10 @@ def cmd_image():
@cmd_image.command(name='build')
@click.argument('profile_name', required=False)
@click.option('--local-repos/--no-local-repos', '-l/-L', default=True, help='Whether to use local packages. Defaults to true.')
@click.option('--build-pkgs/--no-build-pkgs', '-p/-P', default=True, help='Whether to build missing/outdated local packages. Defaults to true.')
@click.option('--build-pkgs/--no-build-pkgs', '-p/-P', default=True, help='Whether to build missing/outdated packages. Defaults to true.')
@click.option('--block-target', default=None, help='Override the block device file to target')
@click.option('--skip-part-images', default=False, help='Skip creating image files for the partitions and directly work on the target block device.')
def cmd_build(profile_name: str = None, local_repos: bool = True, build_pkgs: bool = True, block_target: str = None, skip_part_images: bool = False):
def cmd_build(profile_name: str = None, build_pkgs: bool = True, block_target: str = None, skip_part_images: bool = False):
"""Build a device image"""
enforce_wrap()
profile: Profile = config.get_profile(profile_name)
@@ -371,15 +368,16 @@ def cmd_build(profile_name: str = None, local_repos: bool = True, build_pkgs: bo
sector_size = 4096
rootfs_size_mb = FLAVOURS[flavour].get('size', 2) * 1000
repo = get_repo()
repo.init(arch)
repo.build_enable_qemu_binfmt(arch)
packages_dir = config.get_package_dir(arch)
use_local_repos = os.path.exists(os.path.join(packages_dir, 'main'))
packages = BASE_PACKAGES + DEVICES[device] + FLAVOURS[flavour]['packages'] + profile['pkgs_include']
if arch != config.runtime['arch']:
build_enable_qemu_binfmt(arch)
if local_repos and build_pkgs:
logging.info("Making sure all packages are built")
repo = discover_packages()
build_packages(repo, [p for name, p in repo.items() if name in packages], arch)
if build_pkgs:
repo.build_packages([p for name, p in repo.pkgbuilds.pkgbuilds.items() if name in packages], arch)
image_path = block_target or get_image_path(device, flavour)
@@ -415,7 +413,7 @@ def cmd_build(profile_name: str = None, local_repos: bool = True, build_pkgs: bo
flavour,
arch,
packages,
local_repos,
use_local_repos,
profile,
)
@@ -453,7 +451,7 @@ def cmd_inspect(profile: str = None, shell: bool = False):
chroot.activate()
if arch != config.runtime['arch']:
logging.info('Installing requisites for foreign-arch shell')
build_enable_qemu_binfmt(arch)
get_repo().build_enable_qemu_binfmt(arch)
logging.info('Starting inspection shell')
chroot.run_cmd('/bin/bash')
else:

View File

@@ -1,20 +1,15 @@
import click
import coloredlogs
import logging
import sys
def setup_logging(verbose: bool):
level_colors = coloredlogs.DEFAULT_LEVEL_STYLES | {'info': {'color': 'magenta', 'bright': True}, 'debug': {'color': 'blue', 'bright': True}}
field_colors = coloredlogs.DEFAULT_FIELD_STYLES | {'asctime': {'color': 'white', 'faint': True}}
level = logging.DEBUG if verbose else logging.INFO
coloredlogs.install(
logging.basicConfig(
stream=sys.stdout,
fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S',
level=level,
level_styles=level_colors,
field_styles=field_colors,
)
logging.debug('Logging set up.')

View File

@@ -1,576 +1,43 @@
import click
import logging
import multiprocessing
import os
import shutil
import subprocess
from copy import deepcopy
from joblib import Parallel, delayed
from glob import glob
from shutil import rmtree
from typing import Iterable, Iterator, Any, Optional
from typing import Iterable, Optional
from constants import REPOSITORIES, CROSSDIRECT_PKGS, QEMU_BINFMT_PKGS, GCC_HOSTSPECS, ARCHES, Arch, CHROOT_PATHS, MAKEPKG_CMD
from config import config
from chroot.build import get_build_chroot, BuildChroot
from constants import REPOSITORIES, ARCHES, Arch
from ssh import run_ssh_command, scp_put_files
from wrapper import enforce_wrap
from utils import git
from binfmt import register as binfmt_register
from .pkgbuild import Pkgbuild, parse_pkgbuild
pacman_cmd = [
'pacman',
'-Syuu',
'--noconfirm',
'--overwrite=*',
'--needed',
]
# from .pkgbuild import Pkgbuild
#from .local_repo import get_local_repo
def get_makepkg_env():
# has to be a function because calls to `config` must be done after config file was read
threads = config.file['build']['threads'] or multiprocessing.cpu_count()
return {key: val for key, val in os.environ.items() if not key.split('_', maxsplit=1)[0] in ['CI', 'GITLAB', 'FF']} | {
'LANG': 'C',
'CARGO_BUILD_JOBS': str(threads),
'MAKEFLAGS': f"-j{threads}",
'QEMU_LD_PREFIX': '/usr/aarch64-unknown-linux-gnu',
}
def build(paths: Iterable[str], force: bool, arch: Optional[Arch]):
# TODO: arch = config.get_profile()...
arch = arch or 'aarch64'
def clone_pkbuilds(pkgbuilds_dir: str, repo_url: str, branch: str, interactive=False, update=True):
git_dir = os.path.join(pkgbuilds_dir, '.git')
if not os.path.exists(git_dir):
logging.info('Cloning branch {branch} from {repo}')
result = git(['clone', '-b', branch, repo_url, pkgbuilds_dir])
if result.returncode != 0:
raise Exception('Error cloning pkgbuilds')
else:
result = git(['--git-dir', git_dir, 'branch', '--show-current'], capture_output=True)
current_branch = result.stdout.decode().strip()
if current_branch != branch:
logging.warning(f'pkgbuilds repository is on the wrong branch: {current_branch}, requested: {branch}')
if interactive and click.confirm('Would you like to switch branches?', default=False):
result = git(['switch', branch], dir=pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed switching branches')
if update:
if interactive:
if not click.confirm('Would you like to try updating the PKGBUILDs repo?'):
return
result = git(['pull'], pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed to update pkgbuilds')
def init_pkgbuilds(interactive=False):
pkgbuilds_dir = config.get_path('pkgbuilds')
repo_url = config.file['pkgbuilds']['git_repo']
branch = config.file['pkgbuilds']['git_branch']
clone_pkbuilds(pkgbuilds_dir, repo_url, branch, interactive=interactive, update=False)
def init_prebuilts(arch: Arch, dir: str = None):
"""Ensure that all `constants.REPOSITORIES` inside `dir` exist"""
prebuilts_dir = dir if dir else config.get_package_dir(arch)
os.makedirs(prebuilts_dir, exist_ok=True)
for repo in REPOSITORIES:
os.makedirs(os.path.join(prebuilts_dir, repo), exist_ok=True)
for ext1 in ['db', 'files']:
for ext2 in ['', '.tar.xz']:
if not os.path.exists(os.path.join(prebuilts_dir, repo, f'{repo}.{ext1}{ext2}')):
result = subprocess.run(
[
'tar',
'-czf',
f'{repo}.{ext1}{ext2}',
'-T',
'/dev/null',
],
cwd=os.path.join(prebuilts_dir, repo),
)
if result.returncode != 0:
logging.fatal('Failed to create prebuilt repos')
exit(1)
def discover_packages(parallel: bool = True) -> dict[str, Pkgbuild]:
pkgbuilds_dir = config.get_path('pkgbuilds')
packages: dict[str, Pkgbuild] = {}
paths = []
init_pkgbuilds(interactive=False)
for repo in REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
paths.append(os.path.join(repo, dir))
native_chroot = setup_build_chroot(config.runtime['arch'], add_kupfer_repos=False)
results = []
if parallel:
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(delayed(parse_pkgbuild)(path, native_chroot) for path in paths))
else:
chunks = (parse_pkgbuild(path, native_chroot) for path in paths)
for pkglist in chunks:
results += pkglist
logging.debug('Building package dictionary!')
for package in results:
for name in [package.name] + package.replaces:
if name in packages:
logging.warn(f'Overriding {packages[package.name]} with {package}')
packages[name] = package
# This filters the deps to only include the ones that are provided in this repo
for package in packages.values():
package.local_depends = package.depends.copy()
for dep in package.depends.copy():
found = dep in packages
for p in packages.values():
if found:
break
for name in p.names():
if dep == name:
logging.debug(f'Found {p.name} that provides {dep}')
found = True
break
if not found:
logging.debug(f'Removing {dep} from dependencies')
package.local_depends.remove(dep)
return packages
def filter_packages_by_paths(repo: dict[str, Pkgbuild], paths: Iterable[str], allow_empty_results=True) -> Iterable[Pkgbuild]:
if 'all' in paths:
return list(repo.values())
result = []
for pkg in repo.values():
if pkg.path in paths:
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result
def generate_dependency_chain(package_repo: dict[str, Pkgbuild], to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
First the top-level packages get selected by searching the paths.
Then their dependencies and sub-dependencies and so on get added to the selection.
"""
visited = set[Pkgbuild]()
visited_names = set[str]()
dep_levels: list[set[Pkgbuild]] = [set(), set()]
def visit(package: Pkgbuild, visited=visited, visited_names=visited_names):
visited.add(package)
visited_names.update(package.names())
def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]:
result = dict[Pkgbuild, int]()
for i, level in enumerate(levels):
for pkg in level:
result[pkg] = i
return result
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for dep_name in package.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
visit(dep_pkg)
yield dep_pkg
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for pkg in get_dependencies(package, package_repo):
yield pkg
for sub_pkg in get_recursive_dependencies(pkg, package_repo):
yield sub_pkg
logging.debug('Generating dependency chain:')
# init level 0
for package in to_build:
visit(package)
dep_levels[0].add(package)
logging.debug(f'Adding requested package {package.name}')
# add dependencies of our requested builds to level 0
for dep_pkg in get_recursive_dependencies(package):
logging.debug(f"Adding {package.name}'s dependency {dep_pkg.name} to level 0")
dep_levels[0].add(dep_pkg)
visit(dep_pkg)
"""
Starting with `level` = 0, iterate over the packages in `dep_levels[level]`:
1. Moving packages that are dependencies of other packages up to `level`+1
2. Adding yet unadded local dependencies of all pkgs on `level` to `level`+1
3. increment level
"""
level = 0
# protect against dependency cycles
repeat_count = 0
_last_level: Optional[set[Pkgbuild]] = None
while dep_levels[level]:
level_copy = dep_levels[level].copy()
modified = False
logging.debug(f'Scanning dependency level {level}')
if level > 100:
raise Exception('Dependency chain reached 100 levels depth, this is probably a bug. Aborting!')
for pkg in level_copy:
pkg_done = False
if pkg not in dep_levels[level]:
# pkg has been moved, move on
continue
# move pkg to level+1 if something else depends on it
for other_pkg in level_copy:
if pkg == other_pkg:
continue
if pkg_done:
break
if not issubclass(type(other_pkg), Pkgbuild):
raise Exception('Not a Pkgbuild object:' + repr(other_pkg))
for dep_name in other_pkg.depends:
if dep_name in pkg.names():
dep_levels[level].remove(pkg)
dep_levels[level + 1].add(pkg)
logging.debug(f'Moving {pkg.name} to level {level+1} because {other_pkg.name} depends on it as {dep_name}')
modified = True
pkg_done = True
break
for dep_name in pkg.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
logging.debug(f"Adding {pkg.name}'s dependency {dep_name} to level {level}")
dep_levels[level].add(dep_pkg)
visit(dep_pkg)
modified = True
if _last_level == dep_levels[level]:
repeat_count += 1
else:
repeat_count = 0
if repeat_count > 10:
raise Exception(f'Probable dependency cycle detected: Level has been passed on unmodifed multiple times: #{level}: {_last_level}')
_last_level = dep_levels[level].copy()
if not modified: # if the level was modified, make another pass.
level += 1
dep_levels.append(set[Pkgbuild]())
# reverse level list into buildorder (deps first!), prune empty levels
return list([lvl for lvl in dep_levels[::-1] if lvl])
def add_file_to_repo(file_path: str, repo_name: str, arch: Arch):
repo_dir = os.path.join(config.get_package_dir(arch), repo_name)
pacman_cache_dir = os.path.join(config.get_path('pacman'), arch)
file_name = os.path.basename(file_path)
target_file = os.path.join(repo_dir, file_name)
os.makedirs(repo_dir, exist_ok=True)
if file_path != target_file:
logging.debug(f'moving {file_path} to {target_file} ({repo_dir})')
shutil.copy(
file_path,
repo_dir,
)
os.unlink(file_path)
# clean up same name package from pacman cache
cache_file = os.path.join(pacman_cache_dir, file_name)
if os.path.exists(cache_file):
os.unlink(cache_file)
cmd = [
'repo-add',
'--remove',
os.path.join(
repo_dir,
f'{repo_name}.db.tar.xz',
),
target_file,
]
logging.debug(f'repo: running cmd: {cmd}')
result = subprocess.run(cmd)
if result.returncode != 0:
raise Exception(f'Failed add package {target_file} to repo {repo_name}')
for ext in ['db', 'files']:
file = os.path.join(repo_dir, f'{repo_name}.{ext}')
if os.path.exists(file + '.tar.xz'):
os.unlink(file)
shutil.copyfile(file + '.tar.xz', file)
old = file + '.tar.xz.old'
if os.path.exists(old):
os.unlink(old)
def add_package_to_repo(package: Pkgbuild, arch: Arch):
logging.info(f'Adding {package.path} to repo {package.repo}')
pkgbuild_dir = os.path.join(config.get_path('pkgbuilds'), package.path)
files = []
for file in os.listdir(pkgbuild_dir):
# Forced extension by makepkg.conf
if file.endswith('.pkg.tar.xz') or file.endswith('.pkg.tar.zst'):
repo_dir = os.path.join(config.get_package_dir(arch), package.repo)
files.append(os.path.join(repo_dir, file))
add_file_to_repo(os.path.join(pkgbuild_dir, file), package.repo, arch)
return files
def check_package_version_built(package: Pkgbuild, arch: Arch) -> bool:
native_chroot = setup_build_chroot(config.runtime['arch'])
config_path = '/' + native_chroot.write_makepkg_conf(
target_arch=arch,
cross_chroot_relative=os.path.join('chroot', arch),
cross=True,
)
cmd = ['cd', os.path.join(CHROOT_PATHS['pkgbuilds'], package.path), '&&'] + MAKEPKG_CMD + [
'--config',
config_path,
'--nobuild',
'--noprepare',
'--skippgpcheck',
'--packagelist',
]
result: Any = native_chroot.run_cmd(
cmd,
capture_output=True,
)
if result.returncode != 0:
raise Exception(f'Failed to get package list for {package.path}:' + '\n' + result.stdout.decode() + '\n' + result.stderr.decode())
missing = False
for line in result.stdout.decode('utf-8').split('\n'):
if line != "":
file = os.path.join(config.get_package_dir(arch), package.repo, os.path.basename(line))
logging.debug(f'Checking if {file} is built')
if os.path.exists(file):
add_file_to_repo(file, repo_name=package.repo, arch=arch)
else:
missing = True
return not missing
def setup_build_chroot(
arch: Arch,
extra_packages: list[str] = [],
add_kupfer_repos: bool = True,
clean_chroot: bool = False,
) -> BuildChroot:
init_prebuilts(arch)
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
chroot.mount_packages()
logging.info(f'Initializing {arch} build chroot')
chroot.initialize(reset=clean_chroot)
chroot.write_pacman_conf() # in case it was initialized with different repos
chroot.activate()
chroot.mount_pacman_cache()
chroot.mount_pkgbuilds()
if extra_packages:
chroot.try_install_packages(extra_packages, allow_fail=False)
return chroot
def setup_sources(package: Pkgbuild, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf', pkgbuilds_dir: str = None):
pkgbuilds_dir = pkgbuilds_dir if pkgbuilds_dir else CHROOT_PATHS['pkgbuilds']
makepkg_setup_args = [
'--config',
makepkg_conf_path,
'--nobuild',
'--holdver',
'--nodeps',
'--skippgpcheck',
]
logging.info(f'Setting up sources for {package.path} in {chroot.name}')
result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to check sources for {package.path}')
def build_package(
package: Pkgbuild,
arch: Arch,
repo_dir: str = None,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf'
repo_dir = repo_dir if repo_dir else config.get_path('pkgbuilds')
foreign_arch = config.runtime['arch'] != arch
deps = (list(set(package.depends) - set(package.names())))
target_chroot = setup_build_chroot(
arch=arch,
extra_packages=deps,
clean_chroot=clean_chroot,
)
native_chroot = target_chroot if not foreign_arch else setup_build_chroot(
arch=config.runtime['arch'],
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot,
)
cross = foreign_arch and package.mode == 'cross' and enable_crosscompile
target_chroot.initialize()
if cross:
logging.info(f'Cross-compiling {package.path}')
build_root = native_chroot
makepkg_compile_opts += ['--nodeps']
env = deepcopy(get_makepkg_env())
if enable_ccache:
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
logging.info('Setting up dependencies for cross-compilation')
# include crossdirect for ccache symlinks and qemu-user
results = native_chroot.try_install_packages(package.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"])
res_crossdirect = results['crossdirect']
assert isinstance(res_crossdirect, subprocess.CompletedProcess)
if res_crossdirect.returncode != 0:
raise Exception('Unable to install crossdirect')
# mount foreign arch chroot inside native chroot
chroot_relative = os.path.join(CHROOT_PATHS['chroots'], target_chroot.name)
makepkg_path_absolute = native_chroot.write_makepkg_conf(target_arch=arch, cross_chroot_relative=chroot_relative, cross=True)
makepkg_conf_path = os.path.join('etc', os.path.basename(makepkg_path_absolute))
native_chroot.mount_crosscompile(target_chroot)
else:
logging.info(f'Host-compiling {package.path}')
build_root = target_chroot
makepkg_compile_opts += ['--syncdeps']
env = deepcopy(get_makepkg_env())
if foreign_arch and enable_crossdirect and package.name not in CROSSDIRECT_PKGS:
env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}"
target_chroot.mount_crossdirect(native_chroot)
else:
if enable_ccache:
logging.debug('ccache enabled')
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
deps += ['ccache']
logging.debug(('Building for native arch. ' if not foreign_arch else '') + 'Skipping crossdirect.')
dep_install = target_chroot.try_install_packages(deps, allow_fail=False)
failed_deps = [name for name, res in dep_install.items() if res.returncode != 0] # type: ignore[union-attr]
if failed_deps:
raise Exception(f'Dependencies failed to install: {failed_deps}')
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
setup_sources(package, build_root, makepkg_conf_path=makepkg_conf_absolute)
build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}'
logging.debug(f'Building: Running {build_cmd}')
result = build_root.run_cmd(build_cmd, inner_env=env, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], package.path))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to compile package {package.path}')
def get_unbuilt_package_levels(repo: dict[str, Pkgbuild], packages: Iterable[Pkgbuild], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
package_levels = generate_dependency_chain(repo, packages)
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
i = 0
for level_packages in package_levels:
level = set[Pkgbuild]()
for package in level_packages:
if ((not check_package_version_built(package, arch)) or set.intersection(set(package.depends), set(build_names)) or
(force and package in packages)):
level.add(package)
build_names.update(package.names())
if level:
build_levels.append(level)
logging.debug(f'Adding to level {i}:' + '\n' + ('\n'.join([p.name for p in level])))
i += 1
return build_levels
def build_packages(
repo: dict[str, Pkgbuild],
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
build_levels = get_unbuilt_package_levels(repo, packages, arch, force=force)
if not build_levels:
logging.info('Everything built already')
return
files = []
for level, need_build in enumerate(build_levels):
logging.info(f"(Level {level}) Building {', '.join([x.name for x in need_build])}")
for package in need_build:
build_package(
package,
arch=arch,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
files += add_package_to_repo(package, arch)
return files
def build_packages_by_paths(
paths: Iterable[str],
arch: Arch,
repo: dict[str, Pkgbuild],
force=False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
if isinstance(paths, str):
paths = [paths]
for _arch in set([arch, config.runtime['arch']]):
init_prebuilts(_arch)
packages = filter_packages_by_paths(repo, paths, allow_empty_results=False)
return build_packages(
repo,
packages,
arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_enable_qemu_binfmt(arch: Arch, repo: dict[str, Pkgbuild] = None):
if arch not in ARCHES:
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
logging.info('Installing qemu-user (building if necessary)')
enforce_wrap()
if not repo:
repo = discover_packages()
native = config.runtime['arch']
# build qemu-user, binfmt, crossdirect
chroot = setup_build_chroot(native)
build_packages_by_paths(
['cross/' + pkg for pkg in CROSSDIRECT_PKGS],
native,
repo,
enable_crosscompile=False,
enable_crossdirect=False,
enable_ccache=False,
config.enforce_config_loaded()
local_repo = get_local_repo()
local_repo.init(arch)
# repo: dict[str, Pkgbuild] = local_repo.discover_packages()
if arch != config.runtime['arch']:
local_repo.build_enable_qemu_binfmt(arch)
return local_repo.pkgbuilds.build_packages_by_paths(
paths,
arch,
force=force,
enable_crosscompile=config.file['build']['crosscompile'],
enable_crossdirect=config.file['build']['crossdirect'],
enable_ccache=config.file['build']['ccache'],
clean_chroot=config.file['build']['clean_mode'],
)
subprocess.run(['pacman', '-Syy', '--noconfirm', '--needed', '--config', os.path.join(chroot.path, 'etc/pacman.conf')] + QEMU_BINFMT_PKGS)
if arch != native:
binfmt_register(arch)
@click.group(name='packages')
@@ -583,7 +50,7 @@ def cmd_packages():
def cmd_update(non_interactive: bool = False):
"""Update PKGBUILDs git repo"""
enforce_wrap()
init_pkgbuilds(interactive=not non_interactive)
get_local_repo().pkgbuilds.init(interactive=not non_interactive)
@cmd_packages.command(name='build')
@@ -601,30 +68,6 @@ def cmd_build(paths: list[str], force=False, arch=None):
build(paths, force, arch)
def build(paths: Iterable[str], force: bool, arch: Optional[Arch]):
# TODO: arch = config.get_profile()...
arch = arch or 'aarch64'
if arch not in ARCHES:
raise Exception(f'Unknown architecture "{arch}". Choices: {", ".join(ARCHES)}')
enforce_wrap()
config.enforce_config_loaded()
repo: dict[str, Pkgbuild] = discover_packages()
if arch != config.runtime['arch']:
build_enable_qemu_binfmt(arch, repo=repo)
return build_packages_by_paths(
paths,
arch,
repo,
force=force,
enable_crosscompile=config.file['build']['crosscompile'],
enable_crossdirect=config.file['build']['crossdirect'],
enable_ccache=config.file['build']['ccache'],
clean_chroot=config.file['build']['clean_mode'],
)
@cmd_packages.command(name='sideload')
@click.argument('paths', nargs=-1)
def cmd_sideload(paths: Iterable[str]):
@@ -647,14 +90,12 @@ def cmd_sideload(paths: Iterable[str]):
@click.option('-n', '--noop', is_flag=True, default=False, help="Print what would be removed but dont execute")
@click.argument('what', type=click.Choice(['all', 'src', 'pkg']), nargs=-1)
def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = False):
"""Remove files and directories not tracked in PKGBUILDs.git. Passing in an empty `what` defaults it to `['all']`"""
"""Remove files and directories not tracked in PKGBUILDs.git"""
enforce_wrap()
if noop:
logging.debug('Running in noop mode!')
if force:
logging.debug('Running in FORCE mode!')
what = what or ['all']
logging.debug(f'Clearing {what} from PKGBUILDs')
pkgbuilds = config.get_path('pkgbuilds')
if 'all' in what:
warning = "Really reset PKGBUILDs to git state completely?\nThis will erase any untracked changes to your PKGBUILDs directory."
@@ -679,7 +120,7 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
dirs += glob(os.path.join(pkgbuilds, '*', '*', loc))
dir_lines = '\n'.join(dirs)
verb = 'Would remove' if noop else 'Removing'
verb = 'Would remove' if noop or force else 'Removing'
logging.info(verb + ' directories:\n' + dir_lines)
if not (noop or force):
@@ -694,8 +135,9 @@ def cmd_clean(what: Iterable[str] = ['all'], force: bool = False, noop: bool = F
@cmd_packages.command(name='list')
def cmd_list():
enforce_wrap()
repo = get_local_repo()
logging.info('Discovering packages.')
packages = discover_packages()
packages = repo.discover_packages()
logging.info('Done! Pkgbuilds:')
for p in set(packages.values()):
print(
@@ -705,11 +147,12 @@ def cmd_list():
@cmd_packages.command(name='check')
@click.argument('paths', nargs=-1)
def cmd_check(paths):
def cmd_check(paths: list[str]):
"""Check that specified PKGBUILDs are formatted correctly"""
enforce_wrap()
paths = list(paths)
packages = filter_packages_by_paths(discover_packages(), paths, allow_empty_results=False)
repo = get_local_repo()
packages = repo.pkgbuilds.filter_packages_by_paths(paths, allow_empty_results=False)
for package in packages:
name = package.name
@@ -719,7 +162,7 @@ def cmd_check(paths):
is_git_package = True
required_arches = ''
provided_arches = []
provided_arches: list[str] = []
mode_key = '_mode'
pkgbase_key = 'pkgbase'

37
packages/helpers.py Normal file
View File

@@ -0,0 +1,37 @@
import logging
import os
import multiprocessing
from config import config
from chroot.build import get_build_chroot, BuildChroot
from constants import Arch
def setup_build_chroot(
arch: Arch,
extra_packages: list[str] = [],
add_kupfer_repos: bool = True,
clean_chroot: bool = False,
) -> BuildChroot:
chroot = get_build_chroot(arch, add_kupfer_repos=add_kupfer_repos)
chroot.mount_packages()
logging.info(f'Initializing {arch} build chroot')
chroot.initialize(reset=clean_chroot)
chroot.write_pacman_conf() # in case it was initialized with different repos
chroot.activate()
chroot.mount_pacman_cache()
chroot.mount_pkgbuilds()
if extra_packages:
chroot.try_install_packages(extra_packages, allow_fail=False)
return chroot
def get_makepkg_env():
# has to be a function because calls to `config` must be done after config file was read
threads = config.file['build']['threads'] or multiprocessing.cpu_count()
return os.environ.copy() | {
'LANG': 'C',
'CARGO_BUILD_JOBS': str(threads),
'MAKEFLAGS': f"-j{threads}",
'QEMU_LD_PREFIX': '/usr/aarch64-unknown-linux-gnu',
}

34
packages/local_distro.py Normal file
View File

@@ -0,0 +1,34 @@
import logging
import subprocess
import os
from typing import Optional
from binfmt import register as binfmt_register
from config import config
from chroot.build import setup_build_chroot
from constants import Arch, ARCHES, QEMU_BINFMT_PKGS, REPOSITORIES
from wrapper import enforce_wrap
from distro.distro import Distro
from .local_repo import LocalRepo
class LocalDistro(Distro):
pass
_local_distros = dict[Arch, LocalDistro]()
def get_local_distro(arch: Arch, repo_names: list[str] = REPOSITORIES) -> LocalDistro:
global _local_distros
if arch not in _local_distros or not _local_distros[arch]:
repos = dict[str, LocalRepo]()
for name in repo_names:
repos[name] = LocalRepo(name, arch)
_local_distros[arch] = LocalDistro(arch, repos)
return _local_distros[arch]
def get_local_distro_flat(arch: Arch, flat_repo_name: str = "local"):
return get_local_distro(arch, [flat_repo_name])

28
packages/local_package.py Normal file
View File

@@ -0,0 +1,28 @@
import os
from typing import Optional
from config import config
from distro.package import RemotePackage
from .pkgbuild import SourcePackage
#from .pkgbuild import Pkgbuild
class LocalPackage(RemotePackage):
source_package: SourcePackage
remote_package: RemotePackage
local_package: RemotePackage
def __init__(self, source_package: SourcePackage, remote_package: Optional[RemotePackage], local_package: Optional[RemotePackage]):
self.source_package = source_package
self.remote_package = remote_package
def acquire(self) -> Optional[str]:
file_name = self.get_filename()
assert file_name
file_path = os.path.join(config.get_package_dir(self.arch), self.repo_name, file_name)
if os.path.exists(file_path):
return file_path
# not found: invalidate version
self.version = None
return None

121
packages/local_repo.py Normal file
View File

@@ -0,0 +1,121 @@
import logging
import os
import shutil
import subprocess
from typing import Optional
from config import config
from constants import Arch, CHROOT_PATHS
from distro.repo import Repo
from distro.abstract import PackageInfo
from utils import md5sum_file
from .pkgbuild import Pkgbuild, Pkgbase, SubPkgbuild
class LocalRepo(Repo):
initialized: bool = False
repo_dir: str
def __init__(self, name: str, arch: Arch, repo_dir: Optional[str] = None, options: dict[str, str] = {'SigLevel': 'Never'}, scan=False):
self.repo_dir = repo_dir or config.get_path('packages')
self.full_path = os.path.join(self.repo_dir, arch, name)
super().__init__(name=name, url_template=f'file://{CHROOT_PATHS["packages"]}/$arch/$repo', arch=arch, options=options, scan=scan)
def init(self):
"""Create repo database files"""
if not self.initialized:
repo = self.name
repo_dir = os.path.join(self.repo_dir, self.arch, repo)
os.makedirs(repo_dir, exist_ok=True)
for ext1 in ['db', 'files']:
for ext2 in ['', '.tar.xz']:
if not os.path.exists(os.path.join(repo_dir, f'{repo}.{ext1}{ext2}')):
result = subprocess.run(
[
'tar',
'-czf',
f'{repo}.{ext1}{ext2}',
'-T',
'/dev/null',
],
cwd=repo_dir,
)
if result.returncode != 0:
raise Exception('Failed to create prebuilt repos')
self.initialized = True
def scan(self, refresh: bool = False):
if not self.initialized:
self.init()
super().scan(refresh=refresh)
def copy_file_to_repo(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
repo_dir = self.full_path
target_file = os.path.join(repo_dir, file_name)
os.makedirs(repo_dir, exist_ok=True)
md5sum = md5sum_file(file_path)
if file_path != target_file:
if md5sum_file(target_file) != md5sum:
logging.debug(f'moving {file_path} to {target_file} ({repo_dir})')
shutil.copy(
file_path,
repo_dir,
)
else:
logging.warning('Exact package file (confirmed by hash) was already in the repo. Skipped and deleted.')
os.unlink(file_path)
return os.path.join(repo_dir, file_name), md5sum
def run_repo_add(self, target_file: str):
cmd = [
'repo-add',
'--remove',
os.path.join(
self.full_path,
f'{self.name}.db.tar.xz',
),
target_file,
]
logging.debug(f'repo: running cmd: {cmd}')
result = subprocess.run(cmd)
if result.returncode != 0:
raise Exception(f'Failed to add package {target_file} to repo {self.name}')
for ext in ['db', 'files']:
file = os.path.join(self.full_path, f'{self.name}.{ext}')
if os.path.exists(file + '.tar.xz'):
os.unlink(file)
shutil.copyfile(file + '.tar.xz', file)
old = file + '.tar.xz.old'
if os.path.exists(old):
os.unlink(old)
def add_file_to_repo(self, file_path: str):
pacman_cache_dir = os.path.join(config.get_path('pacman'), self.arch)
file_name = os.path.basename(file_path)
# copy file to repo dir
target_file, md5sum = self.copy_file_to_repo(file_path)
# clean up same name package from pacman cache
cache_file = os.path.join(pacman_cache_dir, file_name)
if os.path.exists(cache_file) and md5sum_file(cache_file) != md5sum:
logging.debug(f'Removing stale cache file (checksum mismatch): {cache_file}')
os.unlink(cache_file)
self.run_repo_add(target_file)
return target_file
def add_package_to_repo(self, package: Pkgbuild):
logging.info(f'Adding {package.name} at {package.path} to repo {self.name}')
pkgbuild_dir = package.path
assert package.path
files = []
for file in os.listdir(pkgbuild_dir):
# Forced extension by makepkg.conf
if file.endswith('.pkg.tar.xz') or file.endswith('.pkg.tar.zst'):
files.append(self.add_file_to_repo(os.path.join(pkgbuild_dir, file)))
return files

0
packages/management.py Normal file
View File

204
packages/meta_distro.py Normal file
View File

@@ -0,0 +1,204 @@
import logging
import subprocess
import os
from typing import Iterable, Iterator, Mapping, Optional
from binfmt import register as binfmt_register
from config import config
from chroot.build import setup_build_chroot
from distro.abstract import DistroInfo, PackageInfo
#from distro.distro import Distro
from constants import Arch, ARCHES, QEMU_BINFMT_PKGS
from wrapper import enforce_wrap
from .pkgbuild import Pkgbuild
from .local_distro import LocalDistro
from .source_distro import SourceDistro
from .meta_package import MetaPackage
class MetaDistro(DistroInfo):
def __init__(
self,
source_distro: SourceDistro,
remote_distro: DistroInfo,
local_distro: LocalDistro,
):
pass
def get_unbuilt_package_levels(self, packages: Iterable[PackageInfo], arch: Arch, force: bool = False) -> list[set[Pkgbuild]]:
package_levels = self.pkgbuilds.generate_dependency_chain(packages)
build_names = set[str]()
build_levels = list[set[Pkgbuild]]()
i = 0
for level_packages in package_levels:
level = set[Pkgbuild]()
for package in level_packages:
if ((not self.check_package_version_built(package, arch)) or set.intersection(set(package.depends), set(build_names)) or
(force and package in packages)):
level.add(package)
build_names.update(package.names())
if level:
build_levels.append(level)
logging.debug(f'Adding to level {i}:' + '\n' + ('\n'.join([p.name for p in level])))
i += 1
return build_levels
def generate_dependency_chain(self, to_build: Iterable[MetaPackage]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
First the top-level packages get selected by searching the paths.
Then their dependencies and sub-dependencies and so on get added to the selection.
"""
visited = set[Pkgbuild]()
visited_names = set[str]()
dep_levels: list[set[Pkgbuild]] = [set(), set()]
package_repo = self.pkgbuilds
def visit(package: Pkgbuild, visited=visited, visited_names=visited_names):
visited.add(package)
visited_names.update(package.names())
def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]:
result = dict[Pkgbuild, int]()
for i, level in enumerate(levels):
for pkg in level:
result[pkg] = i
return result
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for dep_name in package.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
visit(dep_pkg)
yield dep_pkg
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for pkg in get_dependencies(package, package_repo):
yield pkg
for sub_pkg in get_recursive_dependencies(pkg, package_repo):
yield sub_pkg
logging.debug('Generating dependency chain:')
# init level 0
for package in to_build:
visit(package)
dep_levels[0].add(package)
logging.debug(f'Adding requested package {package.name}')
# add dependencies of our requested builds to level 0
for dep_pkg in get_recursive_dependencies(package):
logging.debug(f"Adding {package.name}'s dependency {dep_pkg.name} to level 0")
dep_levels[0].add(dep_pkg)
visit(dep_pkg)
"""
Starting with `level` = 0, iterate over the packages in `dep_levels[level]`:
1. Moving packages that are dependencies of other packages up to `level`+1
2. Adding yet unadded local dependencies of all pkgs on `level` to `level`+1
3. increment level
"""
level = 0
# protect against dependency cycles
repeat_count = 0
_last_level: Optional[set[Pkgbuild]] = None
while dep_levels[level]:
level_copy = dep_levels[level].copy()
modified = False
logging.debug(f'Scanning dependency level {level}')
if level > 100:
raise Exception('Dependency chain reached 100 levels depth, this is probably a bug. Aborting!')
for pkg in level_copy:
pkg_done = False
if pkg not in dep_levels[level]:
# pkg has been moved, move on
continue
# move pkg to level+1 if something else depends on it
for other_pkg in level_copy:
if pkg == other_pkg:
continue
if pkg_done:
break
if not issubclass(type(other_pkg), Pkgbuild):
raise Exception('Not a Pkgbuild object:' + repr(other_pkg))
for dep_name in other_pkg.depends:
if dep_name in pkg.names():
dep_levels[level].remove(pkg)
dep_levels[level + 1].add(pkg)
logging.debug(f'Moving {pkg.name} to level {level+1} because {other_pkg.name} depends on it as {dep_name}')
modified = True
pkg_done = True
break
for dep_name in pkg.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
logging.debug(f"Adding {pkg.name}'s dependency {dep_name} to level {level}")
dep_levels[level].add(dep_pkg)
visit(dep_pkg)
modified = True
if _last_level == dep_levels[level]:
repeat_count += 1
else:
repeat_count = 0
if repeat_count > 10:
raise Exception(f'Probable dependency cycle detected: Level has been passed on unmodifed multiple times: #{level}: {_last_level}')
_last_level = dep_levels[level].copy()
if not modified: # if the level was modified, make another pass.
level += 1
dep_levels.append(set[Pkgbuild]())
# reverse level list into buildorder (deps first!), prune empty levels
return list([lvl for lvl in dep_levels[::-1] if lvl])
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
build_levels = self.get_unbuilt_package_levels(packages, arch, force=force)
if not build_levels:
logging.info('Everything built already')
return
self.pkgbuilds.build_package_levels(
build_levels,
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def get_packages(self) -> Mapping[str, MetaPackage]:
return super().get_packages()
def build_enable_qemu_binfmt(self, foreign_arch: Arch):
if foreign_arch not in ARCHES:
raise Exception(f'Unknown architecture "{foreign_arch}". Choices: {", ".join(ARCHES)}')
enforce_wrap()
native = config.runtime['arch']
assert self.arch == native
self.init()
# build qemu-user, binfmt, crossdirect
chroot = setup_build_chroot(native)
logging.info('Installing qemu-user (building if necessary)')
qemu_pkgs = [pkg for pkgname, pkg in self.get_packages().items() if pkgname in QEMU_BINFMT_PKGS]
self.build_packages(
qemu_pkgs,
native,
enable_crosscompile=False,
enable_crossdirect=False,
enable_ccache=False,
)
subprocess.run(['pacman', '-Syy', '--noconfirm', '--needed', '--config', os.path.join(chroot.path, 'etc/pacman.conf')] + QEMU_BINFMT_PKGS)
if foreign_arch != native:
binfmt_register(foreign_arch)

24
packages/meta_package.py Normal file
View File

@@ -0,0 +1,24 @@
from typing import Optional
from distro.package import Package, PackageInfo
from distro.version import compare_package_versions, VerComp
from .source_repo import SourceRepo, SourcePackage
from .local_package import LocalPackage
from distro.remote.package import RemotePackage
from .pkgbuild import Pkgbuild
class MetaPackage(PackageInfo):
pkgbuild: Optional[Pkgbuild]
local_package: Optional[LocalPackage]
remote_package: Optional[RemotePackage]
def __init__(self, source_pkgbuild: Optional[Pkgbuild], local_package: Optional[PackageInfo], remote_package: Optional[PackageInfo]):
self.pkgbuild = source_pkgbuild
self.local_package = local_package
self.remote_package = remote_package
def acquire(self, download=True, build=True) -> str:
version_comparison = self.binary_package.compare_version(Pkgbuild.version)

64
packages/meta_repo.py Normal file
View File

@@ -0,0 +1,64 @@
from hashlib import md5
import logging
import os
import shutil
import subprocess
from typing import Iterable
from config import config
from constants import Arch
from distro.repo import RepoInfo
from .pkgbuild import Pkgbuild
from .source_repo import SourceRepo, get_repo as get_source_repo
from .local_distro import get_local_distro, LocalRepo
from .helpers import setup_build_chroot
class MetaRepo(LocalRepo):
def __init__(self, name, local_repo: LocalRepo):
self.name = name
self.local_repo = local_repo
self.arch = local_repo.arch
def init(self, discover_packages: bool = True, parallel: bool = True):
self.pkgbuilds.init()
if discover_packages:
self.pkgbuilds.discover_packages(refresh=False, parallel=parallel)
self.local_repo.init()
def add_package_to_repo(self, package: Pkgbuild, arch: Arch):
logging.info(f'Adding {package.path} to repo {package.repo}')
pkgbuild_dir = self.pkgbuilds.pkgbuilds_dir
files = []
for file in os.listdir(pkgbuild_dir):
# Forced extension by makepkg.conf
if file.endswith('.pkg.tar.xz') or file.endswith('.pkg.tar.zst'):
assert package.repo and package.repo.name
repo_name = package.repo.name
repo_dir = os.path.join(self.repo_dir, arch, repo_name)
files.append(os.path.join(repo_dir, file))
self.local_repo.add_file_to_repo(os.path.join(pkgbuild_dir, file), repo_name, arch)
if files and self.local_repo.scanned:
self.scan(refresh=True)
return files
def check_package_version_built(self, package: Pkgbuild) -> bool:
native_chroot = setup_build_chroot(config.runtime['arch'])
missing = False
for line in package.get_pkg_filenames(self.arch, native_chroot):
if not line:
continue
assert package.repo and package.repo.name
file = os.path.join(self.repo_dir, self.arch, package.repo.name, os.path.basename(line))
logging.debug(f'Checking if {file} is built')
if os.path.exists(file):
self.add_file_to_repo(file, repo_name=package.repo.name, arch=self.arch)
else:
missing = True
return not missing

View File

@@ -1,36 +1,51 @@
from copy import deepcopy
from __future__ import annotations
import logging
import os
import subprocess
from chroot import Chroot
from constants import CHROOT_PATHS, MAKEPKG_CMD
from copy import deepcopy
from typing import Any, Iterable, Optional, Sequence
from distro.package import PackageInfo
from chroot.build import BuildChroot
from config import config
from constants import Arch, CHROOT_PATHS, MAKEPKG_CMD, CROSSDIRECT_PKGS, GCC_HOSTSPECS
from distro.abstract import PackageInfo
from .helpers import setup_build_chroot, get_makepkg_env
class Pkgbuild(PackageInfo):
name: str
version: str
arches: list[Arch]
depends: list[str]
provides: list[str]
replaces: list[str]
local_depends: list[str]
repo = ''
mode = ''
path = ''
pkgver = ''
pkgrel = ''
source_packages: dict[Arch, SourcePackage]
def __init__(
self,
relative_path: str,
arches: list[Arch] = [],
depends: list[str] = [],
provides: list[str] = [],
replaces: list[str] = [],
) -> None:
"""Create new Pkgbuild representation for file located at `relative_path/PKGBUILD`. `relative_path` will be written to `self.path`"""
self.name = os.path.basename(relative_path)
self.version = ''
self.path = relative_path
self.depends = deepcopy(depends)
self.provides = deepcopy(provides)
self.replaces = deepcopy(replaces)
self.arches = deepcopy(arches)
self.source_packages = {}
def __repr__(self):
return f'Pkgbuild({self.name},{repr(self.path)},{self.version},{self.mode})'
@@ -38,16 +53,182 @@ class Pkgbuild(PackageInfo):
def names(self):
return list(set([self.name] + self.provides + self.replaces))
def get_pkg_filenames(self, arch: Arch, native_chroot: BuildChroot) -> Iterable[str]:
config_path = '/' + native_chroot.write_makepkg_conf(
target_arch=arch,
cross_chroot_relative=os.path.join('chroot', f'base_{arch}'),
cross=True,
)
cmd = ['cd', os.path.join(CHROOT_PATHS['pkgbuilds'], self.path), '&&'] + MAKEPKG_CMD + [
'--config',
config_path,
'--nobuild',
'--noprepare',
'--skippgpcheck',
'--packagelist',
]
result: Any = native_chroot.run_cmd(
cmd,
capture_output=True,
)
if result.returncode != 0:
raise Exception(f'Failed to get package list for {self.path}:' + '\n' + result.stdout.decode() + '\n' + result.stderr.decode())
return result.stdout.decode('utf-8').split('\n')
def setup_sources(self, chroot: BuildChroot, makepkg_conf_path='/etc/makepkg.conf'):
makepkg_setup_args = [
'--config',
makepkg_conf_path,
'--nobuild',
'--holdver',
'--nodeps',
'--skippgpcheck',
]
logging.info(f'Setting up sources for {self.path} in {chroot.name}')
result = chroot.run_cmd(MAKEPKG_CMD + makepkg_setup_args, cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], self.path))
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to check sources for {self.path}')
def build(
self,
arch: Arch,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
repo_dir: str = None,
):
"""build the PKGBUILD for the given Architecture. Returns the directory in which the PKGBUILD and the resulting packages reside"""
makepkg_compile_opts = ['--holdver']
makepkg_conf_path = 'etc/makepkg.conf'
repo_dir = repo_dir or config.get_path('pkgbuilds')
foreign_arch = config.runtime['arch'] != arch
deps = (list(set(self.depends) - set(self.names())))
target_chroot = setup_build_chroot(
arch=arch,
extra_packages=deps,
clean_chroot=clean_chroot,
)
native_chroot = target_chroot if not foreign_arch else setup_build_chroot(
arch=config.runtime['arch'],
extra_packages=['base-devel'] + CROSSDIRECT_PKGS,
clean_chroot=clean_chroot,
)
cross = foreign_arch and self.mode == 'cross' and enable_crosscompile
target_chroot.initialize()
if cross:
logging.info(f'Cross-compiling {self.path}')
build_root = native_chroot
makepkg_compile_opts += ['--nodeps']
env = deepcopy(get_makepkg_env())
if enable_ccache:
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
logging.info('Setting up dependencies for cross-compilation')
# include crossdirect for ccache symlinks and qemu-user
results = native_chroot.try_install_packages(self.depends + CROSSDIRECT_PKGS + [f"{GCC_HOSTSPECS[native_chroot.arch][arch]}-gcc"])
res_crossdirect = results['crossdirect']
assert isinstance(res_crossdirect, subprocess.CompletedProcess)
if res_crossdirect.returncode != 0:
raise Exception('Unable to install crossdirect')
# mount foreign arch chroot inside native chroot
chroot_relative = os.path.join(CHROOT_PATHS['chroots'], target_chroot.name)
makepkg_path_absolute = native_chroot.write_makepkg_conf(target_arch=arch, cross_chroot_relative=chroot_relative, cross=True)
makepkg_conf_path = os.path.join('etc', os.path.basename(makepkg_path_absolute))
native_chroot.mount_crosscompile(target_chroot)
else:
logging.info(f'Host-compiling {self.path}')
build_root = target_chroot
makepkg_compile_opts += ['--syncdeps']
env = deepcopy(get_makepkg_env())
if foreign_arch and enable_crossdirect and self.name not in CROSSDIRECT_PKGS:
env['PATH'] = f"/native/usr/lib/crossdirect/{arch}:{env['PATH']}"
target_chroot.mount_crossdirect(native_chroot)
else:
if enable_ccache:
logging.debug('ccache enabled')
env['PATH'] = f"/usr/lib/ccache:{env['PATH']}"
deps += ['ccache']
logging.debug(('Building for native arch. ' if not foreign_arch else '') + 'Skipping crossdirect.')
dep_install = target_chroot.try_install_packages(deps, allow_fail=False)
failed_deps = [name for name, res in dep_install.items() if res.returncode != 0] # type: ignore[union-attr]
if failed_deps:
raise Exception(f'Dependencies failed to install: {failed_deps}')
makepkg_conf_absolute = os.path.join('/', makepkg_conf_path)
self.setup_sources(build_root, makepkg_conf_path=makepkg_conf_absolute)
build_cmd = f'makepkg --config {makepkg_conf_absolute} --skippgpcheck --needed --noconfirm --ignorearch {" ".join(makepkg_compile_opts)}'
logging.debug(f'Building: Running {build_cmd}')
pkgbuild_dir = os.path.join(CHROOT_PATHS['pkgbuilds'], self.path)
result = build_root.run_cmd(build_cmd, inner_env=env, cwd=pkgbuild_dir)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode != 0:
raise Exception(f'Failed to compile package {self.path}')
return pkgbuild_dir
def update_version(self):
"""updates `self.version` from `self.pkgver` and `self.pkgrel`"""
self.version = f'{self.pkgver}-{self.pkgrel}'
def update(self, pkgbuild: Pkgbuild):
self.depends = pkgbuild.depends
self.provides = pkgbuild.provides
self.replaces = pkgbuild.replaces
self.pkgver = pkgbuild.pkgver
self.pkgrel = pkgbuild.pkgrel
self.local_depends = pkgbuild.local_depends
self.path = pkgbuild.path
self.mode = pkgbuild.mode
self.update_version()
for arch, package in self.source_packages.items():
if package.pkgbuild is not self:
self.source_packages.pop(arch)
logging.warning(
f'Pkgbuild {self.name} held reference package {package.name} for arch {arch} that references Pkgbuild {package.pkgbuild} instead')
continue
package.update()
def get_source_repo(self, arch: Arch) -> 'SourcePackage':
if not self.source_packages.get(arch, None):
self.source_packages[arch] = SourcePackage(arch=arch, pkgbuild=self)
return self.source_packages[arch]
class Pkgbase(Pkgbuild):
subpackages: list[Pkgbuild]
subpackages: Sequence[SubPkgbuild]
def __init__(self, relative_path: str, subpackages: list[Pkgbuild] = [], **args):
self.subpackages = deepcopy(subpackages)
def __init__(self, relative_path: str, subpackages: Sequence[SubPkgbuild] = [], **args):
self.subpackages = list(subpackages)
super().__init__(relative_path, **args)
def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuild]:
class SubPkgbuild(Pkgbuild):
pkgbase: Pkgbase
def __init__(self, name: str, pkgbase: Pkgbase):
self.depends = []
self.provides = []
self.replaces = []
self.local_depends = []
self.name = name
self.pkgbase = pkgbase
self.arches = pkgbase.arches
self.version = pkgbase.version
self.mode = pkgbase.mode
self.path = pkgbase.path
self.pkgver = pkgbase.pkgver
self.pkgrel = pkgbase.pkgrel
def parse_pkgbuild(relative_pkg_dir: str, native_chroot: BuildChroot) -> Sequence[Pkgbuild]:
mode = None
with open(os.path.join(native_chroot.get_path(CHROOT_PATHS['pkgbuilds']), relative_pkg_dir, 'PKGBUILD'), 'r') as file:
for line in file.read().split('\n'):
@@ -60,7 +241,7 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuil
base_package = Pkgbase(relative_pkg_dir)
base_package.mode = mode
base_package.repo = relative_pkg_dir.split('/')[0]
#base_package.repo = relative_pkg_dir.split('/')[0]
srcinfo = native_chroot.run_cmd(
MAKEPKG_CMD + ['--printsrcinfo'],
cwd=os.path.join(CHROOT_PATHS['pkgbuilds'], base_package.path),
@@ -69,7 +250,7 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuil
assert (isinstance(srcinfo, subprocess.CompletedProcess))
lines = srcinfo.stdout.decode('utf-8').split('\n')
current = base_package
current: Pkgbuild = base_package
multi_pkgs = False
for line_raw in lines:
line = line_raw.strip()
@@ -81,26 +262,52 @@ def parse_pkgbuild(relative_pkg_dir: str, native_chroot: Chroot) -> list[Pkgbuil
multi_pkgs = True
elif line.startswith('pkgname'):
if multi_pkgs:
if current is not base_package:
base_package.subpackages.append(current)
current = deepcopy(base_package)
current.name = splits[1]
current = SubPkgbuild(splits[1], base_package)
assert isinstance(base_package.subpackages, list)
base_package.subpackages.append(current)
else:
current.name = splits[1]
elif line.startswith('pkgver'):
current.pkgver = splits[1]
elif line.startswith('pkgrel'):
current.pkgrel = splits[1]
elif line.startswith('arch'):
current.arches.append(splits[1])
elif line.startswith('provides'):
current.provides.append(splits[1])
elif line.startswith('replaces'):
current.replaces.append(splits[1])
elif line.startswith('depends') or line.startswith('makedepends') or line.startswith('checkdepends') or line.startswith('optdepends'):
current.depends.append(splits[1].split('=')[0].split(': ')[0])
current.depends = list(set(current.depends))
results = base_package.subpackages or [base_package]
results: Sequence[Pkgbuild] = list(base_package.subpackages) or [base_package]
for pkg in results:
pkg.version = f'{pkg.pkgver}-{pkg.pkgrel}'
assert isinstance(pkg, Pkgbuild)
pkg.depends = list(set(pkg.depends))
pkg.update_version()
if not (pkg.pkgver == base_package.pkgver and pkg.pkgrel == base_package.pkgrel):
raise Exception('subpackage malformed! pkgver differs!')
return results
class SourcePackage(PackageInfo):
pkgbuild: Pkgbuild
def __init__(self, arch: Arch, pkgbuild: Pkgbuild):
self.arch = arch
self.pkgbuild = pkgbuild
self.update()
def update(self):
self.name = self.pkgbuild.name
self.depends = self.pkgbuild.depends
self.provides = self.pkgbuild.provides
self.replaces = self.pkgbuild.replaces
self.local_depends = self.pkgbuild.local_depends
self.path = self.pkgbuild.path
self.pkgbuild.update_version()
self.version = self.pkgbuild.version
def acquire(self):
return os.path.join(self.pkgbuild.build(arch=self.arch), self.get_filename())

76
packages/source_distro.py Normal file
View File

@@ -0,0 +1,76 @@
import logging
from typing import Iterable
from constants import Arch
from distro.abstract import DistroInfo
from .source_repo import SourceRepo, Pkgbuild
class SourceDistro(DistroInfo):
repos: dict[str, SourceRepo]
def build_package_levels(
self,
build_levels: list[set[Pkgbuild]],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
for level, packages in enumerate(build_levels):
logging.info(f"(Level {level}) Building {', '.join([x.name for x in packages])}")
for package in packages:
package.build(
arch=arch,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
self.build_package_levels(
[set(packages)],
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages_by_paths(
self,
paths: Iterable[str],
arch: Arch,
force=False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
if isinstance(paths, str):
paths = [paths]
packages = self.filter_packages_by_paths(paths, allow_empty_results=False)
return self.build_packages(
packages,
arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)

318
packages/source_repo.py Normal file
View File

@@ -0,0 +1,318 @@
import click
import logging
import multiprocessing
import os
from joblib import Parallel, delayed
from typing import Iterable, Optional, Iterator
from config import config
from constants import Arch, REPOSITORIES
from utils import git
from .pkgbuild import Pkgbuild, parse_pkgbuild
from .helpers import setup_build_chroot
pacman_cmd = [
'pacman',
'-Syuu',
'--noconfirm',
'--overwrite=*',
'--needed',
]
class SourceRepo:
pkgbuilds_dir: str
pkgbuilds = dict[str, Pkgbuild]()
initialized: bool = False
def __init__(self, pkgbuilds_dir: Optional[str] = None):
self.pkgbuilds_dir = pkgbuilds_dir or config.get_path('pkgbuilds')
def git_get_pkgbuilds(self, repo_url: str, branch: str, interactive=False, update=True):
git_dir = os.path.join(self.pkgbuilds_dir, '.git')
if not os.path.exists(git_dir):
logging.info('Cloning branch {branch} from {repo}')
result = git(['clone', '-b', branch, repo_url, self.pkgbuilds_dir])
if result.returncode != 0:
raise Exception('Error cloning pkgbuilds')
else:
result = git(['--git-dir', git_dir, 'branch', '--show-current'], capture_output=True)
current_branch = result.stdout.decode().strip()
if current_branch != branch:
logging.warning(f'pkgbuilds repository is on the wrong branch: {current_branch}, requested: {branch}')
if interactive and click.confirm('Would you like to switch branches?', default=False):
result = git(['switch', branch], dir=self.pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed switching branches')
if update:
if interactive:
if not click.confirm('Would you like to try updating the PKGBUILDs repo?'):
return
result = git(['pull'], self.pkgbuilds_dir)
if result.returncode != 0:
raise Exception('failed to update pkgbuilds')
def init(self, interactive=False):
if (not self.initialized) or interactive:
pkgbuilds_dir = self.pkgbuilds_dir
repo_url = config.file['pkgbuilds']['git_repo']
branch = config.file['pkgbuilds']['git_branch']
self.git_get_pkgbuilds(
repo_url,
branch,
interactive=interactive,
update=False,
)
def discover_packages(self, parallel: bool = True, refresh: bool = False) -> dict[str, Pkgbuild]:
pkgbuilds_dir = self.pkgbuilds_dir
packages: dict[str, Pkgbuild] = {}
paths = []
self.init(interactive=False)
if self.pkgbuilds and not refresh:
return self.pkgbuilds.copy()
for repo in REPOSITORIES:
for dir in os.listdir(os.path.join(pkgbuilds_dir, repo)):
paths.append(os.path.join(repo, dir))
native_chroot = setup_build_chroot(config.runtime['arch'], add_kupfer_repos=False)
results = []
if parallel:
chunks = (Parallel(n_jobs=multiprocessing.cpu_count() * 4)(delayed(parse_pkgbuild)(path, native_chroot) for path in paths))
else:
chunks = (parse_pkgbuild(path, native_chroot) for path in paths)
for pkglist in chunks:
results += pkglist
logging.debug('Building package dictionary!')
for package in results:
for name in [package.name] + package.replaces:
if name in packages:
logging.warn(f'Overriding {packages[package.name]} with {package}')
packages[name] = package
# This filters the deps to only include the ones that are provided in this repo
for package in packages.values():
package.local_depends = package.depends.copy()
for dep in package.depends.copy():
found = dep in packages
for p in packages.values():
if found:
break
for name in p.names():
if dep == name:
logging.debug(f'Found {p.name} that provides {dep}')
found = True
break
if not found:
logging.debug(f'Removing {dep} from dependencies')
package.local_depends.remove(dep)
if package.name not in self.pkgbuilds:
self.pkgbuilds[package.name] = package
self.pkgbuilds[package.name].update(package)
# clean up dict entries that previously were defined but are no longer (i.e. name change)
for stale_name in set(self.pkgbuilds.keys()) - set(packages.keys()):
self.pkgbuilds.pop(stale_name)
return packages
def filter_packages_by_paths(self, paths: Iterable[str], allow_empty_results=True) -> Iterable[Pkgbuild]:
if 'all' in paths:
return list(self.pkgbuilds.values())
result = []
for pkg in self.pkgbuilds.values():
if pkg.path in paths:
result += [pkg]
if not allow_empty_results and not result:
raise Exception('No packages matched by paths: ' + ', '.join([f'"{p}"' for p in paths]))
return result
def generate_dependency_chain(self, to_build: Iterable[Pkgbuild]) -> list[set[Pkgbuild]]:
"""
This figures out all dependencies and their sub-dependencies for the selection and adds those packages to the selection.
First the top-level packages get selected by searching the paths.
Then their dependencies and sub-dependencies and so on get added to the selection.
"""
visited = set[Pkgbuild]()
visited_names = set[str]()
dep_levels: list[set[Pkgbuild]] = [set(), set()]
package_repo = self.pkgbuilds
def visit(package: Pkgbuild, visited=visited, visited_names=visited_names):
visited.add(package)
visited_names.update(package.names())
def join_levels(levels: list[set[Pkgbuild]]) -> dict[Pkgbuild, int]:
result = dict[Pkgbuild, int]()
for i, level in enumerate(levels):
for pkg in level:
result[pkg] = i
return result
def get_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for dep_name in package.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
visit(dep_pkg)
yield dep_pkg
def get_recursive_dependencies(package: Pkgbuild, package_repo: dict[str, Pkgbuild] = package_repo) -> Iterator[Pkgbuild]:
for pkg in get_dependencies(package, package_repo):
yield pkg
for sub_pkg in get_recursive_dependencies(pkg, package_repo):
yield sub_pkg
logging.debug('Generating dependency chain:')
# init level 0
for package in to_build:
visit(package)
dep_levels[0].add(package)
logging.debug(f'Adding requested package {package.name}')
# add dependencies of our requested builds to level 0
for dep_pkg in get_recursive_dependencies(package):
logging.debug(f"Adding {package.name}'s dependency {dep_pkg.name} to level 0")
dep_levels[0].add(dep_pkg)
visit(dep_pkg)
"""
Starting with `level` = 0, iterate over the packages in `dep_levels[level]`:
1. Moving packages that are dependencies of other packages up to `level`+1
2. Adding yet unadded local dependencies of all pkgs on `level` to `level`+1
3. increment level
"""
level = 0
# protect against dependency cycles
repeat_count = 0
_last_level: Optional[set[Pkgbuild]] = None
while dep_levels[level]:
level_copy = dep_levels[level].copy()
modified = False
logging.debug(f'Scanning dependency level {level}')
if level > 100:
raise Exception('Dependency chain reached 100 levels depth, this is probably a bug. Aborting!')
for pkg in level_copy:
pkg_done = False
if pkg not in dep_levels[level]:
# pkg has been moved, move on
continue
# move pkg to level+1 if something else depends on it
for other_pkg in level_copy:
if pkg == other_pkg:
continue
if pkg_done:
break
if not issubclass(type(other_pkg), Pkgbuild):
raise Exception('Not a Pkgbuild object:' + repr(other_pkg))
for dep_name in other_pkg.depends:
if dep_name in pkg.names():
dep_levels[level].remove(pkg)
dep_levels[level + 1].add(pkg)
logging.debug(f'Moving {pkg.name} to level {level+1} because {other_pkg.name} depends on it as {dep_name}')
modified = True
pkg_done = True
break
for dep_name in pkg.depends:
if dep_name in visited_names:
continue
elif dep_name in package_repo:
dep_pkg = package_repo[dep_name]
logging.debug(f"Adding {pkg.name}'s dependency {dep_name} to level {level}")
dep_levels[level].add(dep_pkg)
visit(dep_pkg)
modified = True
if _last_level == dep_levels[level]:
repeat_count += 1
else:
repeat_count = 0
if repeat_count > 10:
raise Exception(f'Probable dependency cycle detected: Level has been passed on unmodifed multiple times: #{level}: {_last_level}')
_last_level = dep_levels[level].copy()
if not modified: # if the level was modified, make another pass.
level += 1
dep_levels.append(set[Pkgbuild]())
# reverse level list into buildorder (deps first!), prune empty levels
return list([lvl for lvl in dep_levels[::-1] if lvl])
def build_package_levels(
self,
build_levels: list[set[Pkgbuild]],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
for level, need_build in enumerate(build_levels):
logging.info(f"(Level {level}) Building {', '.join([x.name for x in need_build])}")
for package in need_build:
package.build(
arch=arch,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages(
self,
packages: Iterable[Pkgbuild],
arch: Arch,
force: bool = False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
self.build_package_levels(
[set(packages)],
arch=arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
def build_packages_by_paths(
self,
paths: Iterable[str],
arch: Arch,
force=False,
enable_crosscompile: bool = True,
enable_crossdirect: bool = True,
enable_ccache: bool = True,
clean_chroot: bool = False,
):
if isinstance(paths, str):
paths = [paths]
packages = self.filter_packages_by_paths(paths, allow_empty_results=False)
return self.build_packages(
packages,
arch,
force=force,
enable_crosscompile=enable_crosscompile,
enable_crossdirect=enable_crossdirect,
enable_ccache=enable_ccache,
clean_chroot=clean_chroot,
)
_src_repo: SourceRepo
def get_repo():
global _src_repo
if not _src_repo:
_src_repo = SourceRepo()
return _src_repo

View File

@@ -3,4 +3,3 @@ appdirs==1.4.4
joblib==1.0.1
toml
typing_extensions
coloredlogs

View File

@@ -1,7 +1,11 @@
import atexit
import logging
import os
import subprocess
from hashlib import md5
import urllib.request
from shutil import which
from tempfile import mkstemp
from typing import Optional, Union, Sequence
@@ -68,3 +72,22 @@ def log_or_exception(raise_exception: bool, msg: str, exc_class=Exception, log_l
raise exc_class(msg)
else:
logging.log(log_level, msg)
def md5sum_file(file_path: str) -> str:
with open(file_path, 'rb') as file:
return md5(file.read()).hexdigest()
def download_file(file_url: str, destination_file: Optional[str] = None) -> str:
fd: Union[int, str]
path: str
with urllib.request.urlopen(file_url) as request:
if destination_file:
fd, path = destination_file, destination_file
os.makedirs(os.path.dirname(destination_file), exist_ok=True)
else:
fd, path = mkstemp()
with open(fd, 'wb') as writable:
writable.write(request.read())
return path

View File

@@ -93,7 +93,7 @@ class BaseWrapper(Wrapper):
raise NotImplementedError()
def is_wrapped(self):
return os.getenv('KUPFERBOOTSTRAP_WRAPPED') == self.type.upper()
return os.getenv('KUPFERBOOTSTRAP_WRAPPED') == self.type.capitalize()
def get_bind_mounts_default(self, wrapped_config_path: str = None, ssh_dir: str = None, target_home: str = '/root'):
wrapped_config_path = wrapped_config_path or self.wrapped_config_path