fix linter issues

This commit is contained in:
InsanePrawn 2025-03-29 20:25:03 +01:00
parent 35ef46332b
commit b8c072e5ee
10 changed files with 33 additions and 30 deletions

View file

@ -254,10 +254,10 @@ class Chroot(AbstractChroot):
# make sure paths start with '/'. Important: also copies the collection and casts to list, which will be sorted! # make sure paths start with '/'. Important: also copies the collection and casts to list, which will be sorted!
mounts = [make_abs_path(path) for path in relative_paths] mounts = [make_abs_path(path) for path in relative_paths]
mounts.sort(reverse=True) mounts.sort(reverse=True)
for mount in mounts: for _mount in mounts:
if mount == "/proc": if _mount == "/proc":
continue continue
self.umount(mount) self.umount(_mount)
if "/proc" in mounts: if "/proc" in mounts:
self.umount("/proc") self.umount("/proc")

View file

@ -84,7 +84,7 @@ def get_device_chroot(
**kwargs, **kwargs,
) -> DeviceChroot: ) -> DeviceChroot:
name = f"rootfs_{device}-{flavour}" name = f"rootfs_{device}-{flavour}"
repos: dict[str, RepoInfo] = ( repos: dict[str, RepoInfo] = dict(
get_kupfer_local(arch).repos get_kupfer_local(arch).repos
if use_local_repos if use_local_repos
else get_kupfer_https(arch).repos else get_kupfer_https(arch).repos

View file

@ -57,14 +57,14 @@ def prompt_config(
to_check or to_check is zero to_check or to_check is zero
) # can't do == due to boolean<->int casting ) # can't do == due to boolean<->int casting
if type(None) == field_type: if type(None) is field_type:
field_type = str field_type = str
if field_type == dict: if field_type is dict:
raise Exception( raise Exception(
"Dictionaries not supported by config_prompt, this is likely a bug in kupferbootstrap" "Dictionaries not supported by config_prompt, this is likely a bug in kupferbootstrap"
) )
elif field_type == list: elif field_type is list:
default = list_to_comma_str(default) default = list_to_comma_str(default)
value_conv = comma_str_to_list value_conv = comma_str_to_list
else: else:
@ -83,7 +83,7 @@ def prompt_config(
show_choices=show_choices, show_choices=show_choices,
) # type: ignore ) # type: ignore
changed = result != ( changed = result != (
original_default if field_type == list else default original_default if field_type is list else default
) and (true_or_zero(default) or true_or_zero(result)) ) and (true_or_zero(default) or true_or_zero(result))
if changed and echo_changes: if changed and echo_changes:
print(f'value changed: "{text}" = "{result}"') print(f'value changed: "{text}" = "{result}"')
@ -411,7 +411,7 @@ def cmd_config_set(
key: str = split_pair[0] key: str = split_pair[0]
value: Any = split_pair[1] value: Any = split_pair[1]
value_type = type(config_dot_name_get(key, CONFIG_DEFAULTS)) value_type = type(config_dot_name_get(key, CONFIG_DEFAULTS))
if value_type != list: if value_type is not list:
value = click.types.convert_type(value_type)(value) value = click.types.convert_type(value_type)(value)
else: else:
value = comma_str_to_list(value, default=[]) value = comma_str_to_list(value, default=[])

View file

@ -95,7 +95,7 @@ def resolve_profile(
for key, value in PROFILE_DEFAULTS_DICT.items(): for key, value in PROFILE_DEFAULTS_DICT.items():
if key not in full.keys(): if key not in full.keys():
full[key] = value # type: ignore[literal-required] full[key] = value # type: ignore[literal-required]
if type(value) == list: if type(value) is list:
full[key] = [] # type: ignore[literal-required] full[key] = [] # type: ignore[literal-required]
full["size_extra_mb"] = int(full["size_extra_mb"] or 0) full["size_extra_mb"] = int(full["size_extra_mb"] or 0)

View file

@ -274,7 +274,7 @@ class ConfigStateHolder:
raise ConfigLoadException(Exception(m)) raise ConfigLoadException(Exception(m))
ex = self.file_state.exception ex = self.file_state.exception
if ex: if ex:
if type(ex) == FileNotFoundError: if isinstance(ex, FileNotFoundError):
ex = Exception( ex = Exception(
"Config file doesn't exist. Try running `kupferbootstrap config init` first?" "Config file doesn't exist. Try running `kupferbootstrap config init` first?"
) )

View file

@ -50,7 +50,7 @@ def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
for hint in flatten_hints(hints): for hint in flatten_hints(hints):
t_origin = get_origin(hint) t_origin = get_origin(hint)
t_args = get_args(hint) t_args = get_args(hint)
if t_origin == dict: if t_origin is dict:
yield t_args yield t_args
continue continue
if t_origin in [NoneType, Optional, Union, UnionType] and t_args: if t_origin in [NoneType, Optional, Union, UnionType] and t_args:
@ -294,7 +294,7 @@ class DictScheme(Munch):
hints_flat = list(flatten_hints(_hints)) hints_flat = list(flatten_hints(_hints))
subclass = DictScheme subclass = DictScheme
for hint in hints_flat: for hint in hints_flat:
if get_origin(hint) == dict: if get_origin(hint) is dict:
_valtype = get_args(hint)[1] _valtype = get_args(hint)[1]
_subhints = {n: _valtype for n in v.keys()} _subhints = {n: _valtype for n in v.keys()}
break break

View file

@ -71,7 +71,7 @@ def chmod(
octal = octal.rjust(4, "0") octal = octal.rjust(4, "0")
try: try:
os.chmod(path, mode=octal) # type: ignore os.chmod(path, mode=octal) # type: ignore
except: except Exception:
cmd = ["chmod", octal, path] cmd = ["chmod", octal, path]
result = run_cmd(cmd, switch_user="root" if privileged else None) result = run_cmd(cmd, switch_user="root" if privileged else None)
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
@ -177,7 +177,7 @@ def remove_file(path: str, recursive=False):
try: try:
rm = rmtree if recursive else os.unlink rm = rmtree if recursive else os.unlink
rm(path) # type: ignore rm(path) # type: ignore
except: except Exception:
cmd = ["rm"] + (["-r"] if recursive else []) + [path] cmd = ["rm"] + (["-r"] if recursive else []) + [path]
rc = run_root_cmd(cmd).returncode rc = run_root_cmd(cmd).returncode
if rc: if rc:
@ -197,7 +197,7 @@ def makedir(
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
else: else:
os.mkdir(path) os.mkdir(path)
except: except Exception:
run_root_cmd(["mkdir"] + (["-p"] if parents else []) + [path]) run_root_cmd(["mkdir"] + (["-p"] if parents else []) + [path])
if mode is not None: if mode is not None:
chmod(path, mode=mode) chmod(path, mode=mode)
@ -212,7 +212,7 @@ def symlink(source, target):
"Create a symlink at `target`, pointing at `source`" "Create a symlink at `target`, pointing at `source`"
try: try:
os.symlink(source, target) os.symlink(source, target)
except: except Exception:
result = run_root_cmd(["ln", "-s", source, target]) result = run_root_cmd(["ln", "-s", source, target])
assert isinstance(result, subprocess.CompletedProcess) assert isinstance(result, subprocess.CompletedProcess)
if result.returncode: if result.returncode:

View file

@ -78,16 +78,17 @@ def get_fs_size(partition: str) -> tuple[int, int]:
blocks_cmd.stdout.decode("utf-8") if blocks_cmd.stdout else "" blocks_cmd.stdout.decode("utf-8") if blocks_cmd.stdout else ""
) )
try: try:
fs_blocks = int( re_blocks = re.search(
re.search(
"\\nBlock count:[ ]+([0-9]+)\\n", "\\nBlock count:[ ]+([0-9]+)\\n",
blocks_text, blocks_text,
flags=re.MULTILINE, flags=re.MULTILINE,
).group(1) )
) # type: ignore[union-attr] assert re_blocks, "Block output should contain block count"
fs_block_size = int( fs_blocks = int(re_blocks.group(1))
re.search("\\nBlock size:[ ]+([0-9]+)\\n", blocks_text).group(1)
) # type: ignore[union-attr] re_blocksize = re.search("\\nBlock size:[ ]+([0-9]+)\\n", blocks_text)
assert re_blocksize, "Block output should contain block size"
fs_block_size = int(re_blocksize.group(1))
except Exception as ex: except Exception as ex:
logging.debug(f"dumpe2fs stdout:\n {blocks_text}") logging.debug(f"dumpe2fs stdout:\n {blocks_text}")
logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}") logging.debug(f"dumpe2fs stderr:\n: {blocks_cmd.stderr}")
@ -133,6 +134,7 @@ def shrink_fs(loop_device: str, file: str, sector_size: int):
), # type: ignore ), # type: ignore
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
) )
assert child_proccess.stdin is not None
child_proccess.stdin.write( child_proccess.stdin.write(
"\n".join( "\n".join(
[ # type: ignore [ # type: ignore

View file

@ -747,7 +747,7 @@ def build_package(
failed_deps = [ failed_deps = [
name name
for name, res in dep_install.items() for name, res in dep_install.items()
if res.returncode != 0 if (res.returncode if not isinstance(res, int) else res) != 0
] # type: ignore[union-attr] ] # type: ignore[union-attr]
if failed_deps: if failed_deps:
raise Exception( raise Exception(
@ -1092,7 +1092,8 @@ def build_enable_qemu_binfmt(
.packages .packages
) )
pkgfiles = [ pkgfiles = [
os.path.join(crossrepo[pkg].resolved_url.split("file://")[1]) # we want this to crash on unresolved URL
os.path.join((crossrepo[pkg].resolved_url or "").split("file://")[1])
for pkg in QEMU_BINFMT_PKGS for pkg in QEMU_BINFMT_PKGS
] # type: ignore ] # type: ignore
runcmd = run_root_cmd runcmd = run_root_cmd

View file

@ -34,7 +34,7 @@ try:
integration_enabled = bool( integration_enabled = bool(
int(os.environ.get(INTEGRATION_TESTS_ENABLE_ENV_VAR, 0)) > 0 int(os.environ.get(INTEGRATION_TESTS_ENABLE_ENV_VAR, 0)) > 0
) )
except: except Exception:
pass pass