about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/referencing/_core.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/referencing/_core.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/referencing/_core.py')
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/_core.py739
1 files changed, 739 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/referencing/_core.py b/.venv/lib/python3.12/site-packages/referencing/_core.py
new file mode 100644
index 00000000..ec2d51bd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/_core.py
@@ -0,0 +1,739 @@
+from __future__ import annotations
+
+from collections.abc import Iterable, Iterator, Sequence
+from enum import Enum
+from typing import Any, Callable, ClassVar, Generic, Protocol
+from urllib.parse import unquote, urldefrag, urljoin
+
+from attrs import evolve, field
+from rpds import HashTrieMap, HashTrieSet, List
+
+try:
+    from typing_extensions import TypeVar
+except ImportError:  # pragma: no cover
+    from typing import TypeVar
+
+from referencing import exceptions
+from referencing._attrs import frozen
+from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
+
+EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
+EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
+
+
+class _Unset(Enum):
+    """
+    What sillyness...
+    """
+
+    SENTINEL = 1
+
+
+_UNSET = _Unset.SENTINEL
+
+
+class _MaybeInSubresource(Protocol[D]):
+    def __call__(
+        self,
+        segments: Sequence[int | str],
+        resolver: Resolver[D],
+        subresource: Resource[D],
+    ) -> Resolver[D]: ...
+
+
+def _detect_or_error(contents: D) -> Specification[D]:
+    if not isinstance(contents, Mapping):
+        raise exceptions.CannotDetermineSpecification(contents)
+
+    jsonschema_dialect_id = contents.get("$schema")  # type: ignore[reportUnknownMemberType]
+    if not isinstance(jsonschema_dialect_id, str):
+        raise exceptions.CannotDetermineSpecification(contents)
+
+    from referencing.jsonschema import specification_with
+
+    return specification_with(jsonschema_dialect_id)
+
+
+def _detect_or_default(
+    default: Specification[D],
+) -> Callable[[D], Specification[D]]:
+    def _detect(contents: D) -> Specification[D]:
+        if not isinstance(contents, Mapping):
+            return default
+
+        jsonschema_dialect_id = contents.get("$schema")  # type: ignore[reportUnknownMemberType]
+        if jsonschema_dialect_id is None:
+            return default
+
+        from referencing.jsonschema import specification_with
+
+        return specification_with(
+            jsonschema_dialect_id,  # type: ignore[reportUnknownArgumentType]
+            default=default,
+        )
+
+    return _detect
+
+
+class _SpecificationDetector:
+    def __get__(
+        self,
+        instance: Specification[D] | None,
+        cls: type[Specification[D]],
+    ) -> Callable[[D], Specification[D]]:
+        if instance is None:
+            return _detect_or_error
+        else:
+            return _detect_or_default(instance)
+
+
+@frozen
+class Specification(Generic[D]):
+    """
+    A specification which defines referencing behavior.
+
+    The various methods of a `Specification` allow for varying referencing
+    behavior across JSON Schema specification versions, etc.
+    """
+
+    #: A short human-readable name for the specification, used for debugging.
+    name: str
+
+    #: Find the ID of a given document.
+    id_of: Callable[[D], URI | None]
+
+    #: Retrieve the subresources of the given document (without traversing into
+    #: the subresources themselves).
+    subresources_of: Callable[[D], Iterable[D]]
+
+    #: While resolving a JSON pointer, conditionally enter a subresource
+    #: (if e.g. we have just entered a keyword whose value is a subresource)
+    maybe_in_subresource: _MaybeInSubresource[D]
+
+    #: Retrieve the anchors contained in the given document.
+    _anchors_in: Callable[
+        [Specification[D], D],
+        Iterable[AnchorType[D]],
+    ] = field(alias="anchors_in")
+
+    #: An opaque specification where resources have no subresources
+    #: nor internal identifiers.
+    OPAQUE: ClassVar[Specification[Any]]
+
+    #: Attempt to discern which specification applies to the given contents.
+    #:
+    #: May be called either as an instance method or as a class method, with
+    #: slightly different behavior in the following case:
+    #:
+    #: Recall that not all contents contains enough internal information about
+    #: which specification it is written for -- the JSON Schema ``{}``,
+    #: for instance, is valid under many different dialects and may be
+    #: interpreted as any one of them.
+    #:
+    #: When this method is used as an instance method (i.e. called on a
+    #: specific specification), that specification is used as the default
+    #: if the given contents are unidentifiable.
+    #:
+    #: On the other hand when called as a class method, an error is raised.
+    #:
+    #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
+    #: whereas the class method ``Specification.detect({})`` will raise an
+    #: error.
+    #:
+    #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
+    #: specification when given a schema which *does* identify as being for
+    #: another version).
+    #:
+    #: Raises:
+    #:
+    #:     `CannotDetermineSpecification`
+    #:
+    #:         if the given contents don't have any discernible
+    #:         information which could be used to guess which
+    #:         specification they identify as
+    detect = _SpecificationDetector()
+
+    def __repr__(self) -> str:
+        return f"<Specification name={self.name!r}>"
+
+    def anchors_in(self, contents: D):
+        """
+        Retrieve the anchors contained in the given document.
+        """
+        return self._anchors_in(self, contents)
+
+    def create_resource(self, contents: D) -> Resource[D]:
+        """
+        Create a resource which is interpreted using this specification.
+        """
+        return Resource(contents=contents, specification=self)
+
+
+Specification.OPAQUE = Specification(
+    name="opaque",
+    id_of=lambda contents: None,
+    subresources_of=lambda contents: [],
+    anchors_in=lambda specification, contents: [],
+    maybe_in_subresource=lambda segments, resolver, subresource: resolver,
+)
+
+
+@frozen
+class Resource(Generic[D]):
+    r"""
+    A document (deserialized JSON) with a concrete interpretation under a spec.
+
+    In other words, a Python object, along with an instance of `Specification`
+    which describes how the document interacts with referencing -- both
+    internally (how it refers to other `Resource`\ s) and externally (how it
+    should be identified such that it is referenceable by other documents).
+    """
+
+    contents: D
+    _specification: Specification[D] = field(alias="specification")
+
+    @classmethod
+    def from_contents(
+        cls,
+        contents: D,
+        default_specification: (
+            type[Specification[D]] | Specification[D]
+        ) = Specification,
+    ) -> Resource[D]:
+        """
+        Create a resource guessing which specification applies to the contents.
+
+        Raises:
+
+            `CannotDetermineSpecification`
+
+                if the given contents don't have any discernible
+                information which could be used to guess which
+                specification they identify as
+
+        """
+        specification = default_specification.detect(contents)
+        return specification.create_resource(contents=contents)
+
+    @classmethod
+    def opaque(cls, contents: D) -> Resource[D]:
+        """
+        Create an opaque `Resource` -- i.e. one with opaque specification.
+
+        See `Specification.OPAQUE` for details.
+        """
+        return Specification.OPAQUE.create_resource(contents=contents)
+
+    def id(self) -> URI | None:
+        """
+        Retrieve this resource's (specification-specific) identifier.
+        """
+        id = self._specification.id_of(self.contents)
+        if id is None:
+            return
+        return id.rstrip("#")
+
+    def subresources(self) -> Iterable[Resource[D]]:
+        """
+        Retrieve this resource's subresources.
+        """
+        return (
+            Resource.from_contents(
+                each,
+                default_specification=self._specification,
+            )
+            for each in self._specification.subresources_of(self.contents)
+        )
+
+    def anchors(self) -> Iterable[AnchorType[D]]:
+        """
+        Retrieve this resource's (specification-specific) identifier.
+        """
+        return self._specification.anchors_in(self.contents)
+
+    def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
+        """
+        Resolve the given JSON pointer.
+
+        Raises:
+
+            `exceptions.PointerToNowhere`
+
+                if the pointer points to a location not present in the document
+
+        """
+        if not pointer:
+            return Resolved(contents=self.contents, resolver=resolver)
+
+        contents = self.contents
+        segments: list[int | str] = []
+        for segment in unquote(pointer[1:]).split("/"):
+            if isinstance(contents, Sequence):
+                segment = int(segment)
+            else:
+                segment = segment.replace("~1", "/").replace("~0", "~")
+            try:
+                contents = contents[segment]  # type: ignore[reportUnknownArgumentType]
+            except LookupError as lookup_error:
+                error = exceptions.PointerToNowhere(ref=pointer, resource=self)
+                raise error from lookup_error
+
+            segments.append(segment)
+            last = resolver
+            resolver = self._specification.maybe_in_subresource(
+                segments=segments,
+                resolver=resolver,
+                subresource=self._specification.create_resource(contents),
+            )
+            if resolver is not last:
+                segments = []
+        return Resolved(contents=contents, resolver=resolver)  # type: ignore[reportUnknownArgumentType]
+
+
+def _fail_to_retrieve(uri: URI):
+    raise exceptions.NoSuchResource(ref=uri)
+
+
+@frozen
+class Registry(Mapping[URI, Resource[D]]):
+    r"""
+    A registry of `Resource`\ s, each identified by their canonical URIs.
+
+    Registries store a collection of in-memory resources, and optionally
+    enable additional resources which may be stored elsewhere (e.g. in a
+    database, a separate set of files, over the network, etc.).
+
+    They also lazily walk their known resources, looking for subresources
+    within them. In other words, subresources contained within any added
+    resources will be retrievable via their own IDs (though this discovery of
+    subresources will be delayed until necessary).
+
+    Registries are immutable, and their methods return new instances of the
+    registry with the additional resources added to them.
+
+    The ``retrieve`` argument can be used to configure retrieval of resources
+    dynamically, either over the network, from a database, or the like.
+    Pass it a callable which will be called if any URI not present in the
+    registry is accessed. It must either return a `Resource` or else raise a
+    `NoSuchResource` exception indicating that the resource does not exist
+    even according to the retrieval logic.
+    """
+
+    _resources: HashTrieMap[URI, Resource[D]] = field(
+        default=HashTrieMap(),
+        converter=HashTrieMap.convert,  # type: ignore[reportGeneralTypeIssues]
+        alias="resources",
+    )
+    _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
+    _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
+    _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
+
+    def __getitem__(self, uri: URI) -> Resource[D]:
+        """
+        Return the (already crawled) `Resource` identified by the given URI.
+        """
+        try:
+            return self._resources[uri.rstrip("#")]
+        except KeyError:
+            raise exceptions.NoSuchResource(ref=uri) from None
+
+    def __iter__(self) -> Iterator[URI]:
+        """
+        Iterate over all crawled URIs in the registry.
+        """
+        return iter(self._resources)
+
+    def __len__(self) -> int:
+        """
+        Count the total number of fully crawled resources in this registry.
+        """
+        return len(self._resources)
+
+    def __rmatmul__(
+        self,
+        new: Resource[D] | Iterable[Resource[D]],
+    ) -> Registry[D]:
+        """
+        Create a new registry with resource(s) added using their internal IDs.
+
+        Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
+        modern JSON Schema versions), otherwise an error will be raised.
+
+        Both a single resource as well as an iterable of resources works, i.e.:
+
+            * ``resource @ registry`` or
+
+            * ``[iterable, of, multiple, resources] @ registry``
+
+        which -- again, assuming the resources have internal IDs -- is
+        equivalent to calling `Registry.with_resources` as such:
+
+        .. code:: python
+
+            registry.with_resources(
+                (resource.id(), resource) for resource in new_resources
+            )
+
+        Raises:
+
+            `NoInternalID`
+
+                if the resource(s) in fact do not have IDs
+
+        """
+        if isinstance(new, Resource):
+            new = (new,)
+
+        resources = self._resources
+        uncrawled = self._uncrawled
+        for resource in new:
+            id = resource.id()
+            if id is None:
+                raise exceptions.NoInternalID(resource=resource)
+            uncrawled = uncrawled.insert(id)
+            resources = resources.insert(id, resource)
+        return evolve(self, resources=resources, uncrawled=uncrawled)
+
+    def __repr__(self) -> str:
+        size = len(self)
+        pluralized = "resource" if size == 1 else "resources"
+        if self._uncrawled:
+            uncrawled = len(self._uncrawled)
+            if uncrawled == size:
+                summary = f"uncrawled {pluralized}"
+            else:
+                summary = f"{pluralized}, {uncrawled} uncrawled"
+        else:
+            summary = f"{pluralized}"
+        return f"<Registry ({size} {summary})>"
+
+    def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
+        """
+        Get a resource from the registry, crawling or retrieving if necessary.
+
+        May involve crawling to find the given URI if it is not already known,
+        so the returned object is a `Retrieved` object which contains both the
+        resource value as well as the registry which ultimately contained it.
+        """
+        resource = self._resources.get(uri)
+        if resource is not None:
+            return Retrieved(registry=self, value=resource)
+
+        registry = self.crawl()
+        resource = registry._resources.get(uri)
+        if resource is not None:
+            return Retrieved(registry=registry, value=resource)
+
+        try:
+            resource = registry._retrieve(uri)
+        except (
+            exceptions.CannotDetermineSpecification,
+            exceptions.NoSuchResource,
+        ):
+            raise
+        except Exception as error:
+            raise exceptions.Unretrievable(ref=uri) from error
+        else:
+            registry = registry.with_resource(uri, resource)
+            return Retrieved(registry=registry, value=resource)
+
+    def remove(self, uri: URI):
+        """
+        Return a registry with the resource identified by a given URI removed.
+        """
+        if uri not in self._resources:
+            raise exceptions.NoSuchResource(ref=uri)
+
+        return evolve(
+            self,
+            resources=self._resources.remove(uri),
+            uncrawled=self._uncrawled.discard(uri),
+            anchors=HashTrieMap(
+                (k, v) for k, v in self._anchors.items() if k[0] != uri
+            ),
+        )
+
+    def anchor(self, uri: URI, name: str):
+        """
+        Retrieve a given anchor from a resource which must already be crawled.
+        """
+        value = self._anchors.get((uri, name))
+        if value is not None:
+            return Retrieved(value=value, registry=self)
+
+        registry = self.crawl()
+        value = registry._anchors.get((uri, name))
+        if value is not None:
+            return Retrieved(value=value, registry=registry)
+
+        resource = self[uri]
+        canonical_uri = resource.id()
+        if canonical_uri is not None:
+            value = registry._anchors.get((canonical_uri, name))
+            if value is not None:
+                return Retrieved(value=value, registry=registry)
+
+        if "/" in name:
+            raise exceptions.InvalidAnchor(
+                ref=uri,
+                resource=resource,
+                anchor=name,
+            )
+        raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
+
+    def contents(self, uri: URI) -> D:
+        """
+        Retrieve the (already crawled) contents identified by the given URI.
+        """
+        return self[uri].contents
+
+    def crawl(self) -> Registry[D]:
+        """
+        Crawl all added resources, discovering subresources.
+        """
+        resources = self._resources
+        anchors = self._anchors
+        uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
+        while uncrawled:
+            uri, resource = uncrawled.pop()
+
+            id = resource.id()
+            if id is not None:
+                uri = urljoin(uri, id)
+                resources = resources.insert(uri, resource)
+            for each in resource.anchors():
+                anchors = anchors.insert((uri, each.name), each)
+            uncrawled.extend((uri, each) for each in resource.subresources())
+        return evolve(
+            self,
+            resources=resources,
+            anchors=anchors,
+            uncrawled=EMPTY_UNCRAWLED,
+        )
+
+    def with_resource(self, uri: URI, resource: Resource[D]):
+        """
+        Add the given `Resource` to the registry, without crawling it.
+        """
+        return self.with_resources([(uri, resource)])
+
+    def with_resources(
+        self,
+        pairs: Iterable[tuple[URI, Resource[D]]],
+    ) -> Registry[D]:
+        r"""
+        Add the given `Resource`\ s to the registry, without crawling them.
+        """
+        resources = self._resources
+        uncrawled = self._uncrawled
+        for uri, resource in pairs:
+            # Empty fragment URIs are equivalent to URIs without the fragment.
+            # TODO: Is this true for non JSON Schema resources? Probably not.
+            uri = uri.rstrip("#")
+            uncrawled = uncrawled.insert(uri)
+            resources = resources.insert(uri, resource)
+        return evolve(self, resources=resources, uncrawled=uncrawled)
+
+    def with_contents(
+        self,
+        pairs: Iterable[tuple[URI, D]],
+        **kwargs: Any,
+    ) -> Registry[D]:
+        r"""
+        Add the given contents to the registry, autodetecting when necessary.
+        """
+        return self.with_resources(
+            (uri, Resource.from_contents(each, **kwargs))
+            for uri, each in pairs
+        )
+
+    def combine(self, *registries: Registry[D]) -> Registry[D]:
+        """
+        Combine together one or more other registries, producing a unified one.
+        """
+        if registries == (self,):
+            return self
+        resources = self._resources
+        anchors = self._anchors
+        uncrawled = self._uncrawled
+        retrieve = self._retrieve
+        for registry in registries:
+            resources = resources.update(registry._resources)
+            anchors = anchors.update(registry._anchors)
+            uncrawled = uncrawled.update(registry._uncrawled)
+
+            if registry._retrieve is not _fail_to_retrieve:  # type: ignore[reportUnnecessaryComparison] ???
+                if registry._retrieve is not retrieve is not _fail_to_retrieve:  # type: ignore[reportUnnecessaryComparison] ???
+                    raise ValueError(  # noqa: TRY003
+                        "Cannot combine registries with conflicting retrieval "
+                        "functions.",
+                    )
+                retrieve = registry._retrieve
+        return evolve(
+            self,
+            anchors=anchors,
+            resources=resources,
+            uncrawled=uncrawled,
+            retrieve=retrieve,
+        )
+
+    def resolver(self, base_uri: URI = "") -> Resolver[D]:
+        """
+        Return a `Resolver` which resolves references against this registry.
+        """
+        return Resolver(base_uri=base_uri, registry=self)
+
+    def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
+        """
+        Return a `Resolver` with a specific root resource.
+        """
+        uri = resource.id() or ""
+        return Resolver(
+            base_uri=uri,
+            registry=self.with_resource(uri, resource),
+        )
+
+
+#: An anchor or resource.
+AnchorOrResource = TypeVar(
+    "AnchorOrResource",
+    AnchorType[Any],
+    Resource[Any],
+    default=Resource[Any],
+)
+
+
+@frozen
+class Retrieved(Generic[D, AnchorOrResource]):
+    """
+    A value retrieved from a `Registry`.
+    """
+
+    value: AnchorOrResource
+    registry: Registry[D]
+
+
+@frozen
+class Resolved(Generic[D]):
+    """
+    A reference resolved to its contents by a `Resolver`.
+    """
+
+    contents: D
+    resolver: Resolver[D]
+
+
+@frozen
+class Resolver(Generic[D]):
+    """
+    A reference resolver.
+
+    Resolvers help resolve references (including relative ones) by
+    pairing a fixed base URI with a `Registry`.
+
+    This object, under normal circumstances, is expected to be used by
+    *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
+    implementations or other libraries resolving JSON references),
+    not directly by end-users populating registries or while writing
+    schemas or other resources.
+
+    References are resolved against the base URI, and the combined URI
+    is then looked up within the registry.
+
+    The process of resolving a reference may itself involve calculating
+    a *new* base URI for future reference resolution (e.g. if an
+    intermediate resource sets a new base URI), or may involve encountering
+    additional subresources and adding them to a new registry.
+    """
+
+    _base_uri: URI = field(alias="base_uri")
+    _registry: Registry[D] = field(alias="registry")
+    _previous: List[URI] = field(default=List(), repr=False, alias="previous")
+
+    def lookup(self, ref: URI) -> Resolved[D]:
+        """
+        Resolve the given reference to the resource it points to.
+
+        Raises:
+
+            `exceptions.Unresolvable`
+
+                or a subclass thereof (see below) if the reference isn't
+                resolvable
+
+            `exceptions.NoSuchAnchor`
+
+                if the reference is to a URI where a resource exists but
+                contains a plain name fragment which does not exist within
+                the resource
+
+            `exceptions.PointerToNowhere`
+
+                if the reference is to a URI where a resource exists but
+                contains a JSON pointer to a location within the resource
+                that does not exist
+
+        """
+        if ref.startswith("#"):
+            uri, fragment = self._base_uri, ref[1:]
+        else:
+            uri, fragment = urldefrag(urljoin(self._base_uri, ref))
+        try:
+            retrieved = self._registry.get_or_retrieve(uri)
+        except exceptions.NoSuchResource:
+            raise exceptions.Unresolvable(ref=ref) from None
+        except exceptions.Unretrievable as error:
+            raise exceptions.Unresolvable(ref=ref) from error
+
+        if fragment.startswith("/"):
+            resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+            return retrieved.value.pointer(pointer=fragment, resolver=resolver)
+
+        if fragment:
+            retrieved = retrieved.registry.anchor(uri, fragment)
+            resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+            return retrieved.value.resolve(resolver=resolver)
+
+        resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+        return Resolved(contents=retrieved.value.contents, resolver=resolver)
+
+    def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
+        """
+        Create a resolver for a subresource (which may have a new base URI).
+        """
+        id = subresource.id()
+        if id is None:
+            return self
+        return evolve(self, base_uri=urljoin(self._base_uri, id))
+
+    def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
+        """
+        In specs with such a notion, return the URIs in the dynamic scope.
+        """
+        for uri in self._previous:
+            yield uri, self._registry
+
+    def _evolve(self, base_uri: URI, **kwargs: Any):
+        """
+        Evolve, appending to the dynamic scope.
+        """
+        previous = self._previous
+        if self._base_uri and (not previous or base_uri != self._base_uri):
+            previous = previous.push_front(self._base_uri)
+        return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
+
+
+@frozen
+class Anchor(Generic[D]):
+    """
+    A simple anchor in a `Resource`.
+    """
+
+    name: str
+    resource: Resource[D]
+
+    def resolve(self, resolver: Resolver[D]):
+        """
+        Return the resource for this anchor.
+        """
+        return Resolved(contents=self.resource.contents, resolver=resolver)