mirror of
https://gitlab.com/kupfer/kupferbootstrap.git
synced 2025-02-22 21:25:43 -05:00
chroot: extract run_cmd() to new exec.py, use in utils.py and chroot/abstract.py
This commit is contained in:
parent
e3ad2edc69
commit
66ac56d715
3 changed files with 123 additions and 22 deletions
|
@ -7,6 +7,7 @@ from shlex import quote as shell_quote
|
|||
from typing import Protocol, Union, Optional, Mapping
|
||||
from uuid import uuid4
|
||||
|
||||
from exec import run_root_cmd, generate_env_cmd, flatten_shell_script, wrap_in_bash
|
||||
from config import config
|
||||
from constants import Arch, CHROOT_PATHS
|
||||
from distro.distro import get_base_distro, get_kupfer_local, RepoInfo
|
||||
|
@ -226,27 +227,15 @@ class Chroot(AbstractChroot):
|
|||
raise Exception(f'Chroot {self.name} is inactive, not running command! Hint: pass `fail_inactive=False`')
|
||||
if outer_env is None:
|
||||
outer_env = os.environ.copy()
|
||||
env_cmd = ['/usr/bin/env'] + [f'{shell_quote(key)}={shell_quote(value)}' for key, value in inner_env.items()]
|
||||
kwargs: dict = {
|
||||
'env': outer_env,
|
||||
}
|
||||
if not attach_tty:
|
||||
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
|
||||
env_cmd = generate_env_cmd(inner_env)
|
||||
|
||||
if not isinstance(script, str) and isinstance(script, list):
|
||||
script = ' '.join(script)
|
||||
script = flatten_shell_script(script, shell_quote_items=False, wrap_in_shell_quote=False)
|
||||
if cwd:
|
||||
script = f"cd {shell_quote(cwd)} && ( {script} )"
|
||||
cmd = ['chroot', self.path] + env_cmd + [
|
||||
'/bin/bash',
|
||||
'-c',
|
||||
script,
|
||||
]
|
||||
logging.debug(f'{self.name}: Running cmd: "{cmd}"')
|
||||
if attach_tty:
|
||||
return subprocess.call(cmd, **kwargs)
|
||||
else:
|
||||
return subprocess.run(cmd, **kwargs)
|
||||
cmd = flatten_shell_script(['chroot', self.path] + env_cmd + wrap_in_bash(script, flatten_result=False), shell_quote_items=True)
|
||||
|
||||
return run_root_cmd(cmd, env=outer_env, attach_tty=attach_tty, capture_output=capture_output, stdout=stdout)
|
||||
|
||||
def mount_pkgbuilds(self, fail_if_mounted: bool = False) -> str:
|
||||
return self.mount(
|
||||
|
|
108
exec.py
Normal file
108
exec.py
Normal file
|
@ -0,0 +1,108 @@
|
|||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import subprocess
|
||||
|
||||
from shlex import quote as shell_quote
|
||||
from typing import Optional, Union, TypeAlias
|
||||
|
||||
ElevationMethod: TypeAlias = str
|
||||
|
||||
# as long as **only** sudo is supported, hardcode the default into ELEVATION_METHOD_DEFAULT.
|
||||
# when other methods are added, all mentions of ELEVATION_METHOD_DEFAULT should be replaced by a config key.
|
||||
|
||||
ELEVATION_METHOD_DEFAULT = "sudo"
|
||||
|
||||
ELEVATION_METHODS: dict[ElevationMethod, list[str]] = {
|
||||
"sudo": ['sudo', '--'],
|
||||
}
|
||||
|
||||
|
||||
def generate_env_cmd(env: dict[str, str]):
|
||||
return ['/usr/bin/env'] + [f'{key}={value}' for key, value in env.items()]
|
||||
|
||||
|
||||
def flatten_shell_script(script: Union[list[str], str], shell_quote_items: bool = False, wrap_in_shell_quote=False) -> str:
|
||||
"""
|
||||
takes a shell-script and returns a flattened string for consumption with `sh -c`.
|
||||
|
||||
`shell_quote_items` should only be used on `script` arrays that have no shell magic anymore,
|
||||
e.g. `['bash', '-c', 'echo $USER']`, which would return the string `'bash' '-c' 'echo user'`,
|
||||
which is suited for consumption by another bash -c process.
|
||||
"""
|
||||
if not isinstance(script, str) and isinstance(script, list):
|
||||
cmds = script
|
||||
if shell_quote_items:
|
||||
cmds = [shell_quote(i) for i in cmds]
|
||||
script = " ".join(cmds)
|
||||
if wrap_in_shell_quote:
|
||||
script = shell_quote(script)
|
||||
return script
|
||||
|
||||
|
||||
def wrap_in_bash(cmd: Union[list[str], str], flatten_result=True) -> Union[str, list[str]]:
|
||||
res: Union[str, list[str]] = ['/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=False, wrap_in_shell_quote=False)]
|
||||
if flatten_result:
|
||||
res = flatten_shell_script(res, shell_quote_items=True, wrap_in_shell_quote=False)
|
||||
return res
|
||||
|
||||
|
||||
def generate_cmd_elevated(cmd: list[str], elevation_method: ElevationMethod):
|
||||
"wraps `cmd` in the necessary commands to escalate, e.g. `['sudo', '--', cmd]`."
|
||||
if elevation_method not in ELEVATION_METHODS:
|
||||
raise Exception(f"Unknown elevation method {elevation_method}")
|
||||
return ELEVATION_METHODS[elevation_method] + cmd
|
||||
|
||||
|
||||
def generate_cmd_su(cmd: list[str], switch_user: str, elevation_method: Optional[ElevationMethod] = None):
|
||||
"""
|
||||
returns cmd to escalate (e.g. sudo) and switch users (su) to run `cmd` as `switch_user` as necessary.
|
||||
If `switch_user` is neither the current user nor root, cmd will have to be flattened into a single string.
|
||||
A result might look like `['sudo', '--', 'su', '-s', '/bin/bash', '-c', cmd_as_a_string]`.
|
||||
"""
|
||||
current_uid = os.getuid()
|
||||
if pwd.getpwuid(current_uid).pw_name != switch_user:
|
||||
if switch_user != 'root':
|
||||
cmd = ['/bin/su', switch_user, '-s', '/bin/bash', '-c', flatten_shell_script(cmd, shell_quote_items=True)]
|
||||
if current_uid != 0: # in order to use `/bin/su`, we have to be root first.
|
||||
cmd = generate_cmd_elevated(cmd, elevation_method or ELEVATION_METHOD_DEFAULT)
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def run_cmd(
|
||||
script: Union[str, list[str]],
|
||||
env: dict[str, str] = {},
|
||||
attach_tty: bool = False,
|
||||
capture_output: bool = False,
|
||||
cwd: Optional[str] = None,
|
||||
stdout: Optional[int] = None,
|
||||
switch_user: Optional[str] = None,
|
||||
elevation_method: Optional[ElevationMethod] = None,
|
||||
) -> Union[int, subprocess.CompletedProcess]:
|
||||
"execute `script` as `switch_user`, elevating and su'ing as necessary"
|
||||
kwargs: dict = {}
|
||||
env_cmd = []
|
||||
if env:
|
||||
env_cmd = generate_env_cmd(env)
|
||||
kwargs['env'] = env
|
||||
if not attach_tty:
|
||||
kwargs |= {'stdout': stdout} if stdout else {'capture_output': capture_output}
|
||||
|
||||
script = flatten_shell_script(script)
|
||||
if cwd:
|
||||
kwargs['cwd'] = cwd
|
||||
wrapped_script: list[str] = wrap_in_bash(script, flatten_result=False) # type: ignore
|
||||
cmd = env_cmd + wrapped_script
|
||||
if switch_user:
|
||||
cmd = generate_cmd_su(cmd, switch_user, elevation_method=elevation_method)
|
||||
logging.debug(f'Running cmd: "{cmd}"')
|
||||
if attach_tty:
|
||||
return subprocess.call(cmd, **kwargs)
|
||||
else:
|
||||
return subprocess.run(cmd, **kwargs)
|
||||
|
||||
|
||||
def run_root_cmd(*kargs, **kwargs):
|
||||
kwargs['switch_user'] = 'root'
|
||||
return run_cmd(*kargs, **kwargs)
|
14
utils.py
14
utils.py
|
@ -4,6 +4,8 @@ import subprocess
|
|||
from shutil import which
|
||||
from typing import Optional, Union, Sequence
|
||||
|
||||
from exec import run_cmd, run_root_cmd
|
||||
|
||||
|
||||
def programs_available(programs: Union[str, Sequence[str]]) -> bool:
|
||||
if type(programs) is str:
|
||||
|
@ -15,7 +17,7 @@ def programs_available(programs: Union[str, Sequence[str]]) -> bool:
|
|||
|
||||
|
||||
def umount(dest: str, lazy=False):
|
||||
return subprocess.run(
|
||||
return run_root_cmd(
|
||||
[
|
||||
'umount',
|
||||
'-c' + ('l' if lazy else ''),
|
||||
|
@ -33,7 +35,7 @@ def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[
|
|||
if fs_type:
|
||||
opts += ['-t', fs_type]
|
||||
|
||||
result = subprocess.run(
|
||||
result = run_root_cmd(
|
||||
['mount'] + opts + [
|
||||
src,
|
||||
dest,
|
||||
|
@ -46,7 +48,7 @@ def mount(src: str, dest: str, options: list[str] = ['bind'], fs_type: Optional[
|
|||
|
||||
|
||||
def check_findmnt(path: str):
|
||||
result = subprocess.run(
|
||||
result = run_root_cmd(
|
||||
[
|
||||
'findmnt',
|
||||
'-n',
|
||||
|
@ -59,8 +61,10 @@ def check_findmnt(path: str):
|
|||
return result.stdout.decode().strip()
|
||||
|
||||
|
||||
def git(cmd: list[str], dir='.', capture_output=False) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(['git'] + cmd, cwd=dir, capture_output=capture_output)
|
||||
def git(cmd: list[str], dir='.', capture_output=False, user: Optional[str] = None) -> subprocess.CompletedProcess:
|
||||
result = run_cmd(['git'] + cmd, cwd=dir, capture_output=capture_output, switch_user=user)
|
||||
assert isinstance(result, subprocess.CompletedProcess)
|
||||
return result
|
||||
|
||||
|
||||
def log_or_exception(raise_exception: bool, msg: str, exc_class=Exception, log_level=logging.WARNING):
|
||||
|
|
Loading…
Add table
Reference in a new issue