From 7359b447e78b3ac4c030693844a142a591bc0f60 Mon Sep 17 00:00:00 2001 From: InsanePrawn Date: Thu, 17 Feb 2022 03:11:33 +0100 Subject: [PATCH] distro: refactor Repo- and PackageInfo into separate files --- distro/__init__.py | 103 +-------------------------------------------- distro/package.py | 29 +++++++++++++ distro/repo.py | 74 ++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 101 deletions(-) create mode 100644 distro/package.py create mode 100644 distro/repo.py diff --git a/distro/__init__.py b/distro/__init__.py index 9782124..2d5bf62 100644 --- a/distro/__init__.py +++ b/distro/__init__.py @@ -1,108 +1,9 @@ -from copy import deepcopy -import urllib.request -import tempfile -import os -import tarfile -import logging - from constants import ARCHES, BASE_DISTROS, REPOSITORIES, KUPFER_HTTPS, CHROOT_PATHS from generator import generate_pacman_conf_body from config import config - -def resolve_url(url_template, repo_name: str, arch: str): - result = url_template - for template, replacement in {'$repo': repo_name, '$arch': config.runtime['arch']}.items(): - result = result.replace(template, replacement) - return result - - -class PackageInfo: - name: str - version: str - filename: str - resolved_url: str - - def __init__( - self, - name: str, - version: str, - filename: str, - resolved_url: str = None, - ): - self.name = name - self.version = version - self.filename = filename - self.resolved_url = resolved_url - - def __repr__(self): - return f'{self.name}@{self.version}' - - def parse_desc(desc_str: str, resolved_url=None): - """Parses a desc file, returning a PackageInfo""" - - pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()]) - desc = {} - for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]): - desc[key.strip()] = value.strip() - return PackageInfo(desc['NAME'], desc['VERSION'], desc['FILENAME'], resolved_url=resolved_url) - - -class RepoInfo: - options: dict[str, str] = {} - url_template: str - - def __init__(self, url_template: str, options: dict[str, str] = {}): - self.url_template = url_template - self.options.update(options) - - -class Repo(RepoInfo): - name: str - resolved_url: str - arch: str - packages: dict[str, PackageInfo] - remote: bool - scanned: bool = False - - def scan(self): - self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch) - self.remote = not self.resolved_url.startswith('file://') - uri = f'{self.resolved_url}/{self.name}.db' - path = '' - if self.remote: - logging.debug(f'Downloading repo file from {uri}') - with urllib.request.urlopen(uri) as request: - fd, path = tempfile.mkstemp() - with open(fd, 'wb') as writable: - writable.write(request.read()) - else: - path = uri.split('file://')[1] - logging.debug(f'Parsing repo file at {path}') - with tarfile.open(path) as index: - for node in index.getmembers(): - if os.path.basename(node.name) == 'desc': - logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}') - pkg = PackageInfo.parse_desc(index.extractfile(node).read().decode(), self.resolved_url) - self.packages[pkg.name] = pkg - - self.scanned = True - - def __init__(self, name: str, url_template: str, arch: str, options={}, scan=False): - self.packages = {} - self.name = name - self.url_template = url_template - self.arch = arch - self.options = deepcopy(options) - if scan: - self.scan() - - def config_snippet(self) -> str: - options = {'Server': self.url_template} | self.options - return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()]) - - def get_RepoInfo(self): - return RepoInfo(url_template=self.url_template, options=self.options) +from .package import PackageInfo +from .repo import RepoInfo, Repo class Distro: diff --git a/distro/package.py b/distro/package.py new file mode 100644 index 0000000..537d79c --- /dev/null +++ b/distro/package.py @@ -0,0 +1,29 @@ +class PackageInfo: + name: str + version: str + filename: str + resolved_url: str + + def __init__( + self, + name: str, + version: str, + filename: str, + resolved_url: str = None, + ): + self.name = name + self.version = version + self.filename = filename + self.resolved_url = resolved_url + + def __repr__(self): + return f'{self.name}@{self.version}' + + def parse_desc(desc_str: str, resolved_url=None): + """Parses a desc file, returning a PackageInfo""" + + pruned_lines = ([line.strip() for line in desc_str.split('%') if line.strip()]) + desc = {} + for key, value in zip(pruned_lines[0::2], pruned_lines[1::2]): + desc[key.strip()] = value.strip() + return PackageInfo(desc['NAME'], desc['VERSION'], desc['FILENAME'], resolved_url=resolved_url) diff --git a/distro/repo.py b/distro/repo.py new file mode 100644 index 0000000..dfbee76 --- /dev/null +++ b/distro/repo.py @@ -0,0 +1,74 @@ +from copy import deepcopy +import logging +import os +import tarfile +import tempfile +import urllib.request + +from config import config + +from .package import PackageInfo + + +def resolve_url(url_template, repo_name: str, arch: str): + result = url_template + for template, replacement in {'$repo': repo_name, '$arch': config.runtime['arch']}.items(): + result = result.replace(template, replacement) + return result + + +class RepoInfo: + options: dict[str, str] = {} + url_template: str + + def __init__(self, url_template: str, options: dict[str, str] = {}): + self.url_template = url_template + self.options.update(options) + + +class Repo(RepoInfo): + name: str + resolved_url: str + arch: str + packages: dict[str, PackageInfo] + remote: bool + scanned: bool = False + + def scan(self): + self.resolved_url = resolve_url(self.url_template, repo_name=self.name, arch=self.arch) + self.remote = not self.resolved_url.startswith('file://') + uri = f'{self.resolved_url}/{self.name}.db' + path = '' + if self.remote: + logging.debug(f'Downloading repo file from {uri}') + with urllib.request.urlopen(uri) as request: + fd, path = tempfile.mkstemp() + with open(fd, 'wb') as writable: + writable.write(request.read()) + else: + path = uri.split('file://')[1] + logging.debug(f'Parsing repo file at {path}') + with tarfile.open(path) as index: + for node in index.getmembers(): + if os.path.basename(node.name) == 'desc': + logging.debug(f'Parsing desc file for {os.path.dirname(node.name)}') + pkg = PackageInfo.parse_desc(index.extractfile(node).read().decode(), self.resolved_url) + self.packages[pkg.name] = pkg + + self.scanned = True + + def __init__(self, name: str, url_template: str, arch: str, options={}, scan=False): + self.packages = {} + self.name = name + self.url_template = url_template + self.arch = arch + self.options = deepcopy(options) + if scan: + self.scan() + + def config_snippet(self) -> str: + options = {'Server': self.url_template} | self.options + return ('[%s]\n' % self.name) + '\n'.join([f"{key} = {value}" for key, value in options.items()]) + + def get_RepoInfo(self): + return RepoInfo(url_template=self.url_template, options=self.options)