1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
import contextlib
import hashlib
import logging
import os
from types import TracebackType
from typing import Dict, Generator, Optional, Type, Union
from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.temp_dir import TempDirectory
logger = logging.getLogger(__name__)
@contextlib.contextmanager
def update_env_context_manager(**changes: str) -> Generator[None, None, None]:
target = os.environ
# Save values from the target and change them.
non_existent_marker = object()
saved_values: Dict[str, Union[object, str]] = {}
for name, new_value in changes.items():
try:
saved_values[name] = target[name]
except KeyError:
saved_values[name] = non_existent_marker
target[name] = new_value
try:
yield
finally:
# Restore original values in the target.
for name, original_value in saved_values.items():
if original_value is non_existent_marker:
del target[name]
else:
assert isinstance(original_value, str) # for mypy
target[name] = original_value
@contextlib.contextmanager
def get_build_tracker() -> Generator["BuildTracker", None, None]:
root = os.environ.get("PIP_BUILD_TRACKER")
with contextlib.ExitStack() as ctx:
if root is None:
root = ctx.enter_context(TempDirectory(kind="build-tracker")).path
ctx.enter_context(update_env_context_manager(PIP_BUILD_TRACKER=root))
logger.debug("Initialized build tracking at %s", root)
with BuildTracker(root) as tracker:
yield tracker
class TrackerId(str):
"""Uniquely identifying string provided to the build tracker."""
class BuildTracker:
"""Ensure that an sdist cannot request itself as a setup requirement.
When an sdist is prepared, it identifies its setup requirements in the
context of ``BuildTracker.track()``. If a requirement shows up recursively, this
raises an exception.
This stops fork bombs embedded in malicious packages."""
def __init__(self, root: str) -> None:
self._root = root
self._entries: Dict[TrackerId, InstallRequirement] = {}
logger.debug("Created build tracker: %s", self._root)
def __enter__(self) -> "BuildTracker":
logger.debug("Entered build tracker: %s", self._root)
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.cleanup()
def _entry_path(self, key: TrackerId) -> str:
hashed = hashlib.sha224(key.encode()).hexdigest()
return os.path.join(self._root, hashed)
def add(self, req: InstallRequirement, key: TrackerId) -> None:
"""Add an InstallRequirement to build tracking."""
# Get the file to write information about this requirement.
entry_path = self._entry_path(key)
# Try reading from the file. If it exists and can be read from, a build
# is already in progress, so a LookupError is raised.
try:
with open(entry_path) as fp:
contents = fp.read()
except FileNotFoundError:
pass
else:
message = f"{req.link} is already being built: {contents}"
raise LookupError(message)
# If we're here, req should really not be building already.
assert key not in self._entries
# Start tracking this requirement.
with open(entry_path, "w", encoding="utf-8") as fp:
fp.write(str(req))
self._entries[key] = req
logger.debug("Added %s to build tracker %r", req, self._root)
def remove(self, req: InstallRequirement, key: TrackerId) -> None:
"""Remove an InstallRequirement from build tracking."""
# Delete the created file and the corresponding entry.
os.unlink(self._entry_path(key))
del self._entries[key]
logger.debug("Removed %s from build tracker %r", req, self._root)
def cleanup(self) -> None:
for key, req in list(self._entries.items()):
self.remove(req, key)
logger.debug("Removed build tracker: %r", self._root)
@contextlib.contextmanager
def track(self, req: InstallRequirement, key: str) -> Generator[None, None, None]:
"""Ensure that `key` cannot install itself as a setup requirement.
:raises LookupError: If `key` was already provided in a parent invocation of
the context introduced by this method."""
tracker_id = TrackerId(key)
self.add(req, tracker_id)
yield
self.remove(req, tracker_id)
|