diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/referencing')
15 files changed, 3402 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/referencing/__init__.py b/.venv/lib/python3.12/site-packages/referencing/__init__.py new file mode 100644 index 00000000..e09207d7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/__init__.py @@ -0,0 +1,7 @@ +""" +Cross-specification, implementation-agnostic JSON referencing. +""" + +from referencing._core import Anchor, Registry, Resource, Specification + +__all__ = ["Anchor", "Registry", "Resource", "Specification"] diff --git a/.venv/lib/python3.12/site-packages/referencing/_attrs.py b/.venv/lib/python3.12/site-packages/referencing/_attrs.py new file mode 100644 index 00000000..ae85b865 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/_attrs.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import NoReturn, TypeVar + +from attrs import define as _define, frozen as _frozen + +_T = TypeVar("_T") + + +def define(cls: type[_T]) -> type[_T]: # pragma: no cover + cls.__init_subclass__ = _do_not_subclass + return _define(cls) + + +def frozen(cls: type[_T]) -> type[_T]: + cls.__init_subclass__ = _do_not_subclass + return _frozen(cls) + + +class UnsupportedSubclassing(Exception): + def __str__(self): + return ( + "Subclassing is not part of referencing's public API. " + "If no other suitable API exists for what you're trying to do, " + "feel free to file an issue asking for one." + ) + + +@staticmethod +def _do_not_subclass() -> NoReturn: # pragma: no cover + raise UnsupportedSubclassing() diff --git a/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi b/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi new file mode 100644 index 00000000..278e4109 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi @@ -0,0 +1,20 @@ +from typing import Any, Callable, TypeVar, Union + +from attr import attrib, field + +class UnsupportedSubclassing(Exception): ... + +_T = TypeVar("_T") + +def __dataclass_transform__( + *, + frozen_default: bool = False, + field_descriptors: tuple[Union[type, Callable[..., Any]], ...] = ..., +) -> Callable[[_T], _T]: ... +@__dataclass_transform__(field_descriptors=(attrib, field)) +def define(cls: type[_T]) -> type[_T]: ... +@__dataclass_transform__( + frozen_default=True, + field_descriptors=(attrib, field), +) +def frozen(cls: type[_T]) -> type[_T]: ... diff --git a/.venv/lib/python3.12/site-packages/referencing/_core.py b/.venv/lib/python3.12/site-packages/referencing/_core.py new file mode 100644 index 00000000..ec2d51bd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/_core.py @@ -0,0 +1,739 @@ +from __future__ import annotations + +from collections.abc import Iterable, Iterator, Sequence +from enum import Enum +from typing import Any, Callable, ClassVar, Generic, Protocol +from urllib.parse import unquote, urldefrag, urljoin + +from attrs import evolve, field +from rpds import HashTrieMap, HashTrieSet, List + +try: + from typing_extensions import TypeVar +except ImportError: # pragma: no cover + from typing import TypeVar + +from referencing import exceptions +from referencing._attrs import frozen +from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve + +EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet() +EMPTY_PREVIOUS_RESOLVERS: List[URI] = List() + + +class _Unset(Enum): + """ + What sillyness... + """ + + SENTINEL = 1 + + +_UNSET = _Unset.SENTINEL + + +class _MaybeInSubresource(Protocol[D]): + def __call__( + self, + segments: Sequence[int | str], + resolver: Resolver[D], + subresource: Resource[D], + ) -> Resolver[D]: ... + + +def _detect_or_error(contents: D) -> Specification[D]: + if not isinstance(contents, Mapping): + raise exceptions.CannotDetermineSpecification(contents) + + jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] + if not isinstance(jsonschema_dialect_id, str): + raise exceptions.CannotDetermineSpecification(contents) + + from referencing.jsonschema import specification_with + + return specification_with(jsonschema_dialect_id) + + +def _detect_or_default( + default: Specification[D], +) -> Callable[[D], Specification[D]]: + def _detect(contents: D) -> Specification[D]: + if not isinstance(contents, Mapping): + return default + + jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] + if jsonschema_dialect_id is None: + return default + + from referencing.jsonschema import specification_with + + return specification_with( + jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType] + default=default, + ) + + return _detect + + +class _SpecificationDetector: + def __get__( + self, + instance: Specification[D] | None, + cls: type[Specification[D]], + ) -> Callable[[D], Specification[D]]: + if instance is None: + return _detect_or_error + else: + return _detect_or_default(instance) + + +@frozen +class Specification(Generic[D]): + """ + A specification which defines referencing behavior. + + The various methods of a `Specification` allow for varying referencing + behavior across JSON Schema specification versions, etc. + """ + + #: A short human-readable name for the specification, used for debugging. + name: str + + #: Find the ID of a given document. + id_of: Callable[[D], URI | None] + + #: Retrieve the subresources of the given document (without traversing into + #: the subresources themselves). + subresources_of: Callable[[D], Iterable[D]] + + #: While resolving a JSON pointer, conditionally enter a subresource + #: (if e.g. we have just entered a keyword whose value is a subresource) + maybe_in_subresource: _MaybeInSubresource[D] + + #: Retrieve the anchors contained in the given document. + _anchors_in: Callable[ + [Specification[D], D], + Iterable[AnchorType[D]], + ] = field(alias="anchors_in") + + #: An opaque specification where resources have no subresources + #: nor internal identifiers. + OPAQUE: ClassVar[Specification[Any]] + + #: Attempt to discern which specification applies to the given contents. + #: + #: May be called either as an instance method or as a class method, with + #: slightly different behavior in the following case: + #: + #: Recall that not all contents contains enough internal information about + #: which specification it is written for -- the JSON Schema ``{}``, + #: for instance, is valid under many different dialects and may be + #: interpreted as any one of them. + #: + #: When this method is used as an instance method (i.e. called on a + #: specific specification), that specification is used as the default + #: if the given contents are unidentifiable. + #: + #: On the other hand when called as a class method, an error is raised. + #: + #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012`` + #: whereas the class method ``Specification.detect({})`` will raise an + #: error. + #: + #: (Note that of course ``DRAFT202012.detect(...)`` may return some other + #: specification when given a schema which *does* identify as being for + #: another version). + #: + #: Raises: + #: + #: `CannotDetermineSpecification` + #: + #: if the given contents don't have any discernible + #: information which could be used to guess which + #: specification they identify as + detect = _SpecificationDetector() + + def __repr__(self) -> str: + return f"<Specification name={self.name!r}>" + + def anchors_in(self, contents: D): + """ + Retrieve the anchors contained in the given document. + """ + return self._anchors_in(self, contents) + + def create_resource(self, contents: D) -> Resource[D]: + """ + Create a resource which is interpreted using this specification. + """ + return Resource(contents=contents, specification=self) + + +Specification.OPAQUE = Specification( + name="opaque", + id_of=lambda contents: None, + subresources_of=lambda contents: [], + anchors_in=lambda specification, contents: [], + maybe_in_subresource=lambda segments, resolver, subresource: resolver, +) + + +@frozen +class Resource(Generic[D]): + r""" + A document (deserialized JSON) with a concrete interpretation under a spec. + + In other words, a Python object, along with an instance of `Specification` + which describes how the document interacts with referencing -- both + internally (how it refers to other `Resource`\ s) and externally (how it + should be identified such that it is referenceable by other documents). + """ + + contents: D + _specification: Specification[D] = field(alias="specification") + + @classmethod + def from_contents( + cls, + contents: D, + default_specification: ( + type[Specification[D]] | Specification[D] + ) = Specification, + ) -> Resource[D]: + """ + Create a resource guessing which specification applies to the contents. + + Raises: + + `CannotDetermineSpecification` + + if the given contents don't have any discernible + information which could be used to guess which + specification they identify as + + """ + specification = default_specification.detect(contents) + return specification.create_resource(contents=contents) + + @classmethod + def opaque(cls, contents: D) -> Resource[D]: + """ + Create an opaque `Resource` -- i.e. one with opaque specification. + + See `Specification.OPAQUE` for details. + """ + return Specification.OPAQUE.create_resource(contents=contents) + + def id(self) -> URI | None: + """ + Retrieve this resource's (specification-specific) identifier. + """ + id = self._specification.id_of(self.contents) + if id is None: + return + return id.rstrip("#") + + def subresources(self) -> Iterable[Resource[D]]: + """ + Retrieve this resource's subresources. + """ + return ( + Resource.from_contents( + each, + default_specification=self._specification, + ) + for each in self._specification.subresources_of(self.contents) + ) + + def anchors(self) -> Iterable[AnchorType[D]]: + """ + Retrieve this resource's (specification-specific) identifier. + """ + return self._specification.anchors_in(self.contents) + + def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]: + """ + Resolve the given JSON pointer. + + Raises: + + `exceptions.PointerToNowhere` + + if the pointer points to a location not present in the document + + """ + if not pointer: + return Resolved(contents=self.contents, resolver=resolver) + + contents = self.contents + segments: list[int | str] = [] + for segment in unquote(pointer[1:]).split("/"): + if isinstance(contents, Sequence): + segment = int(segment) + else: + segment = segment.replace("~1", "/").replace("~0", "~") + try: + contents = contents[segment] # type: ignore[reportUnknownArgumentType] + except LookupError as lookup_error: + error = exceptions.PointerToNowhere(ref=pointer, resource=self) + raise error from lookup_error + + segments.append(segment) + last = resolver + resolver = self._specification.maybe_in_subresource( + segments=segments, + resolver=resolver, + subresource=self._specification.create_resource(contents), + ) + if resolver is not last: + segments = [] + return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType] + + +def _fail_to_retrieve(uri: URI): + raise exceptions.NoSuchResource(ref=uri) + + +@frozen +class Registry(Mapping[URI, Resource[D]]): + r""" + A registry of `Resource`\ s, each identified by their canonical URIs. + + Registries store a collection of in-memory resources, and optionally + enable additional resources which may be stored elsewhere (e.g. in a + database, a separate set of files, over the network, etc.). + + They also lazily walk their known resources, looking for subresources + within them. In other words, subresources contained within any added + resources will be retrievable via their own IDs (though this discovery of + subresources will be delayed until necessary). + + Registries are immutable, and their methods return new instances of the + registry with the additional resources added to them. + + The ``retrieve`` argument can be used to configure retrieval of resources + dynamically, either over the network, from a database, or the like. + Pass it a callable which will be called if any URI not present in the + registry is accessed. It must either return a `Resource` or else raise a + `NoSuchResource` exception indicating that the resource does not exist + even according to the retrieval logic. + """ + + _resources: HashTrieMap[URI, Resource[D]] = field( + default=HashTrieMap(), + converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues] + alias="resources", + ) + _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap() + _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED + _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve") + + def __getitem__(self, uri: URI) -> Resource[D]: + """ + Return the (already crawled) `Resource` identified by the given URI. + """ + try: + return self._resources[uri.rstrip("#")] + except KeyError: + raise exceptions.NoSuchResource(ref=uri) from None + + def __iter__(self) -> Iterator[URI]: + """ + Iterate over all crawled URIs in the registry. + """ + return iter(self._resources) + + def __len__(self) -> int: + """ + Count the total number of fully crawled resources in this registry. + """ + return len(self._resources) + + def __rmatmul__( + self, + new: Resource[D] | Iterable[Resource[D]], + ) -> Registry[D]: + """ + Create a new registry with resource(s) added using their internal IDs. + + Resources must have a internal IDs (e.g. the :kw:`$id` keyword in + modern JSON Schema versions), otherwise an error will be raised. + + Both a single resource as well as an iterable of resources works, i.e.: + + * ``resource @ registry`` or + + * ``[iterable, of, multiple, resources] @ registry`` + + which -- again, assuming the resources have internal IDs -- is + equivalent to calling `Registry.with_resources` as such: + + .. code:: python + + registry.with_resources( + (resource.id(), resource) for resource in new_resources + ) + + Raises: + + `NoInternalID` + + if the resource(s) in fact do not have IDs + + """ + if isinstance(new, Resource): + new = (new,) + + resources = self._resources + uncrawled = self._uncrawled + for resource in new: + id = resource.id() + if id is None: + raise exceptions.NoInternalID(resource=resource) + uncrawled = uncrawled.insert(id) + resources = resources.insert(id, resource) + return evolve(self, resources=resources, uncrawled=uncrawled) + + def __repr__(self) -> str: + size = len(self) + pluralized = "resource" if size == 1 else "resources" + if self._uncrawled: + uncrawled = len(self._uncrawled) + if uncrawled == size: + summary = f"uncrawled {pluralized}" + else: + summary = f"{pluralized}, {uncrawled} uncrawled" + else: + summary = f"{pluralized}" + return f"<Registry ({size} {summary})>" + + def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]: + """ + Get a resource from the registry, crawling or retrieving if necessary. + + May involve crawling to find the given URI if it is not already known, + so the returned object is a `Retrieved` object which contains both the + resource value as well as the registry which ultimately contained it. + """ + resource = self._resources.get(uri) + if resource is not None: + return Retrieved(registry=self, value=resource) + + registry = self.crawl() + resource = registry._resources.get(uri) + if resource is not None: + return Retrieved(registry=registry, value=resource) + + try: + resource = registry._retrieve(uri) + except ( + exceptions.CannotDetermineSpecification, + exceptions.NoSuchResource, + ): + raise + except Exception as error: + raise exceptions.Unretrievable(ref=uri) from error + else: + registry = registry.with_resource(uri, resource) + return Retrieved(registry=registry, value=resource) + + def remove(self, uri: URI): + """ + Return a registry with the resource identified by a given URI removed. + """ + if uri not in self._resources: + raise exceptions.NoSuchResource(ref=uri) + + return evolve( + self, + resources=self._resources.remove(uri), + uncrawled=self._uncrawled.discard(uri), + anchors=HashTrieMap( + (k, v) for k, v in self._anchors.items() if k[0] != uri + ), + ) + + def anchor(self, uri: URI, name: str): + """ + Retrieve a given anchor from a resource which must already be crawled. + """ + value = self._anchors.get((uri, name)) + if value is not None: + return Retrieved(value=value, registry=self) + + registry = self.crawl() + value = registry._anchors.get((uri, name)) + if value is not None: + return Retrieved(value=value, registry=registry) + + resource = self[uri] + canonical_uri = resource.id() + if canonical_uri is not None: + value = registry._anchors.get((canonical_uri, name)) + if value is not None: + return Retrieved(value=value, registry=registry) + + if "/" in name: + raise exceptions.InvalidAnchor( + ref=uri, + resource=resource, + anchor=name, + ) + raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name) + + def contents(self, uri: URI) -> D: + """ + Retrieve the (already crawled) contents identified by the given URI. + """ + return self[uri].contents + + def crawl(self) -> Registry[D]: + """ + Crawl all added resources, discovering subresources. + """ + resources = self._resources + anchors = self._anchors + uncrawled = [(uri, resources[uri]) for uri in self._uncrawled] + while uncrawled: + uri, resource = uncrawled.pop() + + id = resource.id() + if id is not None: + uri = urljoin(uri, id) + resources = resources.insert(uri, resource) + for each in resource.anchors(): + anchors = anchors.insert((uri, each.name), each) + uncrawled.extend((uri, each) for each in resource.subresources()) + return evolve( + self, + resources=resources, + anchors=anchors, + uncrawled=EMPTY_UNCRAWLED, + ) + + def with_resource(self, uri: URI, resource: Resource[D]): + """ + Add the given `Resource` to the registry, without crawling it. + """ + return self.with_resources([(uri, resource)]) + + def with_resources( + self, + pairs: Iterable[tuple[URI, Resource[D]]], + ) -> Registry[D]: + r""" + Add the given `Resource`\ s to the registry, without crawling them. + """ + resources = self._resources + uncrawled = self._uncrawled + for uri, resource in pairs: + # Empty fragment URIs are equivalent to URIs without the fragment. + # TODO: Is this true for non JSON Schema resources? Probably not. + uri = uri.rstrip("#") + uncrawled = uncrawled.insert(uri) + resources = resources.insert(uri, resource) + return evolve(self, resources=resources, uncrawled=uncrawled) + + def with_contents( + self, + pairs: Iterable[tuple[URI, D]], + **kwargs: Any, + ) -> Registry[D]: + r""" + Add the given contents to the registry, autodetecting when necessary. + """ + return self.with_resources( + (uri, Resource.from_contents(each, **kwargs)) + for uri, each in pairs + ) + + def combine(self, *registries: Registry[D]) -> Registry[D]: + """ + Combine together one or more other registries, producing a unified one. + """ + if registries == (self,): + return self + resources = self._resources + anchors = self._anchors + uncrawled = self._uncrawled + retrieve = self._retrieve + for registry in registries: + resources = resources.update(registry._resources) + anchors = anchors.update(registry._anchors) + uncrawled = uncrawled.update(registry._uncrawled) + + if registry._retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ??? + if registry._retrieve is not retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ??? + raise ValueError( # noqa: TRY003 + "Cannot combine registries with conflicting retrieval " + "functions.", + ) + retrieve = registry._retrieve + return evolve( + self, + anchors=anchors, + resources=resources, + uncrawled=uncrawled, + retrieve=retrieve, + ) + + def resolver(self, base_uri: URI = "") -> Resolver[D]: + """ + Return a `Resolver` which resolves references against this registry. + """ + return Resolver(base_uri=base_uri, registry=self) + + def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]: + """ + Return a `Resolver` with a specific root resource. + """ + uri = resource.id() or "" + return Resolver( + base_uri=uri, + registry=self.with_resource(uri, resource), + ) + + +#: An anchor or resource. +AnchorOrResource = TypeVar( + "AnchorOrResource", + AnchorType[Any], + Resource[Any], + default=Resource[Any], +) + + +@frozen +class Retrieved(Generic[D, AnchorOrResource]): + """ + A value retrieved from a `Registry`. + """ + + value: AnchorOrResource + registry: Registry[D] + + +@frozen +class Resolved(Generic[D]): + """ + A reference resolved to its contents by a `Resolver`. + """ + + contents: D + resolver: Resolver[D] + + +@frozen +class Resolver(Generic[D]): + """ + A reference resolver. + + Resolvers help resolve references (including relative ones) by + pairing a fixed base URI with a `Registry`. + + This object, under normal circumstances, is expected to be used by + *implementers of libraries* built on top of `referencing` (e.g. JSON Schema + implementations or other libraries resolving JSON references), + not directly by end-users populating registries or while writing + schemas or other resources. + + References are resolved against the base URI, and the combined URI + is then looked up within the registry. + + The process of resolving a reference may itself involve calculating + a *new* base URI for future reference resolution (e.g. if an + intermediate resource sets a new base URI), or may involve encountering + additional subresources and adding them to a new registry. + """ + + _base_uri: URI = field(alias="base_uri") + _registry: Registry[D] = field(alias="registry") + _previous: List[URI] = field(default=List(), repr=False, alias="previous") + + def lookup(self, ref: URI) -> Resolved[D]: + """ + Resolve the given reference to the resource it points to. + + Raises: + + `exceptions.Unresolvable` + + or a subclass thereof (see below) if the reference isn't + resolvable + + `exceptions.NoSuchAnchor` + + if the reference is to a URI where a resource exists but + contains a plain name fragment which does not exist within + the resource + + `exceptions.PointerToNowhere` + + if the reference is to a URI where a resource exists but + contains a JSON pointer to a location within the resource + that does not exist + + """ + if ref.startswith("#"): + uri, fragment = self._base_uri, ref[1:] + else: + uri, fragment = urldefrag(urljoin(self._base_uri, ref)) + try: + retrieved = self._registry.get_or_retrieve(uri) + except exceptions.NoSuchResource: + raise exceptions.Unresolvable(ref=ref) from None + except exceptions.Unretrievable as error: + raise exceptions.Unresolvable(ref=ref) from error + + if fragment.startswith("/"): + resolver = self._evolve(registry=retrieved.registry, base_uri=uri) + return retrieved.value.pointer(pointer=fragment, resolver=resolver) + + if fragment: + retrieved = retrieved.registry.anchor(uri, fragment) + resolver = self._evolve(registry=retrieved.registry, base_uri=uri) + return retrieved.value.resolve(resolver=resolver) + + resolver = self._evolve(registry=retrieved.registry, base_uri=uri) + return Resolved(contents=retrieved.value.contents, resolver=resolver) + + def in_subresource(self, subresource: Resource[D]) -> Resolver[D]: + """ + Create a resolver for a subresource (which may have a new base URI). + """ + id = subresource.id() + if id is None: + return self + return evolve(self, base_uri=urljoin(self._base_uri, id)) + + def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]: + """ + In specs with such a notion, return the URIs in the dynamic scope. + """ + for uri in self._previous: + yield uri, self._registry + + def _evolve(self, base_uri: URI, **kwargs: Any): + """ + Evolve, appending to the dynamic scope. + """ + previous = self._previous + if self._base_uri and (not previous or base_uri != self._base_uri): + previous = previous.push_front(self._base_uri) + return evolve(self, base_uri=base_uri, previous=previous, **kwargs) + + +@frozen +class Anchor(Generic[D]): + """ + A simple anchor in a `Resource`. + """ + + name: str + resource: Resource[D] + + def resolve(self, resolver: Resolver[D]): + """ + Return the resource for this anchor. + """ + return Resolved(contents=self.resource.contents, resolver=resolver) diff --git a/.venv/lib/python3.12/site-packages/referencing/exceptions.py b/.venv/lib/python3.12/site-packages/referencing/exceptions.py new file mode 100644 index 00000000..3267fc70 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/exceptions.py @@ -0,0 +1,165 @@ +""" +Errors, oh no! +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import attrs + +from referencing._attrs import frozen + +if TYPE_CHECKING: + from referencing import Resource + from referencing.typing import URI + + +@frozen +class NoSuchResource(KeyError): + """ + The given URI is not present in a registry. + + Unlike most exceptions, this class *is* intended to be publicly + instantiable and *is* part of the public API of the package. + """ + + ref: URI + + def __eq__(self, other: object) -> bool: + if self.__class__ is not other.__class__: + return NotImplemented + return attrs.astuple(self) == attrs.astuple(other) + + def __hash__(self) -> int: + return hash(attrs.astuple(self)) + + +@frozen +class NoInternalID(Exception): + """ + A resource has no internal ID, but one is needed. + + E.g. in modern JSON Schema drafts, this is the :kw:`$id` keyword. + + One might be needed if a resource was to-be added to a registry but no + other URI is available, and the resource doesn't declare its canonical URI. + """ + + resource: Resource[Any] + + def __eq__(self, other: object) -> bool: + if self.__class__ is not other.__class__: + return NotImplemented + return attrs.astuple(self) == attrs.astuple(other) + + def __hash__(self) -> int: + return hash(attrs.astuple(self)) + + +@frozen +class Unretrievable(KeyError): + """ + The given URI is not present in a registry, and retrieving it failed. + """ + + ref: URI + + def __eq__(self, other: object) -> bool: + if self.__class__ is not other.__class__: + return NotImplemented + return attrs.astuple(self) == attrs.astuple(other) + + def __hash__(self) -> int: + return hash(attrs.astuple(self)) + + +@frozen +class CannotDetermineSpecification(Exception): + """ + Attempting to detect the appropriate `Specification` failed. + + This happens if no discernible information is found in the contents of the + new resource which would help identify it. + """ + + contents: Any + + def __eq__(self, other: object) -> bool: + if self.__class__ is not other.__class__: + return NotImplemented + return attrs.astuple(self) == attrs.astuple(other) + + def __hash__(self) -> int: + return hash(attrs.astuple(self)) + + +@attrs.frozen # Because here we allow subclassing below. +class Unresolvable(Exception): + """ + A reference was unresolvable. + """ + + ref: URI + + def __eq__(self, other: object) -> bool: + if self.__class__ is not other.__class__: + return NotImplemented + return attrs.astuple(self) == attrs.astuple(other) + + def __hash__(self) -> int: + return hash(attrs.astuple(self)) + + +@frozen +class PointerToNowhere(Unresolvable): + """ + A JSON Pointer leads to a part of a document that does not exist. + """ + + resource: Resource[Any] + + def __str__(self) -> str: + msg = f"{self.ref!r} does not exist within {self.resource.contents!r}" + if self.ref == "/": + msg += ( + ". The pointer '/' is a valid JSON Pointer but it points to " + "an empty string property ''. If you intended to point " + "to the entire resource, you should use '#'." + ) + return msg + + +@frozen +class NoSuchAnchor(Unresolvable): + """ + An anchor does not exist within a particular resource. + """ + + resource: Resource[Any] + anchor: str + + def __str__(self) -> str: + return ( + f"{self.anchor!r} does not exist within {self.resource.contents!r}" + ) + + +@frozen +class InvalidAnchor(Unresolvable): + """ + An anchor which could never exist in a resource was dereferenced. + + It is somehow syntactically invalid. + """ + + resource: Resource[Any] + anchor: str + + def __str__(self) -> str: + return ( + f"'#{self.anchor}' is not a valid anchor, neither as a " + "plain name anchor nor as a JSON Pointer. You may have intended " + f"to use '#/{self.anchor}', as the slash is required *before each " + "segment* of a JSON pointer." + ) diff --git a/.venv/lib/python3.12/site-packages/referencing/jsonschema.py b/.venv/lib/python3.12/site-packages/referencing/jsonschema.py new file mode 100644 index 00000000..169e109d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/jsonschema.py @@ -0,0 +1,642 @@ +""" +Referencing implementations for JSON Schema specs (historic & current). +""" + +from __future__ import annotations + +from collections.abc import Iterable, Sequence, Set +from typing import Any, Union + +from referencing import Anchor, Registry, Resource, Specification, exceptions +from referencing._attrs import frozen +from referencing._core import ( + _UNSET, # type: ignore[reportPrivateUsage] + Resolved as _Resolved, + Resolver as _Resolver, + _Unset, # type: ignore[reportPrivateUsage] +) +from referencing.typing import URI, Anchor as AnchorType, Mapping + +#: A JSON Schema which is a JSON object +ObjectSchema = Mapping[str, Any] + +#: A JSON Schema of any kind +Schema = Union[bool, ObjectSchema] + +#: A Resource whose contents are JSON Schemas +SchemaResource = Resource[Schema] + +#: A JSON Schema Registry +SchemaRegistry = Registry[Schema] + +#: The empty JSON Schema Registry +EMPTY_REGISTRY: SchemaRegistry = Registry() + + +@frozen +class UnknownDialect(Exception): + """ + A dialect identifier was found for a dialect unknown by this library. + + If it's a custom ("unofficial") dialect, be sure you've registered it. + """ + + uri: URI + + +def _dollar_id(contents: Schema) -> URI | None: + if isinstance(contents, bool): + return + return contents.get("$id") + + +def _legacy_dollar_id(contents: Schema) -> URI | None: + if isinstance(contents, bool) or "$ref" in contents: + return + id = contents.get("$id") + if id is not None and not id.startswith("#"): + return id + + +def _legacy_id(contents: ObjectSchema) -> URI | None: + if "$ref" in contents: + return + id = contents.get("id") + if id is not None and not id.startswith("#"): + return id + + +def _anchor( + specification: Specification[Schema], + contents: Schema, +) -> Iterable[AnchorType[Schema]]: + if isinstance(contents, bool): + return + anchor = contents.get("$anchor") + if anchor is not None: + yield Anchor( + name=anchor, + resource=specification.create_resource(contents), + ) + + dynamic_anchor = contents.get("$dynamicAnchor") + if dynamic_anchor is not None: + yield DynamicAnchor( + name=dynamic_anchor, + resource=specification.create_resource(contents), + ) + + +def _anchor_2019( + specification: Specification[Schema], + contents: Schema, +) -> Iterable[Anchor[Schema]]: + if isinstance(contents, bool): + return [] + anchor = contents.get("$anchor") + if anchor is None: + return [] + return [ + Anchor( + name=anchor, + resource=specification.create_resource(contents), + ), + ] + + +def _legacy_anchor_in_dollar_id( + specification: Specification[Schema], + contents: Schema, +) -> Iterable[Anchor[Schema]]: + if isinstance(contents, bool): + return [] + id = contents.get("$id", "") + if not id.startswith("#"): + return [] + return [ + Anchor( + name=id[1:], + resource=specification.create_resource(contents), + ), + ] + + +def _legacy_anchor_in_id( + specification: Specification[ObjectSchema], + contents: ObjectSchema, +) -> Iterable[Anchor[ObjectSchema]]: + id = contents.get("id", "") + if not id.startswith("#"): + return [] + return [ + Anchor( + name=id[1:], + resource=specification.create_resource(contents), + ), + ] + + +def _subresources_of( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + """ + Create a callable returning JSON Schema specification-style subschemas. + + Relies on specifying the set of keywords containing subschemas in their + values, in a subobject's values, or in a subarray. + """ + + def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: + if isinstance(contents, bool): + return + for each in in_value: + if each in contents: + yield contents[each] + for each in in_subarray: + if each in contents: + yield from contents[each] + for each in in_subvalues: + if each in contents: + yield from contents[each].values() + + return subresources_of + + +def _subresources_of_with_crazy_items( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + """ + Specifically handle older drafts where there are some funky keywords. + """ + + def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: + if isinstance(contents, bool): + return + for each in in_value: + if each in contents: + yield contents[each] + for each in in_subarray: + if each in contents: + yield from contents[each] + for each in in_subvalues: + if each in contents: + yield from contents[each].values() + + items = contents.get("items") + if items is not None: + if isinstance(items, Sequence): + yield from items + else: + yield items + + return subresources_of + + +def _subresources_of_with_crazy_items_dependencies( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + """ + Specifically handle older drafts where there are some funky keywords. + """ + + def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: + if isinstance(contents, bool): + return + for each in in_value: + if each in contents: + yield contents[each] + for each in in_subarray: + if each in contents: + yield from contents[each] + for each in in_subvalues: + if each in contents: + yield from contents[each].values() + + items = contents.get("items") + if items is not None: + if isinstance(items, Sequence): + yield from items + else: + yield items + dependencies = contents.get("dependencies") + if dependencies is not None: + values = iter(dependencies.values()) + value = next(values, None) + if isinstance(value, Mapping): + yield value + yield from values + + return subresources_of + + +def _subresources_of_with_crazy_aP_items_dependencies( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + """ + Specifically handle even older drafts where there are some funky keywords. + """ + + def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: + for each in in_value: + if each in contents: + yield contents[each] + for each in in_subarray: + if each in contents: + yield from contents[each] + for each in in_subvalues: + if each in contents: + yield from contents[each].values() + + items = contents.get("items") + if items is not None: + if isinstance(items, Sequence): + yield from items + else: + yield items + dependencies = contents.get("dependencies") + if dependencies is not None: + values = iter(dependencies.values()) + value = next(values, None) + if isinstance(value, Mapping): + yield value + yield from values + + for each in "additionalItems", "additionalProperties": + value = contents.get(each) + if isinstance(value, Mapping): + yield value + + return subresources_of + + +def _maybe_in_subresource( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + in_child = in_subvalues | in_subarray + + def maybe_in_subresource( + segments: Sequence[int | str], + resolver: _Resolver[Any], + subresource: Resource[Any], + ) -> _Resolver[Any]: + _segments = iter(segments) + for segment in _segments: + if segment not in in_value and ( + segment not in in_child or next(_segments, None) is None + ): + return resolver + return resolver.in_subresource(subresource) + + return maybe_in_subresource + + +def _maybe_in_subresource_crazy_items( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + in_child = in_subvalues | in_subarray + + def maybe_in_subresource( + segments: Sequence[int | str], + resolver: _Resolver[Any], + subresource: Resource[Any], + ) -> _Resolver[Any]: + _segments = iter(segments) + for segment in _segments: + if segment == "items" and isinstance( + subresource.contents, + Mapping, + ): + return resolver.in_subresource(subresource) + if segment not in in_value and ( + segment not in in_child or next(_segments, None) is None + ): + return resolver + return resolver.in_subresource(subresource) + + return maybe_in_subresource + + +def _maybe_in_subresource_crazy_items_dependencies( + in_value: Set[str] = frozenset(), + in_subvalues: Set[str] = frozenset(), + in_subarray: Set[str] = frozenset(), +): + in_child = in_subvalues | in_subarray + + def maybe_in_subresource( + segments: Sequence[int | str], + resolver: _Resolver[Any], + subresource: Resource[Any], + ) -> _Resolver[Any]: + _segments = iter(segments) + for segment in _segments: + if segment in {"items", "dependencies"} and isinstance( + subresource.contents, + Mapping, + ): + return resolver.in_subresource(subresource) + if segment not in in_value and ( + segment not in in_child or next(_segments, None) is None + ): + return resolver + return resolver.in_subresource(subresource) + + return maybe_in_subresource + + +#: JSON Schema draft 2020-12 +DRAFT202012 = Specification( + name="draft2020-12", + id_of=_dollar_id, + subresources_of=_subresources_of( + in_value={ + "additionalProperties", + "contains", + "contentSchema", + "else", + "if", + "items", + "not", + "propertyNames", + "then", + "unevaluatedItems", + "unevaluatedProperties", + }, + in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, + in_subvalues={ + "$defs", + "definitions", + "dependentSchemas", + "patternProperties", + "properties", + }, + ), + anchors_in=_anchor, + maybe_in_subresource=_maybe_in_subresource( + in_value={ + "additionalProperties", + "contains", + "contentSchema", + "else", + "if", + "items", + "not", + "propertyNames", + "then", + "unevaluatedItems", + "unevaluatedProperties", + }, + in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, + in_subvalues={ + "$defs", + "definitions", + "dependentSchemas", + "patternProperties", + "properties", + }, + ), +) +#: JSON Schema draft 2019-09 +DRAFT201909 = Specification( + name="draft2019-09", + id_of=_dollar_id, + subresources_of=_subresources_of_with_crazy_items( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "contentSchema", + "else", + "if", + "not", + "propertyNames", + "then", + "unevaluatedItems", + "unevaluatedProperties", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={ + "$defs", + "definitions", + "dependentSchemas", + "patternProperties", + "properties", + }, + ), + anchors_in=_anchor_2019, + maybe_in_subresource=_maybe_in_subresource_crazy_items( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "contentSchema", + "else", + "if", + "not", + "propertyNames", + "then", + "unevaluatedItems", + "unevaluatedProperties", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={ + "$defs", + "definitions", + "dependentSchemas", + "patternProperties", + "properties", + }, + ), +) +#: JSON Schema draft 7 +DRAFT7 = Specification( + name="draft-07", + id_of=_legacy_dollar_id, + subresources_of=_subresources_of_with_crazy_items_dependencies( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "else", + "if", + "not", + "propertyNames", + "then", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), + anchors_in=_legacy_anchor_in_dollar_id, + maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "else", + "if", + "not", + "propertyNames", + "then", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), +) +#: JSON Schema draft 6 +DRAFT6 = Specification( + name="draft-06", + id_of=_legacy_dollar_id, + subresources_of=_subresources_of_with_crazy_items_dependencies( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "not", + "propertyNames", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), + anchors_in=_legacy_anchor_in_dollar_id, + maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( + in_value={ + "additionalItems", + "additionalProperties", + "contains", + "not", + "propertyNames", + }, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), +) +#: JSON Schema draft 4 +DRAFT4 = Specification( + name="draft-04", + id_of=_legacy_id, + subresources_of=_subresources_of_with_crazy_aP_items_dependencies( + in_value={"not"}, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), + anchors_in=_legacy_anchor_in_id, + maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( + in_value={"additionalItems", "additionalProperties", "not"}, + in_subarray={"allOf", "anyOf", "oneOf"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), +) +#: JSON Schema draft 3 +DRAFT3 = Specification( + name="draft-03", + id_of=_legacy_id, + subresources_of=_subresources_of_with_crazy_aP_items_dependencies( + in_subarray={"extends"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), + anchors_in=_legacy_anchor_in_id, + maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( + in_value={"additionalItems", "additionalProperties"}, + in_subarray={"extends"}, + in_subvalues={"definitions", "patternProperties", "properties"}, + ), +) + + +_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( + { + dialect_id: Resource.opaque(specification) + for dialect_id, specification in [ + ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), + ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), + ("http://json-schema.org/draft-07/schema", DRAFT7), + ("http://json-schema.org/draft-06/schema", DRAFT6), + ("http://json-schema.org/draft-04/schema", DRAFT4), + ("http://json-schema.org/draft-03/schema", DRAFT3), + ] + }, +) + + +def specification_with( + dialect_id: URI, + default: Specification[Any] | _Unset = _UNSET, +) -> Specification[Any]: + """ + Retrieve the `Specification` with the given dialect identifier. + + Raises: + + `UnknownDialect` + + if the given ``dialect_id`` isn't known + + """ + resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) + if resource is not None: + return resource.contents + if default is _UNSET: + raise UnknownDialect(dialect_id) + return default + + +@frozen +class DynamicAnchor: + """ + Dynamic anchors, introduced in draft 2020. + """ + + name: str + resource: SchemaResource + + def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: + """ + Resolve this anchor dynamically. + """ + last = self.resource + for uri, registry in resolver.dynamic_scope(): + try: + anchor = registry.anchor(uri, self.name).value + except exceptions.NoSuchAnchor: + continue + if isinstance(anchor, DynamicAnchor): + last = anchor.resource + return _Resolved( + contents=last.contents, + resolver=resolver.in_subresource(last), + ) + + +def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: + """ + Recursive references (via recursive anchors), present only in draft 2019. + + As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive + reference is supported (and is therefore assumed to be the relevant + reference). + """ + resolved = resolver.lookup("#") + if isinstance(resolved.contents, Mapping) and resolved.contents.get( + "$recursiveAnchor", + ): + for uri, _ in resolver.dynamic_scope(): + next_resolved = resolver.lookup(uri) + if not isinstance( + next_resolved.contents, + Mapping, + ) or not next_resolved.contents.get("$recursiveAnchor"): + break + resolved = next_resolved + return resolved diff --git a/.venv/lib/python3.12/site-packages/referencing/py.typed b/.venv/lib/python3.12/site-packages/referencing/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/py.typed diff --git a/.venv/lib/python3.12/site-packages/referencing/retrieval.py b/.venv/lib/python3.12/site-packages/referencing/retrieval.py new file mode 100644 index 00000000..53e0512b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/retrieval.py @@ -0,0 +1,92 @@ +""" +Helpers related to (dynamic) resource retrieval. +""" + +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Callable +import json + +try: + from typing_extensions import TypeVar +except ImportError: # pragma: no cover + from typing import TypeVar + +from referencing import Resource + +if TYPE_CHECKING: + from referencing.typing import URI, D, Retrieve + +#: A serialized document (e.g. a JSON string) +_T = TypeVar("_T", default=str) + + +def to_cached_resource( + cache: Callable[[Retrieve[D]], Retrieve[D]] | None = None, + loads: Callable[[_T], D] = json.loads, + from_contents: Callable[[D], Resource[D]] = Resource.from_contents, +) -> Callable[[Callable[[URI], _T]], Retrieve[D]]: + """ + Create a retriever which caches its return values from a simpler callable. + + Takes a function which returns things like serialized JSON (strings) and + returns something suitable for passing to `Registry` as a retrieve + function. + + This decorator both reduces a small bit of boilerplate for a common case + (deserializing JSON from strings and creating `Resource` objects from the + result) as well as makes the probable need for caching a bit easier. + Retrievers which otherwise do expensive operations (like hitting the + network) might otherwise be called repeatedly. + + Examples + -------- + + .. testcode:: + + from referencing import Registry + from referencing.typing import URI + import referencing.retrieval + + + @referencing.retrieval.to_cached_resource() + def retrieve(uri: URI): + print(f"Retrieved {uri}") + + # Normally, go get some expensive JSON from the network, a file ... + return ''' + { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "foo": "bar" + } + ''' + + one = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") + print(one.value.contents["foo"]) + + # Retrieving the same URI again reuses the same value (and thus doesn't + # print another retrieval message here) + two = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") + print(two.value.contents["foo"]) + + .. testoutput:: + + Retrieved urn:example:foo + bar + bar + + """ + if cache is None: + cache = lru_cache(maxsize=None) + + def decorator(retrieve: Callable[[URI], _T]): + @cache + def cached_retrieve(uri: URI): + response = retrieve(uri) + contents = loads(response) + return from_contents(contents) + + return cached_retrieve + + return decorator diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py b/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py new file mode 100644 index 00000000..3edddbc3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py @@ -0,0 +1,1057 @@ +from rpds import HashTrieMap +import pytest + +from referencing import Anchor, Registry, Resource, Specification, exceptions +from referencing.jsonschema import DRAFT202012 + +ID_AND_CHILDREN = Specification( + name="id-and-children", + id_of=lambda contents: contents.get("ID"), + subresources_of=lambda contents: contents.get("children", []), + anchors_in=lambda specification, contents: [ + Anchor( + name=name, + resource=specification.create_resource(contents=each), + ) + for name, each in contents.get("anchors", {}).items() + ], + maybe_in_subresource=lambda segments, resolver, subresource: ( + resolver.in_subresource(subresource) + if not len(segments) % 2 + and all(each == "children" for each in segments[::2]) + else resolver + ), +) + + +def blow_up(uri): # pragma: no cover + """ + A retriever suitable for use in tests which expect it never to be used. + """ + raise RuntimeError("This retrieve function expects to never be called!") + + +class TestRegistry: + def test_with_resource(self): + """ + Adding a resource to the registry then allows re-retrieving it. + """ + + resource = Resource.opaque(contents={"foo": "bar"}) + uri = "urn:example" + registry = Registry().with_resource(uri=uri, resource=resource) + assert registry[uri] is resource + + def test_with_resources(self): + """ + Adding multiple resources to the registry is like adding each one. + """ + + one = Resource.opaque(contents={}) + two = Resource(contents={"foo": "bar"}, specification=ID_AND_CHILDREN) + registry = Registry().with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/foo/bar", two), + ], + ) + assert registry == Registry().with_resource( + uri="http://example.com/1", + resource=one, + ).with_resource( + uri="http://example.com/foo/bar", + resource=two, + ) + + def test_matmul_resource(self): + uri = "urn:example:resource" + resource = ID_AND_CHILDREN.create_resource({"ID": uri, "foo": 12}) + registry = resource @ Registry() + assert registry == Registry().with_resource(uri, resource) + + def test_matmul_many_resources(self): + one_uri = "urn:example:one" + one = ID_AND_CHILDREN.create_resource({"ID": one_uri, "foo": 12}) + + two_uri = "urn:example:two" + two = ID_AND_CHILDREN.create_resource({"ID": two_uri, "foo": 12}) + + registry = [one, two] @ Registry() + assert registry == Registry().with_resources( + [(one_uri, one), (two_uri, two)], + ) + + def test_matmul_resource_without_id(self): + resource = Resource.opaque(contents={"foo": "bar"}) + with pytest.raises(exceptions.NoInternalID) as e: + resource @ Registry() + assert e.value == exceptions.NoInternalID(resource=resource) + + def test_with_contents_from_json_schema(self): + uri = "urn:example" + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + registry = Registry().with_contents([(uri, schema)]) + + expected = Resource(contents=schema, specification=DRAFT202012) + assert registry[uri] == expected + + def test_with_contents_and_default_specification(self): + uri = "urn:example" + registry = Registry().with_contents( + [(uri, {"foo": "bar"})], + default_specification=Specification.OPAQUE, + ) + assert registry[uri] == Resource.opaque({"foo": "bar"}) + + def test_len(self): + total = 5 + registry = Registry().with_contents( + [(str(i), {"foo": "bar"}) for i in range(total)], + default_specification=Specification.OPAQUE, + ) + assert len(registry) == total + + def test_bool_empty(self): + assert not Registry() + + def test_bool_not_empty(self): + registry = Registry().with_contents( + [(str(i), {"foo": "bar"}) for i in range(3)], + default_specification=Specification.OPAQUE, + ) + assert registry + + def test_iter(self): + registry = Registry().with_contents( + [(str(i), {"foo": "bar"}) for i in range(8)], + default_specification=Specification.OPAQUE, + ) + assert set(registry) == {str(i) for i in range(8)} + + def test_crawl_still_has_top_level_resource(self): + resource = Resource.opaque({"foo": "bar"}) + uri = "urn:example" + registry = Registry({uri: resource}).crawl() + assert registry[uri] is resource + + def test_crawl_finds_a_subresource(self): + child_id = "urn:child" + root = ID_AND_CHILDREN.create_resource( + {"ID": "urn:root", "children": [{"ID": child_id, "foo": 12}]}, + ) + registry = root @ Registry() + with pytest.raises(LookupError): + registry[child_id] + + expected = ID_AND_CHILDREN.create_resource({"ID": child_id, "foo": 12}) + assert registry.crawl()[child_id] == expected + + def test_crawl_finds_anchors_with_id(self): + resource = ID_AND_CHILDREN.create_resource( + {"ID": "urn:bar", "anchors": {"foo": 12}}, + ) + registry = resource @ Registry() + + assert registry.crawl().anchor(resource.id(), "foo").value == Anchor( + name="foo", + resource=ID_AND_CHILDREN.create_resource(12), + ) + + def test_crawl_finds_anchors_no_id(self): + resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) + registry = Registry().with_resource("urn:root", resource) + + assert registry.crawl().anchor("urn:root", "foo").value == Anchor( + name="foo", + resource=ID_AND_CHILDREN.create_resource(12), + ) + + def test_contents(self): + resource = Resource.opaque({"foo": "bar"}) + uri = "urn:example" + registry = Registry().with_resource(uri, resource) + assert registry.contents(uri) == {"foo": "bar"} + + def test_getitem_strips_empty_fragments(self): + uri = "http://example.com/" + resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) + registry = resource @ Registry() + assert registry[uri] == registry[uri + "#"] == resource + + def test_contents_strips_empty_fragments(self): + uri = "http://example.com/" + resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) + registry = resource @ Registry() + assert ( + registry.contents(uri) + == registry.contents(uri + "#") + == {"ID": uri + "#"} + ) + + def test_contents_nonexistent_resource(self): + registry = Registry() + with pytest.raises(exceptions.NoSuchResource) as e: + registry.contents("urn:example") + assert e.value == exceptions.NoSuchResource(ref="urn:example") + + def test_crawled_anchor(self): + resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}}) + registry = Registry().with_resource("urn:example", resource) + retrieved = registry.anchor("urn:example", "foo") + assert retrieved.value == Anchor( + name="foo", + resource=ID_AND_CHILDREN.create_resource("bar"), + ) + assert retrieved.registry == registry.crawl() + + def test_anchor_in_nonexistent_resource(self): + registry = Registry() + with pytest.raises(exceptions.NoSuchResource) as e: + registry.anchor("urn:example", "foo") + assert e.value == exceptions.NoSuchResource(ref="urn:example") + + def test_init(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = Registry( + { + "http://example.com/1": one, + "http://example.com/foo/bar": two, + }, + ) + assert ( + registry + == Registry() + .with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/foo/bar", two), + ], + ) + .crawl() + ) + + def test_dict_conversion(self): + """ + Passing a `dict` to `Registry` gets converted to a `HashTrieMap`. + + So continuing to use the registry works. + """ + + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = Registry( + {"http://example.com/1": one}, + ).with_resource("http://example.com/foo/bar", two) + assert ( + registry.crawl() + == Registry() + .with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/foo/bar", two), + ], + ) + .crawl() + ) + + def test_no_such_resource(self): + registry = Registry() + with pytest.raises(exceptions.NoSuchResource) as e: + registry["urn:bigboom"] + assert e.value == exceptions.NoSuchResource(ref="urn:bigboom") + + def test_combine(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) + four = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) + + first = Registry({"http://example.com/1": one}) + second = Registry().with_resource("http://example.com/foo/bar", two) + third = Registry( + { + "http://example.com/1": one, + "http://example.com/baz": three, + }, + ) + fourth = ( + Registry() + .with_resource( + "http://example.com/foo/quux", + four, + ) + .crawl() + ) + assert first.combine(second, third, fourth) == Registry( + [ + ("http://example.com/1", one), + ("http://example.com/baz", three), + ("http://example.com/foo/quux", four), + ], + anchors=HashTrieMap( + { + ("http://example.com/foo/quux", "foo"): Anchor( + name="foo", + resource=ID_AND_CHILDREN.create_resource(12), + ), + }, + ), + ).with_resource("http://example.com/foo/bar", two) + + def test_combine_self(self): + """ + Combining a registry with itself short-circuits. + + This is a performance optimization -- otherwise we do lots more work + (in jsonschema this seems to correspond to making the test suite take + *3x* longer). + """ + + registry = Registry({"urn:foo": "bar"}) + assert registry.combine(registry) is registry + + def test_combine_with_uncrawled_resources(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) + + first = Registry().with_resource("http://example.com/1", one) + second = Registry().with_resource("http://example.com/foo/bar", two) + third = Registry( + { + "http://example.com/1": one, + "http://example.com/baz": three, + }, + ) + expected = Registry( + [ + ("http://example.com/1", one), + ("http://example.com/foo/bar", two), + ("http://example.com/baz", three), + ], + ) + combined = first.combine(second, third) + assert combined != expected + assert combined.crawl() == expected + + def test_combine_with_single_retrieve(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) + + def retrieve(uri): # pragma: no cover + pass + + first = Registry().with_resource("http://example.com/1", one) + second = Registry( + retrieve=retrieve, + ).with_resource("http://example.com/2", two) + third = Registry().with_resource("http://example.com/3", three) + + assert first.combine(second, third) == Registry( + retrieve=retrieve, + ).with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/2", two), + ("http://example.com/3", three), + ], + ) + assert second.combine(first, third) == Registry( + retrieve=retrieve, + ).with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/2", two), + ("http://example.com/3", three), + ], + ) + + def test_combine_with_common_retrieve(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) + + def retrieve(uri): # pragma: no cover + pass + + first = Registry(retrieve=retrieve).with_resource( + "http://example.com/1", + one, + ) + second = Registry( + retrieve=retrieve, + ).with_resource("http://example.com/2", two) + third = Registry(retrieve=retrieve).with_resource( + "http://example.com/3", + three, + ) + + assert first.combine(second, third) == Registry( + retrieve=retrieve, + ).with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/2", two), + ("http://example.com/3", three), + ], + ) + assert second.combine(first, third) == Registry( + retrieve=retrieve, + ).with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/2", two), + ("http://example.com/3", three), + ], + ) + + def test_combine_conflicting_retrieve(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) + + def foo_retrieve(uri): # pragma: no cover + pass + + def bar_retrieve(uri): # pragma: no cover + pass + + first = Registry(retrieve=foo_retrieve).with_resource( + "http://example.com/1", + one, + ) + second = Registry().with_resource("http://example.com/2", two) + third = Registry(retrieve=bar_retrieve).with_resource( + "http://example.com/3", + three, + ) + + with pytest.raises(Exception, match="conflict.*retriev"): + first.combine(second, third) + + def test_remove(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = Registry({"urn:foo": one, "urn:bar": two}) + assert registry.remove("urn:foo") == Registry({"urn:bar": two}) + + def test_remove_uncrawled(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = Registry().with_resources( + [("urn:foo", one), ("urn:bar", two)], + ) + assert registry.remove("urn:foo") == Registry().with_resource( + "urn:bar", + two, + ) + + def test_remove_with_anchors(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}}) + registry = ( + Registry() + .with_resources( + [("urn:foo", one), ("urn:bar", two)], + ) + .crawl() + ) + assert ( + registry.remove("urn:bar") + == Registry() + .with_resource( + "urn:foo", + one, + ) + .crawl() + ) + + def test_remove_nonexistent_uri(self): + with pytest.raises(exceptions.NoSuchResource) as e: + Registry().remove("urn:doesNotExist") + assert e.value == exceptions.NoSuchResource(ref="urn:doesNotExist") + + def test_retrieve(self): + foo = Resource.opaque({"foo": "bar"}) + registry = Registry(retrieve=lambda uri: foo) + assert registry.get_or_retrieve("urn:example").value == foo + + def test_retrieve_arbitrary_exception(self): + foo = Resource.opaque({"foo": "bar"}) + + def retrieve(uri): + if uri == "urn:succeed": + return foo + raise Exception("Oh no!") + + registry = Registry(retrieve=retrieve) + assert registry.get_or_retrieve("urn:succeed").value == foo + with pytest.raises(exceptions.Unretrievable): + registry.get_or_retrieve("urn:uhoh") + + def test_retrieve_no_such_resource(self): + foo = Resource.opaque({"foo": "bar"}) + + def retrieve(uri): + if uri == "urn:succeed": + return foo + raise exceptions.NoSuchResource(ref=uri) + + registry = Registry(retrieve=retrieve) + assert registry.get_or_retrieve("urn:succeed").value == foo + with pytest.raises(exceptions.NoSuchResource): + registry.get_or_retrieve("urn:uhoh") + + def test_retrieve_cannot_determine_specification(self): + def retrieve(uri): + return Resource.from_contents({}) + + registry = Registry(retrieve=retrieve) + with pytest.raises(exceptions.CannotDetermineSpecification): + registry.get_or_retrieve("urn:uhoh") + + def test_retrieve_already_available_resource(self): + foo = Resource.opaque({"foo": "bar"}) + registry = Registry({"urn:example": foo}, retrieve=blow_up) + assert registry["urn:example"] == foo + assert registry.get_or_retrieve("urn:example").value == foo + + def test_retrieve_first_checks_crawlable_resource(self): + child = ID_AND_CHILDREN.create_resource({"ID": "urn:child", "foo": 12}) + root = ID_AND_CHILDREN.create_resource({"children": [child.contents]}) + registry = Registry(retrieve=blow_up).with_resource("urn:root", root) + assert registry.crawl()["urn:child"] == child + + def test_resolver(self): + one = Resource.opaque(contents={}) + registry = Registry({"http://example.com": one}) + resolver = registry.resolver(base_uri="http://example.com") + assert resolver.lookup("#").contents == {} + + def test_resolver_with_root_identified(self): + root = ID_AND_CHILDREN.create_resource({"ID": "http://example.com"}) + resolver = Registry().resolver_with_root(root) + assert resolver.lookup("http://example.com").contents == root.contents + assert resolver.lookup("#").contents == root.contents + + def test_resolver_with_root_unidentified(self): + root = Resource.opaque(contents={}) + resolver = Registry().resolver_with_root(root) + assert resolver.lookup("#").contents == root.contents + + def test_repr(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = Registry().with_resources( + [ + ("http://example.com/1", one), + ("http://example.com/foo/bar", two), + ], + ) + assert repr(registry) == "<Registry (2 uncrawled resources)>" + assert repr(registry.crawl()) == "<Registry (2 resources)>" + + def test_repr_mixed_crawled(self): + one = Resource.opaque(contents={}) + two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) + registry = ( + Registry( + {"http://example.com/1": one}, + ) + .crawl() + .with_resource(uri="http://example.com/foo/bar", resource=two) + ) + assert repr(registry) == "<Registry (2 resources, 1 uncrawled)>" + + def test_repr_one_resource(self): + registry = Registry().with_resource( + uri="http://example.com/1", + resource=Resource.opaque(contents={}), + ) + assert repr(registry) == "<Registry (1 uncrawled resource)>" + + def test_repr_empty(self): + assert repr(Registry()) == "<Registry (0 resources)>" + + +class TestResource: + def test_from_contents_from_json_schema(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + resource = Resource.from_contents(schema) + assert resource == Resource(contents=schema, specification=DRAFT202012) + + def test_from_contents_with_no_discernible_information(self): + """ + Creating a resource with no discernible way to see what + specification it belongs to (e.g. no ``$schema`` keyword for JSON + Schema) raises an error. + """ + + with pytest.raises(exceptions.CannotDetermineSpecification): + Resource.from_contents({"foo": "bar"}) + + def test_from_contents_with_no_discernible_information_and_default(self): + resource = Resource.from_contents( + {"foo": "bar"}, + default_specification=Specification.OPAQUE, + ) + assert resource == Resource.opaque(contents={"foo": "bar"}) + + def test_from_contents_unneeded_default(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + resource = Resource.from_contents( + schema, + default_specification=Specification.OPAQUE, + ) + assert resource == Resource( + contents=schema, + specification=DRAFT202012, + ) + + def test_non_mapping_from_contents(self): + resource = Resource.from_contents( + True, + default_specification=ID_AND_CHILDREN, + ) + assert resource == Resource( + contents=True, + specification=ID_AND_CHILDREN, + ) + + def test_from_contents_with_fallback(self): + resource = Resource.from_contents( + {"foo": "bar"}, + default_specification=Specification.OPAQUE, + ) + assert resource == Resource.opaque(contents={"foo": "bar"}) + + def test_id_delegates_to_specification(self): + specification = Specification( + name="", + id_of=lambda contents: "urn:fixedID", + subresources_of=lambda contents: [], + anchors_in=lambda specification, contents: [], + maybe_in_subresource=( + lambda segments, resolver, subresource: resolver + ), + ) + resource = Resource( + contents={"foo": "baz"}, + specification=specification, + ) + assert resource.id() == "urn:fixedID" + + def test_id_strips_empty_fragment(self): + uri = "http://example.com/" + root = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) + assert root.id() == uri + + def test_subresources_delegates_to_specification(self): + resource = ID_AND_CHILDREN.create_resource({"children": [{}, 12]}) + assert list(resource.subresources()) == [ + ID_AND_CHILDREN.create_resource(each) for each in [{}, 12] + ] + + def test_subresource_with_different_specification(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + resource = ID_AND_CHILDREN.create_resource({"children": [schema]}) + assert list(resource.subresources()) == [ + DRAFT202012.create_resource(schema), + ] + + def test_anchors_delegates_to_specification(self): + resource = ID_AND_CHILDREN.create_resource( + {"anchors": {"foo": {}, "bar": 1, "baz": ""}}, + ) + assert list(resource.anchors()) == [ + Anchor(name="foo", resource=ID_AND_CHILDREN.create_resource({})), + Anchor(name="bar", resource=ID_AND_CHILDREN.create_resource(1)), + Anchor(name="baz", resource=ID_AND_CHILDREN.create_resource("")), + ] + + def test_pointer_to_mapping(self): + resource = Resource.opaque(contents={"foo": "baz"}) + resolver = Registry().resolver() + assert resource.pointer("/foo", resolver=resolver).contents == "baz" + + def test_pointer_to_array(self): + resource = Resource.opaque(contents={"foo": {"bar": [3]}}) + resolver = Registry().resolver() + assert resource.pointer("/foo/bar/0", resolver=resolver).contents == 3 + + def test_root_pointer(self): + contents = {"foo": "baz"} + resource = Resource.opaque(contents=contents) + resolver = Registry().resolver() + assert resource.pointer("", resolver=resolver).contents == contents + + def test_opaque(self): + contents = {"foo": "bar"} + assert Resource.opaque(contents) == Resource( + contents=contents, + specification=Specification.OPAQUE, + ) + + +class TestResolver: + def test_lookup_exact_uri(self): + resource = Resource.opaque(contents={"foo": "baz"}) + resolver = Registry({"http://example.com/1": resource}).resolver() + resolved = resolver.lookup("http://example.com/1") + assert resolved.contents == resource.contents + + def test_lookup_subresource(self): + root = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "children": [ + {"ID": "http://example.com/a", "foo": 12}, + ], + }, + ) + registry = root @ Registry() + resolved = registry.resolver().lookup("http://example.com/a") + assert resolved.contents == {"ID": "http://example.com/a", "foo": 12} + + def test_lookup_anchor_with_id(self): + root = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "anchors": {"foo": 12}, + }, + ) + registry = root @ Registry() + resolved = registry.resolver().lookup("http://example.com/#foo") + assert resolved.contents == 12 + + def test_lookup_anchor_without_id(self): + root = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) + resolver = Registry().with_resource("urn:example", root).resolver() + resolved = resolver.lookup("urn:example#foo") + assert resolved.contents == 12 + + def test_lookup_unknown_reference(self): + resolver = Registry().resolver() + ref = "http://example.com/does/not/exist" + with pytest.raises(exceptions.Unresolvable) as e: + resolver.lookup(ref) + assert e.value == exceptions.Unresolvable(ref=ref) + + def test_lookup_non_existent_pointer(self): + resource = Resource.opaque({"foo": {}}) + resolver = Registry({"http://example.com/1": resource}).resolver() + ref = "http://example.com/1#/foo/bar" + with pytest.raises(exceptions.Unresolvable) as e: + resolver.lookup(ref) + assert e.value == exceptions.PointerToNowhere( + ref="/foo/bar", + resource=resource, + ) + assert str(e.value) == "'/foo/bar' does not exist within {'foo': {}}" + + def test_lookup_non_existent_pointer_to_array_index(self): + resource = Resource.opaque([1, 2, 4, 8]) + resolver = Registry({"http://example.com/1": resource}).resolver() + ref = "http://example.com/1#/10" + with pytest.raises(exceptions.Unresolvable) as e: + resolver.lookup(ref) + assert e.value == exceptions.PointerToNowhere( + ref="/10", + resource=resource, + ) + + def test_lookup_pointer_to_empty_string(self): + resolver = Registry().resolver_with_root(Resource.opaque({"": {}})) + assert resolver.lookup("#/").contents == {} + + def test_lookup_non_existent_pointer_to_empty_string(self): + resource = Resource.opaque({"foo": {}}) + resolver = Registry().resolver_with_root(resource) + with pytest.raises( + exceptions.Unresolvable, + match="^'/' does not exist within {'foo': {}}.*'#'", + ) as e: + resolver.lookup("#/") + assert e.value == exceptions.PointerToNowhere( + ref="/", + resource=resource, + ) + + def test_lookup_non_existent_anchor(self): + root = ID_AND_CHILDREN.create_resource({"anchors": {}}) + resolver = Registry().with_resource("urn:example", root).resolver() + resolved = resolver.lookup("urn:example") + assert resolved.contents == root.contents + + ref = "urn:example#noSuchAnchor" + with pytest.raises(exceptions.Unresolvable) as e: + resolver.lookup(ref) + assert "'noSuchAnchor' does not exist" in str(e.value) + assert e.value == exceptions.NoSuchAnchor( + ref="urn:example", + resource=root, + anchor="noSuchAnchor", + ) + + def test_lookup_invalid_JSON_pointerish_anchor(self): + resolver = Registry().resolver_with_root( + ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "foo": {"bar": 12}, + }, + ), + ) + + valid = resolver.lookup("#/foo/bar") + assert valid.contents == 12 + + with pytest.raises(exceptions.InvalidAnchor) as e: + resolver.lookup("#foo/bar") + assert " '#/foo/bar'" in str(e.value) + + def test_lookup_retrieved_resource(self): + resource = Resource.opaque(contents={"foo": "baz"}) + resolver = Registry(retrieve=lambda uri: resource).resolver() + resolved = resolver.lookup("http://example.com/") + assert resolved.contents == resource.contents + + def test_lookup_failed_retrieved_resource(self): + """ + Unretrievable exceptions are also wrapped in Unresolvable. + """ + + uri = "http://example.com/" + + registry = Registry(retrieve=blow_up) + with pytest.raises(exceptions.Unretrievable): + registry.get_or_retrieve(uri) + + resolver = registry.resolver() + with pytest.raises(exceptions.Unresolvable): + resolver.lookup(uri) + + def test_repeated_lookup_from_retrieved_resource(self): + """ + A (custom-)retrieved resource is added to the registry returned by + looking it up. + """ + resource = Resource.opaque(contents={"foo": "baz"}) + once = [resource] + + def retrieve(uri): + return once.pop() + + resolver = Registry(retrieve=retrieve).resolver() + resolved = resolver.lookup("http://example.com/") + assert resolved.contents == resource.contents + + resolved = resolved.resolver.lookup("http://example.com/") + assert resolved.contents == resource.contents + + def test_repeated_anchor_lookup_from_retrieved_resource(self): + resource = Resource.opaque(contents={"foo": "baz"}) + once = [resource] + + def retrieve(uri): + return once.pop() + + resolver = Registry(retrieve=retrieve).resolver() + resolved = resolver.lookup("http://example.com/") + assert resolved.contents == resource.contents + + resolved = resolved.resolver.lookup("#") + assert resolved.contents == resource.contents + + # FIXME: The tests below aren't really representable in the current + # suite, though we should probably think of ways to do so. + + def test_in_subresource(self): + root = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "children": [ + { + "ID": "child/", + "children": [{"ID": "grandchild"}], + }, + ], + }, + ) + registry = root @ Registry() + + resolver = registry.resolver() + first = resolver.lookup("http://example.com/") + assert first.contents == root.contents + + with pytest.raises(exceptions.Unresolvable): + first.resolver.lookup("grandchild") + + sub = first.resolver.in_subresource( + ID_AND_CHILDREN.create_resource(first.contents["children"][0]), + ) + second = sub.lookup("grandchild") + assert second.contents == {"ID": "grandchild"} + + def test_in_pointer_subresource(self): + root = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "children": [ + { + "ID": "child/", + "children": [{"ID": "grandchild"}], + }, + ], + }, + ) + registry = root @ Registry() + + resolver = registry.resolver() + first = resolver.lookup("http://example.com/") + assert first.contents == root.contents + + with pytest.raises(exceptions.Unresolvable): + first.resolver.lookup("grandchild") + + second = first.resolver.lookup("#/children/0") + third = second.resolver.lookup("grandchild") + assert third.contents == {"ID": "grandchild"} + + def test_dynamic_scope(self): + one = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/", + "children": [ + { + "ID": "child/", + "children": [{"ID": "grandchild"}], + }, + ], + }, + ) + two = ID_AND_CHILDREN.create_resource( + { + "ID": "http://example.com/two", + "children": [{"ID": "two-child/"}], + }, + ) + registry = [one, two] @ Registry() + + resolver = registry.resolver() + first = resolver.lookup("http://example.com/") + second = first.resolver.lookup("#/children/0") + third = second.resolver.lookup("grandchild") + fourth = third.resolver.lookup("http://example.com/two") + assert list(fourth.resolver.dynamic_scope()) == [ + ("http://example.com/child/grandchild", fourth.resolver._registry), + ("http://example.com/child/", fourth.resolver._registry), + ("http://example.com/", fourth.resolver._registry), + ] + assert list(third.resolver.dynamic_scope()) == [ + ("http://example.com/child/", third.resolver._registry), + ("http://example.com/", third.resolver._registry), + ] + assert list(second.resolver.dynamic_scope()) == [ + ("http://example.com/", second.resolver._registry), + ] + assert list(first.resolver.dynamic_scope()) == [] + + +class TestSpecification: + def test_create_resource(self): + specification = Specification( + name="", + id_of=lambda contents: "urn:fixedID", + subresources_of=lambda contents: [], + anchors_in=lambda specification, contents: [], + maybe_in_subresource=( + lambda segments, resolver, subresource: resolver + ), + ) + resource = specification.create_resource(contents={"foo": "baz"}) + assert resource == Resource( + contents={"foo": "baz"}, + specification=specification, + ) + assert resource.id() == "urn:fixedID" + + def test_detect_from_json_schema(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + specification = Specification.detect(schema) + assert specification == DRAFT202012 + + def test_detect_with_no_discernible_information(self): + with pytest.raises(exceptions.CannotDetermineSpecification): + Specification.detect({"foo": "bar"}) + + def test_detect_with_non_URI_schema(self): + with pytest.raises(exceptions.CannotDetermineSpecification): + Specification.detect({"$schema": 37}) + + def test_detect_with_no_discernible_information_and_default(self): + specification = Specification.OPAQUE.detect({"foo": "bar"}) + assert specification is Specification.OPAQUE + + def test_detect_unneeded_default(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + specification = Specification.OPAQUE.detect(schema) + assert specification == DRAFT202012 + + def test_non_mapping_detect(self): + with pytest.raises(exceptions.CannotDetermineSpecification): + Specification.detect(True) + + def test_non_mapping_detect_with_default(self): + specification = ID_AND_CHILDREN.detect(True) + assert specification is ID_AND_CHILDREN + + def test_detect_with_fallback(self): + specification = Specification.OPAQUE.detect({"foo": "bar"}) + assert specification is Specification.OPAQUE + + def test_repr(self): + assert ( + repr(ID_AND_CHILDREN) == "<Specification name='id-and-children'>" + ) + + +class TestOpaqueSpecification: + THINGS = [{"foo": "bar"}, True, 37, "foo", object()] + + @pytest.mark.parametrize("thing", THINGS) + def test_no_id(self, thing): + """ + An arbitrary thing has no ID. + """ + + assert Specification.OPAQUE.id_of(thing) is None + + @pytest.mark.parametrize("thing", THINGS) + def test_no_subresources(self, thing): + """ + An arbitrary thing has no subresources. + """ + + assert list(Specification.OPAQUE.subresources_of(thing)) == [] + + @pytest.mark.parametrize("thing", THINGS) + def test_no_anchors(self, thing): + """ + An arbitrary thing has no anchors. + """ + + assert list(Specification.OPAQUE.anchors_in(thing)) == [] + + +@pytest.mark.parametrize( + "cls", + [Anchor, Registry, Resource, Specification, exceptions.PointerToNowhere], +) +def test_nonsubclassable(cls): + with pytest.raises(Exception, match="(?i)subclassing"): + + class Boom(cls): # pragma: no cover + pass diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py new file mode 100644 index 00000000..85cf99ec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py @@ -0,0 +1,34 @@ +import itertools + +import pytest + +from referencing import Resource, exceptions + + +def pairs(choices): + return itertools.combinations(choices, 2) + + +TRUE = Resource.opaque(True) + + +thunks = ( + lambda: exceptions.CannotDetermineSpecification(TRUE), + lambda: exceptions.NoSuchResource("urn:example:foo"), + lambda: exceptions.NoInternalID(TRUE), + lambda: exceptions.InvalidAnchor(resource=TRUE, anchor="foo", ref="a#b"), + lambda: exceptions.NoSuchAnchor(resource=TRUE, anchor="foo", ref="a#b"), + lambda: exceptions.PointerToNowhere(resource=TRUE, ref="urn:example:foo"), + lambda: exceptions.Unresolvable("urn:example:foo"), + lambda: exceptions.Unretrievable("urn:example:foo"), +) + + +@pytest.mark.parametrize("one, two", pairs(each() for each in thunks)) +def test_eq_incompatible_types(one, two): + assert one != two + + +@pytest.mark.parametrize("thunk", thunks) +def test_hash(thunk): + assert thunk() in {thunk()} diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py new file mode 100644 index 00000000..c80714d0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py @@ -0,0 +1,382 @@ +import pytest + +from referencing import Registry, Resource, Specification +import referencing.jsonschema + + +@pytest.mark.parametrize( + "uri, expected", + [ + ( + "https://json-schema.org/draft/2020-12/schema", + referencing.jsonschema.DRAFT202012, + ), + ( + "https://json-schema.org/draft/2019-09/schema", + referencing.jsonschema.DRAFT201909, + ), + ( + "http://json-schema.org/draft-07/schema#", + referencing.jsonschema.DRAFT7, + ), + ( + "http://json-schema.org/draft-06/schema#", + referencing.jsonschema.DRAFT6, + ), + ( + "http://json-schema.org/draft-04/schema#", + referencing.jsonschema.DRAFT4, + ), + ( + "http://json-schema.org/draft-03/schema#", + referencing.jsonschema.DRAFT3, + ), + ], +) +def test_schemas_with_explicit_schema_keywords_are_detected(uri, expected): + """ + The $schema keyword in JSON Schema is a dialect identifier. + """ + contents = {"$schema": uri} + resource = Resource.from_contents(contents) + assert resource == Resource(contents=contents, specification=expected) + + +def test_unknown_dialect(): + dialect_id = "http://example.com/unknown-json-schema-dialect-id" + with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo: + Resource.from_contents({"$schema": dialect_id}) + assert excinfo.value.uri == dialect_id + + +@pytest.mark.parametrize( + "id, specification", + [ + ("$id", referencing.jsonschema.DRAFT202012), + ("$id", referencing.jsonschema.DRAFT201909), + ("$id", referencing.jsonschema.DRAFT7), + ("$id", referencing.jsonschema.DRAFT6), + ("id", referencing.jsonschema.DRAFT4), + ("id", referencing.jsonschema.DRAFT3), + ], +) +def test_id_of_mapping(id, specification): + uri = "http://example.com/some-schema" + assert specification.id_of({id: uri}) == uri + + +@pytest.mark.parametrize( + "specification", + [ + referencing.jsonschema.DRAFT202012, + referencing.jsonschema.DRAFT201909, + referencing.jsonschema.DRAFT7, + referencing.jsonschema.DRAFT6, + ], +) +@pytest.mark.parametrize("value", [True, False]) +def test_id_of_bool(specification, value): + assert specification.id_of(value) is None + + +@pytest.mark.parametrize( + "specification", + [ + referencing.jsonschema.DRAFT202012, + referencing.jsonschema.DRAFT201909, + referencing.jsonschema.DRAFT7, + referencing.jsonschema.DRAFT6, + ], +) +@pytest.mark.parametrize("value", [True, False]) +def test_anchors_in_bool(specification, value): + assert list(specification.anchors_in(value)) == [] + + +@pytest.mark.parametrize( + "specification", + [ + referencing.jsonschema.DRAFT202012, + referencing.jsonschema.DRAFT201909, + referencing.jsonschema.DRAFT7, + referencing.jsonschema.DRAFT6, + ], +) +@pytest.mark.parametrize("value", [True, False]) +def test_subresources_of_bool(specification, value): + assert list(specification.subresources_of(value)) == [] + + +@pytest.mark.parametrize( + "uri, expected", + [ + ( + "https://json-schema.org/draft/2020-12/schema", + referencing.jsonschema.DRAFT202012, + ), + ( + "https://json-schema.org/draft/2019-09/schema", + referencing.jsonschema.DRAFT201909, + ), + ( + "http://json-schema.org/draft-07/schema#", + referencing.jsonschema.DRAFT7, + ), + ( + "http://json-schema.org/draft-06/schema#", + referencing.jsonschema.DRAFT6, + ), + ( + "http://json-schema.org/draft-04/schema#", + referencing.jsonschema.DRAFT4, + ), + ( + "http://json-schema.org/draft-03/schema#", + referencing.jsonschema.DRAFT3, + ), + ], +) +def test_specification_with(uri, expected): + assert referencing.jsonschema.specification_with(uri) == expected + + +@pytest.mark.parametrize( + "uri, expected", + [ + ( + "http://json-schema.org/draft-07/schema", + referencing.jsonschema.DRAFT7, + ), + ( + "http://json-schema.org/draft-06/schema", + referencing.jsonschema.DRAFT6, + ), + ( + "http://json-schema.org/draft-04/schema", + referencing.jsonschema.DRAFT4, + ), + ( + "http://json-schema.org/draft-03/schema", + referencing.jsonschema.DRAFT3, + ), + ], +) +def test_specification_with_no_empty_fragment(uri, expected): + assert referencing.jsonschema.specification_with(uri) == expected + + +def test_specification_with_unknown_dialect(): + dialect_id = "http://example.com/unknown-json-schema-dialect-id" + with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo: + referencing.jsonschema.specification_with(dialect_id) + assert excinfo.value.uri == dialect_id + + +def test_specification_with_default(): + dialect_id = "http://example.com/unknown-json-schema-dialect-id" + specification = referencing.jsonschema.specification_with( + dialect_id, + default=Specification.OPAQUE, + ) + assert specification is Specification.OPAQUE + + +# FIXME: The tests below should move to the referencing suite but I haven't yet +# figured out how to represent dynamic (& recursive) ref lookups in it. +def test_lookup_trivial_dynamic_ref(): + one = referencing.jsonschema.DRAFT202012.create_resource( + {"$dynamicAnchor": "foo"}, + ) + resolver = Registry().with_resource("http://example.com", one).resolver() + resolved = resolver.lookup("http://example.com#foo") + assert resolved.contents == one.contents + + +def test_multiple_lookup_trivial_dynamic_ref(): + TRUE = referencing.jsonschema.DRAFT202012.create_resource(True) + root = referencing.jsonschema.DRAFT202012.create_resource( + { + "$id": "http://example.com", + "$dynamicAnchor": "fooAnchor", + "$defs": { + "foo": { + "$id": "foo", + "$dynamicAnchor": "fooAnchor", + "$defs": { + "bar": True, + "baz": { + "$dynamicAnchor": "fooAnchor", + }, + }, + }, + }, + }, + ) + resolver = ( + Registry() + .with_resources( + [ + ("http://example.com", root), + ("http://example.com/foo/", TRUE), + ("http://example.com/foo/bar", root), + ], + ) + .resolver() + ) + + first = resolver.lookup("http://example.com") + second = first.resolver.lookup("foo/") + resolver = second.resolver.lookup("bar").resolver + fourth = resolver.lookup("#fooAnchor") + assert fourth.contents == root.contents + + +def test_multiple_lookup_dynamic_ref_to_nondynamic_ref(): + one = referencing.jsonschema.DRAFT202012.create_resource( + {"$anchor": "fooAnchor"}, + ) + two = referencing.jsonschema.DRAFT202012.create_resource( + { + "$id": "http://example.com", + "$dynamicAnchor": "fooAnchor", + "$defs": { + "foo": { + "$id": "foo", + "$dynamicAnchor": "fooAnchor", + "$defs": { + "bar": True, + "baz": { + "$dynamicAnchor": "fooAnchor", + }, + }, + }, + }, + }, + ) + resolver = ( + Registry() + .with_resources( + [ + ("http://example.com", two), + ("http://example.com/foo/", one), + ("http://example.com/foo/bar", two), + ], + ) + .resolver() + ) + + first = resolver.lookup("http://example.com") + second = first.resolver.lookup("foo/") + resolver = second.resolver.lookup("bar").resolver + fourth = resolver.lookup("#fooAnchor") + assert fourth.contents == two.contents + + +def test_lookup_trivial_recursive_ref(): + one = referencing.jsonschema.DRAFT201909.create_resource( + {"$recursiveAnchor": True}, + ) + resolver = Registry().with_resource("http://example.com", one).resolver() + first = resolver.lookup("http://example.com") + resolved = referencing.jsonschema.lookup_recursive_ref( + resolver=first.resolver, + ) + assert resolved.contents == one.contents + + +def test_lookup_recursive_ref_to_bool(): + TRUE = referencing.jsonschema.DRAFT201909.create_resource(True) + registry = Registry({"http://example.com": TRUE}) + resolved = referencing.jsonschema.lookup_recursive_ref( + resolver=registry.resolver(base_uri="http://example.com"), + ) + assert resolved.contents == TRUE.contents + + +def test_multiple_lookup_recursive_ref_to_bool(): + TRUE = referencing.jsonschema.DRAFT201909.create_resource(True) + root = referencing.jsonschema.DRAFT201909.create_resource( + { + "$id": "http://example.com", + "$recursiveAnchor": True, + "$defs": { + "foo": { + "$id": "foo", + "$recursiveAnchor": True, + "$defs": { + "bar": True, + "baz": { + "$recursiveAnchor": True, + "$anchor": "fooAnchor", + }, + }, + }, + }, + }, + ) + resolver = ( + Registry() + .with_resources( + [ + ("http://example.com", root), + ("http://example.com/foo/", TRUE), + ("http://example.com/foo/bar", root), + ], + ) + .resolver() + ) + + first = resolver.lookup("http://example.com") + second = first.resolver.lookup("foo/") + resolver = second.resolver.lookup("bar").resolver + fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver) + assert fourth.contents == root.contents + + +def test_multiple_lookup_recursive_ref_with_nonrecursive_ref(): + one = referencing.jsonschema.DRAFT201909.create_resource( + {"$recursiveAnchor": True}, + ) + two = referencing.jsonschema.DRAFT201909.create_resource( + { + "$id": "http://example.com", + "$recursiveAnchor": True, + "$defs": { + "foo": { + "$id": "foo", + "$recursiveAnchor": True, + "$defs": { + "bar": True, + "baz": { + "$recursiveAnchor": True, + "$anchor": "fooAnchor", + }, + }, + }, + }, + }, + ) + three = referencing.jsonschema.DRAFT201909.create_resource( + {"$recursiveAnchor": False}, + ) + resolver = ( + Registry() + .with_resources( + [ + ("http://example.com", three), + ("http://example.com/foo/", two), + ("http://example.com/foo/bar", one), + ], + ) + .resolver() + ) + + first = resolver.lookup("http://example.com") + second = first.resolver.lookup("foo/") + resolver = second.resolver.lookup("bar").resolver + fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver) + assert fourth.contents == two.contents + + +def test_empty_registry(): + assert referencing.jsonschema.EMPTY_REGISTRY == Registry() diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py new file mode 100644 index 00000000..4b8ae917 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py @@ -0,0 +1,66 @@ +from pathlib import Path +import json +import os + +import pytest + +from referencing import Registry +from referencing.exceptions import Unresolvable +import referencing.jsonschema + + +class SuiteNotFound(Exception): + def __str__(self): # pragma: no cover + return ( + "Cannot find the referencing suite. " + "Set the REFERENCING_SUITE environment variable to the path to " + "the suite, or run the test suite from alongside a full checkout " + "of the git repository." + ) + + +if "REFERENCING_SUITE" in os.environ: # pragma: no cover + SUITE = Path(os.environ["REFERENCING_SUITE"]) / "tests" +else: + SUITE = Path(__file__).parent.parent.parent / "suite/tests" +if not SUITE.is_dir(): # pragma: no cover + raise SuiteNotFound() +DIALECT_IDS = json.loads(SUITE.joinpath("specifications.json").read_text()) + + +@pytest.mark.parametrize( + "test_path", + [ + pytest.param(each, id=f"{each.parent.name}-{each.stem}") + for each in SUITE.glob("*/**/*.json") + ], +) +def test_referencing_suite(test_path, subtests): + dialect_id = DIALECT_IDS[test_path.relative_to(SUITE).parts[0]] + specification = referencing.jsonschema.specification_with(dialect_id) + loaded = json.loads(test_path.read_text()) + registry = loaded["registry"] + registry = Registry().with_resources( + (uri, specification.create_resource(contents)) + for uri, contents in loaded["registry"].items() + ) + for test in loaded["tests"]: + with subtests.test(test=test): + if "normalization" in test_path.stem: + pytest.xfail("APIs need to change for proper URL support.") + + resolver = registry.resolver(base_uri=test.get("base_uri", "")) + + if test.get("error"): + with pytest.raises(Unresolvable): + resolver.lookup(test["ref"]) + else: + resolved = resolver.lookup(test["ref"]) + assert resolved.contents == test["target"] + + then = test.get("then") + while then: # pragma: no cover + with subtests.test(test=test, then=then): + resolved = resolved.resolver.lookup(then["ref"]) + assert resolved.contents == then["target"] + then = then.get("then") diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py new file mode 100644 index 00000000..d0a8f8ad --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py @@ -0,0 +1,106 @@ +from functools import lru_cache +import json + +import pytest + +from referencing import Registry, Resource, exceptions +from referencing.jsonschema import DRAFT202012 +from referencing.retrieval import to_cached_resource + + +class TestToCachedResource: + def test_it_caches_retrieved_resources(self): + contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + stack = [json.dumps(contents)] + + @to_cached_resource() + def retrieve(uri): + return stack.pop() + + registry = Registry(retrieve=retrieve) + + expected = Resource.from_contents(contents) + + got = registry.get_or_retrieve("urn:example:schema") + assert got.value == expected + + # And a second time we get the same value. + again = registry.get_or_retrieve("urn:example:schema") + assert again.value is got.value + + def test_custom_loader(self): + contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + stack = [json.dumps(contents)[::-1]] + + @to_cached_resource(loads=lambda s: json.loads(s[::-1])) + def retrieve(uri): + return stack.pop() + + registry = Registry(retrieve=retrieve) + + expected = Resource.from_contents(contents) + + got = registry.get_or_retrieve("urn:example:schema") + assert got.value == expected + + # And a second time we get the same value. + again = registry.get_or_retrieve("urn:example:schema") + assert again.value is got.value + + def test_custom_from_contents(self): + contents = {} + stack = [json.dumps(contents)] + + @to_cached_resource(from_contents=DRAFT202012.create_resource) + def retrieve(uri): + return stack.pop() + + registry = Registry(retrieve=retrieve) + + expected = DRAFT202012.create_resource(contents) + + got = registry.get_or_retrieve("urn:example:schema") + assert got.value == expected + + # And a second time we get the same value. + again = registry.get_or_retrieve("urn:example:schema") + assert again.value is got.value + + def test_custom_cache(self): + schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} + mapping = { + "urn:example:1": dict(schema, foo=1), + "urn:example:2": dict(schema, foo=2), + "urn:example:3": dict(schema, foo=3), + } + + resources = { + uri: Resource.from_contents(contents) + for uri, contents in mapping.items() + } + + @to_cached_resource(cache=lru_cache(maxsize=2)) + def retrieve(uri): + return json.dumps(mapping.pop(uri)) + + registry = Registry(retrieve=retrieve) + + got = registry.get_or_retrieve("urn:example:1") + assert got.value == resources["urn:example:1"] + assert registry.get_or_retrieve("urn:example:1").value is got.value + assert registry.get_or_retrieve("urn:example:1").value is got.value + + got = registry.get_or_retrieve("urn:example:2") + assert got.value == resources["urn:example:2"] + assert registry.get_or_retrieve("urn:example:2").value is got.value + assert registry.get_or_retrieve("urn:example:2").value is got.value + + # This still succeeds, but evicts the first URI + got = registry.get_or_retrieve("urn:example:3") + assert got.value == resources["urn:example:3"] + assert registry.get_or_retrieve("urn:example:3").value is got.value + assert registry.get_or_retrieve("urn:example:3").value is got.value + + # And now this fails (as we popped the value out of `mapping`) + with pytest.raises(exceptions.Unretrievable): + registry.get_or_retrieve("urn:example:1") diff --git a/.venv/lib/python3.12/site-packages/referencing/typing.py b/.venv/lib/python3.12/site-packages/referencing/typing.py new file mode 100644 index 00000000..a6144641 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/referencing/typing.py @@ -0,0 +1,61 @@ +""" +Type-annotation related support for the referencing library. +""" + +from __future__ import annotations + +from collections.abc import Mapping as Mapping +from typing import TYPE_CHECKING, Any, Protocol + +try: + from typing_extensions import TypeVar +except ImportError: # pragma: no cover + from typing import TypeVar + +if TYPE_CHECKING: + from referencing._core import Resolved, Resolver, Resource + +#: A URI which identifies a `Resource`. +URI = str + +#: The type of documents within a registry. +D = TypeVar("D", default=Any) + + +class Retrieve(Protocol[D]): + """ + A retrieval callable, usable within a `Registry` for resource retrieval. + + Does not make assumptions about where the resource might be coming from. + """ + + def __call__(self, uri: URI) -> Resource[D]: + """ + Retrieve the resource with the given URI. + + Raise `referencing.exceptions.NoSuchResource` if you wish to indicate + the retriever cannot lookup the given URI. + """ + ... + + +class Anchor(Protocol[D]): + """ + An anchor within a `Resource`. + + Beyond "simple" anchors, some specifications like JSON Schema's 2020 + version have dynamic anchors. + """ + + @property + def name(self) -> str: + """ + Return the name of this anchor. + """ + ... + + def resolve(self, resolver: Resolver[D]) -> Resolved[D]: + """ + Return the resource for this anchor. + """ + ... |