aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py b/.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py
new file mode 100644
index 00000000..c12beef0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/resolution/resolvelib/resolver.py
@@ -0,0 +1,317 @@
+import contextlib
+import functools
+import logging
+import os
+from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
+
+from pip._vendor.packaging.utils import canonicalize_name
+from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
+from pip._vendor.resolvelib import Resolver as RLResolver
+from pip._vendor.resolvelib.structs import DirectedGraph
+
+from pip._internal.cache import WheelCache
+from pip._internal.index.package_finder import PackageFinder
+from pip._internal.operations.prepare import RequirementPreparer
+from pip._internal.req.constructors import install_req_extend_extras
+from pip._internal.req.req_install import InstallRequirement
+from pip._internal.req.req_set import RequirementSet
+from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
+from pip._internal.resolution.resolvelib.provider import PipProvider
+from pip._internal.resolution.resolvelib.reporter import (
+ PipDebuggingReporter,
+ PipReporter,
+)
+from pip._internal.utils.packaging import get_requirement
+
+from .base import Candidate, Requirement
+from .factory import Factory
+
+if TYPE_CHECKING:
+ from pip._vendor.resolvelib.resolvers import Result as RLResult
+
+ Result = RLResult[Requirement, Candidate, str]
+
+
+logger = logging.getLogger(__name__)
+
+
+class Resolver(BaseResolver):
+ _allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
+
+ def __init__(
+ self,
+ preparer: RequirementPreparer,
+ finder: PackageFinder,
+ wheel_cache: Optional[WheelCache],
+ make_install_req: InstallRequirementProvider,
+ use_user_site: bool,
+ ignore_dependencies: bool,
+ ignore_installed: bool,
+ ignore_requires_python: bool,
+ force_reinstall: bool,
+ upgrade_strategy: str,
+ py_version_info: Optional[Tuple[int, ...]] = None,
+ ):
+ super().__init__()
+ assert upgrade_strategy in self._allowed_strategies
+
+ self.factory = Factory(
+ finder=finder,
+ preparer=preparer,
+ make_install_req=make_install_req,
+ wheel_cache=wheel_cache,
+ use_user_site=use_user_site,
+ force_reinstall=force_reinstall,
+ ignore_installed=ignore_installed,
+ ignore_requires_python=ignore_requires_python,
+ py_version_info=py_version_info,
+ )
+ self.ignore_dependencies = ignore_dependencies
+ self.upgrade_strategy = upgrade_strategy
+ self._result: Optional[Result] = None
+
+ def resolve(
+ self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
+ ) -> RequirementSet:
+ collected = self.factory.collect_root_requirements(root_reqs)
+ provider = PipProvider(
+ factory=self.factory,
+ constraints=collected.constraints,
+ ignore_dependencies=self.ignore_dependencies,
+ upgrade_strategy=self.upgrade_strategy,
+ user_requested=collected.user_requested,
+ )
+ if "PIP_RESOLVER_DEBUG" in os.environ:
+ reporter: BaseReporter = PipDebuggingReporter()
+ else:
+ reporter = PipReporter()
+ resolver: RLResolver[Requirement, Candidate, str] = RLResolver(
+ provider,
+ reporter,
+ )
+
+ try:
+ limit_how_complex_resolution_can_be = 200000
+ result = self._result = resolver.resolve(
+ collected.requirements, max_rounds=limit_how_complex_resolution_can_be
+ )
+
+ except ResolutionImpossible as e:
+ error = self.factory.get_installation_error(
+ cast("ResolutionImpossible[Requirement, Candidate]", e),
+ collected.constraints,
+ )
+ raise error from e
+
+ req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
+ # process candidates with extras last to ensure their base equivalent is
+ # already in the req_set if appropriate.
+ # Python's sort is stable so using a binary key function keeps relative order
+ # within both subsets.
+ for candidate in sorted(
+ result.mapping.values(), key=lambda c: c.name != c.project_name
+ ):
+ ireq = candidate.get_install_requirement()
+ if ireq is None:
+ if candidate.name != candidate.project_name:
+ # extend existing req's extras
+ with contextlib.suppress(KeyError):
+ req = req_set.get_requirement(candidate.project_name)
+ req_set.add_named_requirement(
+ install_req_extend_extras(
+ req, get_requirement(candidate.name).extras
+ )
+ )
+ continue
+
+ # Check if there is already an installation under the same name,
+ # and set a flag for later stages to uninstall it, if needed.
+ installed_dist = self.factory.get_dist_to_uninstall(candidate)
+ if installed_dist is None:
+ # There is no existing installation -- nothing to uninstall.
+ ireq.should_reinstall = False
+ elif self.factory.force_reinstall:
+ # The --force-reinstall flag is set -- reinstall.
+ ireq.should_reinstall = True
+ elif installed_dist.version != candidate.version:
+ # The installation is different in version -- reinstall.
+ ireq.should_reinstall = True
+ elif candidate.is_editable or installed_dist.editable:
+ # The incoming distribution is editable, or different in
+ # editable-ness to installation -- reinstall.
+ ireq.should_reinstall = True
+ elif candidate.source_link and candidate.source_link.is_file:
+ # The incoming distribution is under file://
+ if candidate.source_link.is_wheel:
+ # is a local wheel -- do nothing.
+ logger.info(
+ "%s is already installed with the same version as the "
+ "provided wheel. Use --force-reinstall to force an "
+ "installation of the wheel.",
+ ireq.name,
+ )
+ continue
+
+ # is a local sdist or path -- reinstall
+ ireq.should_reinstall = True
+ else:
+ continue
+
+ link = candidate.source_link
+ if link and link.is_yanked:
+ # The reason can contain non-ASCII characters, Unicode
+ # is required for Python 2.
+ msg = (
+ "The candidate selected for download or install is a "
+ "yanked version: {name!r} candidate (version {version} "
+ "at {link})\nReason for being yanked: {reason}"
+ ).format(
+ name=candidate.name,
+ version=candidate.version,
+ link=link,
+ reason=link.yanked_reason or "<none given>",
+ )
+ logger.warning(msg)
+
+ req_set.add_named_requirement(ireq)
+
+ reqs = req_set.all_requirements
+ self.factory.preparer.prepare_linked_requirements_more(reqs)
+ for req in reqs:
+ req.prepared = True
+ req.needs_more_preparation = False
+ return req_set
+
+ def get_installation_order(
+ self, req_set: RequirementSet
+ ) -> List[InstallRequirement]:
+ """Get order for installation of requirements in RequirementSet.
+
+ The returned list contains a requirement before another that depends on
+ it. This helps ensure that the environment is kept consistent as they
+ get installed one-by-one.
+
+ The current implementation creates a topological ordering of the
+ dependency graph, giving more weight to packages with less
+ or no dependencies, while breaking any cycles in the graph at
+ arbitrary points. We make no guarantees about where the cycle
+ would be broken, other than it *would* be broken.
+ """
+ assert self._result is not None, "must call resolve() first"
+
+ if not req_set.requirements:
+ # Nothing is left to install, so we do not need an order.
+ return []
+
+ graph = self._result.graph
+ weights = get_topological_weights(graph, set(req_set.requirements.keys()))
+
+ sorted_items = sorted(
+ req_set.requirements.items(),
+ key=functools.partial(_req_set_item_sorter, weights=weights),
+ reverse=True,
+ )
+ return [ireq for _, ireq in sorted_items]
+
+
+def get_topological_weights(
+ graph: "DirectedGraph[Optional[str]]", requirement_keys: Set[str]
+) -> Dict[Optional[str], int]:
+ """Assign weights to each node based on how "deep" they are.
+
+ This implementation may change at any point in the future without prior
+ notice.
+
+ We first simplify the dependency graph by pruning any leaves and giving them
+ the highest weight: a package without any dependencies should be installed
+ first. This is done again and again in the same way, giving ever less weight
+ to the newly found leaves. The loop stops when no leaves are left: all
+ remaining packages have at least one dependency left in the graph.
+
+ Then we continue with the remaining graph, by taking the length for the
+ longest path to any node from root, ignoring any paths that contain a single
+ node twice (i.e. cycles). This is done through a depth-first search through
+ the graph, while keeping track of the path to the node.
+
+ Cycles in the graph result would result in node being revisited while also
+ being on its own path. In this case, take no action. This helps ensure we
+ don't get stuck in a cycle.
+
+ When assigning weight, the longer path (i.e. larger length) is preferred.
+
+ We are only interested in the weights of packages that are in the
+ requirement_keys.
+ """
+ path: Set[Optional[str]] = set()
+ weights: Dict[Optional[str], int] = {}
+
+ def visit(node: Optional[str]) -> None:
+ if node in path:
+ # We hit a cycle, so we'll break it here.
+ return
+
+ # Time to visit the children!
+ path.add(node)
+ for child in graph.iter_children(node):
+ visit(child)
+ path.remove(node)
+
+ if node not in requirement_keys:
+ return
+
+ last_known_parent_count = weights.get(node, 0)
+ weights[node] = max(last_known_parent_count, len(path))
+
+ # Simplify the graph, pruning leaves that have no dependencies.
+ # This is needed for large graphs (say over 200 packages) because the
+ # `visit` function is exponentially slower then, taking minutes.
+ # See https://github.com/pypa/pip/issues/10557
+ # We will loop until we explicitly break the loop.
+ while True:
+ leaves = set()
+ for key in graph:
+ if key is None:
+ continue
+ for _child in graph.iter_children(key):
+ # This means we have at least one child
+ break
+ else:
+ # No child.
+ leaves.add(key)
+ if not leaves:
+ # We are done simplifying.
+ break
+ # Calculate the weight for the leaves.
+ weight = len(graph) - 1
+ for leaf in leaves:
+ if leaf not in requirement_keys:
+ continue
+ weights[leaf] = weight
+ # Remove the leaves from the graph, making it simpler.
+ for leaf in leaves:
+ graph.remove(leaf)
+
+ # Visit the remaining graph.
+ # `None` is guaranteed to be the root node by resolvelib.
+ visit(None)
+
+ # Sanity check: all requirement keys should be in the weights,
+ # and no other keys should be in the weights.
+ difference = set(weights.keys()).difference(requirement_keys)
+ assert not difference, difference
+
+ return weights
+
+
+def _req_set_item_sorter(
+ item: Tuple[str, InstallRequirement],
+ weights: Dict[Optional[str], int],
+) -> Tuple[int, str]:
+ """Key function used to sort install requirements for installation.
+
+ Based on the "weight" mapping calculated in ``get_installation_order()``.
+ The canonical package name is returned as the second member as a tie-
+ breaker to ensure the result is predictable, which is useful in tests.
+ """
+ name = canonicalize_name(item[0])
+ return weights[name], name