2022-08-15 02:13:55 +02:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import pwd
|
|
|
|
import subprocess
|
|
|
|
|
2022-12-09 05:32:31 +01:00
|
|
|
from subprocess import CompletedProcess # make it easy for users of this module
|
2022-08-15 02:13:55 +02:00
|
|
|
from shlex import quote as shell_quote
|
|
|
|
from typing import Optional, Union, TypeAlias
|
|
|
|
|
|
|
|
ElevationMethod: TypeAlias = str
|
|
|
|
|
|
|
|
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
|
|
|
|
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
|
|
|
|
|
|
|
|
ELEVATION_METHOD_DEFAULT = "sudo"
|
|
|
|
|
|
|
|
ELEVATION_METHODS: dict[ElevationMethod, list[str]] = {
|
2022-08-28 00:29:56 +02:00
|
|
|
"none": [],
|
2022-08-15 02:13:55 +02:00
|
|
|
"sudo": ['sudo', '--'],
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def generate_env_cmd(env: dict[str, str]):
|
|
|
|
return ['/usr/bin/env'] + [f'{key}={value}' for key, value in env.items()]
|
|
|
|
|
|
|
|
|
|
|
|
def flatten_shell_script(script: Union[list[str], str], shell_quote_items: bool = False, wrap_in_shell_quote=False) -> str:
|
|
|
|
"""
|
|
|
|
takes a shell-script and returns a flattened string for consumption with `sh -c`.
|
|
|
|
|
|
|
|
`shell_quote_items` should only be used on `script` arrays that have no shell magic anymore,
|
|
|
|
e.g. `['bash', '-c', 'echo $USER']`, which would return the string `'bash' '-c' 'echo user'`,
|
|
|
|
which is suited for consumption by another bash -c process.
|
|
|
|
"""
|
|
|
|
if not isinstance(script, str) and isinstance(script, list):
|
|
|
|
cmds = script
|
|
|
|
if shell_quote_items:
|
|
|
|
cmds = [shell_quote(i) for i in cmds]
|
|
|
|
script = " ".join(cmds)
|
|
|
|
if wrap_in_shell_quote:
|
|
|
|
script = shell_quote(script)
|
|
|
|
return script
|
|
|
|
|
|
|
|
|
|
|
|
def wrap_in_bash(cmd: Union[list[str], str], flatten_result=True) -> Union[str, list[str]]:
|
|
|
|
res: Union[str, list[str]] = ['/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=False, wrap_in_shell_quote=False)]
|
|
|
|
if flatten_result:
|
|
|
|
res = flatten_shell_script(res, shell_quote_items=True, wrap_in_shell_quote=False)
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
2022-08-28 02:12:05 +02:00
|
|
|
def generate_cmd_elevated(cmd: Union[list[str], str], elevation_method: ElevationMethod):
|
2022-08-15 02:13:55 +02:00
|
|
|
"wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`."
|
2022-08-28 02:12:05 +02:00
|
|
|
if isinstance(cmd, str):
|
|
|
|
cmd = wrap_in_bash(cmd, flatten_result=False)
|
|
|
|
assert not isinstance(cmd, str) # typhints cmd as list[str]
|
2022-08-15 02:13:55 +02:00
|
|
|
if elevation_method not in ELEVATION_METHODS:
|
|
|
|
raise Exception(f"Unknown elevation method {elevation_method}")
|
|
|
|
return ELEVATION_METHODS[elevation_method] + cmd
|
|
|
|
|
|
|
|
|
2022-08-15 18:54:25 +02:00
|
|
|
def generate_cmd_su(
|
2022-08-28 02:12:05 +02:00
|
|
|
cmd: Union[list[str], str],
|
2022-08-15 18:54:25 +02:00
|
|
|
switch_user: str,
|
|
|
|
elevation_method: Optional[ElevationMethod] = None,
|
|
|
|
force_su: bool = False,
|
|
|
|
force_elevate: bool = False,
|
|
|
|
):
|
2022-08-15 02:13:55 +02:00
|
|
|
"""
|
|
|
|
returns cmd to escalate (e.g. sudo) and switch users (su) to run `cmd` as `switch_user` as necessary.
|
|
|
|
If `switch_user` is neither the current user nor root, cmd will have to be flattened into a single string.
|
|
|
|
A result might look like `['sudo', '--', 'su', '-s', '/bin/bash', '-c', cmd_as_a_string]`.
|
|
|
|
"""
|
|
|
|
current_uid = os.getuid()
|
2022-08-15 18:54:25 +02:00
|
|
|
if pwd.getpwuid(current_uid).pw_name != switch_user or force_su:
|
|
|
|
if switch_user != 'root' or force_su:
|
2022-08-15 02:13:55 +02:00
|
|
|
cmd = ['/bin/su', switch_user, '-s', '/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=True)]
|
2022-08-15 18:54:25 +02:00
|
|
|
if current_uid != 0 or force_elevate: # in order to use `/bin/su`, we have to be root first.
|
2022-08-15 02:13:55 +02:00
|
|
|
cmd = generate_cmd_elevated(cmd, elevation_method or ELEVATION_METHOD_DEFAULT)
|
|
|
|
|
|
|
|
return cmd
|
|
|
|
|
|
|
|
|
|
|
|
def run_cmd(
|
|
|
|
script: Union[str, list[str]],
|
|
|
|
env: dict[str, str] = {},
|
|
|
|
attach_tty: bool = False,
|
|
|
|
capture_output: bool = False,
|
|
|
|
cwd: Optional[str] = None,
|
|
|
|
switch_user: Optional[str] = None,
|
|
|
|
elevation_method: Optional[ElevationMethod] = None,
|
2022-08-16 02:07:41 +02:00
|
|
|
stdout: Optional[int] = None,
|
|
|
|
stderr=None,
|
2022-12-09 05:32:31 +01:00
|
|
|
) -> Union[CompletedProcess, int]:
|
2022-08-15 02:13:55 +02:00
|
|
|
"execute `script` as `switch_user`, elevating and su'ing as necessary"
|
|
|
|
kwargs: dict = {}
|
|
|
|
env_cmd = []
|
|
|
|
if env:
|
|
|
|
env_cmd = generate_env_cmd(env)
|
|
|
|
kwargs['env'] = env
|
|
|
|
if not attach_tty:
|
|
|
|
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
|
2022-08-16 02:07:41 +02:00
|
|
|
if stderr:
|
|
|
|
kwargs['stderr'] = stderr
|
2022-08-15 02:13:55 +02:00
|
|
|
|
|
|
|
script = flatten_shell_script(script)
|
|
|
|
if cwd:
|
|
|
|
kwargs['cwd'] = cwd
|
|
|
|
wrapped_script: list[str] = wrap_in_bash(script, flatten_result=False) # type: ignore
|
|
|
|
cmd = env_cmd + wrapped_script
|
|
|
|
if switch_user:
|
|
|
|
cmd = generate_cmd_su(cmd, switch_user, elevation_method=elevation_method)
|
2022-09-11 00:25:57 +02:00
|
|
|
logging.debug(f'Running cmd: "{cmd}"' + (f' (path: {repr(cwd)})' if cwd else ''))
|
2022-08-15 02:13:55 +02:00
|
|
|
if attach_tty:
|
|
|
|
return subprocess.call(cmd, **kwargs)
|
|
|
|
else:
|
|
|
|
return subprocess.run(cmd, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
def run_root_cmd(*kargs, **kwargs):
|
|
|
|
kwargs['switch_user'] = 'root'
|
|
|
|
return run_cmd(*kargs, **kwargs)
|
2022-08-15 17:36:28 +02:00
|
|
|
|
|
|
|
|
|
|
|
def elevation_noop(**kwargs):
|
|
|
|
run_root_cmd('/bin/true', **kwargs)
|