diff --git a/exec/file.py b/exec/file.py new file mode 100644 index 0000000..2772c86 --- /dev/null +++ b/exec/file.py @@ -0,0 +1,108 @@ +import logging +import os +import stat +import subprocess + +from typing import Optional, Union + +from .cmd import run_root_cmd, elevation_noop, generate_cmd_su, wrap_in_bash, shell_quote +from utils import get_user_name, get_group_name + + +def try_native_filewrite(path: str, content: Union[str, bytes], chmod: Optional[str] = None) -> Optional[Exception]: + "try writing with python open(), return None on success, return(!) Exception on failure" + bflag = 'b' if isinstance(content, bytes) else '' + try: + kwargs = {} + if chmod: + kwargs['mode'] = chmod + descriptor = os.open(path, **kwargs) # type: ignore + with open(descriptor, 'w' + bflag) as f: + f.write(content) + except Exception as ex: + return ex + return None + + +def chown(path: str, user: Optional[Union[str, int]] = None, group: Optional[Union[str, int]] = None): + owner = '' + if user is not None: + owner += get_user_name(user) + if group is not None: + owner += f':{get_group_name(group)}' + if owner: + result = run_root_cmd(["chown", owner, path]) + assert isinstance(result, subprocess.CompletedProcess) + if result.returncode: + raise Exception(f"Failed to change owner of '{path}' to '{owner}'") + + +def root_check_exists(path): + return os.path.exists(path) or run_root_cmd(['[', '-e', path, ']']).returncode == 0 + + +def root_check_is_dir(path): + return os.path.isdir(path) or run_root_cmd(['[', '-d', path, ']']) + + +def write_file( + path: str, + content: Union[str, bytes], + lazy: bool = True, + mode: Optional[str] = None, + user: Optional[str] = None, + group: Optional[str] = None, +): + chmod = '' + chown_user = get_user_name(user) if user else None + chown_group = get_group_name(group) if group else None + fstat: os.stat_result + exists = root_check_exists(path) + dirname = os.path.dirname(path) + if exists: + fstat = os.stat(path) + else: + chown_user = chown_user or get_user_name(os.getuid()) + chown_group = chown_group or get_group_name(os.getgid()) + dir_exists = root_check_exists(dirname) + if not dir_exists or not root_check_is_dir(dirname): + reason = "is not a directory" if dir_exists else "does not exist" + raise Exception(f"Error writing file {path}, parent dir {reason}") + if mode: + if not mode.isnumeric(): + raise Exception(f"Unknown file mode '{mode}' (must be numeric): {path}") + if not exists or stat.filemode(int(mode, 8)) != stat.filemode(fstat.st_mode): + chmod = mode + failed = try_native_filewrite(path, content, chmod) + if exists or failed: + if failed: + try: + elevation_noop(attach_tty=True) # avoid password prompt while writing file + logging.debug(f"Writing to {path} using elevated /bin/tee") + cmd: list[str] = generate_cmd_su(wrap_in_bash(f'tee {shell_quote(path)} >/dev/null', flatten_result=False), 'root') # type: ignore + assert isinstance(cmd, list) + s = subprocess.Popen( + cmd, + text=(not isinstance(content, bytes)), + stdin=subprocess.PIPE, + ) + s.communicate(content) + s.wait(300) # 5 minute timeout + if s.returncode: + raise Exception(f"Write command excited non-zero: {s.returncode}") + except Exception as ex: + logging.fatal(f"Writing to file '{path}' with elevated privileges failed") + raise ex + if chmod: + result = run_root_cmd(["chmod", chmod, path]) + assert isinstance(result, subprocess.CompletedProcess) + if result.returncode: + raise Exception(f"Failed to set mode of '{path}' to '{chmod}'") + + chown(path, chown_user, chown_group) + + +def root_write_file(*args, **kwargs): + kwargs['user'] = 'root' + kwargs['group'] = 'root' + return write_file(*args, **kwargs) diff --git a/exec/test_file.py b/exec/test_file.py new file mode 100644 index 0000000..5b2b606 --- /dev/null +++ b/exec/test_file.py @@ -0,0 +1,163 @@ +import pytest + +import os +import tempfile + +from typing import Union, Generator +from dataclasses import dataclass + +from .cmd import run_root_cmd +from .file import write_file, chown +from utils import get_gid, get_uid + + +@dataclass +class TempdirFillInfo(): + path: str + files: dict[str, str] + + +def get_tempdir(): + d = tempfile.mkdtemp() + assert os.path.exists(d) + return d + + +def remove_dir(d): + run_root_cmd(['rm', '-rf', d]).check_returncode() + + +def create_file(filepath, owner='root', group='root'): + assert not os.path.exists(filepath) + run_root_cmd(['touch', filepath]).check_returncode() + run_root_cmd(['chown', f'{owner}:{group}', filepath]).check_returncode() + + +@pytest.fixture +def tempdir(): + d = get_tempdir() + yield d + # cleanup, gets run after the test since we yield above + remove_dir(d) + + +@pytest.fixture +def tempdir_filled() -> Generator[TempdirFillInfo, None, None]: + d = get_tempdir() + contents = { + 'rootfile': { + 'owner': 'root', + 'group': 'root', + }, + 'userfile': { + 'owner': 'nobody', + 'group': 'nobody', + }, + } + res = TempdirFillInfo(path=d, files={}) + for p, opts in contents.items(): + path = os.path.join(d, p) + res.files[p] = path + create_file(path, **opts) + yield res + # cleanup, gets run after the test since we yield above + remove_dir(d) + + +def verify_ownership(filepath, user: Union[str, int], group: Union[str, int]): + uid = get_uid(user) + gid = get_gid(group) + assert os.path.exists(filepath) + fstat = os.stat(filepath) + assert fstat.st_uid == uid + assert fstat.st_gid == gid + + +def verify_content(filepath, content): + assert os.path.exists(filepath) + with open(filepath, 'r') as f: + assert f.read().strip() == content.strip() + + +@pytest.mark.parametrize("user,group", [('root', 'root'), ('nobody', 'nobody')]) +def test_chown(tempdir: str, user: str, group: str): + assert os.path.exists(tempdir) + target_uid = get_uid(user) + target_gid = get_gid(group) + chown(tempdir, target_uid, target_gid) + verify_ownership(tempdir, target_uid, target_gid) + + +def test_tempdir_filled_fixture(tempdir_filled: TempdirFillInfo): + files = tempdir_filled.files + assert files + assert 'rootfile' in files + assert 'userfile' in files + verify_ownership(files['rootfile'], 'root', 'root') + verify_ownership(files['userfile'], 'nobody', 'nobody') + + +def test_write_new_file_naive(tempdir: str): + assert os.path.exists(tempdir) + new = os.path.join(tempdir, 'newfiletest') + content = 'test12345' + assert not os.path.exists(new) + write_file(new, content) + verify_content(new, content) + verify_ownership(new, user=os.getuid(), group=os.getgid()) + + +def test_write_new_file_root(tempdir: str): + assert os.path.exists(tempdir) + new = os.path.join(tempdir, 'newfiletest') + content = 'test12345' + assert not os.path.exists(new) + write_file(new, content, user='root', group='root') + verify_content(new, content) + verify_ownership(new, user=0, group=0) + + +def test_write_new_file_user(tempdir: str): + user = 'nobody' + group = 'nobody' + assert os.path.exists(tempdir) + new = os.path.join(tempdir, 'newfiletest') + content = 'test12345' + assert not os.path.exists(new) + write_file(new, content, user=user, group=group) + assert os.path.exists(new) + verify_content(new, content) + verify_ownership(new, user=user, group=group) + + +def test_write_new_file_user_in_root_dir(tempdir: str): + assert os.path.exists(tempdir) + chown(tempdir, user='root', group='root') + run_root_cmd(['chmod', '755', tempdir]) + verify_ownership(tempdir, 'root', 'root') + test_write_new_file_user(tempdir) + + +def test_write_rootfile_naive(tempdir_filled: TempdirFillInfo): + files = tempdir_filled.files + assert 'rootfile' in files + p = files['rootfile'] + assert os.path.exists(p) + verify_ownership(p, 'root', 'root') + content = 'test123' + write_file(p, content) + verify_content(p, 'test123') + verify_ownership(p, 'root', 'root') + + +@pytest.mark.parametrize("user,group", [('root', 'root'), ('nobody', 'nobody')]) +def test_write_rootfile(tempdir_filled: TempdirFillInfo, user: str, group: str): + files = tempdir_filled.files + assert 'rootfile' in files + p = files['rootfile'] + assert os.path.exists(p) + verify_ownership(p, 'root', 'root') + content = 'test123' + write_file(p, content) + verify_content(p, 'test123') + verify_ownership(p, 'root', 'root')