mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-23 21:55:43 -05:00
with check_space=True, build chroots fail to pacman -S anything, because their rootfs is not a mountpoint, hence pacman's space calculations will fail and return not enough free space. We do want these space checks when pacstrapping device rootfilesystems though.
163 lines
5.5 KiB
Python
163 lines
5.5 KiB
Python
from copy import deepcopy
|
|
import urllib.request
|
|
import tempfile
|
|
import os
|
|
import tarfile
|
|
import logging
|
|
|
|
from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS
|
|
from generator import generate_pacman_conf_body
|
|
from config import config
|
|
|
|
|
|
def resolve_url(url_template, repo_name: str, arch: str):
|
|
result = url_template
|
|
for template, replacement in {'$repo': repo_name, '$arch': config.runtime['arch']}.items():
|
|
result = result.replace(template, replacement)
|
|
return result
|
|
|
|
|
|
class PackageInfo:
|
|
name: str
|
|
version: str
|
|
filename: str
|
|
resolved_url: str
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
version: str,
|
|
filename: str,
|
|
resolved_url: str = None,
|
|
):
|
|
self.name = name
|
|
self.version = version
|
|
self.filename = filename
|
|
self.resolved_url = resolved_url
|
|
|
|
def __repr__(self):
|
|
return f'{self.name}@{self.version}'
|
|
|
|
def parse_desc(desc_str: str, resolved_url=None):
|
|
"""Parses a desc file, returning a PackageInfo"""
|
|
|
|
pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()])
|
|
desc = {}
|
|
for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]):
|
|
desc[key.strip()] = value.strip()
|
|
return PackageInfo(desc['NAME'], desc['VERSION'], desc['FILENAME'], resolved_url=resolved_url)
|
|
|
|
|
|
class RepoInfo:
|
|
options: dict[str, str] = {}
|
|
url_template: str
|
|
|
|
def __init__(self, url_template: str, options: dict[str, str] = {}):
|
|
self.url_template = url_template
|
|
self.options.update(options)
|
|
|
|
|
|
class Repo(RepoInfo):
|
|
name: str
|
|
resolved_url: str
|
|
arch: str
|
|
packages: dict[str, PackageInfo]
|
|
remote: bool
|
|
scanned: bool = False
|
|
|
|
def scan(self):
|
|
self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch)
|
|
self.remote = not self.resolved_url.startswith('file://')
|
|
uri = f'{self.resolved_url}/{self.name}.db'
|
|
path = ''
|
|
if self.remote:
|
|
logging.debug(f'Downloading repo file from {uri}')
|
|
with urllib.request.urlopen(uri) as request:
|
|
fd, path = tempfile.mkstemp()
|
|
with open(fd, 'wb') as writable:
|
|
writable.write(request.read())
|
|
else:
|
|
path = uri.split('file://')[1]
|
|
logging.debug(f'Parsing repo file at {path}')
|
|
with tarfile.open(path) as index:
|
|
for node in index.getmembers():
|
|
if os.path.basename(node.name) == 'desc':
|
|
logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}')
|
|
pkg = PackageInfo.parse_desc(index.extractfile(node).read().decode(), self.resolved_url)
|
|
self.packages[pkg.name] = pkg
|
|
|
|
self.scanned = True
|
|
|
|
def __init__(self, name: str, url_template: str, arch: str, options={}, scan=False):
|
|
self.packages = {}
|
|
self.name = name
|
|
self.url_template = url_template
|
|
self.arch = arch
|
|
self.options = deepcopy(options)
|
|
if scan:
|
|
self.scan()
|
|
|
|
def config_snippet(self) -> str:
|
|
options = {'Server': self.url_template} | self.options
|
|
return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()])
|
|
|
|
def get_RepoInfo(self):
|
|
return RepoInfo(url_template=self.url_template, options=self.options)
|
|
|
|
|
|
class Distro:
|
|
repos: dict[str, Repo]
|
|
arch: str
|
|
|
|
def __init__(self, arch: str, repo_infos: dict[str, RepoInfo], scan=False):
|
|
assert (arch in ARCHES)
|
|
self.arch = arch
|
|
self.repos = dict[str, Repo]()
|
|
for repo_name, repo_info in repo_infos.items():
|
|
self.repos[repo_name] = Repo(
|
|
name=repo_name,
|
|
arch=arch,
|
|
url_template=repo_info.url_template,
|
|
options=repo_info.options,
|
|
scan=scan,
|
|
)
|
|
|
|
def get_packages(self):
|
|
""" get packages from all repos, semantically overlaying them"""
|
|
results = dict[str, PackageInfo]()
|
|
for repo in self.repos.values().reverse():
|
|
assert (repo.packages is not None)
|
|
for package in repo.packages:
|
|
results[package.name] = package
|
|
|
|
def repos_config_snippet(self, extra_repos: dict[str, RepoInfo] = {}) -> str:
|
|
extras = [Repo(name, url_template=info.url_template, arch=self.arch, options=info.options, scan=False) for name, info in extra_repos.items()]
|
|
return '\n\n'.join(repo.config_snippet() for repo in (list(self.repos.values()) + extras))
|
|
|
|
def get_pacman_conf(self, extra_repos: dict[str, RepoInfo] = {}, check_space: bool = True):
|
|
body = generate_pacman_conf_body(self.arch, check_space=check_space)
|
|
return body + self.repos_config_snippet(extra_repos)
|
|
|
|
|
|
def get_base_distro(arch: str) -> Distro:
|
|
repos = {name: RepoInfo(url_template=url) for name, url in BASE_DISTROS[arch]['repos'].items()}
|
|
return Distro(arch=arch, repo_infos=repos, scan=False)
|
|
|
|
|
|
def get_kupfer(arch: str, url_template: str) -> Distro:
|
|
repos = {name: RepoInfo(url_template=url_template, options={'SigLevel': 'Never'}) for name in REPOSITORIES}
|
|
return Distro(
|
|
arch=arch,
|
|
repo_infos=repos,
|
|
)
|
|
|
|
|
|
def get_kupfer_https(arch: str) -> Distro:
|
|
return get_kupfer(arch, KUPFER_HTTPS)
|
|
|
|
|
|
def get_kupfer_local(arch: str = None, in_chroot: bool = True) -> Distro:
|
|
if not arch:
|
|
arch = config.runtime['arch']
|
|
dir = CHROOT_PATHS['packages'] if in_chroot else config.get_path('packages')
|
|
return get_kupfer(arch, f"file://{dir}/$arch/$repo")
|