diff --git a/exec/__init__.py b/exec/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/exec/cmd.py b/exec/cmd.py index 4a3646d..8d99f61 100644 --- a/exec/cmd.py +++ b/exec/cmd.py @@ -54,17 +54,23 @@ def generate_cmd_elevated(cmd: list[str], elevation_method: ElevationMethod): return ELEVATION_METHODS[elevation_method] + cmd -def generate_cmd_su(cmd: list[str], switch_user: str, elevation_method: Optional[ElevationMethod] = None): +def generate_cmd_su( + cmd: list[str], + switch_user: str, + elevation_method: Optional[ElevationMethod] = None, + force_su: bool = False, + force_elevate: bool = False, +): """ returns cmd to escalate (e.g. sudo) and switch users (su) to run `cmd` as `switch_user` as necessary. If `switch_user` is neither the current user nor root, cmd will have to be flattened into a single string. A result might look like `['sudo', '--', 'su', '-s', '/bin/bash', '-c', cmd_as_a_string]`. """ current_uid = os.getuid() - if pwd.getpwuid(current_uid).pw_name != switch_user: - if switch_user != 'root': + if pwd.getpwuid(current_uid).pw_name != switch_user or force_su: + if switch_user != 'root' or force_su: cmd = ['/bin/su', switch_user, '-s', '/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=True)] - if current_uid != 0: # in order to use `/bin/su`, we have to be root first. + if current_uid != 0 or force_elevate: # in order to use `/bin/su`, we have to be root first. cmd = generate_cmd_elevated(cmd, elevation_method or ELEVATION_METHOD_DEFAULT) return cmd diff --git a/exec/test_cmd.py b/exec/test_cmd.py new file mode 100644 index 0000000..0316491 --- /dev/null +++ b/exec/test_cmd.py @@ -0,0 +1,70 @@ +import logging +import os +import pwd +import subprocess + +from .cmd import run_cmd, run_root_cmd, generate_cmd_su + + +def get_username(id: int): + return pwd.getpwuid(id).pw_name + + +def run_func(f, expected_user: str = None, **kwargs): + current_uid = os.getuid() + current_username = get_username(current_uid) + target_uid = current_uid + result = f(['id', '-u'], capture_output=True, **kwargs) + assert isinstance(result, subprocess.CompletedProcess) + result.check_returncode() + if expected_user and current_username != expected_user: + target_uid = pwd.getpwnam(expected_user).pw_uid + result_uid = result.stdout.decode() + assert int(result_uid) == target_uid + + +def run_generate_and_exec(script, generate_args={}, switch_user=None, **kwargs): + "runs generate_cmd_su() and executes the resulting argv" + if not switch_user: + switch_user = get_username(os.getuid()) + cmd = generate_cmd_su(script, switch_user=switch_user, **generate_args) + logging.debug(f'run_generate_and_exec: running {cmd}') + return subprocess.run( + cmd, + **kwargs, + ) + + +def test_generate_su_force_su(): + run_func(run_generate_and_exec, generate_args={'force_su': True}) + + +def test_generate_su_force_elevate(): + run_func(run_generate_and_exec, generate_args={'force_elevate': True}, expected_user='root', switch_user='root') + + +def test_generate_su_nobody_force_su(): + user = 'nobody' + run_func(run_generate_and_exec, expected_user=user, switch_user=user, generate_args={'force_su': True}) + + +def test_generate_su_nobody_force_su_and_elevate(): + user = 'nobody' + run_func(run_generate_and_exec, expected_user=user, switch_user=user, generate_args={'force_su': True, 'force_elevate': True}) + + +def test_run_cmd(): + run_func(run_cmd) + + +def test_run_cmd_su_nobody(): + user = 'nobody' + run_func(run_cmd, expected_user=user, switch_user=user) + + +def test_run_cmd_as_root(): + run_func(run_cmd, expected_user='root', switch_user='root') + + +def test_run_root_cmd(): + run_func(run_root_cmd, expected_user='root')