about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py547
1 files changed, 547 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py b/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
new file mode 100644
index 00000000..2c3d0e30
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_vendor/resolvelib/resolvers.py
@@ -0,0 +1,547 @@
+import collections
+import itertools
+import operator
+
+from .providers import AbstractResolver
+from .structs import DirectedGraph, IteratorMapping, build_iter_view
+
+RequirementInformation = collections.namedtuple(
+    "RequirementInformation", ["requirement", "parent"]
+)
+
+
+class ResolverException(Exception):
+    """A base class for all exceptions raised by this module.
+
+    Exceptions derived by this class should all be handled in this module. Any
+    bubbling pass the resolver should be treated as a bug.
+    """
+
+
+class RequirementsConflicted(ResolverException):
+    def __init__(self, criterion):
+        super(RequirementsConflicted, self).__init__(criterion)
+        self.criterion = criterion
+
+    def __str__(self):
+        return "Requirements conflict: {}".format(
+            ", ".join(repr(r) for r in self.criterion.iter_requirement()),
+        )
+
+
+class InconsistentCandidate(ResolverException):
+    def __init__(self, candidate, criterion):
+        super(InconsistentCandidate, self).__init__(candidate, criterion)
+        self.candidate = candidate
+        self.criterion = criterion
+
+    def __str__(self):
+        return "Provided candidate {!r} does not satisfy {}".format(
+            self.candidate,
+            ", ".join(repr(r) for r in self.criterion.iter_requirement()),
+        )
+
+
+class Criterion(object):
+    """Representation of possible resolution results of a package.
+
+    This holds three attributes:
+
+    * `information` is a collection of `RequirementInformation` pairs.
+      Each pair is a requirement contributing to this criterion, and the
+      candidate that provides the requirement.
+    * `incompatibilities` is a collection of all known not-to-work candidates
+      to exclude from consideration.
+    * `candidates` is a collection containing all possible candidates deducted
+      from the union of contributing requirements and known incompatibilities.
+      It should never be empty, except when the criterion is an attribute of a
+      raised `RequirementsConflicted` (in which case it is always empty).
+
+    .. note::
+        This class is intended to be externally immutable. **Do not** mutate
+        any of its attribute containers.
+    """
+
+    def __init__(self, candidates, information, incompatibilities):
+        self.candidates = candidates
+        self.information = information
+        self.incompatibilities = incompatibilities
+
+    def __repr__(self):
+        requirements = ", ".join(
+            "({!r}, via={!r})".format(req, parent)
+            for req, parent in self.information
+        )
+        return "Criterion({})".format(requirements)
+
+    def iter_requirement(self):
+        return (i.requirement for i in self.information)
+
+    def iter_parent(self):
+        return (i.parent for i in self.information)
+
+
+class ResolutionError(ResolverException):
+    pass
+
+
+class ResolutionImpossible(ResolutionError):
+    def __init__(self, causes):
+        super(ResolutionImpossible, self).__init__(causes)
+        # causes is a list of RequirementInformation objects
+        self.causes = causes
+
+
+class ResolutionTooDeep(ResolutionError):
+    def __init__(self, round_count):
+        super(ResolutionTooDeep, self).__init__(round_count)
+        self.round_count = round_count
+
+
+# Resolution state in a round.
+State = collections.namedtuple("State", "mapping criteria backtrack_causes")
+
+
+class Resolution(object):
+    """Stateful resolution object.
+
+    This is designed as a one-off object that holds information to kick start
+    the resolution process, and holds the results afterwards.
+    """
+
+    def __init__(self, provider, reporter):
+        self._p = provider
+        self._r = reporter
+        self._states = []
+
+    @property
+    def state(self):
+        try:
+            return self._states[-1]
+        except IndexError:
+            raise AttributeError("state")
+
+    def _push_new_state(self):
+        """Push a new state into history.
+
+        This new state will be used to hold resolution results of the next
+        coming round.
+        """
+        base = self._states[-1]
+        state = State(
+            mapping=base.mapping.copy(),
+            criteria=base.criteria.copy(),
+            backtrack_causes=base.backtrack_causes[:],
+        )
+        self._states.append(state)
+
+    def _add_to_criteria(self, criteria, requirement, parent):
+        self._r.adding_requirement(requirement=requirement, parent=parent)
+
+        identifier = self._p.identify(requirement_or_candidate=requirement)
+        criterion = criteria.get(identifier)
+        if criterion:
+            incompatibilities = list(criterion.incompatibilities)
+        else:
+            incompatibilities = []
+
+        matches = self._p.find_matches(
+            identifier=identifier,
+            requirements=IteratorMapping(
+                criteria,
+                operator.methodcaller("iter_requirement"),
+                {identifier: [requirement]},
+            ),
+            incompatibilities=IteratorMapping(
+                criteria,
+                operator.attrgetter("incompatibilities"),
+                {identifier: incompatibilities},
+            ),
+        )
+
+        if criterion:
+            information = list(criterion.information)
+            information.append(RequirementInformation(requirement, parent))
+        else:
+            information = [RequirementInformation(requirement, parent)]
+
+        criterion = Criterion(
+            candidates=build_iter_view(matches),
+            information=information,
+            incompatibilities=incompatibilities,
+        )
+        if not criterion.candidates:
+            raise RequirementsConflicted(criterion)
+        criteria[identifier] = criterion
+
+    def _remove_information_from_criteria(self, criteria, parents):
+        """Remove information from parents of criteria.
+
+        Concretely, removes all values from each criterion's ``information``
+        field that have one of ``parents`` as provider of the requirement.
+
+        :param criteria: The criteria to update.
+        :param parents: Identifiers for which to remove information from all criteria.
+        """
+        if not parents:
+            return
+        for key, criterion in criteria.items():
+            criteria[key] = Criterion(
+                criterion.candidates,
+                [
+                    information
+                    for information in criterion.information
+                    if (
+                        information.parent is None
+                        or self._p.identify(information.parent) not in parents
+                    )
+                ],
+                criterion.incompatibilities,
+            )
+
+    def _get_preference(self, name):
+        return self._p.get_preference(
+            identifier=name,
+            resolutions=self.state.mapping,
+            candidates=IteratorMapping(
+                self.state.criteria,
+                operator.attrgetter("candidates"),
+            ),
+            information=IteratorMapping(
+                self.state.criteria,
+                operator.attrgetter("information"),
+            ),
+            backtrack_causes=self.state.backtrack_causes,
+        )
+
+    def _is_current_pin_satisfying(self, name, criterion):
+        try:
+            current_pin = self.state.mapping[name]
+        except KeyError:
+            return False
+        return all(
+            self._p.is_satisfied_by(requirement=r, candidate=current_pin)
+            for r in criterion.iter_requirement()
+        )
+
+    def _get_updated_criteria(self, candidate):
+        criteria = self.state.criteria.copy()
+        for requirement in self._p.get_dependencies(candidate=candidate):
+            self._add_to_criteria(criteria, requirement, parent=candidate)
+        return criteria
+
+    def _attempt_to_pin_criterion(self, name):
+        criterion = self.state.criteria[name]
+
+        causes = []
+        for candidate in criterion.candidates:
+            try:
+                criteria = self._get_updated_criteria(candidate)
+            except RequirementsConflicted as e:
+                self._r.rejecting_candidate(e.criterion, candidate)
+                causes.append(e.criterion)
+                continue
+
+            # Check the newly-pinned candidate actually works. This should
+            # always pass under normal circumstances, but in the case of a
+            # faulty provider, we will raise an error to notify the implementer
+            # to fix find_matches() and/or is_satisfied_by().
+            satisfied = all(
+                self._p.is_satisfied_by(requirement=r, candidate=candidate)
+                for r in criterion.iter_requirement()
+            )
+            if not satisfied:
+                raise InconsistentCandidate(candidate, criterion)
+
+            self._r.pinning(candidate=candidate)
+            self.state.criteria.update(criteria)
+
+            # Put newly-pinned candidate at the end. This is essential because
+            # backtracking looks at this mapping to get the last pin.
+            self.state.mapping.pop(name, None)
+            self.state.mapping[name] = candidate
+
+            return []
+
+        # All candidates tried, nothing works. This criterion is a dead
+        # end, signal for backtracking.
+        return causes
+
+    def _backjump(self, causes):
+        """Perform backjumping.
+
+        When we enter here, the stack is like this::
+
+            [ state Z ]
+            [ state Y ]
+            [ state X ]
+            .... earlier states are irrelevant.
+
+        1. No pins worked for Z, so it does not have a pin.
+        2. We want to reset state Y to unpinned, and pin another candidate.
+        3. State X holds what state Y was before the pin, but does not
+           have the incompatibility information gathered in state Y.
+
+        Each iteration of the loop will:
+
+        1.  Identify Z. The incompatibility is not always caused by the latest
+            state. For example, given three requirements A, B and C, with
+            dependencies A1, B1 and C1, where A1 and B1 are incompatible: the
+            last state might be related to C, so we want to discard the
+            previous state.
+        2.  Discard Z.
+        3.  Discard Y but remember its incompatibility information gathered
+            previously, and the failure we're dealing with right now.
+        4.  Push a new state Y' based on X, and apply the incompatibility
+            information from Y to Y'.
+        5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
+            the new Z and go back to step 2.
+        5b. If the incompatibilities apply cleanly, end backtracking.
+        """
+        incompatible_reqs = itertools.chain(
+            (c.parent for c in causes if c.parent is not None),
+            (c.requirement for c in causes),
+        )
+        incompatible_deps = {self._p.identify(r) for r in incompatible_reqs}
+        while len(self._states) >= 3:
+            # Remove the state that triggered backtracking.
+            del self._states[-1]
+
+            # Ensure to backtrack to a state that caused the incompatibility
+            incompatible_state = False
+            while not incompatible_state:
+                # Retrieve the last candidate pin and known incompatibilities.
+                try:
+                    broken_state = self._states.pop()
+                    name, candidate = broken_state.mapping.popitem()
+                except (IndexError, KeyError):
+                    raise ResolutionImpossible(causes)
+                current_dependencies = {
+                    self._p.identify(d)
+                    for d in self._p.get_dependencies(candidate)
+                }
+                incompatible_state = not current_dependencies.isdisjoint(
+                    incompatible_deps
+                )
+
+            incompatibilities_from_broken = [
+                (k, list(v.incompatibilities))
+                for k, v in broken_state.criteria.items()
+            ]
+
+            # Also mark the newly known incompatibility.
+            incompatibilities_from_broken.append((name, [candidate]))
+
+            # Create a new state from the last known-to-work one, and apply
+            # the previously gathered incompatibility information.
+            def _patch_criteria():
+                for k, incompatibilities in incompatibilities_from_broken:
+                    if not incompatibilities:
+                        continue
+                    try:
+                        criterion = self.state.criteria[k]
+                    except KeyError:
+                        continue
+                    matches = self._p.find_matches(
+                        identifier=k,
+                        requirements=IteratorMapping(
+                            self.state.criteria,
+                            operator.methodcaller("iter_requirement"),
+                        ),
+                        incompatibilities=IteratorMapping(
+                            self.state.criteria,
+                            operator.attrgetter("incompatibilities"),
+                            {k: incompatibilities},
+                        ),
+                    )
+                    candidates = build_iter_view(matches)
+                    if not candidates:
+                        return False
+                    incompatibilities.extend(criterion.incompatibilities)
+                    self.state.criteria[k] = Criterion(
+                        candidates=candidates,
+                        information=list(criterion.information),
+                        incompatibilities=incompatibilities,
+                    )
+                return True
+
+            self._push_new_state()
+            success = _patch_criteria()
+
+            # It works! Let's work on this new state.
+            if success:
+                return True
+
+            # State does not work after applying known incompatibilities.
+            # Try the still previous state.
+
+        # No way to backtrack anymore.
+        return False
+
+    def resolve(self, requirements, max_rounds):
+        if self._states:
+            raise RuntimeError("already resolved")
+
+        self._r.starting()
+
+        # Initialize the root state.
+        self._states = [
+            State(
+                mapping=collections.OrderedDict(),
+                criteria={},
+                backtrack_causes=[],
+            )
+        ]
+        for r in requirements:
+            try:
+                self._add_to_criteria(self.state.criteria, r, parent=None)
+            except RequirementsConflicted as e:
+                raise ResolutionImpossible(e.criterion.information)
+
+        # The root state is saved as a sentinel so the first ever pin can have
+        # something to backtrack to if it fails. The root state is basically
+        # pinning the virtual "root" package in the graph.
+        self._push_new_state()
+
+        for round_index in range(max_rounds):
+            self._r.starting_round(index=round_index)
+
+            unsatisfied_names = [
+                key
+                for key, criterion in self.state.criteria.items()
+                if not self._is_current_pin_satisfying(key, criterion)
+            ]
+
+            # All criteria are accounted for. Nothing more to pin, we are done!
+            if not unsatisfied_names:
+                self._r.ending(state=self.state)
+                return self.state
+
+            # keep track of satisfied names to calculate diff after pinning
+            satisfied_names = set(self.state.criteria.keys()) - set(
+                unsatisfied_names
+            )
+
+            # Choose the most preferred unpinned criterion to try.
+            name = min(unsatisfied_names, key=self._get_preference)
+            failure_causes = self._attempt_to_pin_criterion(name)
+
+            if failure_causes:
+                causes = [i for c in failure_causes for i in c.information]
+                # Backjump if pinning fails. The backjump process puts us in
+                # an unpinned state, so we can work on it in the next round.
+                self._r.resolving_conflicts(causes=causes)
+                success = self._backjump(causes)
+                self.state.backtrack_causes[:] = causes
+
+                # Dead ends everywhere. Give up.
+                if not success:
+                    raise ResolutionImpossible(self.state.backtrack_causes)
+            else:
+                # discard as information sources any invalidated names
+                # (unsatisfied names that were previously satisfied)
+                newly_unsatisfied_names = {
+                    key
+                    for key, criterion in self.state.criteria.items()
+                    if key in satisfied_names
+                    and not self._is_current_pin_satisfying(key, criterion)
+                }
+                self._remove_information_from_criteria(
+                    self.state.criteria, newly_unsatisfied_names
+                )
+                # Pinning was successful. Push a new state to do another pin.
+                self._push_new_state()
+
+            self._r.ending_round(index=round_index, state=self.state)
+
+        raise ResolutionTooDeep(max_rounds)
+
+
+def _has_route_to_root(criteria, key, all_keys, connected):
+    if key in connected:
+        return True
+    if key not in criteria:
+        return False
+    for p in criteria[key].iter_parent():
+        try:
+            pkey = all_keys[id(p)]
+        except KeyError:
+            continue
+        if pkey in connected:
+            connected.add(key)
+            return True
+        if _has_route_to_root(criteria, pkey, all_keys, connected):
+            connected.add(key)
+            return True
+    return False
+
+
+Result = collections.namedtuple("Result", "mapping graph criteria")
+
+
+def _build_result(state):
+    mapping = state.mapping
+    all_keys = {id(v): k for k, v in mapping.items()}
+    all_keys[id(None)] = None
+
+    graph = DirectedGraph()
+    graph.add(None)  # Sentinel as root dependencies' parent.
+
+    connected = {None}
+    for key, criterion in state.criteria.items():
+        if not _has_route_to_root(state.criteria, key, all_keys, connected):
+            continue
+        if key not in graph:
+            graph.add(key)
+        for p in criterion.iter_parent():
+            try:
+                pkey = all_keys[id(p)]
+            except KeyError:
+                continue
+            if pkey not in graph:
+                graph.add(pkey)
+            graph.connect(pkey, key)
+
+    return Result(
+        mapping={k: v for k, v in mapping.items() if k in connected},
+        graph=graph,
+        criteria=state.criteria,
+    )
+
+
+class Resolver(AbstractResolver):
+    """The thing that performs the actual resolution work."""
+
+    base_exception = ResolverException
+
+    def resolve(self, requirements, max_rounds=100):
+        """Take a collection of constraints, spit out the resolution result.
+
+        The return value is a representation to the final resolution result. It
+        is a tuple subclass with three public members:
+
+        * `mapping`: A dict of resolved candidates. Each key is an identifier
+            of a requirement (as returned by the provider's `identify` method),
+            and the value is the resolved candidate.
+        * `graph`: A `DirectedGraph` instance representing the dependency tree.
+            The vertices are keys of `mapping`, and each edge represents *why*
+            a particular package is included. A special vertex `None` is
+            included to represent parents of user-supplied requirements.
+        * `criteria`: A dict of "criteria" that hold detailed information on
+            how edges in the graph are derived. Each key is an identifier of a
+            requirement, and the value is a `Criterion` instance.
+
+        The following exceptions may be raised if a resolution cannot be found:
+
+        * `ResolutionImpossible`: A resolution cannot be found for the given
+            combination of requirements. The `causes` attribute of the
+            exception is a list of (requirement, parent), giving the
+            requirements that could not be satisfied.
+        * `ResolutionTooDeep`: The dependency tree is too deeply nested and
+            the resolver gave up. This is usually caused by a circular
+            dependency, but you can try to resolve this by increasing the
+            `max_rounds` argument.
+        """
+        resolution = Resolution(self.provider, self.reporter)
+        state = resolution.resolve(requirements, max_rounds=max_rounds)
+        return _build_result(state)