aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py547
1 files changed, 547 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py b/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
new file mode 100644
index 00000000..2c3d0e30
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
@@ -0,0 +1,547 @@
+import collections
+import itertools
+import operator
+
+from .providers import AbstractResolver
+from .structs import DirectedGraph, IteratorMapping, build_iter_view
+
+RequirementInformation = collections.namedtuple(
+ "RequirementInformation", ["requirement", "parent"]
+)
+
+
+class ResolverException(Exception):
+ """A base class for all exceptions raised by this module.
+
+ Exceptions derived by this class should all be handled in this module. Any
+ bubbling pass the resolver should be treated as a bug.
+ """
+
+
+class RequirementsConflicted(ResolverException):
+ def __init__(self, criterion):
+ super(RequirementsConflicted, self).__init__(criterion)
+ self.criterion = criterion
+
+ def __str__(self):
+ return "Requirements conflict: {}".format(
+ ", ".join(repr(r) for r in self.criterion.iter_requirement()),
+ )
+
+
+class InconsistentCandidate(ResolverException):
+ def __init__(self, candidate, criterion):
+ super(InconsistentCandidate, self).__init__(candidate, criterion)
+ self.candidate = candidate
+ self.criterion = criterion
+
+ def __str__(self):
+ return "Provided candidate {!r} does not satisfy {}".format(
+ self.candidate,
+ ", ".join(repr(r) for r in self.criterion.iter_requirement()),
+ )
+
+
+class Criterion(object):
+ """Representation of possible resolution results of a package.
+
+ This holds three attributes:
+
+ * `information` is a collection of `RequirementInformation` pairs.
+ Each pair is a requirement contributing to this criterion, and the
+ candidate that provides the requirement.
+ * `incompatibilities` is a collection of all known not-to-work candidates
+ to exclude from consideration.
+ * `candidates` is a collection containing all possible candidates deducted
+ from the union of contributing requirements and known incompatibilities.
+ It should never be empty, except when the criterion is an attribute of a
+ raised `RequirementsConflicted` (in which case it is always empty).
+
+ .. note::
+ This class is intended to be externally immutable. **Do not** mutate
+ any of its attribute containers.
+ """
+
+ def __init__(self, candidates, information, incompatibilities):
+ self.candidates = candidates
+ self.information = information
+ self.incompatibilities = incompatibilities
+
+ def __repr__(self):
+ requirements = ", ".join(
+ "({!r}, via={!r})".format(req, parent)
+ for req, parent in self.information
+ )
+ return "Criterion({})".format(requirements)
+
+ def iter_requirement(self):
+ return (i.requirement for i in self.information)
+
+ def iter_parent(self):
+ return (i.parent for i in self.information)
+
+
+class ResolutionError(ResolverException):
+ pass
+
+
+class ResolutionImpossible(ResolutionError):
+ def __init__(self, causes):
+ super(ResolutionImpossible, self).__init__(causes)
+ # causes is a list of RequirementInformation objects
+ self.causes = causes
+
+
+class ResolutionTooDeep(ResolutionError):
+ def __init__(self, round_count):
+ super(ResolutionTooDeep, self).__init__(round_count)
+ self.round_count = round_count
+
+
+# Resolution state in a round.
+State = collections.namedtuple("State", "mapping criteria backtrack_causes")
+
+
+class Resolution(object):
+ """Stateful resolution object.
+
+ This is designed as a one-off object that holds information to kick start
+ the resolution process, and holds the results afterwards.
+ """
+
+ def __init__(self, provider, reporter):
+ self._p = provider
+ self._r = reporter
+ self._states = []
+
+ @property
+ def state(self):
+ try:
+ return self._states[-1]
+ except IndexError:
+ raise AttributeError("state")
+
+ def _push_new_state(self):
+ """Push a new state into history.
+
+ This new state will be used to hold resolution results of the next
+ coming round.
+ """
+ base = self._states[-1]
+ state = State(
+ mapping=base.mapping.copy(),
+ criteria=base.criteria.copy(),
+ backtrack_causes=base.backtrack_causes[:],
+ )
+ self._states.append(state)
+
+ def _add_to_criteria(self, criteria, requirement, parent):
+ self._r.adding_requirement(requirement=requirement, parent=parent)
+
+ identifier = self._p.identify(requirement_or_candidate=requirement)
+ criterion = criteria.get(identifier)
+ if criterion:
+ incompatibilities = list(criterion.incompatibilities)
+ else:
+ incompatibilities = []
+
+ matches = self._p.find_matches(
+ identifier=identifier,
+ requirements=IteratorMapping(
+ criteria,
+ operator.methodcaller("iter_requirement"),
+ {identifier: [requirement]},
+ ),
+ incompatibilities=IteratorMapping(
+ criteria,
+ operator.attrgetter("incompatibilities"),
+ {identifier: incompatibilities},
+ ),
+ )
+
+ if criterion:
+ information = list(criterion.information)
+ information.append(RequirementInformation(requirement, parent))
+ else:
+ information = [RequirementInformation(requirement, parent)]
+
+ criterion = Criterion(
+ candidates=build_iter_view(matches),
+ information=information,
+ incompatibilities=incompatibilities,
+ )
+ if not criterion.candidates:
+ raise RequirementsConflicted(criterion)
+ criteria[identifier] = criterion
+
+ def _remove_information_from_criteria(self, criteria, parents):
+ """Remove information from parents of criteria.
+
+ Concretely, removes all values from each criterion's ``information``
+ field that have one of ``parents`` as provider of the requirement.
+
+ :param criteria: The criteria to update.
+ :param parents: Identifiers for which to remove information from all criteria.
+ """
+ if not parents:
+ return
+ for key, criterion in criteria.items():
+ criteria[key] = Criterion(
+ criterion.candidates,
+ [
+ information
+ for information in criterion.information
+ if (
+ information.parent is None
+ or self._p.identify(information.parent) not in parents
+ )
+ ],
+ criterion.incompatibilities,
+ )
+
+ def _get_preference(self, name):
+ return self._p.get_preference(
+ identifier=name,
+ resolutions=self.state.mapping,
+ candidates=IteratorMapping(
+ self.state.criteria,
+ operator.attrgetter("candidates"),
+ ),
+ information=IteratorMapping(
+ self.state.criteria,
+ operator.attrgetter("information"),
+ ),
+ backtrack_causes=self.state.backtrack_causes,
+ )
+
+ def _is_current_pin_satisfying(self, name, criterion):
+ try:
+ current_pin = self.state.mapping[name]
+ except KeyError:
+ return False
+ return all(
+ self._p.is_satisfied_by(requirement=r, candidate=current_pin)
+ for r in criterion.iter_requirement()
+ )
+
+ def _get_updated_criteria(self, candidate):
+ criteria = self.state.criteria.copy()
+ for requirement in self._p.get_dependencies(candidate=candidate):
+ self._add_to_criteria(criteria, requirement, parent=candidate)
+ return criteria
+
+ def _attempt_to_pin_criterion(self, name):
+ criterion = self.state.criteria[name]
+
+ causes = []
+ for candidate in criterion.candidates:
+ try:
+ criteria = self._get_updated_criteria(candidate)
+ except RequirementsConflicted as e:
+ self._r.rejecting_candidate(e.criterion, candidate)
+ causes.append(e.criterion)
+ continue
+
+ # Check the newly-pinned candidate actually works. This should
+ # always pass under normal circumstances, but in the case of a
+ # faulty provider, we will raise an error to notify the implementer
+ # to fix find_matches() and/or is_satisfied_by().
+ satisfied = all(
+ self._p.is_satisfied_by(requirement=r, candidate=candidate)
+ for r in criterion.iter_requirement()
+ )
+ if not satisfied:
+ raise InconsistentCandidate(candidate, criterion)
+
+ self._r.pinning(candidate=candidate)
+ self.state.criteria.update(criteria)
+
+ # Put newly-pinned candidate at the end. This is essential because
+ # backtracking looks at this mapping to get the last pin.
+ self.state.mapping.pop(name, None)
+ self.state.mapping[name] = candidate
+
+ return []
+
+ # All candidates tried, nothing works. This criterion is a dead
+ # end, signal for backtracking.
+ return causes
+
+ def _backjump(self, causes):
+ """Perform backjumping.
+
+ When we enter here, the stack is like this::
+
+ [ state Z ]
+ [ state Y ]
+ [ state X ]
+ .... earlier states are irrelevant.
+
+ 1. No pins worked for Z, so it does not have a pin.
+ 2. We want to reset state Y to unpinned, and pin another candidate.
+ 3. State X holds what state Y was before the pin, but does not
+ have the incompatibility information gathered in state Y.
+
+ Each iteration of the loop will:
+
+ 1. Identify Z. The incompatibility is not always caused by the latest
+ state. For example, given three requirements A, B and C, with
+ dependencies A1, B1 and C1, where A1 and B1 are incompatible: the
+ last state might be related to C, so we want to discard the
+ previous state.
+ 2. Discard Z.
+ 3. Discard Y but remember its incompatibility information gathered
+ previously, and the failure we're dealing with right now.
+ 4. Push a new state Y' based on X, and apply the incompatibility
+ information from Y to Y'.
+ 5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
+ the new Z and go back to step 2.
+ 5b. If the incompatibilities apply cleanly, end backtracking.
+ """
+ incompatible_reqs = itertools.chain(
+ (c.parent for c in causes if c.parent is not None),
+ (c.requirement for c in causes),
+ )
+ incompatible_deps = {self._p.identify(r) for r in incompatible_reqs}
+ while len(self._states) >= 3:
+ # Remove the state that triggered backtracking.
+ del self._states[-1]
+
+ # Ensure to backtrack to a state that caused the incompatibility
+ incompatible_state = False
+ while not incompatible_state:
+ # Retrieve the last candidate pin and known incompatibilities.
+ try:
+ broken_state = self._states.pop()
+ name, candidate = broken_state.mapping.popitem()
+ except (IndexError, KeyError):
+ raise ResolutionImpossible(causes)
+ current_dependencies = {
+ self._p.identify(d)
+ for d in self._p.get_dependencies(candidate)
+ }
+ incompatible_state = not current_dependencies.isdisjoint(
+ incompatible_deps
+ )
+
+ incompatibilities_from_broken = [
+ (k, list(v.incompatibilities))
+ for k, v in broken_state.criteria.items()
+ ]
+
+ # Also mark the newly known incompatibility.
+ incompatibilities_from_broken.append((name, [candidate]))
+
+ # Create a new state from the last known-to-work one, and apply
+ # the previously gathered incompatibility information.
+ def _patch_criteria():
+ for k, incompatibilities in incompatibilities_from_broken:
+ if not incompatibilities:
+ continue
+ try:
+ criterion = self.state.criteria[k]
+ except KeyError:
+ continue
+ matches = self._p.find_matches(
+ identifier=k,
+ requirements=IteratorMapping(
+ self.state.criteria,
+ operator.methodcaller("iter_requirement"),
+ ),
+ incompatibilities=IteratorMapping(
+ self.state.criteria,
+ operator.attrgetter("incompatibilities"),
+ {k: incompatibilities},
+ ),
+ )
+ candidates = build_iter_view(matches)
+ if not candidates:
+ return False
+ incompatibilities.extend(criterion.incompatibilities)
+ self.state.criteria[k] = Criterion(
+ candidates=candidates,
+ information=list(criterion.information),
+ incompatibilities=incompatibilities,
+ )
+ return True
+
+ self._push_new_state()
+ success = _patch_criteria()
+
+ # It works! Let's work on this new state.
+ if success:
+ return True
+
+ # State does not work after applying known incompatibilities.
+ # Try the still previous state.
+
+ # No way to backtrack anymore.
+ return False
+
+ def resolve(self, requirements, max_rounds):
+ if self._states:
+ raise RuntimeError("already resolved")
+
+ self._r.starting()
+
+ # Initialize the root state.
+ self._states = [
+ State(
+ mapping=collections.OrderedDict(),
+ criteria={},
+ backtrack_causes=[],
+ )
+ ]
+ for r in requirements:
+ try:
+ self._add_to_criteria(self.state.criteria, r, parent=None)
+ except RequirementsConflicted as e:
+ raise ResolutionImpossible(e.criterion.information)
+
+ # The root state is saved as a sentinel so the first ever pin can have
+ # something to backtrack to if it fails. The root state is basically
+ # pinning the virtual "root" package in the graph.
+ self._push_new_state()
+
+ for round_index in range(max_rounds):
+ self._r.starting_round(index=round_index)
+
+ unsatisfied_names = [
+ key
+ for key, criterion in self.state.criteria.items()
+ if not self._is_current_pin_satisfying(key, criterion)
+ ]
+
+ # All criteria are accounted for. Nothing more to pin, we are done!
+ if not unsatisfied_names:
+ self._r.ending(state=self.state)
+ return self.state
+
+ # keep track of satisfied names to calculate diff after pinning
+ satisfied_names = set(self.state.criteria.keys()) - set(
+ unsatisfied_names
+ )
+
+ # Choose the most preferred unpinned criterion to try.
+ name = min(unsatisfied_names, key=self._get_preference)
+ failure_causes = self._attempt_to_pin_criterion(name)
+
+ if failure_causes:
+ causes = [i for c in failure_causes for i in c.information]
+ # Backjump if pinning fails. The backjump process puts us in
+ # an unpinned state, so we can work on it in the next round.
+ self._r.resolving_conflicts(causes=causes)
+ success = self._backjump(causes)
+ self.state.backtrack_causes[:] = causes
+
+ # Dead ends everywhere. Give up.
+ if not success:
+ raise ResolutionImpossible(self.state.backtrack_causes)
+ else:
+ # discard as information sources any invalidated names
+ # (unsatisfied names that were previously satisfied)
+ newly_unsatisfied_names = {
+ key
+ for key, criterion in self.state.criteria.items()
+ if key in satisfied_names
+ and not self._is_current_pin_satisfying(key, criterion)
+ }
+ self._remove_information_from_criteria(
+ self.state.criteria, newly_unsatisfied_names
+ )
+ # Pinning was successful. Push a new state to do another pin.
+ self._push_new_state()
+
+ self._r.ending_round(index=round_index, state=self.state)
+
+ raise ResolutionTooDeep(max_rounds)
+
+
+def _has_route_to_root(criteria, key, all_keys, connected):
+ if key in connected:
+ return True
+ if key not in criteria:
+ return False
+ for p in criteria[key].iter_parent():
+ try:
+ pkey = all_keys[id(p)]
+ except KeyError:
+ continue
+ if pkey in connected:
+ connected.add(key)
+ return True
+ if _has_route_to_root(criteria, pkey, all_keys, connected):
+ connected.add(key)
+ return True
+ return False
+
+
+Result = collections.namedtuple("Result", "mapping graph criteria")
+
+
+def _build_result(state):
+ mapping = state.mapping
+ all_keys = {id(v): k for k, v in mapping.items()}
+ all_keys[id(None)] = None
+
+ graph = DirectedGraph()
+ graph.add(None) # Sentinel as root dependencies' parent.
+
+ connected = {None}
+ for key, criterion in state.criteria.items():
+ if not _has_route_to_root(state.criteria, key, all_keys, connected):
+ continue
+ if key not in graph:
+ graph.add(key)
+ for p in criterion.iter_parent():
+ try:
+ pkey = all_keys[id(p)]
+ except KeyError:
+ continue
+ if pkey not in graph:
+ graph.add(pkey)
+ graph.connect(pkey, key)
+
+ return Result(
+ mapping={k: v for k, v in mapping.items() if k in connected},
+ graph=graph,
+ criteria=state.criteria,
+ )
+
+
+class Resolver(AbstractResolver):
+ """The thing that performs the actual resolution work."""
+
+ base_exception = ResolverException
+
+ def resolve(self, requirements, max_rounds=100):
+ """Take a collection of constraints, spit out the resolution result.
+
+ The return value is a representation to the final resolution result. It
+ is a tuple subclass with three public members:
+
+ * `mapping`: A dict of resolved candidates. Each key is an identifier
+ of a requirement (as returned by the provider's `identify` method),
+ and the value is the resolved candidate.
+ * `graph`: A `DirectedGraph` instance representing the dependency tree.
+ The vertices are keys of `mapping`, and each edge represents *why*
+ a particular package is included. A special vertex `None` is
+ included to represent parents of user-supplied requirements.
+ * `criteria`: A dict of "criteria" that hold detailed information on
+ how edges in the graph are derived. Each key is an identifier of a
+ requirement, and the value is a `Criterion` instance.
+
+ The following exceptions may be raised if a resolution cannot be found:
+
+ * `ResolutionImpossible`: A resolution cannot be found for the given
+ combination of requirements. The `causes` attribute of the
+ exception is a list of (requirement, parent), giving the
+ requirements that could not be satisfied.
+ * `ResolutionTooDeep`: The dependency tree is too deeply nested and
+ the resolver gave up. This is usually caused by a circular
+ dependency, but you can try to resolve this by increasing the
+ `max_rounds` argument.
+ """
+ resolution = Resolution(self.provider, self.reporter)
+ state = resolution.resolve(requirements, max_rounds=max_rounds)
+ return _build_result(state)