diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py b/.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py new file mode 100644 index 00000000..0ed8dd23 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py @@ -0,0 +1,138 @@ +import contextlib +import hashlib +import logging +import os +from types import TracebackType +from typing import Dict, Generator, Optional, Type, Union + +from pip._internal.req.req_install import InstallRequirement +from pip._internal.utils.temp_dir import TempDirectory + +logger = logging.getLogger(__name__) + + +@contextlib.contextmanager +def update_env_context_manager(**changes: str) -> Generator[None, None, None]: + target = os.environ + + # Save values from the target and change them. + non_existent_marker = object() + saved_values: Dict[str, Union[object, str]] = {} + for name, new_value in changes.items(): + try: + saved_values[name] = target[name] + except KeyError: + saved_values[name] = non_existent_marker + target[name] = new_value + + try: + yield + finally: + # Restore original values in the target. + for name, original_value in saved_values.items(): + if original_value is non_existent_marker: + del target[name] + else: + assert isinstance(original_value, str) # for mypy + target[name] = original_value + + +@contextlib.contextmanager +def get_build_tracker() -> Generator["BuildTracker", None, None]: + root = os.environ.get("PIP_BUILD_TRACKER") + with contextlib.ExitStack() as ctx: + if root is None: + root = ctx.enter_context(TempDirectory(kind="build-tracker")).path + ctx.enter_context(update_env_context_manager(PIP_BUILD_TRACKER=root)) + logger.debug("Initialized build tracking at %s", root) + + with BuildTracker(root) as tracker: + yield tracker + + +class TrackerId(str): + """Uniquely identifying string provided to the build tracker.""" + + +class BuildTracker: + """Ensure that an sdist cannot request itself as a setup requirement. + + When an sdist is prepared, it identifies its setup requirements in the + context of ``BuildTracker.track()``. If a requirement shows up recursively, this + raises an exception. + + This stops fork bombs embedded in malicious packages.""" + + def __init__(self, root: str) -> None: + self._root = root + self._entries: Dict[TrackerId, InstallRequirement] = {} + logger.debug("Created build tracker: %s", self._root) + + def __enter__(self) -> "BuildTracker": + logger.debug("Entered build tracker: %s", self._root) + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.cleanup() + + def _entry_path(self, key: TrackerId) -> str: + hashed = hashlib.sha224(key.encode()).hexdigest() + return os.path.join(self._root, hashed) + + def add(self, req: InstallRequirement, key: TrackerId) -> None: + """Add an InstallRequirement to build tracking.""" + + # Get the file to write information about this requirement. + entry_path = self._entry_path(key) + + # Try reading from the file. If it exists and can be read from, a build + # is already in progress, so a LookupError is raised. + try: + with open(entry_path) as fp: + contents = fp.read() + except FileNotFoundError: + pass + else: + message = f"{req.link} is already being built: {contents}" + raise LookupError(message) + + # If we're here, req should really not be building already. + assert key not in self._entries + + # Start tracking this requirement. + with open(entry_path, "w", encoding="utf-8") as fp: + fp.write(str(req)) + self._entries[key] = req + + logger.debug("Added %s to build tracker %r", req, self._root) + + def remove(self, req: InstallRequirement, key: TrackerId) -> None: + """Remove an InstallRequirement from build tracking.""" + + # Delete the created file and the corresponding entry. + os.unlink(self._entry_path(key)) + del self._entries[key] + + logger.debug("Removed %s from build tracker %r", req, self._root) + + def cleanup(self) -> None: + for key, req in list(self._entries.items()): + self.remove(req, key) + + logger.debug("Removed build tracker: %r", self._root) + + @contextlib.contextmanager + def track(self, req: InstallRequirement, key: str) -> Generator[None, None, None]: + """Ensure that `key` cannot install itself as a setup requirement. + + :raises LookupError: If `key` was already provided in a parent invocation of + the context introduced by this method.""" + tracker_id = TrackerId(key) + self.add(req, tracker_id) + yield + self.remove(req, tracker_id) |