This commit is contained in:
arcolinuxz 2024-06-25 11:31:02 +02:00
parent b09f56fa6c
commit 92433c3100
10 changed files with 332 additions and 196 deletions

View file

@ -1,4 +1,4 @@
# Store kernel data taken from
# Store kernel data from ALA, pacman sync db
import datetime
from datetime import datetime

View file

@ -14,6 +14,7 @@ import datetime
import psutil
import queue
import pathlib
import locale
import tomlkit
from tomlkit import dumps, load
from datetime import timedelta
@ -26,17 +27,17 @@ from libs.Kernel import Kernel, InstalledKernel, CommunityKernel
gi.require_version("Gtk", "4.0")
from gi.repository import GLib
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
latest_archlinux_package_search_url = (
"https://archlinux.org/packages/search/json?name=${PACKAGE_NAME}"
)
archlinux_mirror_archive_url = "https://archive.archlinux.org"
headers = {"Content-Type": "text/plain;charset=UTF-8"}
headers = {
"Content-Type": "text/plain;charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Linux x86_64) Gecko Firefox",
}
dist_id = distro.id()
dist_name = distro.name()
cache_days = 5
fetched_kernels_dict = {}
@ -102,6 +103,11 @@ ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# set locale
locale.setlocale(locale.LC_ALL, "C.utf8")
locale_env = os.environ
locale_env["LC_ALL"] = "C.utf8"
# =====================================================
# CHECK FOR KERNEL UPDATES
@ -274,12 +280,17 @@ def permissions(dst):
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=locale_env,
)
for x in groups.stdout.decode().split(" "):
if "gid" in x:
g = x.split("(")[1]
group = g.replace(")", "").strip()
subprocess.call(["chown", "-R", sudo_username + ":" + group, dst], shell=False)
subprocess.call(
["chown", "-R", sudo_username + ":" + group, dst],
shell=False,
env=locale_env,
)
except Exception as e:
logger.error("Exception in permissions(): %s" % e)
@ -422,6 +433,7 @@ def install_archive_kernel(self):
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -592,23 +604,23 @@ def read_cache(self):
# any kernels older than 2 years
# (currently linux v4.x or earlier) are deemed eol so ignore them
# if (
# datetime.datetime.now().year
# - datetime.datetime.strptime(
# k["last_modified"], "%d-%b-%Y %H:%M"
# ).year
# <= 2
# ):
cached_kernels_list.append(
Kernel(
k["name"],
k["headers"],
k["version"],
k["size"],
k["last_modified"],
k["file_format"],
if (
datetime.datetime.now().year
- datetime.datetime.strptime(
k["last_modified"], "%d-%b-%Y %H:%M"
).year
<= 2
):
cached_kernels_list.append(
Kernel(
k["name"],
k["headers"],
k["version"],
k["size"],
k["last_modified"],
k["file_format"],
)
)
)
name = None
headers = None
@ -634,25 +646,30 @@ def read_cache(self):
# get latest versions of the official kernels
def get_latest_versions(self):
logger.info("Getting latest kernel information")
kernel_versions = {}
try:
kernel_versions = {}
for kernel in supported_kernels_dict:
check_cmd_str = ["pacman", "-Si", kernel]
process_kernel_query = subprocess.Popen(
with subprocess.Popen(
check_cmd_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = process_kernel_query.communicate(timeout=process_timeout)
if process_kernel_query.returncode == 0:
for line in out.decode("utf-8").splitlines():
if line.startswith("Version :"):
kernel_versions[kernel] = line.split("Version :")[1]
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
if line.strip().replace(" ", "").startswith("Version:"):
kernel_versions[kernel] = (
line.strip().replace(" ", "").split("Version:")[1]
)
break
self.kernel_versions_queue.put(kernel_versions)
@ -721,7 +738,6 @@ def parse_archive_html(response, linux_kernel):
def wait_for_response(response_queue):
while True:
# time.sleep(0.1)
items = response_queue.get()
# error break from loop
@ -782,15 +798,11 @@ def get_official_kernels(self):
for kernel in response_content:
parse_archive_html(response_content[kernel], kernel)
if len(fetched_kernels_dict) > 0: # and self.refresh_cache is True:
if len(fetched_kernels_dict) > 0:
write_cache()
read_cache(self)
self.queue_kernels.put(cached_kernels_list)
# elif self.refresh_cache is False:
# logger.info("Cache already processed")
# read_cache(self)
# self.queue_kernels.put(cached_kernels_list)
else:
logger.error("Failed to retrieve Linux Kernel list")
@ -887,18 +899,22 @@ def check_kernel_installed(name):
check_cmd_str = ["pacman", "-Q", name]
logger.debug("Running cmd = %s" % check_cmd_str)
process_kernel_query = subprocess.Popen(
check_cmd_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
check_cmd_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=locale_env,
)
out, err = process_kernel_query.communicate(timeout=process_timeout)
logger.debug(out.decode("utf-8"))
if process_kernel_query.returncode == 0:
for line in out.decode("utf-8").splitlines():
if line.split(" ")[0] == name:
# logger.debug("Kernel installed")
return True
else:
# logger.debug("Kernel is not installed")
return False
return False
@ -971,6 +987,7 @@ def uninstall(self):
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -1069,6 +1086,7 @@ def get_community_kernels(self):
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=locale_env,
)
out, err = process_kernel_query.communicate(timeout=process_timeout)
version = None
@ -1139,6 +1157,7 @@ def install_community_kernel(self):
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -1159,8 +1178,6 @@ def install_community_kernel(self):
error = True
if check_kernel_installed(self.kernel.name) and error is False:
logger.info("Kernel = installed")
self.kernel_state_queue.put((0, "install", self.kernel.name))
event = "%s [INFO]: Installation of %s completed\n" % (
@ -1171,8 +1188,6 @@ def install_community_kernel(self):
self.messages_queue.put(event)
else:
logger.error("Kernel = install failed")
self.kernel_state_queue.put((1, "install", self.kernel.name))
event = "%s [ERROR]: Installation of %s failed\n" % (
@ -1213,6 +1228,7 @@ def get_pacman_repos():
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -1233,8 +1249,13 @@ def get_installed_kernel_info(package_name):
query_str = ["pacman", "-Qi", package_name]
try:
process_kernel_query = subprocess.Popen(
query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=locale_env,
)
out, err = process_kernel_query.communicate(timeout=process_timeout)
install_size = None
@ -1264,36 +1285,41 @@ def get_installed_kernels():
installed_kernels = []
try:
process_kernel_query = subprocess.Popen(
query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
with subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
if line.lower().strip().startswith("linux"):
package_name = line.split(" ")[0].strip()
package_version = line.split(" ")[1].strip()
out, err = process_kernel_query.communicate(timeout=process_timeout)
if process_kernel_query.returncode == 0:
for line in out.decode("utf-8").splitlines():
if line.lower().strip().startswith("linux"):
package_name = line.split(" ")[0]
package_version = line.split(" ")[1]
if (
package_name in supported_kernels_dict
or package_name in community_kernels_dict
):
install_size, install_date = get_installed_kernel_info(
package_name
)
installed_kernel = InstalledKernel(
package_name,
package_version,
install_date,
install_size,
)
if (
package_name in supported_kernels_dict
or package_name in community_kernels_dict
):
install_size, install_date = get_installed_kernel_info(
package_name
)
installed_kernel = InstalledKernel(
package_name,
package_version,
install_date,
install_size,
)
installed_kernels.append(installed_kernel)
return installed_kernels
installed_kernels.append(installed_kernel)
except Exception as e:
logger.error("Exception in get_installed_kernels(): %s" % e)
finally:
return installed_kernels
# ======================================================================
@ -1307,7 +1333,11 @@ def get_active_kernel():
try:
process_kernel_query = subprocess.Popen(
query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=locale_env,
)
out, err = process_kernel_query.communicate(timeout=process_timeout)
@ -1339,6 +1369,7 @@ def sync_package_db():
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
env=locale_env,
)
if process_sync.returncode == 0:
@ -1367,6 +1398,7 @@ def get_boot_loader():
timeout=process_timeout,
universal_newlines=True,
bufsize=1,
env=locale_env,
)
if process.returncode == 0:
@ -1376,9 +1408,14 @@ def get_boot_loader():
if "grub" in product.lower():
logger.info("bootctl product reports booted with grub")
return "grub"
if "systemd-boot" in product.lower():
logger.info("bootcl product reports booted with systemd-boot")
elif "systemd-boot" in product.lower():
logger.info("bootctl product reports booted with systemd-boot")
return "systemd-boot"
else:
# "n/a" in product
logger.info("bootctl product reports n/a, using default grub")
return "grub"
elif line.strip().startswith("Not booted with EFI"): # noqa
# bios
logger.info(
@ -1411,6 +1448,7 @@ def get_kernel_version(kernel):
timeout=process_timeout,
universal_newlines=True,
bufsize=1,
env=locale_env,
)
if process.returncode == 0:
@ -1440,14 +1478,20 @@ def get_kernel_version(kernel):
def run_process(self):
error = False
self.stdout_lines = []
logger.debug("Running process = %s" % " ".join(self.cmd))
event = "%s [INFO]: Running %s\n" % (
datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
" ".join(self.cmd),
)
self.messages_queue.put(event)
with subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -1458,9 +1502,8 @@ def run_process(self):
print(line.strip())
for log in self.stdout_lines:
if "error" in log or "errors" in log:
if "error" in log or "errors" in log or "failed" in log:
self.errors_found = True
error = True
if error is True:
@ -1485,29 +1528,49 @@ def run_process(self):
# grub - grub-mkconfig /boot/grub/grub.cfg
# systemd-boot - bootctl update
def update_bootloader(self):
logger.info("Updating bootloader")
cmds = []
error = False
self.stdout_lines = []
if self.action == "install":
image = "images/48x48/akm-install.png"
if self.installed_kernel_version is not None:
for self.cmd in [
["kernel-install", "add-all"],
["kernel-install", "remove", self.installed_kernel_version],
# ["kernel-install", "add-all", "--verbose"],
[
"kernel-install",
"add",
self.installed_kernel_version,
"/lib/modules/%s/vmlinuz" % self.installed_kernel_version,
],
[
"kernel-install",
"remove",
self.installed_kernel_version,
],
]:
run_process(self)
else:
self.cmd = ["kernel-install", "add-all"]
# self.cmd = ["kernel-install", "add-all", "--verbose"].
self.installed_kernel_version = get_kernel_version(self.kernel.name)
self.cmd = [
"kernel-install",
"add",
self.installed_kernel_version,
"/lib/modules/%s/vmlinuz" % self.installed_kernel_version,
]
run_process(self)
else:
image = "images/48x48/akm-remove.png"
if self.installed_kernel_version is not None:
self.cmd = ["kernel-install", "remove", self.installed_kernel_version]
self.cmd = [
"kernel-install",
"remove",
self.installed_kernel_version,
]
run_process(self)
try:
@ -1559,6 +1622,7 @@ def update_bootloader(self):
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
env=locale_env,
) as process:
while True:
if process.poll() is not None:
@ -1661,10 +1725,14 @@ def update_bootloader(self):
image,
priority=GLib.PRIORITY_DEFAULT,
)
if os.path.exists(self.lockfile):
os.unlink(self.lockfile)
# else:
# logger.error("Bootloader update cannot continue, failed to set command.")
except Exception as e:
logger.error("Exception in update_bootloader(): %s" % e)
if os.path.exists(self.lockfile):
os.unlink(self.lockfile)
# ======================================================================