diff --git a/chroot/abstract.py b/chroot/abstract.py index d9905d9..0d43c95 100644 --- a/chroot/abstract.py +++ b/chroot/abstract.py @@ -7,6 +7,7 @@ from shlex import quote as shell_quote from typing import Protocol, Union, Optional, Mapping from uuid import uuid4 +from exec import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash from config import config from constants import Arch, CHROOT_PATHS from distro.distro import get_base_distro, get_kupfer_local, RepoInfo @@ -226,27 +227,15 @@ class Chroot(AbstractChroot): raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`') if outer_env is None: outer_env = os.environ.copy() - env_cmd = ['/usr/bin/env'] + [f'{shell_quote(key)}={shell_quote(value)}' for key, value in inner_env.items()] - kwargs: dict = { - 'env': outer_env, - } - if not attach_tty: - kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output} + env_cmd = generate_env_cmd(inner_env) if not isinstance(script, str) and isinstance(script, list): - script = ' '.join(script) + script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False) if cwd: script = f"cd {shell_quote(cwd)} && ( {script} )" - cmd = ['chroot', self.path] + env_cmd + [ - '/bin/bash', - '-c', - script, - ] - logging.debug(f'{self.name}: Running cmd: "{cmd}"') - if attach_tty: - return subprocess.call(cmd, **kwargs) - else: - return subprocess.run(cmd, **kwargs) + cmd = flatten_shell_script(['chroot', self.path] + env_cmd + wrap_in_bash(script, flatten_result=False), shell_quote_items=True) + + return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout) def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str: return self.mount( diff --git a/exec.py b/exec.py new file mode 100644 index 0000000..79cf53b --- /dev/null +++ b/exec.py @@ -0,0 +1,108 @@ +import logging +import os +import pwd +import subprocess + +from shlex import quote as shell_quote +from typing import Optional, Union, TypeAlias + +ElevationMethod: TypeAlias = str + +# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT. +# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key. + +ELEVATION_METHOD_DEFAULT = "sudo" + +ELEVATION_METHODS: dict[ElevationMethod, list[str]] = { + "sudo": ['sudo', '--'], +} + + +def generate_env_cmd(env: dict[str, str]): + return ['/usr/bin/env'] + [f'{key}={value}' for key, value in env.items()] + + +def flatten_shell_script(script: Union[list[str], str], shell_quote_items: bool = False, wrap_in_shell_quote=False) -> str: + """ + takes a shell-script and returns a flattened string for consumption with `sh -c`. + + `shell_quote_items` should only be used on `script` arrays that have no shell magic anymore, + e.g. `['bash', '-c', 'echo $USER']`, which would return the string `'bash' '-c' 'echo user'`, + which is suited for consumption by another bash -c process. + """ + if not isinstance(script, str) and isinstance(script, list): + cmds = script + if shell_quote_items: + cmds = [shell_quote(i) for i in cmds] + script = " ".join(cmds) + if wrap_in_shell_quote: + script = shell_quote(script) + return script + + +def wrap_in_bash(cmd: Union[list[str], str], flatten_result=True) -> Union[str, list[str]]: + res: Union[str, list[str]] = ['/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=False, wrap_in_shell_quote=False)] + if flatten_result: + res = flatten_shell_script(res, shell_quote_items=True, wrap_in_shell_quote=False) + return res + + +def generate_cmd_elevated(cmd: list[str], elevation_method: ElevationMethod): + "wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`." + if elevation_method not in ELEVATION_METHODS: + raise Exception(f"Unknown elevation method {elevation_method}") + return ELEVATION_METHODS[elevation_method] + cmd + + +def generate_cmd_su(cmd: list[str], switch_user: str, elevation_method: Optional[ElevationMethod] = None): + """ + returns cmd to escalate (e.g. sudo) and switch users (su) to run `cmd` as `switch_user` as necessary. + If `switch_user` is neither the current user nor root, cmd will have to be flattened into a single string. + A result might look like `['sudo', '--', 'su', '-s', '/bin/bash', '-c', cmd_as_a_string]`. + """ + current_uid = os.getuid() + if pwd.getpwuid(current_uid).pw_name != switch_user: + if switch_user != 'root': + cmd = ['/bin/su', switch_user, '-s', '/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=True)] + if current_uid != 0: # in order to use `/bin/su`, we have to be root first. + cmd = generate_cmd_elevated(cmd, elevation_method or ELEVATION_METHOD_DEFAULT) + + return cmd + + +def run_cmd( + script: Union[str, list[str]], + env: dict[str, str] = {}, + attach_tty: bool = False, + capture_output: bool = False, + cwd: Optional[str] = None, + stdout: Optional[int] = None, + switch_user: Optional[str] = None, + elevation_method: Optional[ElevationMethod] = None, +) -> Union[int, subprocess.CompletedProcess]: + "execute `script` as `switch_user`, elevating and su'ing as necessary" + kwargs: dict = {} + env_cmd = [] + if env: + env_cmd = generate_env_cmd(env) + kwargs['env'] = env + if not attach_tty: + kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output} + + script = flatten_shell_script(script) + if cwd: + kwargs['cwd'] = cwd + wrapped_script: list[str] = wrap_in_bash(script, flatten_result=False) # type: ignore + cmd = env_cmd + wrapped_script + if switch_user: + cmd = generate_cmd_su(cmd, switch_user, elevation_method=elevation_method) + logging.debug(f'Running cmd: "{cmd}"') + if attach_tty: + return subprocess.call(cmd, **kwargs) + else: + return subprocess.run(cmd, **kwargs) + + +def run_root_cmd(*kargs, **kwargs): + kwargs['switch_user'] = 'root' + return run_cmd(*kargs, **kwargs) diff --git a/utils.py b/utils.py index ec04d87..2bda44e 100644 --- a/utils.py +++ b/utils.py @@ -4,6 +4,8 @@ import subprocess from shutil import which from typing import Optional, Union, Sequence +from exec import run_cmd, run_root_cmd + def programs_available(programs: Union[str, Sequence[str]]) -> bool: if type(programs) is str: @@ -15,7 +17,7 @@ def programs_available(programs: Union[str, Sequence[str]]) -> bool: def umount(dest: str, lazy=False): - return subprocess.run( + return run_root_cmd( [ 'umount', '-c' + ('l' if lazy else ''), @@ -33,7 +35,7 @@ def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[ if fs_type: opts += ['-t', fs_type] - result = subprocess.run( + result = run_root_cmd( ['mount'] + opts + [ src, dest, @@ -46,7 +48,7 @@ def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[ def check_findmnt(path: str): - result = subprocess.run( + result = run_root_cmd( [ 'findmnt', '-n', @@ -59,8 +61,10 @@ def check_findmnt(path: str): return result.stdout.decode().strip() -def git(cmd: list[str], dir='.', capture_output=False) -> subprocess.CompletedProcess: - return subprocess.run(['git'] + cmd, cwd=dir, capture_output=capture_output) +def git(cmd: list[str], dir='.', capture_output=False, user: Optional[str] = None) -> subprocess.CompletedProcess: + result = run_cmd(['git'] + cmd, cwd=dir, capture_output=capture_output, switch_user=user) + assert isinstance(result, subprocess.CompletedProcess) + return result def log_or_exception(raise_exception: bool, msg: str, exc_class=Exception, log_level=logging.WARNING):