Here We Go

This commit is contained in:
2025-10-30 12:19:15 +03:30
parent 2097b13137
commit a0f40266ab
11 changed files with 1844 additions and 2 deletions

28
PKGBUILD Normal file
View File

@@ -0,0 +1,28 @@
# Maintainer: Your Name <your.email@example.com>
pkgname=mirrorman
pkgver=0.1
pkgrel=1
pkgdesc="A GUI tool for managing Arch Linux mirrors and repositories (Parch Repository Manager)"
arch=('any')
url="https://git.parchlinux.com/applications/mirrorman"
license=('GPL-3')
depends=('python' 'python-gobject' 'libadwaita' 'polkit' )
makedepends=('meson' 'ninja')
source=("$git+https://git.parchlinux.com/applications/mirrorman.git")
sha256sums=('SKIP')
prepare() {
cd "$srcdir/mirrorman"
}
build() {
cd "$srcdir/mirrorman"
meson setup --prefix=/usr --buildtype=plain build
meson compile -C build
}
package() {
cd "$srcdir/mirrorman"
meson install -C build --destdir "$pkgdir"
}

115
README.md
View File

@@ -1,3 +1,114 @@
# mirrorman
# MirrorMan
Parch Linux Mirror Manager
A modern GTK4/Libadwaita repository manager for Parch Gnu/Linux. MirrorMan provides an intuitive interface for managing Pacman mirrors, testing connection speeds, and configuring system repositories.
## Features
### Mirror Management
- Fetch mirrors from the Arch Linux mirror status API
- Filter by country, protocol (HTTP/HTTPS), and IP version (IPv4/IPv6)
- Test mirror response times and rank by speed
- Enable or disable mirrors individually
- Sort mirrors by speed, country, or last sync time
### Repository Configuration
- Toggle standard Arch repositories
- Manage third-party repositories (Chaotic-AUR, BlackArch, ArchLinuxCN)
- Sync package databases with a single click
- System update integration
### Pacman Settings
- Configure parallel downloads
- Adjust download timeout values
- Enable verbose package lists
- Toggle color output
- Modify package cache settings
## Requirements
- Python 3.8+
- GTK 4
- Libadwaita 1.0+
- PyGObject
- pkexec (for privilege escalation)
## Installation
```bash
# Clone the repository
git clone https://github.com/yourusername/mirrorman.git
cd mirrorman
# Install dependencies (Arch-based systems)
sudo pacman -S python-gobject gtk4 libadwaita
# Run the application
python main.py
# Or just simply do:
makepkg -sic
```
## Usage
### Basic Workflow
1. **Configure Filters**: Select your preferred country, protocols, and IP versions
2. **Fetch Mirrors**: Click the Fetch button to retrieve available mirrors
3. **Test Speed**: Use Test & Rank to measure mirror response times
4. **Enable Mirrors**: Select the fastest mirrors and enable them
5. **Sync**: Save your configuration and sync package databases
### Privilege Escalation
MirrorMan runs as a normal user and only requests elevated privileges when necessary. Operations requiring root access (saving mirrorlist, syncing databases) use pkexec for secure authentication.
### Repository Management
Enable or disable repositories through the sidebar interface. Changes are applied immediately to your pacman configuration. The application supports both standard Arch repositories and popular third-party options.
## Architecture
The application is structured into modular components:
- `main.py`: GTK4 interface and application logic
- `mirror_manager.py`: Mirror fetching, testing, and persistence
- `repo_config.py`: Repository configuration management
- `sync_manager.py`: Package database synchronization
- `pacman_util.py`: Pacman settings interface
## Development
### Project Structure
```
mirrorman
├── LICENSE
├── meson.build
├── mirrorman.in
├── PKGBUILD
├── README.md
└── src
├── main.py
├── mirror_manager.py
├── pacman_util.py
├── repo_config.py
├── sync_manager.py
└── utils.py
```
### Contributing
Contributions are welcome. Please ensure your code follows Python best practices and maintains compatibility with GTK4/Libadwaita design patterns.
## License
This project is licensed under the GPL-3 License. See LICENSE file for details.
## Acknowledgments
Built for Parch Linux and compatible with all Arch-based distributions. Mirror data sourced from the official Arch Linux mirror status API.
## Support
For bug reports and feature requests, please open an issue on the project repository or Parch Linux Forum.

24
meson.build Normal file
View File

@@ -0,0 +1,24 @@
project('mirrorman',
version: '0.1.0',
default_options: ['prefix=/usr']
)
datadir = get_option('prefix') / get_option('datadir') / meson.project_name()
install_subdir('src', install_dir: datadir)
conf = configuration_data()
conf.set_quoted('DATADIR', datadir.full_path())
configure_file(
input: 'mirrorman.in',
output: 'mirrorman',
configuration: conf,
install: true,
install_dir: get_option('bindir'),
install_mode: 'rwxr-xr-x'
)
install_data('mirrorman.desktop',
install_dir: get_option('datadir') / 'applications'
)

9
mirrorman.desktop Normal file
View File

@@ -0,0 +1,9 @@
[Desktop Entry]
Name=Parch Repository Manager
Comment=Manage repositories and mirrors for Parch Linux
Exec=mirrorman
Icon=network-server-symbolic
Terminal=false
Type=Application
Categories=System;Settings;
StartupNotify=true

10
mirrorman.in Normal file
View File

@@ -0,0 +1,10 @@
#!/usr/bin/env python3
import sys
import os
sys.path.insert(0, os.path.join('@DATADIR@', 'src'))
from main import main
if __name__ == '__main__':
main()

801
src/main.py Normal file
View File

@@ -0,0 +1,801 @@
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gtk, Adw, Gio, GLib
import sys
import os
import threading
from mirror_manager import MirrorManager
from repo_config import RepoConfig
from sync_manager import SyncManager
import subprocess
from pacman_util import PacmanOptionsWindow
import tempfile
import urllib.request
def get_suitable_terminal():
desktop = os.environ.get('XDG_CURRENT_DESKTOP', '').upper()
if 'GNOME' in desktop:
if subprocess.run(['which', 'ptyxis'], capture_output=True).returncode == 0:
return 'ptyxis -x '
elif subprocess.run(['which', 'gnome-terminal'], capture_output=True).returncode == 0:
return 'gnome-terminal -- '
elif 'KDE' in desktop or 'PLASMA' in desktop:
if subprocess.run(['which', 'konsole'], capture_output=True).returncode == 0:
return 'konsole -e '
elif 'XFCE' in desktop:
if subprocess.run(['which', 'xfce4-terminal'], capture_output=True).returncode == 0:
return 'xfce4-terminal -e '
for term in ['alacritty', 'kitty', 'xterm']:
if subprocess.run(['which', term], capture_output=True).returncode == 0:
return f'{term} -e '
return None
def open_terminal_with_repo_command(command, parent_window=None):
escaped_cmd = command.replace("'", "'\"'\"'")
terminal_cmd = get_suitable_terminal()
if terminal_cmd is None:
if parent_window:
parent_window.show_error_dialog("Terminal Error", "No suitable terminal emulator found.")
else:
print("Error: No suitable terminal emulator found.")
return
full_cmd = f"{terminal_cmd} bash -c '{escaped_cmd}; echo; echo Press ENTER to close...; read'"
try:
subprocess.Popen(full_cmd, shell=True)
except Exception as e:
if parent_window:
parent_window.show_error_dialog("Terminal Error", str(e))
else:
print(f"Error opening terminal: {e}")
class MainWindow(Adw.ApplicationWindow):
def __init__(self, app):
super().__init__(application=app)
self.set_title("Parch Repository Manager")
self.set_default_size(1200, 800)
self.mirror_manager = MirrorManager()
self.repo_config = RepoConfig()
self.sync_manager = SyncManager()
self.selected_mirror = None
self.is_loading = False
self.is_syncing = False
toolbar_view = Adw.ToolbarView()
self.set_content(toolbar_view)
header = Adw.HeaderBar()
toolbar_view.add_top_bar(header)
self.header_refresh_btn = Gtk.Button()
self.header_refresh_btn.set_icon_name("view-refresh-symbolic")
self.header_refresh_btn.set_tooltip_text("Refresh Mirrors")
self.header_refresh_btn.connect("clicked", self.on_refresh_mirrors)
header.pack_start(self.header_refresh_btn)
self.settings_btn = Gtk.Button()
self.settings_btn.set_icon_name("preferences-system-symbolic")
self.settings_btn.set_tooltip_text("Pacman Settings")
self.settings_btn.connect("clicked", self.on_settings_clicked)
header.pack_end(self.settings_btn)
paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
paned.set_position(320)
paned.set_shrink_start_child(False)
paned.set_shrink_end_child(False)
toolbar_view.set_content(paned)
left_sidebar = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
left_sidebar.add_css_class("sidebar")
paned.set_start_child(left_sidebar)
sidebar_scroll = Gtk.ScrolledWindow()
sidebar_scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
sidebar_scroll.set_vexpand(True)
left_sidebar.append(sidebar_scroll)
sidebar_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=18)
sidebar_box.set_margin_top(18)
sidebar_box.set_margin_bottom(18)
sidebar_box.set_margin_start(12)
sidebar_box.set_margin_end(12)
sidebar_scroll.set_child(sidebar_box)
filter_clamp = Adw.Clamp()
filter_clamp.set_maximum_size(400)
sidebar_box.append(filter_clamp)
filter_group = Adw.PreferencesGroup()
filter_group.set_title("Mirror Filters")
filter_group.set_description("Configure mirror selection criteria")
filter_clamp.set_child(filter_group)
self.country_row = Adw.ComboRow()
self.country_row.set_title("Country")
self.country_row.set_icon_name("mark-location-symbolic")
self.country_store = Gtk.StringList()
self.country_store.append("Worldwide")
self.country_row.set_model(self.country_store)
self.country_row.set_selected(0)
filter_group.add(self.country_row)
protocol_row = Adw.ActionRow()
protocol_row.set_title("Protocol")
protocol_row.set_icon_name("network-wired-symbolic")
protocol_box = Gtk.Box(spacing=12)
protocol_box.set_margin_top(6)
protocol_box.set_margin_bottom(6)
self.http_check = Gtk.CheckButton(label="HTTP")
self.http_check.set_active(True)
self.https_check = Gtk.CheckButton(label="HTTPS")
self.https_check.set_active(True)
protocol_box.append(self.http_check)
protocol_box.append(self.https_check)
protocol_row.add_suffix(protocol_box)
filter_group.add(protocol_row)
ip_row = Adw.ActionRow()
ip_row.set_title("IP Version")
ip_row.set_icon_name("network-transmit-receive-symbolic")
ip_box = Gtk.Box(spacing=12)
ip_box.set_margin_top(6)
ip_box.set_margin_bottom(6)
self.ipv4_check = Gtk.CheckButton(label="IPv4")
self.ipv4_check.set_active(True)
self.ipv6_check = Gtk.CheckButton(label="IPv6")
ip_box.append(self.ipv4_check)
ip_box.append(self.ipv6_check)
ip_row.add_suffix(ip_box)
filter_group.add(ip_row)
status_row = Adw.ActionRow()
status_row.set_title("Up-to-date only")
status_row.set_subtitle("Show only synchronized mirrors")
status_row.set_icon_name("emblem-synchronizing-symbolic")
self.status_check = Gtk.Switch()
self.status_check.set_valign(Gtk.Align.CENTER)
self.status_check.set_active(True)
status_row.add_suffix(self.status_check)
status_row.set_activatable_widget(self.status_check)
filter_group.add(status_row)
btn_box = Gtk.Box(spacing=8)
btn_box.set_margin_top(12)
btn_box.set_homogeneous(True)
sidebar_box.append(btn_box)
self.refresh_btn = Gtk.Button()
refresh_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
refresh_box.set_halign(Gtk.Align.CENTER)
refresh_icon = Gtk.Image.new_from_icon_name("view-refresh-symbolic")
refresh_label = Gtk.Label(label="Fetch")
refresh_box.append(refresh_icon)
refresh_box.append(refresh_label)
self.refresh_btn.set_child(refresh_box)
self.refresh_btn.add_css_class("suggested-action")
self.refresh_btn.connect("clicked", self.on_refresh_mirrors)
btn_box.append(self.refresh_btn)
self.rank_btn = Gtk.Button()
rank_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
rank_box.set_halign(Gtk.Align.CENTER)
rank_icon = Gtk.Image.new_from_icon_name("emblem-default-symbolic")
rank_label = Gtk.Label(label="Test & Rank")
rank_box.append(rank_icon)
rank_box.append(rank_label)
self.rank_btn.set_child(rank_box)
self.rank_btn.connect("clicked", self.on_rank_mirrors)
self.rank_btn.set_sensitive(False)
btn_box.append(self.rank_btn)
loading_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
loading_box.set_margin_top(8)
loading_box.set_halign(Gtk.Align.CENTER)
self.loading_spinner = Gtk.Spinner()
self.loading_label = Gtk.Label(label="")
self.loading_label.add_css_class("dim-label")
self.loading_label.add_css_class("caption")
loading_box.append(self.loading_spinner)
loading_box.append(self.loading_label)
sidebar_box.append(loading_box)
separator1 = Gtk.Separator()
separator1.set_margin_top(6)
separator1.set_margin_bottom(6)
sidebar_box.append(separator1)
repo_clamp = Adw.Clamp()
repo_clamp.set_maximum_size(400)
sidebar_box.append(repo_clamp)
repo_group = Adw.PreferencesGroup()
repo_group.set_title("Repositories")
repo_group.set_description("Enable or disable repositories")
repo_clamp.set_child(repo_group)
self.repo_list = Gtk.ListBox()
self.repo_list.set_selection_mode(Gtk.SelectionMode.NONE)
self.repo_list.add_css_class("boxed-list")
repo_group.add(self.repo_list)
self.update_repo_list()
separator2 = Gtk.Separator()
separator2.set_margin_top(6)
separator2.set_margin_bottom(6)
sidebar_box.append(separator2)
third_clamp = Adw.Clamp()
third_clamp.set_maximum_size(400)
sidebar_box.append(third_clamp)
third_group = Adw.PreferencesGroup()
third_group.set_title("Third-Party Repositories")
third_group.set_description("Enable or disable additional repositories")
third_clamp.set_child(third_group)
self.third_list = Gtk.ListBox()
self.third_list.set_selection_mode(Gtk.SelectionMode.NONE)
self.third_list.add_css_class("boxed-list")
third_group.add(self.third_list)
self.third_party_repos = ["chaotic-aur", "blackarch", "archlinuxcn"]
self.third_party_configs = {
"chaotic-aur": {
"key": "3056513887B78AEB",
"keyring_url": "https://cdn-mirror.chaotic.cx/chaotic-aur/chaotic-keyring.pkg.tar.zst",
"mirrorlist_url": "https://cdn-mirror.chaotic.cx/chaotic-aur/chaotic-mirrorlist.pkg.tar.zst",
"section": "[chaotic-aur]\nInclude = /etc/pacman.d/chaotic-mirrorlist\n"
},
"blackarch": {
"strap_url": "https://blackarch.org/strap.sh",
"section": "[blackarch]\nServer = https://blackarch.org/blackarch/$repo/os/$arch\n"
},
"archlinuxcn": {
"key": "4D41FD3D9E72E7966A573093E8CA6AEB220E236C",
"section": "[archlinuxcn]\nServer = https://repo.archlinuxcn.org/$arch\n"
}
}
display_names = ["Chaotic-AUR", "BlackArch", "ArchLinuxCN"]
for i, repo_name in enumerate(self.third_party_repos):
row = Adw.ActionRow()
row.set_title(display_names[i])
row.set_icon_name("folder-symbolic")
switch = Gtk.Switch()
switch.set_valign(Gtk.Align.CENTER)
switch.set_active(self.repo_config.repositories[repo_name])
switch.connect("state-set", self.on_third_party_toggle, repo_name)
row.add_suffix(switch)
row.set_activatable_widget(switch)
self.third_list.append(row)
sys_box = Gtk.Box(spacing=8)
sys_box.set_margin_top(12)
sys_box.set_homogeneous(True)
sidebar_box.append(sys_box)
sync_btn = Gtk.Button()
sync_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
sync_box.set_halign(Gtk.Align.CENTER)
sync_icon = Gtk.Image.new_from_icon_name("emblem-synchronizing-symbolic")
sync_label = Gtk.Label(label="Sync")
sync_box.append(sync_icon)
sync_box.append(sync_label)
sync_btn.set_child(sync_box)
sync_btn.set_tooltip_text("Save mirrorlist and sync repositories")
sync_btn.connect("clicked", self.on_sync_repos)
sys_box.append(sync_btn)
update_btn = Gtk.Button()
update_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
update_box.set_halign(Gtk.Align.CENTER)
update_icon = Gtk.Image.new_from_icon_name("system-software-update-symbolic")
update_label = Gtk.Label(label="Update")
update_box.append(update_icon)
update_box.append(update_label)
update_btn.set_child(update_box)
update_btn.add_css_class("destructive-action")
update_btn.set_tooltip_text("Update all system packages")
update_btn.connect("clicked", self.on_update_system)
sys_box.append(update_btn)
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
right_box.add_css_class("view")
paned.set_end_child(right_box)
mirror_toolbar = Gtk.Box(spacing=6)
mirror_toolbar.add_css_class("toolbar")
mirror_toolbar.set_margin_top(12)
mirror_toolbar.set_margin_bottom(12)
mirror_toolbar.set_margin_start(12)
mirror_toolbar.set_margin_end(12)
right_box.append(mirror_toolbar)
left_controls = Gtk.Box(spacing=6)
mirror_toolbar.append(left_controls)
self.enable_btn = Gtk.Button()
enable_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
enable_icon = Gtk.Image.new_from_icon_name("emblem-ok-symbolic")
enable_label = Gtk.Label(label="Enable")
enable_box.append(enable_icon)
enable_box.append(enable_label)
self.enable_btn.set_child(enable_box)
self.enable_btn.add_css_class("suggested-action")
self.enable_btn.connect("clicked", self.on_enable_mirror)
self.enable_btn.set_sensitive(False)
self.enable_btn.set_tooltip_text("Enable selected mirror")
left_controls.append(self.enable_btn)
self.disable_btn = Gtk.Button()
disable_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
disable_icon = Gtk.Image.new_from_icon_name("process-stop-symbolic")
disable_label = Gtk.Label(label="Disable")
disable_box.append(disable_icon)
disable_box.append(disable_label)
self.disable_btn.set_child(disable_box)
self.disable_btn.connect("clicked", self.on_disable_mirror)
self.disable_btn.set_sensitive(False)
self.disable_btn.set_tooltip_text("Disable selected mirror")
left_controls.append(self.disable_btn)
spacer = Gtk.Box()
spacer.set_hexpand(True)
mirror_toolbar.append(spacer)
sort_label = Gtk.Label(label="Sort by:")
sort_label.add_css_class("dim-label")
mirror_toolbar.append(sort_label)
self.sort_speed_btn = Gtk.Button()
speed_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
speed_icon = Gtk.Image.new_from_icon_name("speedometer-symbolic")
speed_label = Gtk.Label(label="Speed")
speed_box.append(speed_icon)
speed_box.append(speed_label)
self.sort_speed_btn.set_child(speed_box)
self.sort_speed_btn.connect("clicked", self.on_sort_speed)
self.sort_speed_btn.set_sensitive(False)
self.sort_speed_btn.set_tooltip_text("Sort mirrors by response time")
mirror_toolbar.append(self.sort_speed_btn)
self.sort_country_btn = Gtk.Button()
country_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
country_icon = Gtk.Image.new_from_icon_name("mark-location-symbolic")
country_label = Gtk.Label(label="Country")
country_box.append(country_icon)
country_box.append(country_label)
self.sort_country_btn.set_child(country_box)
self.sort_country_btn.connect("clicked", self.on_sort_country)
self.sort_country_btn.set_sensitive(False)
self.sort_country_btn.set_tooltip_text("Sort mirrors by country")
mirror_toolbar.append(self.sort_country_btn)
self.sort_age_btn = Gtk.Button()
age_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
age_icon = Gtk.Image.new_from_icon_name("document-open-recent-symbolic")
age_label = Gtk.Label(label="Age")
age_box.append(age_icon)
age_box.append(age_label)
self.sort_age_btn.set_child(age_box)
self.sort_age_btn.connect("clicked", self.on_sort_age)
self.sort_age_btn.set_sensitive(False)
self.sort_age_btn.set_tooltip_text("Sort mirrors by last sync time")
mirror_toolbar.append(self.sort_age_btn)
self.mirror_scroll = Gtk.ScrolledWindow()
self.mirror_scroll.set_vexpand(True)
self.mirror_scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
right_box.append(self.mirror_scroll)
self.status_bar = Adw.StatusPage()
self.status_bar.set_title("No Mirrors Loaded")
self.status_bar.set_description("Configure your filters and click 'Fetch' to load available mirrors")
self.status_bar.set_icon_name("network-server-symbolic")
self.mirror_scroll.set_child(self.status_bar)
self.mirror_list = Gtk.ListBox()
self.mirror_list.set_selection_mode(Gtk.SelectionMode.SINGLE)
self.mirror_list.add_css_class("boxed-list")
self.mirror_list.set_margin_top(6)
self.mirror_list.set_margin_bottom(12)
self.mirror_list.set_margin_start(12)
self.mirror_list.set_margin_end(12)
self.mirror_list.connect("row-selected", self.on_mirror_selected)
GLib.idle_add(self.load_country_list)
def toggle_repo_config(self, repo_name, enable, is_third_party=False):
try:
with open('/etc/pacman.conf', 'r') as f:
config_text = f.read()
section_snippet = None
if is_third_party:
config = self.third_party_configs.get(repo_name, {})
section_snippet = config.get('section')
modified = self.toggle_repo(config_text, repo_name, enable, section_snippet)
temp_path = None
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp:
temp.write(modified)
temp_path = temp.name
subprocess.check_call(['pkexec', 'cp', temp_path, '/etc/pacman.conf'])
return True
except Exception as e:
self.show_error_dialog("Config Update Failed", str(e))
return False
finally:
if temp_path:
os.unlink(temp_path)
def toggle_repo(self, config_text, repo_name, enable, section_snippet=None):
lines = config_text.splitlines()
new_lines = []
found_section = False
in_section = False
for line in lines:
stripped = line.strip()
if stripped == f'[{repo_name}]':
found_section = True
in_section = True
if enable:
new_lines.append(line.lstrip('#'))
else:
new_lines.append('#' + line if not line.startswith('#') else line)
continue
if in_section:
if stripped.startswith('[') and not stripped == f'[{repo_name}]':
in_section = False
new_lines.append(line)
continue
if stripped.startswith('Include =') or stripped.startswith('Server ='):
if enable:
new_lines.append(line.lstrip('#'))
else:
new_lines.append('#' + line if not line.startswith('#') else line)
else:
new_lines.append(line)
else:
new_lines.append(line)
if enable and not found_section and section_snippet:
new_lines.append('')
new_lines += section_snippet.splitlines()
return '\n'.join(new_lines) + '\n'
def load_country_list(self):
def fetch_countries():
try:
countries = self.mirror_manager.fetch_countries_only()
GLib.idle_add(self.update_country_list, countries)
except:
pass
thread = threading.Thread(target=fetch_countries, daemon=True)
thread.start()
return False
def update_country_list(self, countries):
while self.country_store.get_n_items() > 1:
self.country_store.remove(1)
for country in sorted(countries):
self.country_store.append(country)
return False
def set_loading_state(self, loading, message=""):
self.is_loading = loading
self.loading_spinner.set_spinning(loading)
self.loading_label.set_text(message)
self.refresh_btn.set_sensitive(not loading)
self.header_refresh_btn.set_sensitive(not loading)
self.rank_btn.set_sensitive(not loading and len(self.mirror_manager.mirrors) > 0)
self.http_check.set_sensitive(not loading)
self.https_check.set_sensitive(not loading)
self.ipv4_check.set_sensitive(not loading)
self.ipv6_check.set_sensitive(not loading)
self.status_check.set_sensitive(not loading)
self.country_row.set_sensitive(not loading)
def show_error_dialog(self, title, message):
dialog = Adw.AlertDialog(heading=title, body=message)
dialog.add_response("ok", "OK")
dialog.present(self)
return False
def show_info_dialog(self, title, message):
dialog = Adw.AlertDialog(heading=title, body=message)
dialog.add_response("ok", "OK")
dialog.set_response_appearance("ok", Adw.ResponseAppearance.SUGGESTED)
dialog.present(self)
return False
def on_refresh_mirrors(self, button):
if self.is_loading:
return
protocols = []
if self.http_check.get_active():
protocols.append("http")
if self.https_check.get_active():
protocols.append("https")
if not protocols:
self.show_error_dialog("No Protocols", "Select at least one protocol")
return
ip_versions = []
if self.ipv4_check.get_active():
ip_versions.append("4")
if self.ipv6_check.get_active():
ip_versions.append("6")
if not ip_versions:
self.show_error_dialog("No IP Versions", "Select at least one IP version")
return
selected = self.country_row.get_selected()
country = self.country_store.get_string(selected) if selected < self.country_store.get_n_items() else None
if country == "Worldwide":
country = None
use_status = self.status_check.get_active()
def fetch_in_background():
try:
GLib.idle_add(self.set_loading_state, True, "Fetching mirrors...")
self.mirror_manager.fetch_mirrors(country, protocols, ip_versions, use_status)
GLib.idle_add(self.on_fetch_complete)
except Exception as e:
GLib.idle_add(self.on_fetch_error, str(e))
thread = threading.Thread(target=fetch_in_background, daemon=True)
thread.start()
def on_fetch_complete(self):
self.set_loading_state(False)
self.update_mirror_list()
self.enable_mirror_controls()
self.mirror_scroll.set_child(self.mirror_list)
count = len(self.mirror_manager.mirrors)
self.show_info_dialog("Success", f"Loaded {count} mirror(s)")
return False
def on_fetch_error(self, error_message):
self.set_loading_state(False)
self.show_error_dialog("Fetch Failed", error_message)
return False
def on_rank_mirrors(self, button):
if self.is_loading or not self.mirror_manager.mirrors:
return
def rank_in_background():
try:
total = len(self.mirror_manager.mirrors)
for i, mirror in enumerate(self.mirror_manager.mirrors):
GLib.idle_add(self.set_loading_state, True, f"Testing mirror {i+1}/{total}...")
self.mirror_manager.test_mirror_speed(mirror)
GLib.idle_add(self.update_mirror_list)
self.mirror_manager.sort_by_speed()
GLib.idle_add(self.on_rank_complete)
except Exception as e:
GLib.idle_add(self.on_rank_error, str(e))
thread = threading.Thread(target=rank_in_background, daemon=True)
thread.start()
def on_rank_complete(self):
self.set_loading_state(False)
self.update_mirror_list()
self.show_info_dialog("Ranking Complete", "Mirrors sorted by speed.\n\nEnable/disable mirrors, then use 'Sync' to save changes.")
return False
def on_rank_error(self, error_message):
self.set_loading_state(False)
self.show_error_dialog("Ranking Error", error_message)
return False
def enable_mirror_controls(self):
has_mirrors = len(self.mirror_manager.mirrors) > 0
self.rank_btn.set_sensitive(has_mirrors)
self.sort_speed_btn.set_sensitive(has_mirrors)
self.sort_country_btn.set_sensitive(has_mirrors)
self.sort_age_btn.set_sensitive(has_mirrors)
def on_mirror_selected(self, listbox, row):
self.selected_mirror = row.mirror if row else None
has_selection = self.selected_mirror is not None
self.enable_btn.set_sensitive(has_selection)
self.disable_btn.set_sensitive(has_selection)
def on_enable_mirror(self, button):
if self.selected_mirror:
self.selected_mirror.enabled = True
self.update_mirror_list()
def on_disable_mirror(self, button):
if self.selected_mirror:
self.selected_mirror.enabled = False
self.update_mirror_list()
def on_sort_speed(self, button):
self.mirror_manager.sort_by_speed()
self.update_mirror_list()
def on_sort_country(self, button):
self.mirror_manager.sort_by_country()
self.update_mirror_list()
def on_sort_age(self, button):
self.mirror_manager.sort_by_age()
self.update_mirror_list()
def update_mirror_list(self):
while self.mirror_list.get_first_child():
self.mirror_list.remove(self.mirror_list.get_first_child())
for mirror in self.mirror_manager.mirrors:
row = Adw.ActionRow()
row.set_title(mirror.url)
subtitle_parts = []
country_display = f"📍 {mirror.country}"
subtitle_parts.append(country_display)
protocol_display = f"🔗 {mirror.protocol.upper()}"
subtitle_parts.append(protocol_display)
if mirror.speed is not None:
speed_text = f"{mirror.speed:.0f}ms"
if mirror.speed < 100:
speed_display = f"🟢 {speed_text}"
elif mirror.speed < 300:
speed_display = f"🟡 {speed_text}"
else:
speed_display = f"🔴 {speed_text}"
else:
speed_display = "⚪ Not tested"
subtitle_parts.append(speed_display)
sync_text = mirror.last_sync.split('T')[0] if mirror.last_sync else "Unknown"
subtitle_parts.append(f"🕒 {sync_text}")
row.set_subtitle("".join(subtitle_parts))
status_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
if mirror.enabled:
status_icon = Gtk.Image.new_from_icon_name("emblem-ok-symbolic")
status_icon.add_css_class("success")
status_label = Gtk.Label(label="Enabled")
status_label.add_css_class("success")
else:
status_icon = Gtk.Image.new_from_icon_name("window-close-symbolic")
status_icon.add_css_class("error")
status_label = Gtk.Label(label="Disabled")
status_label.add_css_class("dim-label")
status_box.append(status_icon)
status_box.append(status_label)
row.add_suffix(status_box)
row.mirror = mirror
self.mirror_list.append(row)
return False
def on_sync_repos(self, button):
mirrorlist_content = "## Parch Mirrorlist\n\n" + '\n'.join(f"Server = {m.url}$repo/os/$arch" for m in self.mirror_manager.mirrors if m.enabled) + '\n'
temp_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp:
temp.write(mirrorlist_content)
temp_path = temp.name
subprocess.check_call(['pkexec', 'cp', temp_path, '/etc/pacman.d/mirrorlist'])
self.show_sync_progress()
except Exception as e:
self.show_error_dialog("Save Failed", str(e))
finally:
if temp_path:
os.unlink(temp_path)
def show_sync_progress(self):
dialog = Adw.AlertDialog(heading="Syncing Repositories")
progress = Gtk.ProgressBar()
progress.set_show_text(True)
progress.set_text("Syncing...")
content_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
content_box.append(progress)
dialog.set_extra_child(content_box)
dialog.add_response("ok", "OK")
dialog.set_response_appearance("ok", Adw.ResponseAppearance.SUGGESTED)
dialog.present(self)
def pulse():
progress.pulse()
return self.is_syncing
self.is_syncing = True
timeout_id = GLib.timeout_add(100, pulse)
def sync_thread():
try:
subprocess.check_call(['pkexec', 'pacman', '-Sy'])
GLib.idle_add(self.on_sync_complete, dialog, timeout_id)
except Exception as e:
GLib.idle_add(self.on_sync_error, dialog, timeout_id, str(e))
thread = threading.Thread(target=sync_thread, daemon=True)
thread.start()
def on_sync_complete(self, dialog, timeout_id):
GLib.source_remove(timeout_id)
self.is_syncing = False
dialog.close()
self.show_info_dialog("Success", "Mirrorlist saved and repositories synced successfully")
return False
def on_sync_error(self, dialog, timeout_id, error):
GLib.source_remove(timeout_id)
self.is_syncing = False
dialog.close()
self.show_error_dialog("Sync Failed", error)
return False
def on_update_system(self, button):
dialog = Adw.AlertDialog(
heading="Update System?",
body="This will update all system packages. This operation may take some time. Continue?"
)
dialog.add_response("cancel", "Cancel")
dialog.add_response("update", "Update")
dialog.set_response_appearance("update", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.connect("response", self.on_update_confirmed)
dialog.present(self)
def on_update_confirmed(self, dialog, response):
if response == "update":
open_terminal_with_repo_command('pkexec pacman -Syu', self)
def update_repo_list(self):
while self.repo_list.get_first_child():
self.repo_list.remove(self.repo_list.get_first_child())
repositories = self.repo_config.get_repositories()
for repo_name in self.repo_config.standard_repos:
enabled = repositories.get(repo_name, False)
row = Adw.ActionRow()
row.set_title(repo_name)
row.set_icon_name("folder-symbolic")
switch = Gtk.Switch()
switch.set_active(enabled)
switch.set_valign(Gtk.Align.CENTER)
switch.connect("state-set", self.on_repo_toggle, repo_name)
row.add_suffix(switch)
row.set_activatable_widget(switch)
self.repo_list.append(row)
def update_third_list(self):
row = self.third_list.get_first_child()
i = 0
while row:
repo_name = self.third_party_repos[i]
switch = row.get_activatable_widget()
switch.set_active(self.repo_config.repositories[repo_name])
row = row.get_next_sibling()
i += 1
def on_repo_toggle(self, switch, state, repo_name):
success = self.toggle_repo_config(repo_name, state)
if success:
self.repo_config.repositories[repo_name] = state
self.update_repo_list()
else:
switch.set_state(not state)
return False
def on_third_party_toggle(self, switch, state, repo_name):
if state:
try:
config = self.third_party_configs[repo_name]
if 'strap_url' in config:
strap_path = f'/tmp/strap_{repo_name}.sh'
urllib.request.urlretrieve(config['strap_url'], strap_path)
os.chmod(strap_path, 0o755)
subprocess.check_call(['pkexec', 'bash', strap_path])
os.unlink(strap_path)
else:
if 'key' in config:
subprocess.check_call(['pkexec', 'pacman-key', '--recv-key', config['key'], '--keyserver', 'keyserver.ubuntu.com'])
subprocess.check_call(['pkexec', 'pacman-key', '--lsign-key', config['key']])
if 'keyring_url' in config:
subprocess.check_call(['pkexec', 'pacman', '-U', '--noconfirm', config['keyring_url'], config['mirrorlist_url']])
except Exception as e:
self.show_error_dialog("Enable Failed", str(e))
switch.set_state(False)
return False
success = self.toggle_repo_config(repo_name, state, is_third_party=True)
if success:
self.repo_config.repositories[repo_name] = state
self.update_third_list()
else:
switch.set_state(not state)
return False
def on_settings_clicked(self, button):
options_window = PacmanOptionsWindow(parent=self)
options_window.present()
class ParchRepoManagerApp(Adw.Application):
def __init__(self):
super().__init__(application_id="org.parchlinux.ParchRepoManager")
self.connect("activate", self.on_activate)
def on_activate(self, app):
win = MainWindow(self)
win.present()
def main():
if not os.environ.get('DISPLAY') and not os.environ.get('WAYLAND_DISPLAY'):
print("Error: No display environment detected.")
print("Ensure you are running with 'sudo -E'.")
sys.exit(1)
if not Gtk.init_check():
print("Error: Failed to initialize GTK.")
sys.exit(1)
Adw.StyleManager.get_default().set_color_scheme(Adw.ColorScheme.DEFAULT)
app = ParchRepoManagerApp()
app.run(sys.argv)
if __name__ == "__main__":
main()

255
src/mirror_manager.py Normal file
View File

@@ -0,0 +1,255 @@
import urllib.request
import os
import json
import time
import ssl
from urllib.error import URLError, HTTPError
from datetime import datetime, timezone
class MirrorManager:
class Mirror:
def __init__(self, url, country, protocol, speed=None, last_sync=None, enabled=True, ipv4=True, ipv6=False):
self.url = url
self.country = country
self.protocol = protocol
self.speed = speed
self.last_sync = last_sync
self.enabled = enabled
self.ipv4 = ipv4
self.ipv6 = ipv6
def __init__(self):
self.mirrors = []
# ONLY touch mirrorlist, NEVER pacman.conf
self.mirrorlist_file = "/etc/pacman.d/mirrorlist"
self.mirrorlist_backup = "/etc/pacman.d/mirrorlist.backup"
self.countries = set()
# Create SSL context that handles modern SSL properly
self.ssl_context = ssl.create_default_context()
# For testing/development, you can uncomment this line:
# self.ssl_context.check_hostname = False
# self.ssl_context.verify_mode = ssl.CERT_NONE
def fetch_countries_only(self):
"""Fetch just the list of countries without loading all mirrors"""
try:
request = urllib.request.Request(
"https://archlinux.org/mirrors/status/json/",
headers={'User-Agent': 'Arch-Repository-Manager/1.0'}
)
with urllib.request.urlopen(request, timeout=10, context=self.ssl_context) as response:
data = json.loads(response.read().decode())
countries = set()
for mirror in data.get("urls", []):
country = mirror.get("country", "Unknown")
if country and country != "Unknown":
countries.add(country)
self.countries = countries
return countries
except Exception as e:
# Return empty set on error, don't crash
return set()
def fetch_mirrors(self, country=None, protocols=None, ip_versions=None, use_status=False):
"""Fetch mirror list from Arch Linux mirror status API"""
try:
# Use proper SSL context
request = urllib.request.Request(
"https://archlinux.org/mirrors/status/json/",
headers={'User-Agent': 'Arch-Repository-Manager/1.0'}
)
with urllib.request.urlopen(request, timeout=10, context=self.ssl_context) as response:
data = json.loads(response.read().decode())
self.mirrors = []
self.countries = set()
for mirror in data.get("urls", []):
mirror_country = mirror.get("country", "Unknown")
self.countries.add(mirror_country)
# Apply country filter
if country and country != mirror_country:
continue
# Apply protocol filter
protocol = mirror.get("protocol", "unknown")
if protocols and protocol.lower() not in protocols:
continue
# Get mirror details
url = mirror.get("url")
if not url:
continue
# Apply IP version
ipv4 = "4" in ip_versions if ip_versions else True
ipv6 = "6" in ip_versions if ip_versions else False
last_sync = mirror.get("last_sync", None)
# Apply status filter
if use_status and not self.is_mirror_up_to_date(mirror):
continue
# Create mirror object
self.mirrors.append(
self.Mirror(
url=url,
country=mirror_country,
protocol=protocol,
last_sync=last_sync,
ipv4=ipv4,
ipv6=ipv6
)
)
self.countries.add("Worldwide")
return True
except HTTPError as e:
raise Exception(f"HTTP Error {e.code}: {e.reason}")
except URLError as e:
if hasattr(e.reason, 'errno'):
raise Exception(f"Network error: {e.reason}")
else:
raise Exception(f"Failed to connect: {e.reason}")
except ssl.SSLError as e:
raise Exception(f"SSL Error: {str(e)}\n\nTry updating your system certificates:\nsudo pacman -S ca-certificates")
except json.JSONDecodeError as e:
raise Exception(f"Invalid response from mirror status API: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def is_mirror_up_to_date(self, mirror):
"""Check if mirror was synced within the last 24 hours"""
last_sync = mirror.get("last_sync")
if not last_sync:
return False
try:
# Parse ISO format timestamp
sync_time = datetime.fromisoformat(last_sync.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
hours_old = (now - sync_time).total_seconds() / 3600
# Consider up-to-date if synced within 24 hours
return hours_old < 24
except (ValueError, AttributeError):
return False
def test_mirror_speed(self, mirror):
"""Test mirror response time"""
if not mirror.url:
mirror.speed = None
return
# Only test HTTP/HTTPS mirrors
if not mirror.url.startswith(('http://', 'https://')):
mirror.speed = None
return
start_time = time.time()
try:
# Test with a small core database file
test_url = mirror.url.rstrip('/') + '/core/os/x86_64/core.db'
request = urllib.request.Request(
test_url,
headers={'User-Agent': 'Arch-Repository-Manager/1.0'}
)
with urllib.request.urlopen(request, timeout=5, context=self.ssl_context) as response:
# Just read headers, don't download entire file
response.read(1024)
elapsed = (time.time() - start_time) * 1000 # Convert to ms
mirror.speed = elapsed
except (URLError, HTTPError, ssl.SSLError, TimeoutError):
# Mirror unreachable or slow
mirror.speed = None
except Exception:
# Any other error
mirror.speed = None
def refresh_mirrors(self, country=None, protocols=None, ip_versions=None, use_status=False):
"""Refresh mirror list and test speeds - DOES NOT SAVE"""
if self.fetch_mirrors(country, protocols, ip_versions, use_status):
# Test mirror speeds (this can take a while)
for i, mirror in enumerate(self.mirrors):
self.test_mirror_speed(mirror)
# Optional: Add progress callback here
def enable_mirror(self, mirror):
"""Enable a specific mirror - DOES NOT SAVE"""
mirror.enabled = True
def disable_mirror(self, mirror):
"""Disable a specific mirror - DOES NOT SAVE"""
mirror.enabled = False
def rank_mirrors(self):
"""Sort mirrors by speed - DOES NOT SAVE"""
self.sort_by_speed()
def sort_by_speed(self):
"""Sort mirrors by response time (fastest first)"""
# Put mirrors with no speed at the end
self.mirrors.sort(key=lambda m: (m.speed is None, m.speed if m.speed else float('inf')))
def sort_by_country(self):
"""Sort mirrors alphabetically by country"""
self.mirrors.sort(key=lambda m: m.country)
def sort_by_age(self):
"""Sort mirrors by last sync time (newest first)"""
def sort_key(m):
if not m.last_sync:
return "9999-12-31" # Put unknowns at the end
return m.last_sync
self.mirrors.sort(key=sort_key, reverse=True)
def get_countries(self):
"""Get sorted list of available countries"""
# Remove "Worldwide" if it exists, we'll add it separately
countries = self.countries - {"Worldwide"}
return sorted(countries)
def save_mirrorlist(self):
"""Save enabled mirrors to /etc/pacman.d/mirrorlist - NEVER TOUCHES pacman.conf"""
try:
# SAFETY CHECK - verify we're writing to the correct file
if self.mirrorlist_file != "/etc/pacman.d/mirrorlist":
raise Exception("SAFETY ERROR: Attempting to write to wrong file!")
# Create backup of existing mirrorlist
import shutil
if os.path.exists(self.mirrorlist_file):
shutil.copy2(self.mirrorlist_file, self.mirrorlist_backup)
# Write new mirrorlist
with open(self.mirrorlist_file, "w") as f:
f.write("##\n")
f.write("## Parch Linux repository mirrorlist\n")
f.write(f"## Generated by Parch Repository Manager on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("##\n\n")
enabled_count = sum(1 for m in self.mirrors if m.enabled)
f.write(f"## {enabled_count} enabled mirror(s)\n\n")
for mirror in self.mirrors:
if mirror.enabled:
url = mirror.url.rstrip('/') + '/$repo/os/$arch'
f.write(f"Server = {url}\n")
except PermissionError:
raise Exception("Permission denied. Ensure you are running as root.")
except IOError as e:
raise Exception(f"Failed to write mirrorlist: {str(e)}")

307
src/pacman_util.py Normal file
View File

@@ -0,0 +1,307 @@
import configparser
import os
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gtk, Adw, Gio, GLib
import tempfile
import subprocess
class PacmanOptionsWindow(Adw.Window):
__gtype_name__ = 'PacmanOptionsWindow'
def __init__(self, parent=None):
super().__init__(transient_for=parent, modal=True)
self.set_title("Pacman Options")
self.set_default_size(600, 600)
self.config = configparser.ConfigParser(allow_no_value=True)
self.pacman_conf = "/etc/pacman.conf"
self.load_config()
toolbar_view = Adw.ToolbarView()
self.set_content(toolbar_view)
header = Adw.HeaderBar()
toolbar_view.add_top_bar(header)
save_btn = Gtk.Button(label="Save")
save_btn.add_css_class("suggested-action")
save_btn.connect("clicked", self.on_save)
header.pack_end(save_btn)
cancel_btn = Gtk.Button(label="Cancel")
cancel_btn.connect("clicked", lambda btn: self.destroy())
header.pack_start(cancel_btn)
scroll = Gtk.ScrolledWindow()
scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scroll.set_vexpand(True)
toolbar_view.set_content(scroll)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
box.set_margin_top(12)
box.set_margin_bottom(12)
box.set_margin_start(12)
box.set_margin_end(12)
scroll.set_child(box)
group = Adw.PreferencesGroup(title="Package Management")
box.append(group)
ignore_row = Adw.ActionRow(title="IgnorePkg")
ignore_row.set_subtitle("Space-separated packages to ignore during upgrades")
self.ignore_entry = Gtk.Entry(text=' '.join(self.get_list('options', 'IgnorePkg')))
ignore_row.add_suffix(self.ignore_entry)
group.add(ignore_row)
hold_row = Adw.ActionRow(title="HoldPkg")
hold_row.set_subtitle("Space-separated packages to hold during upgrades")
self.hold_entry = Gtk.Entry(text=' '.join(self.get_list('options', 'HoldPkg')))
hold_row.add_suffix(self.hold_entry)
group.add(hold_row)
noupgrade_row = Adw.ActionRow(title="NoUpgrade")
noupgrade_row.set_subtitle("Space-separated files to protect from upgrade")
self.noupgrade_entry = Gtk.Entry(text=' '.join(self.get_list('options', 'NoUpgrade')))
noupgrade_row.add_suffix(self.noupgrade_entry)
group.add(noupgrade_row)
noextract_row = Adw.ActionRow(title="NoExtract")
noextract_row.set_subtitle("Space-separated files to skip during extraction")
self.noextract_entry = Gtk.Entry(text=' '.join(self.get_list('options', 'NoExtract')))
noextract_row.add_suffix(self.noextract_entry)
group.add(noextract_row)
syncfirst_row = Adw.ActionRow(title="SyncFirst")
syncfirst_row.set_subtitle("Space-separated packages to sync first")
self.syncfirst_entry = Gtk.Entry(text=' '.join(self.get_list('options', 'SyncFirst')))
syncfirst_row.add_suffix(self.syncfirst_entry)
group.add(syncfirst_row)
checkspace_row = Adw.ActionRow(title="CheckSpace")
checkspace_row.set_subtitle("Check for sufficient disk space before installing")
self.checkspace_switch = Gtk.Switch()
self.checkspace_switch.set_valign(Gtk.Align.CENTER)
self.checkspace_switch.set_active(self.get_bool('options', 'CheckSpace'))
checkspace_row.add_suffix(self.checkspace_switch)
checkspace_row.set_activatable_widget(self.checkspace_switch)
group.add(checkspace_row)
candy_row = Adw.ActionRow(title="ILoveCandy")
candy_row.set_subtitle("Display a candy cane progress bar")
self.candy_switch = Gtk.Switch()
self.candy_switch.set_valign(Gtk.Align.CENTER)
self.candy_switch.set_active(self.get_bool('options', 'ILoveCandy'))
candy_row.add_suffix(self.candy_switch)
candy_row.set_activatable_widget(self.candy_switch)
group.add(candy_row)
parallel_row = Adw.ActionRow(title="ParallelDownloads")
parallel_row.set_subtitle("Number of parallel downloads (1-100)")
self.parallel_spin = Gtk.SpinButton()
self.parallel_spin.set_adjustment(Gtk.Adjustment(lower=1, upper=100, step_increment=1))
self.parallel_spin.set_value(self.get_int('options', 'ParallelDownloads', 5))
parallel_row.add_suffix(self.parallel_spin)
group.add(parallel_row)
cleanmethod_row = Adw.ComboRow(title="CleanMethod")
cleanmethod_row.set_subtitle("Cache cleaning method")
cleanmethod_store = Gtk.StringList()
cleanmethod_store.append("KeepInstalled")
cleanmethod_store.append("KeepCurrent")
cleanmethod_row.set_model(cleanmethod_store)
cleanmethod_value = self.get_string('options', 'CleanMethod', 'KeepInstalled')
cleanmethod_row.set_selected(0 if cleanmethod_value == 'KeepInstalled' else 1)
group.add(cleanmethod_row)
self.cleanmethod_row = cleanmethod_row
arch_row = Adw.ComboRow(title="Architecture")
arch_row.set_subtitle("System architecture")
arch_store = Gtk.StringList()
arch_store.append("auto")
arch_store.append("x86_64")
arch_store.append("x86_64_v3")
arch_row.set_model(arch_store)
arch_value = self.get_string('options', 'Architecture', 'auto')
if arch_value == 'auto':
arch_row.set_selected(0)
elif arch_value == 'x86_64':
arch_row.set_selected(1)
elif arch_value == 'x86_64_v3':
arch_row.set_selected(2)
else:
arch_row.set_selected(0)
group.add(arch_row)
self.arch_row = arch_row
def load_config(self):
if os.path.exists(self.pacman_conf):
try:
self.config.read(self.pacman_conf)
except configparser.Error as e:
dialog = Adw.AlertDialog(heading="Error", body=f"Failed to parse pacman.conf: {str(e)}")
dialog.add_response("ok", "OK")
dialog.present(self)
def get_list(self, section, option):
if self.config.has_option(section, option):
return self.config.get(section, option).split()
return []
def get_bool(self, section, option):
if self.config.has_option(section, option):
try:
value = self.config.get(section, option)
if value is None or value == '':
return True
return self.config.getboolean(section, option)
except (ValueError, configparser.Error):
return False
return False
def get_int(self, section, option, default=0):
if self.config.has_option(section, option):
try:
return self.config.getint(section, option)
except ValueError:
return default
return default
def get_string(self, section, option, default=''):
if self.config.has_option(section, option):
return self.config.get(section, option)
return default
def get_key_from_line(self, line):
stripped = line.strip()
if not stripped or stripped.startswith('#'):
return None
if '=' in stripped:
return stripped.split('=')[0].strip()
else:
return stripped
def on_save(self, button):
updates = {}
# IgnorePkg
ignore_list = [x for x in self.ignore_entry.get_text().split() if x]
if ignore_list:
updates['IgnorePkg'] = f"IgnorePkg = {' '.join(ignore_list)}\n"
else:
updates['IgnorePkg'] = None
# HoldPkg
hold_list = [x for x in self.hold_entry.get_text().split() if x]
if hold_list:
updates['HoldPkg'] = f"HoldPkg = {' '.join(hold_list)}\n"
else:
updates['HoldPkg'] = None
# NoUpgrade
noupgrade_list = [x for x in self.noupgrade_entry.get_text().split() if x]
if noupgrade_list:
updates['NoUpgrade'] = f"NoUpgrade = {' '.join(noupgrade_list)}\n"
else:
updates['NoUpgrade'] = None
# NoExtract
noextract_list = [x for x in self.noextract_entry.get_text().split() if x]
if noextract_list:
updates['NoExtract'] = f"NoExtract = {' '.join(noextract_list)}\n"
else:
updates['NoExtract'] = None
# SyncFirst
syncfirst_list = [x for x in self.syncfirst_entry.get_text().split() if x]
if syncfirst_list:
updates['SyncFirst'] = f"SyncFirst = {' '.join(syncfirst_list)}\n"
else:
updates['SyncFirst'] = None
# CheckSpace
if self.checkspace_switch.get_active():
updates['CheckSpace'] = "CheckSpace\n"
else:
updates['CheckSpace'] = None
# ILoveCandy
if self.candy_switch.get_active():
updates['ILoveCandy'] = "ILoveCandy\n"
else:
updates['ILoveCandy'] = None
# ParallelDownloads
parallel_val = int(self.parallel_spin.get_value())
updates['ParallelDownloads'] = f"ParallelDownloads = {parallel_val}\n"
# CleanMethod
cleanmethod_value = 'KeepInstalled' if self.cleanmethod_row.get_selected() == 0 else 'KeepCurrent'
updates['CleanMethod'] = f"CleanMethod = {cleanmethod_value}\n"
# Architecture
arch_selected = self.arch_row.get_selected()
arch_value = 'auto' if arch_selected == 0 else 'x86_64' if arch_selected == 1 else 'x86_64_v3'
updates['Architecture'] = f"Architecture = {arch_value}\n"
temp_path = None
try:
with open(self.pacman_conf, 'r') as f:
lines = f.readlines()
new_lines = []
in_options = False
added = set()
for line in lines:
added_this = False
if line.strip() == '[options]':
in_options = True
elif line.strip().startswith('[') and line.strip().endswith(']') and line.strip() != '[options]':
# Add pending updates before leaving section
for key, val in updates.items():
if val and key not in added:
new_lines.append(val)
added.add(key)
in_options = False
if in_options:
key = self.get_key_from_line(line)
if key in updates:
if updates[key]:
new_lines.append(updates[key])
added.add(key)
added_this = True
# else skip to remove
if not (in_options and added_this):
new_lines.append(line)
# If file ended while still in options section, add remaining
if in_options:
for key, val in updates.items():
if val and key not in added:
new_lines.append(val)
added.add(key)
# Write to temp file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp:
for line in new_lines:
temp.write(line)
temp_path = temp.name
# Use pkexec to copy
subprocess.check_call(['pkexec', 'cp', temp_path, self.pacman_conf])
dialog = Adw.AlertDialog(heading="Success", body="Settings saved successfully.")
dialog.add_response("ok", "OK")
dialog.present(self)
except Exception as e:
dialog = Adw.AlertDialog(heading="Error", body=str(e))
dialog.add_response("ok", "OK")
dialog.present(self)
finally:
if temp_path:
os.unlink(temp_path)
self.destroy()

184
src/repo_config.py Normal file
View File

@@ -0,0 +1,184 @@
import subprocess
import re
import os
def get_suitable_terminal():
"""Detect and return the command to open a terminal based on desktop environment."""
desktop = os.environ.get('XDG_CURRENT_DESKTOP', '').upper()
if 'GNOME' in desktop:
if subprocess.run(['which', 'ptyxis'], capture_output=True).returncode == 0:
return 'ptyxis -x '
elif subprocess.run(['which', 'gnome-terminal'], capture_output=True).returncode == 0:
return 'gnome-terminal -- '
elif 'KDE' in desktop or 'PLASMA' in desktop:
if subprocess.run(['which', 'konsole'], capture_output=True).returncode == 0:
return 'konsole -e '
elif 'XFCE' in desktop:
if subprocess.run(['which', 'xfce4-terminal'], capture_output=True).returncode == 0:
return 'xfce4-terminal -e '
for term in ['alacritty', 'kitty', 'xterm']:
if subprocess.run(['which', term], capture_output=True).returncode == 0:
return f'{term} -e '
return None
def open_terminal_with_repo_command(command, parent_window=None):
"""Open a terminal with the given command."""
escaped_cmd = command.replace("'", "'\"'\"'")
terminal_cmd = get_suitable_terminal()
if terminal_cmd is None:
if parent_window:
parent_window.show_error_dialog("Terminal Error", "No suitable terminal emulator found.")
else:
print("Error: No suitable terminal emulator found.")
return
full_cmd = f"{terminal_cmd} bash -c '{escaped_cmd}; echo; echo Press ENTER to close...; read'"
try:
subprocess.Popen(full_cmd, shell=True)
except Exception as e:
if parent_window:
parent_window.show_error_dialog("Terminal Error", str(e))
else:
print(f"Error opening terminal: {e}")
class RepoConfig:
def __init__(self):
self.pacman_conf = "/etc/pacman.conf"
self.standard_repos = ["core", "extra", "multilib"]
self.third_party_repos = ["chaotic-aur", "blackarch", "archlinuxcn"]
self.repositories = {repo: False for repo in self.standard_repos + self.third_party_repos}
self.load_pacman_conf()
def load_pacman_conf(self):
"""Load and parse pacman.conf to determine enabled repositories."""
try:
with open(self.pacman_conf, "r") as f:
lines = f.readlines()
repo_pattern = re.compile(r'^\s*(#?)\s*\[([^]]+)\]\s*$')
current_repo = None
for line in lines:
match = repo_pattern.match(line)
if match:
comment, repo_name = match.groups()
if repo_name == 'options':
continue
current_repo = repo_name
enabled = not bool(comment)
if repo_name in self.repositories:
self.repositories[repo_name] = enabled
except Exception as e:
print(f"Error loading pacman.conf: {e}")
def get_repositories(self):
"""Return the dictionary of repositories and their enabled status."""
return self.repositories
def set_repository_enabled(self, repo_name, enabled, parent_window=None):
"""Enable or disable a repository."""
if repo_name not in self.repositories:
print(f"Repository {repo_name} not supported.")
return
current_enabled = self.repositories[repo_name]
if current_enabled == enabled:
return # Already in desired state
if repo_name in self.standard_repos:
if enabled:
sed_cmd = f"sudo sed -i '/^#\\[{repo_name}\\]/,/^\\[/s/^#//g' {self.pacman_conf}"
else:
sed_cmd = f"sudo sed -i '/^\\[{repo_name}\\]/,/^\\[/s/^/#/' {self.pacman_conf}"
commands = [
sed_cmd,
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
elif repo_name in self.third_party_repos:
func_name = repo_name.replace("-", "_")
if enabled:
getattr(self, f"enable_{func_name}")(parent_window)
else:
getattr(self, f"disable_{func_name}")(parent_window)
def add_repository(self, repo_name, repo_url, parent_window=None):
"""Add a new repository to pacman.conf."""
if repo_name in self.repositories:
print(f"Repository {repo_name} already exists or is reserved.")
return
commands = [
"echo '' | sudo tee -a /etc/pacman.conf",
f"echo '[{repo_name}]' | sudo tee -a /etc/pacman.conf",
f"echo 'Server = {repo_url}' | sudo tee -a /etc/pacman.conf",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
self.repositories[repo_name] = True
def enable_chaotic_aur(self, parent_window=None):
"""Enable Chaotic-AUR repository."""
commands = [
"sudo pacman-key --recv-key 3056513887B78AEB --keyserver keyserver.ubuntu.com",
"sudo pacman-key --lsign-key 3056513887B78AEB",
"sudo pacman -U 'https://cdn-mirror.chaotic.cx/chaotic-aur/chaotic-keyring.pkg.tar.zst' --noconfirm",
"sudo pacman -U 'https://cdn-mirror.chaotic.cx/chaotic-aur/chaotic-mirrorlist.pkg.tar.zst' --noconfirm",
"echo '' | sudo tee -a /etc/pacman.conf",
"echo '[chaotic-aur]' | sudo tee -a /etc/pacman.conf",
"echo 'Include = /etc/pacman.d/chaotic-mirrorlist' | sudo tee -a /etc/pacman.conf",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
def disable_chaotic_aur(self, parent_window=None):
"""Disable Chaotic-AUR repository."""
commands = [
# Remove the entire [chaotic-aur] section until the next section or end of file
r"sudo sed -i '/\[chaotic-aur\]/,/^\[/{/^\[/!d; /^\[chaotic-aur\]/d}' /etc/pacman.conf",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
def enable_blackarch(self, parent_window=None):
"""Enable BlackArch repository."""
commands = [
"cd /tmp",
"curl -O https://blackarch.org/strap.sh",
"echo '26849980b35a42e6e192c6d9ed8c46f0d6d06047 strap.sh' | sha1sum -c",
"if [ $? -eq 0 ]; then chmod +x strap.sh && sudo ./strap.sh; else echo 'SHA1 verification failed!'; exit 1; fi",
"rm -f strap.sh",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
def disable_blackarch(self, parent_window=None):
"""Disable BlackArch repository."""
commands = [
# Remove the entire [blackarch] section until the next section or end of file
r"sudo sed -i '/\[blackarch\]/,/^\[/{/^\[/!d; /^\[blackarch\]/d}' /etc/pacman.conf",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
def enable_archlinuxcn(self, parent_window=None):
"""Enable ArchLinuxCN repository."""
commands = [
"echo '' | sudo tee -a /etc/pacman.conf",
"echo '[archlinuxcn]' | sudo tee -a /etc/pacman.conf",
"echo 'Server = https://repo.archlinuxcn.org/$arch' | sudo tee -a /etc/pacman.conf",
"sudo pacman -Syy",
"sudo pacman -S archlinuxcn-keyring --noconfirm"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)
def disable_archlinuxcn(self, parent_window=None):
"""Disable ArchLinuxCN repository."""
commands = [
# Remove the entire [archlinuxcn] section until the next section or end of file
r"sudo sed -i '/\[archlinuxcn\]/,/^\[/{/^\[/!d; /^\[archlinuxcn\]/d}' /etc/pacman.conf",
"sudo pacman -Syy"
]
cmd_str = " && ".join(commands)
open_terminal_with_repo_command(cmd_str, parent_window)

104
src/sync_manager.py Normal file
View File

@@ -0,0 +1,104 @@
import subprocess
import os
class SyncManager:
"""Manages pacman repository sync and system updates"""
def __init__(self):
self.pacman_bin = "/usr/bin/pacman"
self._verify_pacman()
def _verify_pacman(self):
"""Verify pacman is available"""
if not os.path.exists(self.pacman_bin):
raise Exception("Pacman not found. Is this an Arch-based system?")
def sync_repositories(self):
"""
Sync package databases (pacman -Syy)
Forces refresh of all package databases
"""
try:
result = subprocess.run(
[self.pacman_bin, "-Syy", "--noconfirm"],
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise Exception(f"Failed to sync repositories:\n{error_msg}")
except FileNotFoundError:
raise Exception("Pacman executable not found")
except Exception as e:
raise Exception(f"Unexpected error during sync: {str(e)}")
def update_system(self):
"""
Update system packages (pacman -Syu)
Syncs databases and upgrades packages
"""
try:
result = subprocess.run(
[self.pacman_bin, "-Syu", "--noconfirm"],
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise Exception(f"Failed to update system:\n{error_msg}")
except FileNotFoundError:
raise Exception("Pacman executable not found")
except Exception as e:
raise Exception(f"Unexpected error during update: {str(e)}")
def check_updates(self):
"""
Check for available updates without installing them
Returns list of packages that can be updated
"""
try:
result = subprocess.run(
[self.pacman_bin, "-Qu"],
capture_output=True,
text=True,
check=False # Non-zero exit is normal when no updates
)
if result.returncode == 0:
# Parse output to get list of updateable packages
updates = []
for line in result.stdout.strip().split('\n'):
if line:
updates.append(line)
return updates
else:
return []
except Exception as e:
raise Exception(f"Failed to check for updates: {str(e)}")
def clean_cache(self):
"""
Clean package cache (pacman -Sc)
Removes old package files from cache
"""
try:
result = subprocess.run(
[self.pacman_bin, "-Sc", "--noconfirm"],
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise Exception(f"Failed to clean cache:\n{error_msg}")
except Exception as e:
raise Exception(f"Unexpected error during cache clean: {str(e)}")

9
src/utils.py Normal file
View File

@@ -0,0 +1,9 @@
import urllib.request
from urllib.error import URLError
def check_url(url):
try:
with urllib.request.urlopen(url, timeout=5) as response:
return response.getcode() == 200
except URLError:
return False