aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pip/_internal/operations/build/build_tracker.py
blob: 0ed8dd235960bf406e26d5c0069b12c7f2d463e9 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import contextlib
import hashlib
import logging
import os
from types import TracebackType
from typing import Dict, Generator, Optional, Type, Union

from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.temp_dir import TempDirectory

logger = logging.getLogger(__name__)


@contextlib.contextmanager
def update_env_context_manager(**changes: str) -> Generator[None, None, None]:
    target = os.environ

    # Save values from the target and change them.
    non_existent_marker = object()
    saved_values: Dict[str, Union[object, str]] = {}
    for name, new_value in changes.items():
        try:
            saved_values[name] = target[name]
        except KeyError:
            saved_values[name] = non_existent_marker
        target[name] = new_value

    try:
        yield
    finally:
        # Restore original values in the target.
        for name, original_value in saved_values.items():
            if original_value is non_existent_marker:
                del target[name]
            else:
                assert isinstance(original_value, str)  # for mypy
                target[name] = original_value


@contextlib.contextmanager
def get_build_tracker() -> Generator["BuildTracker", None, None]:
    root = os.environ.get("PIP_BUILD_TRACKER")
    with contextlib.ExitStack() as ctx:
        if root is None:
            root = ctx.enter_context(TempDirectory(kind="build-tracker")).path
            ctx.enter_context(update_env_context_manager(PIP_BUILD_TRACKER=root))
            logger.debug("Initialized build tracking at %s", root)

        with BuildTracker(root) as tracker:
            yield tracker


class TrackerId(str):
    """Uniquely identifying string provided to the build tracker."""


class BuildTracker:
    """Ensure that an sdist cannot request itself as a setup requirement.

    When an sdist is prepared, it identifies its setup requirements in the
    context of ``BuildTracker.track()``. If a requirement shows up recursively, this
    raises an exception.

    This stops fork bombs embedded in malicious packages."""

    def __init__(self, root: str) -> None:
        self._root = root
        self._entries: Dict[TrackerId, InstallRequirement] = {}
        logger.debug("Created build tracker: %s", self._root)

    def __enter__(self) -> "BuildTracker":
        logger.debug("Entered build tracker: %s", self._root)
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        self.cleanup()

    def _entry_path(self, key: TrackerId) -> str:
        hashed = hashlib.sha224(key.encode()).hexdigest()
        return os.path.join(self._root, hashed)

    def add(self, req: InstallRequirement, key: TrackerId) -> None:
        """Add an InstallRequirement to build tracking."""

        # Get the file to write information about this requirement.
        entry_path = self._entry_path(key)

        # Try reading from the file. If it exists and can be read from, a build
        # is already in progress, so a LookupError is raised.
        try:
            with open(entry_path) as fp:
                contents = fp.read()
        except FileNotFoundError:
            pass
        else:
            message = f"{req.link} is already being built: {contents}"
            raise LookupError(message)

        # If we're here, req should really not be building already.
        assert key not in self._entries

        # Start tracking this requirement.
        with open(entry_path, "w", encoding="utf-8") as fp:
            fp.write(str(req))
        self._entries[key] = req

        logger.debug("Added %s to build tracker %r", req, self._root)

    def remove(self, req: InstallRequirement, key: TrackerId) -> None:
        """Remove an InstallRequirement from build tracking."""

        # Delete the created file and the corresponding entry.
        os.unlink(self._entry_path(key))
        del self._entries[key]

        logger.debug("Removed %s from build tracker %r", req, self._root)

    def cleanup(self) -> None:
        for key, req in list(self._entries.items()):
            self.remove(req, key)

        logger.debug("Removed build tracker: %r", self._root)

    @contextlib.contextmanager
    def track(self, req: InstallRequirement, key: str) -> Generator[None, None, None]:
        """Ensure that `key` cannot install itself as a setup requirement.

        :raises LookupError: If `key` was already provided in a parent invocation of
                             the context introduced by this method."""
        tracker_id = TrackerId(key)
        self.add(req, tracker_id)
        yield
        self.remove(req, tracker_id)