PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
/
Editing: fd_ops.py
"""fd-based file operations for symlink-attack mitigation. All helpers in this module use O_NOFOLLOW and dir_fd-relative syscalls so that no path-based resolution can be redirected by a concurrent symlink swap. This module is intentionally kept separate from utils/__init__.py to avoid loading these OS-specific helpers into every agent component. """ import errno import logging import os import stat from contextlib import contextmanager, suppress from pathlib import Path logger = logging.getLogger(__name__) def rmtree_fd(dir_fd) -> None: """Remove all contents of a directory using fd-relative operations. Every entry is opened with ``O_NOFOLLOW`` so symlinks inside the tree are unlinked rather than followed. The directory referenced by *dir_fd* itself is **not** removed — the caller should ``os.rmdir()`` the parent entry after this call returns. Uses an iterative approach with an explicit stack to avoid hitting Python's recursion limit on adversarial deeply-nested trees. *dir_fd* must be an open ``O_RDONLY | O_DIRECTORY`` descriptor. """ # Each stack frame is (fd, name_to_rmdir_after_close) where # name_to_rmdir_after_close is the entry name that should be # rmdir'd from the parent once this fd is fully processed. # The initial fd is managed by the caller, so its rmdir entry is None. stack = [(dir_fd, None)] try: while stack: current_fd, _ = stack[-1] pushed = False with os.scandir(current_fd) as entries: for entry in entries: if entry.is_dir(follow_symlinks=False): child_fd = os.open( entry.name, os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW, dir_fd=current_fd, ) stack.append((child_fd, entry.name)) pushed = True break # restart scan from the new directory else: os.unlink(entry.name, dir_fd=current_fd) if not pushed: # All entries in current directory have been removed. fd, name = stack.pop() if name is not None: # Close the child fd and rmdir it from the parent. os.close(fd) parent_fd, _ = stack[-1] os.rmdir(name, dir_fd=parent_fd) except BaseException: # On error, close any fds we opened (but not the caller's dir_fd). for fd, name in stack: if name is not None: os.close(fd) raise def open_dir_no_symlinks(path) -> int: """Open a directory, refusing symlinks at every path component. Walks the absolute *path* one component at a time, opening each with ``O_NOFOLLOW | O_DIRECTORY`` relative to the parent fd. This guards against symlink attacks at *any* depth in the hierarchy, not just the leaf. Returns an ``O_RDONLY`` file descriptor for the final directory. The caller is responsible for closing it. """ path = os.path.abspath(os.fspath(path)) parts = Path(path).parts # ('/', 'home', 'user', ...) fd = os.open(parts[0], os.O_RDONLY | os.O_DIRECTORY) try: for part in parts[1:]: new_fd = os.open( part, os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW, dir_fd=fd, ) os.close(fd) fd = new_fd return fd except BaseException: os.close(fd) raise @contextmanager def open_nofollow(path, flags=os.O_RDONLY, *, dir_fd=None): """Open a file with O_NOFOLLOW, closing the fd on exit. Yields the raw file descriptor. Rejects symlinks at the leaf component (raises ELOOP). When *dir_fd* is provided, *path* is resolved relative to that directory descriptor. """ kw = {"dir_fd": dir_fd} if dir_fd is not None else {} fd = os.open(str(path), flags | os.O_NOFOLLOW, **kw) try: yield fd finally: os.close(fd) @contextmanager def safe_dir(path): """Open a directory with symlink protection, closing the fd on exit. Walks every path component with O_NOFOLLOW via open_dir_no_symlinks and yields the resulting fd. """ fd = open_dir_no_symlinks(path) try: yield fd finally: os.close(fd) def atomic_rewrite_fd( filename, data: bytes, *, uid, gid, allow_empty_content, permissions, dir_fd: int, ) -> bool: """dir_fd-relative implementation of atomic_rewrite. The caller opens the directory with O_NOFOLLOW before any file I/O begins. All file operations use dir_fd so that a concurrent rename of the directory to a symlink cannot redirect writes to a privileged path. """ _, basename = os.path.split(filename) # Read current content without following symlinks. try: content_fd = os.open( basename, os.O_RDONLY | os.O_NOFOLLOW, dir_fd=dir_fd ) with os.fdopen(content_fd, "rb") as f: old_content = f.read(len(data) + 1) if old_content == data: return False except FileNotFoundError: pass # file does not exist yet; will be created except OSError as exc: if exc.errno == errno.ELOOP: pass # existing entry is a symlink; overwrite it else: raise if not allow_empty_content and not data: logger.error("empty content: %r for file: %s", data, filename) return False if permissions is None: try: st = os.stat(basename, dir_fd=dir_fd, follow_symlinks=False) if stat.S_ISLNK(st.st_mode): raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), basename) permissions = stat.S_IMODE(st.st_mode) except FileNotFoundError: current_umask = os.umask(0) os.umask(current_umask) permissions = 0o666 & ~current_umask # Create temp file atomically inside the directory referenced by dir_fd. # O_NOFOLLOW + O_EXCL ensures the name cannot be a pre-existing symlink. tmp_basename = None tmp_fd = -1 for _ in range(100): tmp_basename = f"{basename}_{os.urandom(4).hex()}.i360edit" try: tmp_fd = os.open( tmp_basename, os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW, 0o600, dir_fd=dir_fd, ) break except FileExistsError: continue else: raise FileExistsError("Could not create temporary file (100 attempts)") try: view = memoryview(data) written = 0 while written < len(data): written += os.write(tmp_fd, view[written:]) if uid is not None and gid is not None: os.chown(tmp_fd, uid, gid) os.chmod(tmp_fd, permissions) os.fsync(tmp_fd) os.close(tmp_fd) tmp_fd = -1 # Atomic rename entirely within the directory we hold open. os.rename(tmp_basename, basename, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) tmp_basename = None # rename succeeded; no cleanup needed finally: if tmp_fd >= 0: os.close(tmp_fd) if tmp_basename is not None: with suppress(FileNotFoundError): os.unlink(tmp_basename, dir_fd=dir_fd) return True
SAVE
CANCEL