exec/file: add get_temp_dir() (for tempdirs without sticky bits) and chmod()

This commit is contained in:
InsanePrawn 2022-08-16 16:37:53 +02:00
parent 1f15d6705c
commit 0924ea298a
2 changed files with 60 additions and 15 deletions

View file

@ -1,9 +1,11 @@
import atexit
import logging import logging
import os import os
import stat import stat
import subprocess import subprocess
from shutil import rmtree from shutil import rmtree
from tempfile import mkdtemp
from typing import Optional, Union from typing import Optional, Union
from .cmd import run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote from .cmd import run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote
@ -39,6 +41,25 @@ def chown(path: str, user: Optional[Union[str, int]] = None, group: Optional[Uni
raise Exception(f"Failed to change owner of '{path}' to '{owner}'") raise Exception(f"Failed to change owner of '{path}' to '{owner}'")
def chmod(path, mode: Union[int, str] = 0o0755, force_sticky=True):
if not isinstance(mode, str):
octal = oct(mode)[2:]
else:
octal = mode
assert octal.isnumeric()
octal = octal.rjust(3, '0')
if force_sticky:
octal = octal.rjust(4, '0')
try:
os.chmod(path, mode=octal) # type: ignore
except:
cmd = ["chmod", octal, path]
result = run_root_cmd(cmd)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode:
raise Exception(f"Failed to set mode of '{path}' to '{chmod}'")
def root_check_exists(path): def root_check_exists(path):
return os.path.exists(path) or run_root_cmd(['[', '-e', path, ']']).returncode == 0 return os.path.exists(path) or run_root_cmd(['[', '-e', path, ']']).returncode == 0
@ -55,7 +76,7 @@ def write_file(
user: Optional[str] = None, user: Optional[str] = None,
group: Optional[str] = None, group: Optional[str] = None,
): ):
chmod = '' chmod_mode = ''
chown_user = get_user_name(user) if user else None chown_user = get_user_name(user) if user else None
chown_group = get_group_name(group) if group else None chown_group = get_group_name(group) if group else None
fstat: os.stat_result fstat: os.stat_result
@ -74,8 +95,8 @@ def write_file(
if not mode.isnumeric(): if not mode.isnumeric():
raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}") raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}")
if not exists or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode): if not exists or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode):
chmod = mode chmod_mode = mode
failed = try_native_filewrite(path, content, chmod) failed = try_native_filewrite(path, content, chmod_mode)
if exists or failed: if exists or failed:
if failed: if failed:
try: try:
@ -95,11 +116,8 @@ def write_file(
except Exception as ex: except Exception as ex:
logging.fatal(f"Writing to file '{path}' with elevated privileges failed") logging.fatal(f"Writing to file '{path}' with elevated privileges failed")
raise ex raise ex
if chmod: if chmod_mode:
result = run_root_cmd(["chmod", chmod, path]) chmod(path, chmod_mode)
assert isinstance(result, subprocess.CompletedProcess)
if result.returncode:
raise Exception(f"Failed to set mode of '{path}' to '{chmod}'")
chown(path, chown_user, chown_group) chown(path, chown_user, chown_group)
@ -142,3 +160,12 @@ def symlink(source, target):
os.symlink(source, target) os.symlink(source, target)
except: except:
run_root_cmd(['ln', '-s', source, target]) run_root_cmd(['ln', '-s', source, target])
def get_temp_dir(register_cleanup=True, mode: int = 0o0755):
"create a new tempdir and sanitize ownership so root can access user files as god intended"
t = mkdtemp()
chmod(t, mode)
if register_cleanup:
atexit.register(remove_file, t, recursive=True)
return t

View file

@ -1,15 +1,17 @@
import pytest import pytest
import os import os
import tempfile import stat
from typing import Union, Generator from typing import Union, Generator
from dataclasses import dataclass from dataclasses import dataclass
from .cmd import run_root_cmd from .cmd import run_root_cmd
from .file import write_file, chown from .file import chmod, chown, get_temp_dir, write_file
from utils import get_gid, get_uid from utils import get_gid, get_uid
TEMPDIR_MODE = 0o755
@dataclass @dataclass
class TempdirFillInfo(): class TempdirFillInfo():
@ -17,8 +19,8 @@ class TempdirFillInfo():
files: dict[str, str] files: dict[str, str]
def get_tempdir(): def _get_tempdir():
d = tempfile.mkdtemp() d = get_temp_dir(register_cleanup=False, mode=TEMPDIR_MODE)
assert os.path.exists(d) assert os.path.exists(d)
return d return d
@ -35,15 +37,21 @@ def create_file(filepath, owner='root', group='root'):
@pytest.fixture @pytest.fixture
def tempdir(): def tempdir():
d = get_tempdir() d = _get_tempdir()
yield d yield d
# cleanup, gets run after the test since we yield above # cleanup, gets run after the test since we yield above
remove_dir(d) remove_dir(d)
def test_get_tempdir(tempdir):
mode = os.stat(tempdir).st_mode
assert stat.S_ISDIR(mode)
assert stat.S_IMODE(mode) == TEMPDIR_MODE
@pytest.fixture @pytest.fixture
def tempdir_filled() -> Generator[TempdirFillInfo, None, None]: def tempdir_filled() -> Generator[TempdirFillInfo, None, None]:
d = get_tempdir() d = _get_tempdir()
contents = { contents = {
'rootfile': { 'rootfile': {
'owner': 'root', 'owner': 'root',
@ -73,6 +81,10 @@ def verify_ownership(filepath, user: Union[str, int], group: Union[str, int]):
assert fstat.st_gid == gid assert fstat.st_gid == gid
def verify_mode(filepath, mode: int = TEMPDIR_MODE):
assert stat.S_IMODE(os.stat(filepath).st_mode) == mode
def verify_content(filepath, content): def verify_content(filepath, content):
assert os.path.exists(filepath) assert os.path.exists(filepath)
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
@ -88,6 +100,13 @@ def test_chown(tempdir: str, user: str, group: str):
verify_ownership(tempdir, target_uid, target_gid) verify_ownership(tempdir, target_uid, target_gid)
@pytest.mark.parametrize("mode", [0, 0o700, 0o755, 0o600, 0o555])
def test_chmod(tempdir_filled, mode: int):
for filepath in tempdir_filled.files.values():
chmod(filepath, mode)
verify_mode(filepath, mode)
def test_tempdir_filled_fixture(tempdir_filled: TempdirFillInfo): def test_tempdir_filled_fixture(tempdir_filled: TempdirFillInfo):
files = tempdir_filled.files files = tempdir_filled.files
assert files assert files
@ -133,7 +152,6 @@ def test_write_new_file_user(tempdir: str):
def test_write_new_file_user_in_root_dir(tempdir: str): def test_write_new_file_user_in_root_dir(tempdir: str):
assert os.path.exists(tempdir) assert os.path.exists(tempdir)
chown(tempdir, user='root', group='root') chown(tempdir, user='root', group='root')
run_root_cmd(['chmod', '755', tempdir])
verify_ownership(tempdir, 'root', 'root') verify_ownership(tempdir, 'root', 'root')
test_write_new_file_user(tempdir) test_write_new_file_user(tempdir)