about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/pip/_internal/index
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pip/_internal/index
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_internal/index')
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/index/__init__.py2
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/index/collector.py494
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/index/package_finder.py1029
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/index/sources.py284
4 files changed, 1809 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/index/__init__.py b/.venv/lib/python3.12/site-packages/pip/_internal/index/__init__.py
new file mode 100644
index 00000000..7a17b7b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/index/__init__.py
@@ -0,0 +1,2 @@
+"""Index interaction code
+"""
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/index/collector.py b/.venv/lib/python3.12/site-packages/pip/_internal/index/collector.py
new file mode 100644
index 00000000..5f8fdee3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/index/collector.py
@@ -0,0 +1,494 @@
+"""
+The main purpose of this module is to expose LinkCollector.collect_sources().
+"""
+
+import collections
+import email.message
+import functools
+import itertools
+import json
+import logging
+import os
+import urllib.parse
+import urllib.request
+from dataclasses import dataclass
+from html.parser import HTMLParser
+from optparse import Values
+from typing import (
+    Callable,
+    Dict,
+    Iterable,
+    List,
+    MutableMapping,
+    NamedTuple,
+    Optional,
+    Protocol,
+    Sequence,
+    Tuple,
+    Union,
+)
+
+from pip._vendor import requests
+from pip._vendor.requests import Response
+from pip._vendor.requests.exceptions import RetryError, SSLError
+
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.models.link import Link
+from pip._internal.models.search_scope import SearchScope
+from pip._internal.network.session import PipSession
+from pip._internal.network.utils import raise_for_status
+from pip._internal.utils.filetypes import is_archive_file
+from pip._internal.utils.misc import redact_auth_from_url
+from pip._internal.vcs import vcs
+
+from .sources import CandidatesFromPage, LinkSource, build_source
+
+logger = logging.getLogger(__name__)
+
+ResponseHeaders = MutableMapping[str, str]
+
+
+def _match_vcs_scheme(url: str) -> Optional[str]:
+    """Look for VCS schemes in the URL.
+
+    Returns the matched VCS scheme, or None if there's no match.
+    """
+    for scheme in vcs.schemes:
+        if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
+            return scheme
+    return None
+
+
+class _NotAPIContent(Exception):
+    def __init__(self, content_type: str, request_desc: str) -> None:
+        super().__init__(content_type, request_desc)
+        self.content_type = content_type
+        self.request_desc = request_desc
+
+
+def _ensure_api_header(response: Response) -> None:
+    """
+    Check the Content-Type header to ensure the response contains a Simple
+    API Response.
+
+    Raises `_NotAPIContent` if the content type is not a valid content-type.
+    """
+    content_type = response.headers.get("Content-Type", "Unknown")
+
+    content_type_l = content_type.lower()
+    if content_type_l.startswith(
+        (
+            "text/html",
+            "application/vnd.pypi.simple.v1+html",
+            "application/vnd.pypi.simple.v1+json",
+        )
+    ):
+        return
+
+    raise _NotAPIContent(content_type, response.request.method)
+
+
+class _NotHTTP(Exception):
+    pass
+
+
+def _ensure_api_response(url: str, session: PipSession) -> None:
+    """
+    Send a HEAD request to the URL, and ensure the response contains a simple
+    API Response.
+
+    Raises `_NotHTTP` if the URL is not available for a HEAD request, or
+    `_NotAPIContent` if the content type is not a valid content type.
+    """
+    scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
+    if scheme not in {"http", "https"}:
+        raise _NotHTTP()
+
+    resp = session.head(url, allow_redirects=True)
+    raise_for_status(resp)
+
+    _ensure_api_header(resp)
+
+
+def _get_simple_response(url: str, session: PipSession) -> Response:
+    """Access an Simple API response with GET, and return the response.
+
+    This consists of three parts:
+
+    1. If the URL looks suspiciously like an archive, send a HEAD first to
+       check the Content-Type is HTML or Simple API, to avoid downloading a
+       large file. Raise `_NotHTTP` if the content type cannot be determined, or
+       `_NotAPIContent` if it is not HTML or a Simple API.
+    2. Actually perform the request. Raise HTTP exceptions on network failures.
+    3. Check the Content-Type header to make sure we got a Simple API response,
+       and raise `_NotAPIContent` otherwise.
+    """
+    if is_archive_file(Link(url).filename):
+        _ensure_api_response(url, session=session)
+
+    logger.debug("Getting page %s", redact_auth_from_url(url))
+
+    resp = session.get(
+        url,
+        headers={
+            "Accept": ", ".join(
+                [
+                    "application/vnd.pypi.simple.v1+json",
+                    "application/vnd.pypi.simple.v1+html; q=0.1",
+                    "text/html; q=0.01",
+                ]
+            ),
+            # We don't want to blindly returned cached data for
+            # /simple/, because authors generally expecting that
+            # twine upload && pip install will function, but if
+            # they've done a pip install in the last ~10 minutes
+            # it won't. Thus by setting this to zero we will not
+            # blindly use any cached data, however the benefit of
+            # using max-age=0 instead of no-cache, is that we will
+            # still support conditional requests, so we will still
+            # minimize traffic sent in cases where the page hasn't
+            # changed at all, we will just always incur the round
+            # trip for the conditional GET now instead of only
+            # once per 10 minutes.
+            # For more information, please see pypa/pip#5670.
+            "Cache-Control": "max-age=0",
+        },
+    )
+    raise_for_status(resp)
+
+    # The check for archives above only works if the url ends with
+    # something that looks like an archive. However that is not a
+    # requirement of an url. Unless we issue a HEAD request on every
+    # url we cannot know ahead of time for sure if something is a
+    # Simple API response or not. However we can check after we've
+    # downloaded it.
+    _ensure_api_header(resp)
+
+    logger.debug(
+        "Fetched page %s as %s",
+        redact_auth_from_url(url),
+        resp.headers.get("Content-Type", "Unknown"),
+    )
+
+    return resp
+
+
+def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
+    """Determine if we have any encoding information in our headers."""
+    if headers and "Content-Type" in headers:
+        m = email.message.Message()
+        m["content-type"] = headers["Content-Type"]
+        charset = m.get_param("charset")
+        if charset:
+            return str(charset)
+    return None
+
+
+class CacheablePageContent:
+    def __init__(self, page: "IndexContent") -> None:
+        assert page.cache_link_parsing
+        self.page = page
+
+    def __eq__(self, other: object) -> bool:
+        return isinstance(other, type(self)) and self.page.url == other.page.url
+
+    def __hash__(self) -> int:
+        return hash(self.page.url)
+
+
+class ParseLinks(Protocol):
+    def __call__(self, page: "IndexContent") -> Iterable[Link]: ...
+
+
+def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
+    """
+    Given a function that parses an Iterable[Link] from an IndexContent, cache the
+    function's result (keyed by CacheablePageContent), unless the IndexContent
+    `page` has `page.cache_link_parsing == False`.
+    """
+
+    @functools.lru_cache(maxsize=None)
+    def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
+        return list(fn(cacheable_page.page))
+
+    @functools.wraps(fn)
+    def wrapper_wrapper(page: "IndexContent") -> List[Link]:
+        if page.cache_link_parsing:
+            return wrapper(CacheablePageContent(page))
+        return list(fn(page))
+
+    return wrapper_wrapper
+
+
+@with_cached_index_content
+def parse_links(page: "IndexContent") -> Iterable[Link]:
+    """
+    Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
+    """
+
+    content_type_l = page.content_type.lower()
+    if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
+        data = json.loads(page.content)
+        for file in data.get("files", []):
+            link = Link.from_json(file, page.url)
+            if link is None:
+                continue
+            yield link
+        return
+
+    parser = HTMLLinkParser(page.url)
+    encoding = page.encoding or "utf-8"
+    parser.feed(page.content.decode(encoding))
+
+    url = page.url
+    base_url = parser.base_url or url
+    for anchor in parser.anchors:
+        link = Link.from_element(anchor, page_url=url, base_url=base_url)
+        if link is None:
+            continue
+        yield link
+
+
+@dataclass(frozen=True)
+class IndexContent:
+    """Represents one response (or page), along with its URL.
+
+    :param encoding: the encoding to decode the given content.
+    :param url: the URL from which the HTML was downloaded.
+    :param cache_link_parsing: whether links parsed from this page's url
+                               should be cached. PyPI index urls should
+                               have this set to False, for example.
+    """
+
+    content: bytes
+    content_type: str
+    encoding: Optional[str]
+    url: str
+    cache_link_parsing: bool = True
+
+    def __str__(self) -> str:
+        return redact_auth_from_url(self.url)
+
+
+class HTMLLinkParser(HTMLParser):
+    """
+    HTMLParser that keeps the first base HREF and a list of all anchor
+    elements' attributes.
+    """
+
+    def __init__(self, url: str) -> None:
+        super().__init__(convert_charrefs=True)
+
+        self.url: str = url
+        self.base_url: Optional[str] = None
+        self.anchors: List[Dict[str, Optional[str]]] = []
+
+    def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
+        if tag == "base" and self.base_url is None:
+            href = self.get_href(attrs)
+            if href is not None:
+                self.base_url = href
+        elif tag == "a":
+            self.anchors.append(dict(attrs))
+
+    def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
+        for name, value in attrs:
+            if name == "href":
+                return value
+        return None
+
+
+def _handle_get_simple_fail(
+    link: Link,
+    reason: Union[str, Exception],
+    meth: Optional[Callable[..., None]] = None,
+) -> None:
+    if meth is None:
+        meth = logger.debug
+    meth("Could not fetch URL %s: %s - skipping", link, reason)
+
+
+def _make_index_content(
+    response: Response, cache_link_parsing: bool = True
+) -> IndexContent:
+    encoding = _get_encoding_from_headers(response.headers)
+    return IndexContent(
+        response.content,
+        response.headers["Content-Type"],
+        encoding=encoding,
+        url=response.url,
+        cache_link_parsing=cache_link_parsing,
+    )
+
+
+def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
+    url = link.url.split("#", 1)[0]
+
+    # Check for VCS schemes that do not support lookup as web pages.
+    vcs_scheme = _match_vcs_scheme(url)
+    if vcs_scheme:
+        logger.warning(
+            "Cannot look at %s URL %s because it does not support lookup as web pages.",
+            vcs_scheme,
+            link,
+        )
+        return None
+
+    # Tack index.html onto file:// URLs that point to directories
+    scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
+    if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
+        # add trailing slash if not present so urljoin doesn't trim
+        # final segment
+        if not url.endswith("/"):
+            url += "/"
+        # TODO: In the future, it would be nice if pip supported PEP 691
+        #       style responses in the file:// URLs, however there's no
+        #       standard file extension for application/vnd.pypi.simple.v1+json
+        #       so we'll need to come up with something on our own.
+        url = urllib.parse.urljoin(url, "index.html")
+        logger.debug(" file: URL is directory, getting %s", url)
+
+    try:
+        resp = _get_simple_response(url, session=session)
+    except _NotHTTP:
+        logger.warning(
+            "Skipping page %s because it looks like an archive, and cannot "
+            "be checked by a HTTP HEAD request.",
+            link,
+        )
+    except _NotAPIContent as exc:
+        logger.warning(
+            "Skipping page %s because the %s request got Content-Type: %s. "
+            "The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
+            "application/vnd.pypi.simple.v1+html, and text/html",
+            link,
+            exc.request_desc,
+            exc.content_type,
+        )
+    except NetworkConnectionError as exc:
+        _handle_get_simple_fail(link, exc)
+    except RetryError as exc:
+        _handle_get_simple_fail(link, exc)
+    except SSLError as exc:
+        reason = "There was a problem confirming the ssl certificate: "
+        reason += str(exc)
+        _handle_get_simple_fail(link, reason, meth=logger.info)
+    except requests.ConnectionError as exc:
+        _handle_get_simple_fail(link, f"connection error: {exc}")
+    except requests.Timeout:
+        _handle_get_simple_fail(link, "timed out")
+    else:
+        return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
+    return None
+
+
+class CollectedSources(NamedTuple):
+    find_links: Sequence[Optional[LinkSource]]
+    index_urls: Sequence[Optional[LinkSource]]
+
+
+class LinkCollector:
+    """
+    Responsible for collecting Link objects from all configured locations,
+    making network requests as needed.
+
+    The class's main method is its collect_sources() method.
+    """
+
+    def __init__(
+        self,
+        session: PipSession,
+        search_scope: SearchScope,
+    ) -> None:
+        self.search_scope = search_scope
+        self.session = session
+
+    @classmethod
+    def create(
+        cls,
+        session: PipSession,
+        options: Values,
+        suppress_no_index: bool = False,
+    ) -> "LinkCollector":
+        """
+        :param session: The Session to use to make requests.
+        :param suppress_no_index: Whether to ignore the --no-index option
+            when constructing the SearchScope object.
+        """
+        index_urls = [options.index_url] + options.extra_index_urls
+        if options.no_index and not suppress_no_index:
+            logger.debug(
+                "Ignoring indexes: %s",
+                ",".join(redact_auth_from_url(url) for url in index_urls),
+            )
+            index_urls = []
+
+        # Make sure find_links is a list before passing to create().
+        find_links = options.find_links or []
+
+        search_scope = SearchScope.create(
+            find_links=find_links,
+            index_urls=index_urls,
+            no_index=options.no_index,
+        )
+        link_collector = LinkCollector(
+            session=session,
+            search_scope=search_scope,
+        )
+        return link_collector
+
+    @property
+    def find_links(self) -> List[str]:
+        return self.search_scope.find_links
+
+    def fetch_response(self, location: Link) -> Optional[IndexContent]:
+        """
+        Fetch an HTML page containing package links.
+        """
+        return _get_index_content(location, session=self.session)
+
+    def collect_sources(
+        self,
+        project_name: str,
+        candidates_from_page: CandidatesFromPage,
+    ) -> CollectedSources:
+        # The OrderedDict calls deduplicate sources by URL.
+        index_url_sources = collections.OrderedDict(
+            build_source(
+                loc,
+                candidates_from_page=candidates_from_page,
+                page_validator=self.session.is_secure_origin,
+                expand_dir=False,
+                cache_link_parsing=False,
+                project_name=project_name,
+            )
+            for loc in self.search_scope.get_index_urls_locations(project_name)
+        ).values()
+        find_links_sources = collections.OrderedDict(
+            build_source(
+                loc,
+                candidates_from_page=candidates_from_page,
+                page_validator=self.session.is_secure_origin,
+                expand_dir=True,
+                cache_link_parsing=True,
+                project_name=project_name,
+            )
+            for loc in self.find_links
+        ).values()
+
+        if logger.isEnabledFor(logging.DEBUG):
+            lines = [
+                f"* {s.link}"
+                for s in itertools.chain(find_links_sources, index_url_sources)
+                if s is not None and s.link is not None
+            ]
+            lines = [
+                f"{len(lines)} location(s) to search "
+                f"for versions of {project_name}:"
+            ] + lines
+            logger.debug("\n".join(lines))
+
+        return CollectedSources(
+            find_links=list(find_links_sources),
+            index_urls=list(index_url_sources),
+        )
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/index/package_finder.py b/.venv/lib/python3.12/site-packages/pip/_internal/index/package_finder.py
new file mode 100644
index 00000000..85628ee5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/index/package_finder.py
@@ -0,0 +1,1029 @@
+"""Routines related to PyPI, indexes"""
+
+import enum
+import functools
+import itertools
+import logging
+import re
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
+
+from pip._vendor.packaging import specifiers
+from pip._vendor.packaging.tags import Tag
+from pip._vendor.packaging.utils import canonicalize_name
+from pip._vendor.packaging.version import InvalidVersion, _BaseVersion
+from pip._vendor.packaging.version import parse as parse_version
+
+from pip._internal.exceptions import (
+    BestVersionAlreadyInstalled,
+    DistributionNotFound,
+    InvalidWheelFilename,
+    UnsupportedWheel,
+)
+from pip._internal.index.collector import LinkCollector, parse_links
+from pip._internal.models.candidate import InstallationCandidate
+from pip._internal.models.format_control import FormatControl
+from pip._internal.models.link import Link
+from pip._internal.models.search_scope import SearchScope
+from pip._internal.models.selection_prefs import SelectionPreferences
+from pip._internal.models.target_python import TargetPython
+from pip._internal.models.wheel import Wheel
+from pip._internal.req import InstallRequirement
+from pip._internal.utils._log import getLogger
+from pip._internal.utils.filetypes import WHEEL_EXTENSION
+from pip._internal.utils.hashes import Hashes
+from pip._internal.utils.logging import indent_log
+from pip._internal.utils.misc import build_netloc
+from pip._internal.utils.packaging import check_requires_python
+from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
+
+if TYPE_CHECKING:
+    from pip._vendor.typing_extensions import TypeGuard
+
+__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
+
+
+logger = getLogger(__name__)
+
+BuildTag = Union[Tuple[()], Tuple[int, str]]
+CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
+
+
+def _check_link_requires_python(
+    link: Link,
+    version_info: Tuple[int, int, int],
+    ignore_requires_python: bool = False,
+) -> bool:
+    """
+    Return whether the given Python version is compatible with a link's
+    "Requires-Python" value.
+
+    :param version_info: A 3-tuple of ints representing the Python
+        major-minor-micro version to check.
+    :param ignore_requires_python: Whether to ignore the "Requires-Python"
+        value if the given Python version isn't compatible.
+    """
+    try:
+        is_compatible = check_requires_python(
+            link.requires_python,
+            version_info=version_info,
+        )
+    except specifiers.InvalidSpecifier:
+        logger.debug(
+            "Ignoring invalid Requires-Python (%r) for link: %s",
+            link.requires_python,
+            link,
+        )
+    else:
+        if not is_compatible:
+            version = ".".join(map(str, version_info))
+            if not ignore_requires_python:
+                logger.verbose(
+                    "Link requires a different Python (%s not in: %r): %s",
+                    version,
+                    link.requires_python,
+                    link,
+                )
+                return False
+
+            logger.debug(
+                "Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
+                version,
+                link.requires_python,
+                link,
+            )
+
+    return True
+
+
+class LinkType(enum.Enum):
+    candidate = enum.auto()
+    different_project = enum.auto()
+    yanked = enum.auto()
+    format_unsupported = enum.auto()
+    format_invalid = enum.auto()
+    platform_mismatch = enum.auto()
+    requires_python_mismatch = enum.auto()
+
+
+class LinkEvaluator:
+    """
+    Responsible for evaluating links for a particular project.
+    """
+
+    _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
+
+    # Don't include an allow_yanked default value to make sure each call
+    # site considers whether yanked releases are allowed. This also causes
+    # that decision to be made explicit in the calling code, which helps
+    # people when reading the code.
+    def __init__(
+        self,
+        project_name: str,
+        canonical_name: str,
+        formats: FrozenSet[str],
+        target_python: TargetPython,
+        allow_yanked: bool,
+        ignore_requires_python: Optional[bool] = None,
+    ) -> None:
+        """
+        :param project_name: The user supplied package name.
+        :param canonical_name: The canonical package name.
+        :param formats: The formats allowed for this package. Should be a set
+            with 'binary' or 'source' or both in it.
+        :param target_python: The target Python interpreter to use when
+            evaluating link compatibility. This is used, for example, to
+            check wheel compatibility, as well as when checking the Python
+            version, e.g. the Python version embedded in a link filename
+            (or egg fragment) and against an HTML link's optional PEP 503
+            "data-requires-python" attribute.
+        :param allow_yanked: Whether files marked as yanked (in the sense
+            of PEP 592) are permitted to be candidates for install.
+        :param ignore_requires_python: Whether to ignore incompatible
+            PEP 503 "data-requires-python" values in HTML links. Defaults
+            to False.
+        """
+        if ignore_requires_python is None:
+            ignore_requires_python = False
+
+        self._allow_yanked = allow_yanked
+        self._canonical_name = canonical_name
+        self._ignore_requires_python = ignore_requires_python
+        self._formats = formats
+        self._target_python = target_python
+
+        self.project_name = project_name
+
+    def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
+        """
+        Determine whether a link is a candidate for installation.
+
+        :return: A tuple (result, detail), where *result* is an enum
+            representing whether the evaluation found a candidate, or the reason
+            why one is not found. If a candidate is found, *detail* will be the
+            candidate's version string; if one is not found, it contains the
+            reason the link fails to qualify.
+        """
+        version = None
+        if link.is_yanked and not self._allow_yanked:
+            reason = link.yanked_reason or "<none given>"
+            return (LinkType.yanked, f"yanked for reason: {reason}")
+
+        if link.egg_fragment:
+            egg_info = link.egg_fragment
+            ext = link.ext
+        else:
+            egg_info, ext = link.splitext()
+            if not ext:
+                return (LinkType.format_unsupported, "not a file")
+            if ext not in SUPPORTED_EXTENSIONS:
+                return (
+                    LinkType.format_unsupported,
+                    f"unsupported archive format: {ext}",
+                )
+            if "binary" not in self._formats and ext == WHEEL_EXTENSION:
+                reason = f"No binaries permitted for {self.project_name}"
+                return (LinkType.format_unsupported, reason)
+            if "macosx10" in link.path and ext == ".zip":
+                return (LinkType.format_unsupported, "macosx10 one")
+            if ext == WHEEL_EXTENSION:
+                try:
+                    wheel = Wheel(link.filename)
+                except InvalidWheelFilename:
+                    return (
+                        LinkType.format_invalid,
+                        "invalid wheel filename",
+                    )
+                if canonicalize_name(wheel.name) != self._canonical_name:
+                    reason = f"wrong project name (not {self.project_name})"
+                    return (LinkType.different_project, reason)
+
+                supported_tags = self._target_python.get_unsorted_tags()
+                if not wheel.supported(supported_tags):
+                    # Include the wheel's tags in the reason string to
+                    # simplify troubleshooting compatibility issues.
+                    file_tags = ", ".join(wheel.get_formatted_file_tags())
+                    reason = (
+                        f"none of the wheel's tags ({file_tags}) are compatible "
+                        f"(run pip debug --verbose to show compatible tags)"
+                    )
+                    return (LinkType.platform_mismatch, reason)
+
+                version = wheel.version
+
+        # This should be up by the self.ok_binary check, but see issue 2700.
+        if "source" not in self._formats and ext != WHEEL_EXTENSION:
+            reason = f"No sources permitted for {self.project_name}"
+            return (LinkType.format_unsupported, reason)
+
+        if not version:
+            version = _extract_version_from_fragment(
+                egg_info,
+                self._canonical_name,
+            )
+        if not version:
+            reason = f"Missing project version for {self.project_name}"
+            return (LinkType.format_invalid, reason)
+
+        match = self._py_version_re.search(version)
+        if match:
+            version = version[: match.start()]
+            py_version = match.group(1)
+            if py_version != self._target_python.py_version:
+                return (
+                    LinkType.platform_mismatch,
+                    "Python version is incorrect",
+                )
+
+        supports_python = _check_link_requires_python(
+            link,
+            version_info=self._target_python.py_version_info,
+            ignore_requires_python=self._ignore_requires_python,
+        )
+        if not supports_python:
+            reason = f"{version} Requires-Python {link.requires_python}"
+            return (LinkType.requires_python_mismatch, reason)
+
+        logger.debug("Found link %s, version: %s", link, version)
+
+        return (LinkType.candidate, version)
+
+
+def filter_unallowed_hashes(
+    candidates: List[InstallationCandidate],
+    hashes: Optional[Hashes],
+    project_name: str,
+) -> List[InstallationCandidate]:
+    """
+    Filter out candidates whose hashes aren't allowed, and return a new
+    list of candidates.
+
+    If at least one candidate has an allowed hash, then all candidates with
+    either an allowed hash or no hash specified are returned.  Otherwise,
+    the given candidates are returned.
+
+    Including the candidates with no hash specified when there is a match
+    allows a warning to be logged if there is a more preferred candidate
+    with no hash specified.  Returning all candidates in the case of no
+    matches lets pip report the hash of the candidate that would otherwise
+    have been installed (e.g. permitting the user to more easily update
+    their requirements file with the desired hash).
+    """
+    if not hashes:
+        logger.debug(
+            "Given no hashes to check %s links for project %r: "
+            "discarding no candidates",
+            len(candidates),
+            project_name,
+        )
+        # Make sure we're not returning back the given value.
+        return list(candidates)
+
+    matches_or_no_digest = []
+    # Collect the non-matches for logging purposes.
+    non_matches = []
+    match_count = 0
+    for candidate in candidates:
+        link = candidate.link
+        if not link.has_hash:
+            pass
+        elif link.is_hash_allowed(hashes=hashes):
+            match_count += 1
+        else:
+            non_matches.append(candidate)
+            continue
+
+        matches_or_no_digest.append(candidate)
+
+    if match_count:
+        filtered = matches_or_no_digest
+    else:
+        # Make sure we're not returning back the given value.
+        filtered = list(candidates)
+
+    if len(filtered) == len(candidates):
+        discard_message = "discarding no candidates"
+    else:
+        discard_message = "discarding {} non-matches:\n  {}".format(
+            len(non_matches),
+            "\n  ".join(str(candidate.link) for candidate in non_matches),
+        )
+
+    logger.debug(
+        "Checked %s links for project %r against %s hashes "
+        "(%s matches, %s no digest): %s",
+        len(candidates),
+        project_name,
+        hashes.digest_count,
+        match_count,
+        len(matches_or_no_digest) - match_count,
+        discard_message,
+    )
+
+    return filtered
+
+
+@dataclass
+class CandidatePreferences:
+    """
+    Encapsulates some of the preferences for filtering and sorting
+    InstallationCandidate objects.
+    """
+
+    prefer_binary: bool = False
+    allow_all_prereleases: bool = False
+
+
+@dataclass(frozen=True)
+class BestCandidateResult:
+    """A collection of candidates, returned by `PackageFinder.find_best_candidate`.
+
+    This class is only intended to be instantiated by CandidateEvaluator's
+    `compute_best_candidate()` method.
+
+    :param all_candidates: A sequence of all available candidates found.
+    :param applicable_candidates: The applicable candidates.
+    :param best_candidate: The most preferred candidate found, or None
+        if no applicable candidates were found.
+    """
+
+    all_candidates: List[InstallationCandidate]
+    applicable_candidates: List[InstallationCandidate]
+    best_candidate: Optional[InstallationCandidate]
+
+    def __post_init__(self) -> None:
+        assert set(self.applicable_candidates) <= set(self.all_candidates)
+
+        if self.best_candidate is None:
+            assert not self.applicable_candidates
+        else:
+            assert self.best_candidate in self.applicable_candidates
+
+
+class CandidateEvaluator:
+    """
+    Responsible for filtering and sorting candidates for installation based
+    on what tags are valid.
+    """
+
+    @classmethod
+    def create(
+        cls,
+        project_name: str,
+        target_python: Optional[TargetPython] = None,
+        prefer_binary: bool = False,
+        allow_all_prereleases: bool = False,
+        specifier: Optional[specifiers.BaseSpecifier] = None,
+        hashes: Optional[Hashes] = None,
+    ) -> "CandidateEvaluator":
+        """Create a CandidateEvaluator object.
+
+        :param target_python: The target Python interpreter to use when
+            checking compatibility. If None (the default), a TargetPython
+            object will be constructed from the running Python.
+        :param specifier: An optional object implementing `filter`
+            (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
+            versions.
+        :param hashes: An optional collection of allowed hashes.
+        """
+        if target_python is None:
+            target_python = TargetPython()
+        if specifier is None:
+            specifier = specifiers.SpecifierSet()
+
+        supported_tags = target_python.get_sorted_tags()
+
+        return cls(
+            project_name=project_name,
+            supported_tags=supported_tags,
+            specifier=specifier,
+            prefer_binary=prefer_binary,
+            allow_all_prereleases=allow_all_prereleases,
+            hashes=hashes,
+        )
+
+    def __init__(
+        self,
+        project_name: str,
+        supported_tags: List[Tag],
+        specifier: specifiers.BaseSpecifier,
+        prefer_binary: bool = False,
+        allow_all_prereleases: bool = False,
+        hashes: Optional[Hashes] = None,
+    ) -> None:
+        """
+        :param supported_tags: The PEP 425 tags supported by the target
+            Python in order of preference (most preferred first).
+        """
+        self._allow_all_prereleases = allow_all_prereleases
+        self._hashes = hashes
+        self._prefer_binary = prefer_binary
+        self._project_name = project_name
+        self._specifier = specifier
+        self._supported_tags = supported_tags
+        # Since the index of the tag in the _supported_tags list is used
+        # as a priority, precompute a map from tag to index/priority to be
+        # used in wheel.find_most_preferred_tag.
+        self._wheel_tag_preferences = {
+            tag: idx for idx, tag in enumerate(supported_tags)
+        }
+
+    def get_applicable_candidates(
+        self,
+        candidates: List[InstallationCandidate],
+    ) -> List[InstallationCandidate]:
+        """
+        Return the applicable candidates from a list of candidates.
+        """
+        # Using None infers from the specifier instead.
+        allow_prereleases = self._allow_all_prereleases or None
+        specifier = self._specifier
+
+        # We turn the version object into a str here because otherwise
+        # when we're debundled but setuptools isn't, Python will see
+        # packaging.version.Version and
+        # pkg_resources._vendor.packaging.version.Version as different
+        # types. This way we'll use a str as a common data interchange
+        # format. If we stop using the pkg_resources provided specifier
+        # and start using our own, we can drop the cast to str().
+        candidates_and_versions = [(c, str(c.version)) for c in candidates]
+        versions = set(
+            specifier.filter(
+                (v for _, v in candidates_and_versions),
+                prereleases=allow_prereleases,
+            )
+        )
+
+        applicable_candidates = [c for c, v in candidates_and_versions if v in versions]
+        filtered_applicable_candidates = filter_unallowed_hashes(
+            candidates=applicable_candidates,
+            hashes=self._hashes,
+            project_name=self._project_name,
+        )
+
+        return sorted(filtered_applicable_candidates, key=self._sort_key)
+
+    def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
+        """
+        Function to pass as the `key` argument to a call to sorted() to sort
+        InstallationCandidates by preference.
+
+        Returns a tuple such that tuples sorting as greater using Python's
+        default comparison operator are more preferred.
+
+        The preference is as follows:
+
+        First and foremost, candidates with allowed (matching) hashes are
+        always preferred over candidates without matching hashes. This is
+        because e.g. if the only candidate with an allowed hash is yanked,
+        we still want to use that candidate.
+
+        Second, excepting hash considerations, candidates that have been
+        yanked (in the sense of PEP 592) are always less preferred than
+        candidates that haven't been yanked. Then:
+
+        If not finding wheels, they are sorted by version only.
+        If finding wheels, then the sort order is by version, then:
+          1. existing installs
+          2. wheels ordered via Wheel.support_index_min(self._supported_tags)
+          3. source archives
+        If prefer_binary was set, then all wheels are sorted above sources.
+
+        Note: it was considered to embed this logic into the Link
+              comparison operators, but then different sdist links
+              with the same version, would have to be considered equal
+        """
+        valid_tags = self._supported_tags
+        support_num = len(valid_tags)
+        build_tag: BuildTag = ()
+        binary_preference = 0
+        link = candidate.link
+        if link.is_wheel:
+            # can raise InvalidWheelFilename
+            wheel = Wheel(link.filename)
+            try:
+                pri = -(
+                    wheel.find_most_preferred_tag(
+                        valid_tags, self._wheel_tag_preferences
+                    )
+                )
+            except ValueError:
+                raise UnsupportedWheel(
+                    f"{wheel.filename} is not a supported wheel for this platform. It "
+                    "can't be sorted."
+                )
+            if self._prefer_binary:
+                binary_preference = 1
+            if wheel.build_tag is not None:
+                match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
+                assert match is not None, "guaranteed by filename validation"
+                build_tag_groups = match.groups()
+                build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
+        else:  # sdist
+            pri = -(support_num)
+        has_allowed_hash = int(link.is_hash_allowed(self._hashes))
+        yank_value = -1 * int(link.is_yanked)  # -1 for yanked.
+        return (
+            has_allowed_hash,
+            yank_value,
+            binary_preference,
+            candidate.version,
+            pri,
+            build_tag,
+        )
+
+    def sort_best_candidate(
+        self,
+        candidates: List[InstallationCandidate],
+    ) -> Optional[InstallationCandidate]:
+        """
+        Return the best candidate per the instance's sort order, or None if
+        no candidate is acceptable.
+        """
+        if not candidates:
+            return None
+        best_candidate = max(candidates, key=self._sort_key)
+        return best_candidate
+
+    def compute_best_candidate(
+        self,
+        candidates: List[InstallationCandidate],
+    ) -> BestCandidateResult:
+        """
+        Compute and return a `BestCandidateResult` instance.
+        """
+        applicable_candidates = self.get_applicable_candidates(candidates)
+
+        best_candidate = self.sort_best_candidate(applicable_candidates)
+
+        return BestCandidateResult(
+            candidates,
+            applicable_candidates=applicable_candidates,
+            best_candidate=best_candidate,
+        )
+
+
+class PackageFinder:
+    """This finds packages.
+
+    This is meant to match easy_install's technique for looking for
+    packages, by reading pages and looking for appropriate links.
+    """
+
+    def __init__(
+        self,
+        link_collector: LinkCollector,
+        target_python: TargetPython,
+        allow_yanked: bool,
+        format_control: Optional[FormatControl] = None,
+        candidate_prefs: Optional[CandidatePreferences] = None,
+        ignore_requires_python: Optional[bool] = None,
+    ) -> None:
+        """
+        This constructor is primarily meant to be used by the create() class
+        method and from tests.
+
+        :param format_control: A FormatControl object, used to control
+            the selection of source packages / binary packages when consulting
+            the index and links.
+        :param candidate_prefs: Options to use when creating a
+            CandidateEvaluator object.
+        """
+        if candidate_prefs is None:
+            candidate_prefs = CandidatePreferences()
+
+        format_control = format_control or FormatControl(set(), set())
+
+        self._allow_yanked = allow_yanked
+        self._candidate_prefs = candidate_prefs
+        self._ignore_requires_python = ignore_requires_python
+        self._link_collector = link_collector
+        self._target_python = target_python
+
+        self.format_control = format_control
+
+        # These are boring links that have already been logged somehow.
+        self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
+
+    # Don't include an allow_yanked default value to make sure each call
+    # site considers whether yanked releases are allowed. This also causes
+    # that decision to be made explicit in the calling code, which helps
+    # people when reading the code.
+    @classmethod
+    def create(
+        cls,
+        link_collector: LinkCollector,
+        selection_prefs: SelectionPreferences,
+        target_python: Optional[TargetPython] = None,
+    ) -> "PackageFinder":
+        """Create a PackageFinder.
+
+        :param selection_prefs: The candidate selection preferences, as a
+            SelectionPreferences object.
+        :param target_python: The target Python interpreter to use when
+            checking compatibility. If None (the default), a TargetPython
+            object will be constructed from the running Python.
+        """
+        if target_python is None:
+            target_python = TargetPython()
+
+        candidate_prefs = CandidatePreferences(
+            prefer_binary=selection_prefs.prefer_binary,
+            allow_all_prereleases=selection_prefs.allow_all_prereleases,
+        )
+
+        return cls(
+            candidate_prefs=candidate_prefs,
+            link_collector=link_collector,
+            target_python=target_python,
+            allow_yanked=selection_prefs.allow_yanked,
+            format_control=selection_prefs.format_control,
+            ignore_requires_python=selection_prefs.ignore_requires_python,
+        )
+
+    @property
+    def target_python(self) -> TargetPython:
+        return self._target_python
+
+    @property
+    def search_scope(self) -> SearchScope:
+        return self._link_collector.search_scope
+
+    @search_scope.setter
+    def search_scope(self, search_scope: SearchScope) -> None:
+        self._link_collector.search_scope = search_scope
+
+    @property
+    def find_links(self) -> List[str]:
+        return self._link_collector.find_links
+
+    @property
+    def index_urls(self) -> List[str]:
+        return self.search_scope.index_urls
+
+    @property
+    def proxy(self) -> Optional[str]:
+        return self._link_collector.session.pip_proxy
+
+    @property
+    def trusted_hosts(self) -> Iterable[str]:
+        for host_port in self._link_collector.session.pip_trusted_origins:
+            yield build_netloc(*host_port)
+
+    @property
+    def custom_cert(self) -> Optional[str]:
+        # session.verify is either a boolean (use default bundle/no SSL
+        # verification) or a string path to a custom CA bundle to use. We only
+        # care about the latter.
+        verify = self._link_collector.session.verify
+        return verify if isinstance(verify, str) else None
+
+    @property
+    def client_cert(self) -> Optional[str]:
+        cert = self._link_collector.session.cert
+        assert not isinstance(cert, tuple), "pip only supports PEM client certs"
+        return cert
+
+    @property
+    def allow_all_prereleases(self) -> bool:
+        return self._candidate_prefs.allow_all_prereleases
+
+    def set_allow_all_prereleases(self) -> None:
+        self._candidate_prefs.allow_all_prereleases = True
+
+    @property
+    def prefer_binary(self) -> bool:
+        return self._candidate_prefs.prefer_binary
+
+    def set_prefer_binary(self) -> None:
+        self._candidate_prefs.prefer_binary = True
+
+    def requires_python_skipped_reasons(self) -> List[str]:
+        reasons = {
+            detail
+            for _, result, detail in self._logged_links
+            if result == LinkType.requires_python_mismatch
+        }
+        return sorted(reasons)
+
+    def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
+        canonical_name = canonicalize_name(project_name)
+        formats = self.format_control.get_allowed_formats(canonical_name)
+
+        return LinkEvaluator(
+            project_name=project_name,
+            canonical_name=canonical_name,
+            formats=formats,
+            target_python=self._target_python,
+            allow_yanked=self._allow_yanked,
+            ignore_requires_python=self._ignore_requires_python,
+        )
+
+    def _sort_links(self, links: Iterable[Link]) -> List[Link]:
+        """
+        Returns elements of links in order, non-egg links first, egg links
+        second, while eliminating duplicates
+        """
+        eggs, no_eggs = [], []
+        seen: Set[Link] = set()
+        for link in links:
+            if link not in seen:
+                seen.add(link)
+                if link.egg_fragment:
+                    eggs.append(link)
+                else:
+                    no_eggs.append(link)
+        return no_eggs + eggs
+
+    def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
+        # This is a hot method so don't waste time hashing links unless we're
+        # actually going to log 'em.
+        if not logger.isEnabledFor(logging.DEBUG):
+            return
+
+        entry = (link, result, detail)
+        if entry not in self._logged_links:
+            # Put the link at the end so the reason is more visible and because
+            # the link string is usually very long.
+            logger.debug("Skipping link: %s: %s", detail, link)
+            self._logged_links.add(entry)
+
+    def get_install_candidate(
+        self, link_evaluator: LinkEvaluator, link: Link
+    ) -> Optional[InstallationCandidate]:
+        """
+        If the link is a candidate for install, convert it to an
+        InstallationCandidate and return it. Otherwise, return None.
+        """
+        result, detail = link_evaluator.evaluate_link(link)
+        if result != LinkType.candidate:
+            self._log_skipped_link(link, result, detail)
+            return None
+
+        try:
+            return InstallationCandidate(
+                name=link_evaluator.project_name,
+                link=link,
+                version=detail,
+            )
+        except InvalidVersion:
+            return None
+
+    def evaluate_links(
+        self, link_evaluator: LinkEvaluator, links: Iterable[Link]
+    ) -> List[InstallationCandidate]:
+        """
+        Convert links that are candidates to InstallationCandidate objects.
+        """
+        candidates = []
+        for link in self._sort_links(links):
+            candidate = self.get_install_candidate(link_evaluator, link)
+            if candidate is not None:
+                candidates.append(candidate)
+
+        return candidates
+
+    def process_project_url(
+        self, project_url: Link, link_evaluator: LinkEvaluator
+    ) -> List[InstallationCandidate]:
+        logger.debug(
+            "Fetching project page and analyzing links: %s",
+            project_url,
+        )
+        index_response = self._link_collector.fetch_response(project_url)
+        if index_response is None:
+            return []
+
+        page_links = list(parse_links(index_response))
+
+        with indent_log():
+            package_links = self.evaluate_links(
+                link_evaluator,
+                links=page_links,
+            )
+
+        return package_links
+
+    @functools.lru_cache(maxsize=None)
+    def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
+        """Find all available InstallationCandidate for project_name
+
+        This checks index_urls and find_links.
+        All versions found are returned as an InstallationCandidate list.
+
+        See LinkEvaluator.evaluate_link() for details on which files
+        are accepted.
+        """
+        link_evaluator = self.make_link_evaluator(project_name)
+
+        collected_sources = self._link_collector.collect_sources(
+            project_name=project_name,
+            candidates_from_page=functools.partial(
+                self.process_project_url,
+                link_evaluator=link_evaluator,
+            ),
+        )
+
+        page_candidates_it = itertools.chain.from_iterable(
+            source.page_candidates()
+            for sources in collected_sources
+            for source in sources
+            if source is not None
+        )
+        page_candidates = list(page_candidates_it)
+
+        file_links_it = itertools.chain.from_iterable(
+            source.file_links()
+            for sources in collected_sources
+            for source in sources
+            if source is not None
+        )
+        file_candidates = self.evaluate_links(
+            link_evaluator,
+            sorted(file_links_it, reverse=True),
+        )
+
+        if logger.isEnabledFor(logging.DEBUG) and file_candidates:
+            paths = []
+            for candidate in file_candidates:
+                assert candidate.link.url  # we need to have a URL
+                try:
+                    paths.append(candidate.link.file_path)
+                except Exception:
+                    paths.append(candidate.link.url)  # it's not a local file
+
+            logger.debug("Local files found: %s", ", ".join(paths))
+
+        # This is an intentional priority ordering
+        return file_candidates + page_candidates
+
+    def make_candidate_evaluator(
+        self,
+        project_name: str,
+        specifier: Optional[specifiers.BaseSpecifier] = None,
+        hashes: Optional[Hashes] = None,
+    ) -> CandidateEvaluator:
+        """Create a CandidateEvaluator object to use."""
+        candidate_prefs = self._candidate_prefs
+        return CandidateEvaluator.create(
+            project_name=project_name,
+            target_python=self._target_python,
+            prefer_binary=candidate_prefs.prefer_binary,
+            allow_all_prereleases=candidate_prefs.allow_all_prereleases,
+            specifier=specifier,
+            hashes=hashes,
+        )
+
+    @functools.lru_cache(maxsize=None)
+    def find_best_candidate(
+        self,
+        project_name: str,
+        specifier: Optional[specifiers.BaseSpecifier] = None,
+        hashes: Optional[Hashes] = None,
+    ) -> BestCandidateResult:
+        """Find matches for the given project and specifier.
+
+        :param specifier: An optional object implementing `filter`
+            (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
+            versions.
+
+        :return: A `BestCandidateResult` instance.
+        """
+        candidates = self.find_all_candidates(project_name)
+        candidate_evaluator = self.make_candidate_evaluator(
+            project_name=project_name,
+            specifier=specifier,
+            hashes=hashes,
+        )
+        return candidate_evaluator.compute_best_candidate(candidates)
+
+    def find_requirement(
+        self, req: InstallRequirement, upgrade: bool
+    ) -> Optional[InstallationCandidate]:
+        """Try to find a Link matching req
+
+        Expects req, an InstallRequirement and upgrade, a boolean
+        Returns a InstallationCandidate if found,
+        Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
+        """
+        hashes = req.hashes(trust_internet=False)
+        best_candidate_result = self.find_best_candidate(
+            req.name,
+            specifier=req.specifier,
+            hashes=hashes,
+        )
+        best_candidate = best_candidate_result.best_candidate
+
+        installed_version: Optional[_BaseVersion] = None
+        if req.satisfied_by is not None:
+            installed_version = req.satisfied_by.version
+
+        def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
+            # This repeated parse_version and str() conversion is needed to
+            # handle different vendoring sources from pip and pkg_resources.
+            # If we stop using the pkg_resources provided specifier and start
+            # using our own, we can drop the cast to str().
+            return (
+                ", ".join(
+                    sorted(
+                        {str(c.version) for c in cand_iter},
+                        key=parse_version,
+                    )
+                )
+                or "none"
+            )
+
+        if installed_version is None and best_candidate is None:
+            logger.critical(
+                "Could not find a version that satisfies the requirement %s "
+                "(from versions: %s)",
+                req,
+                _format_versions(best_candidate_result.all_candidates),
+            )
+
+            raise DistributionNotFound(f"No matching distribution found for {req}")
+
+        def _should_install_candidate(
+            candidate: Optional[InstallationCandidate],
+        ) -> "TypeGuard[InstallationCandidate]":
+            if installed_version is None:
+                return True
+            if best_candidate is None:
+                return False
+            return best_candidate.version > installed_version
+
+        if not upgrade and installed_version is not None:
+            if _should_install_candidate(best_candidate):
+                logger.debug(
+                    "Existing installed version (%s) satisfies requirement "
+                    "(most up-to-date version is %s)",
+                    installed_version,
+                    best_candidate.version,
+                )
+            else:
+                logger.debug(
+                    "Existing installed version (%s) is most up-to-date and "
+                    "satisfies requirement",
+                    installed_version,
+                )
+            return None
+
+        if _should_install_candidate(best_candidate):
+            logger.debug(
+                "Using version %s (newest of versions: %s)",
+                best_candidate.version,
+                _format_versions(best_candidate_result.applicable_candidates),
+            )
+            return best_candidate
+
+        # We have an existing version, and its the best version
+        logger.debug(
+            "Installed version (%s) is most up-to-date (past versions: %s)",
+            installed_version,
+            _format_versions(best_candidate_result.applicable_candidates),
+        )
+        raise BestVersionAlreadyInstalled
+
+
+def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
+    """Find the separator's index based on the package's canonical name.
+
+    :param fragment: A <package>+<version> filename "fragment" (stem) or
+        egg fragment.
+    :param canonical_name: The package's canonical name.
+
+    This function is needed since the canonicalized name does not necessarily
+    have the same length as the egg info's name part. An example::
+
+    >>> fragment = 'foo__bar-1.0'
+    >>> canonical_name = 'foo-bar'
+    >>> _find_name_version_sep(fragment, canonical_name)
+    8
+    """
+    # Project name and version must be separated by one single dash. Find all
+    # occurrences of dashes; if the string in front of it matches the canonical
+    # name, this is the one separating the name and version parts.
+    for i, c in enumerate(fragment):
+        if c != "-":
+            continue
+        if canonicalize_name(fragment[:i]) == canonical_name:
+            return i
+    raise ValueError(f"{fragment} does not match {canonical_name}")
+
+
+def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
+    """Parse the version string from a <package>+<version> filename
+    "fragment" (stem) or egg fragment.
+
+    :param fragment: The string to parse. E.g. foo-2.1
+    :param canonical_name: The canonicalized name of the package this
+        belongs to.
+    """
+    try:
+        version_start = _find_name_version_sep(fragment, canonical_name) + 1
+    except ValueError:
+        return None
+    version = fragment[version_start:]
+    if not version:
+        return None
+    return version
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/index/sources.py b/.venv/lib/python3.12/site-packages/pip/_internal/index/sources.py
new file mode 100644
index 00000000..3dafb30e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/index/sources.py
@@ -0,0 +1,284 @@
+import logging
+import mimetypes
+import os
+from collections import defaultdict
+from typing import Callable, Dict, Iterable, List, Optional, Tuple
+
+from pip._vendor.packaging.utils import (
+    InvalidSdistFilename,
+    InvalidWheelFilename,
+    canonicalize_name,
+    parse_sdist_filename,
+    parse_wheel_filename,
+)
+
+from pip._internal.models.candidate import InstallationCandidate
+from pip._internal.models.link import Link
+from pip._internal.utils.urls import path_to_url, url_to_path
+from pip._internal.vcs import is_url
+
+logger = logging.getLogger(__name__)
+
+FoundCandidates = Iterable[InstallationCandidate]
+FoundLinks = Iterable[Link]
+CandidatesFromPage = Callable[[Link], Iterable[InstallationCandidate]]
+PageValidator = Callable[[Link], bool]
+
+
+class LinkSource:
+    @property
+    def link(self) -> Optional[Link]:
+        """Returns the underlying link, if there's one."""
+        raise NotImplementedError()
+
+    def page_candidates(self) -> FoundCandidates:
+        """Candidates found by parsing an archive listing HTML file."""
+        raise NotImplementedError()
+
+    def file_links(self) -> FoundLinks:
+        """Links found by specifying archives directly."""
+        raise NotImplementedError()
+
+
+def _is_html_file(file_url: str) -> bool:
+    return mimetypes.guess_type(file_url, strict=False)[0] == "text/html"
+
+
+class _FlatDirectoryToUrls:
+    """Scans directory and caches results"""
+
+    def __init__(self, path: str) -> None:
+        self._path = path
+        self._page_candidates: List[str] = []
+        self._project_name_to_urls: Dict[str, List[str]] = defaultdict(list)
+        self._scanned_directory = False
+
+    def _scan_directory(self) -> None:
+        """Scans directory once and populates both page_candidates
+        and project_name_to_urls at the same time
+        """
+        for entry in os.scandir(self._path):
+            url = path_to_url(entry.path)
+            if _is_html_file(url):
+                self._page_candidates.append(url)
+                continue
+
+            # File must have a valid wheel or sdist name,
+            # otherwise not worth considering as a package
+            try:
+                project_filename = parse_wheel_filename(entry.name)[0]
+            except InvalidWheelFilename:
+                try:
+                    project_filename = parse_sdist_filename(entry.name)[0]
+                except InvalidSdistFilename:
+                    continue
+
+            self._project_name_to_urls[project_filename].append(url)
+        self._scanned_directory = True
+
+    @property
+    def page_candidates(self) -> List[str]:
+        if not self._scanned_directory:
+            self._scan_directory()
+
+        return self._page_candidates
+
+    @property
+    def project_name_to_urls(self) -> Dict[str, List[str]]:
+        if not self._scanned_directory:
+            self._scan_directory()
+
+        return self._project_name_to_urls
+
+
+class _FlatDirectorySource(LinkSource):
+    """Link source specified by ``--find-links=<path-to-dir>``.
+
+    This looks the content of the directory, and returns:
+
+    * ``page_candidates``: Links listed on each HTML file in the directory.
+    * ``file_candidates``: Archives in the directory.
+    """
+
+    _paths_to_urls: Dict[str, _FlatDirectoryToUrls] = {}
+
+    def __init__(
+        self,
+        candidates_from_page: CandidatesFromPage,
+        path: str,
+        project_name: str,
+    ) -> None:
+        self._candidates_from_page = candidates_from_page
+        self._project_name = canonicalize_name(project_name)
+
+        # Get existing instance of _FlatDirectoryToUrls if it exists
+        if path in self._paths_to_urls:
+            self._path_to_urls = self._paths_to_urls[path]
+        else:
+            self._path_to_urls = _FlatDirectoryToUrls(path=path)
+            self._paths_to_urls[path] = self._path_to_urls
+
+    @property
+    def link(self) -> Optional[Link]:
+        return None
+
+    def page_candidates(self) -> FoundCandidates:
+        for url in self._path_to_urls.page_candidates:
+            yield from self._candidates_from_page(Link(url))
+
+    def file_links(self) -> FoundLinks:
+        for url in self._path_to_urls.project_name_to_urls[self._project_name]:
+            yield Link(url)
+
+
+class _LocalFileSource(LinkSource):
+    """``--find-links=<path-or-url>`` or ``--[extra-]index-url=<path-or-url>``.
+
+    If a URL is supplied, it must be a ``file:`` URL. If a path is supplied to
+    the option, it is converted to a URL first. This returns:
+
+    * ``page_candidates``: Links listed on an HTML file.
+    * ``file_candidates``: The non-HTML file.
+    """
+
+    def __init__(
+        self,
+        candidates_from_page: CandidatesFromPage,
+        link: Link,
+    ) -> None:
+        self._candidates_from_page = candidates_from_page
+        self._link = link
+
+    @property
+    def link(self) -> Optional[Link]:
+        return self._link
+
+    def page_candidates(self) -> FoundCandidates:
+        if not _is_html_file(self._link.url):
+            return
+        yield from self._candidates_from_page(self._link)
+
+    def file_links(self) -> FoundLinks:
+        if _is_html_file(self._link.url):
+            return
+        yield self._link
+
+
+class _RemoteFileSource(LinkSource):
+    """``--find-links=<url>`` or ``--[extra-]index-url=<url>``.
+
+    This returns:
+
+    * ``page_candidates``: Links listed on an HTML file.
+    * ``file_candidates``: The non-HTML file.
+    """
+
+    def __init__(
+        self,
+        candidates_from_page: CandidatesFromPage,
+        page_validator: PageValidator,
+        link: Link,
+    ) -> None:
+        self._candidates_from_page = candidates_from_page
+        self._page_validator = page_validator
+        self._link = link
+
+    @property
+    def link(self) -> Optional[Link]:
+        return self._link
+
+    def page_candidates(self) -> FoundCandidates:
+        if not self._page_validator(self._link):
+            return
+        yield from self._candidates_from_page(self._link)
+
+    def file_links(self) -> FoundLinks:
+        yield self._link
+
+
+class _IndexDirectorySource(LinkSource):
+    """``--[extra-]index-url=<path-to-directory>``.
+
+    This is treated like a remote URL; ``candidates_from_page`` contains logic
+    for this by appending ``index.html`` to the link.
+    """
+
+    def __init__(
+        self,
+        candidates_from_page: CandidatesFromPage,
+        link: Link,
+    ) -> None:
+        self._candidates_from_page = candidates_from_page
+        self._link = link
+
+    @property
+    def link(self) -> Optional[Link]:
+        return self._link
+
+    def page_candidates(self) -> FoundCandidates:
+        yield from self._candidates_from_page(self._link)
+
+    def file_links(self) -> FoundLinks:
+        return ()
+
+
+def build_source(
+    location: str,
+    *,
+    candidates_from_page: CandidatesFromPage,
+    page_validator: PageValidator,
+    expand_dir: bool,
+    cache_link_parsing: bool,
+    project_name: str,
+) -> Tuple[Optional[str], Optional[LinkSource]]:
+    path: Optional[str] = None
+    url: Optional[str] = None
+    if os.path.exists(location):  # Is a local path.
+        url = path_to_url(location)
+        path = location
+    elif location.startswith("file:"):  # A file: URL.
+        url = location
+        path = url_to_path(location)
+    elif is_url(location):
+        url = location
+
+    if url is None:
+        msg = (
+            "Location '%s' is ignored: "
+            "it is either a non-existing path or lacks a specific scheme."
+        )
+        logger.warning(msg, location)
+        return (None, None)
+
+    if path is None:
+        source: LinkSource = _RemoteFileSource(
+            candidates_from_page=candidates_from_page,
+            page_validator=page_validator,
+            link=Link(url, cache_link_parsing=cache_link_parsing),
+        )
+        return (url, source)
+
+    if os.path.isdir(path):
+        if expand_dir:
+            source = _FlatDirectorySource(
+                candidates_from_page=candidates_from_page,
+                path=path,
+                project_name=project_name,
+            )
+        else:
+            source = _IndexDirectorySource(
+                candidates_from_page=candidates_from_page,
+                link=Link(url, cache_link_parsing=cache_link_parsing),
+            )
+        return (url, source)
+    elif os.path.isfile(path):
+        source = _LocalFileSource(
+            candidates_from_page=candidates_from_page,
+            link=Link(url, cache_link_parsing=cache_link_parsing),
+        )
+        return (url, source)
+    logger.warning(
+        "Location '%s' is ignored: it is neither a file nor a directory.",
+        location,
+    )
+    return (url, None)