aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/referencing
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/referencing')
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/__init__.py7
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/_attrs.py31
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/_attrs.pyi20
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/_core.py739
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/exceptions.py165
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/jsonschema.py642
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/py.typed0
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/retrieval.py92
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/test_core.py1057
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py34
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py382
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py66
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py106
-rw-r--r--.venv/lib/python3.12/site-packages/referencing/typing.py61
15 files changed, 3402 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/referencing/__init__.py b/.venv/lib/python3.12/site-packages/referencing/__init__.py
new file mode 100644
index 00000000..e09207d7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/__init__.py
@@ -0,0 +1,7 @@
+"""
+Cross-specification, implementation-agnostic JSON referencing.
+"""
+
+from referencing._core import Anchor, Registry, Resource, Specification
+
+__all__ = ["Anchor", "Registry", "Resource", "Specification"]
diff --git a/.venv/lib/python3.12/site-packages/referencing/_attrs.py b/.venv/lib/python3.12/site-packages/referencing/_attrs.py
new file mode 100644
index 00000000..ae85b865
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/_attrs.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from typing import NoReturn, TypeVar
+
+from attrs import define as _define, frozen as _frozen
+
+_T = TypeVar("_T")
+
+
+def define(cls: type[_T]) -> type[_T]: # pragma: no cover
+ cls.__init_subclass__ = _do_not_subclass
+ return _define(cls)
+
+
+def frozen(cls: type[_T]) -> type[_T]:
+ cls.__init_subclass__ = _do_not_subclass
+ return _frozen(cls)
+
+
+class UnsupportedSubclassing(Exception):
+ def __str__(self):
+ return (
+ "Subclassing is not part of referencing's public API. "
+ "If no other suitable API exists for what you're trying to do, "
+ "feel free to file an issue asking for one."
+ )
+
+
+@staticmethod
+def _do_not_subclass() -> NoReturn: # pragma: no cover
+ raise UnsupportedSubclassing()
diff --git a/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi b/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi
new file mode 100644
index 00000000..278e4109
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/_attrs.pyi
@@ -0,0 +1,20 @@
+from typing import Any, Callable, TypeVar, Union
+
+from attr import attrib, field
+
+class UnsupportedSubclassing(Exception): ...
+
+_T = TypeVar("_T")
+
+def __dataclass_transform__(
+ *,
+ frozen_default: bool = False,
+ field_descriptors: tuple[Union[type, Callable[..., Any]], ...] = ...,
+) -> Callable[[_T], _T]: ...
+@__dataclass_transform__(field_descriptors=(attrib, field))
+def define(cls: type[_T]) -> type[_T]: ...
+@__dataclass_transform__(
+ frozen_default=True,
+ field_descriptors=(attrib, field),
+)
+def frozen(cls: type[_T]) -> type[_T]: ...
diff --git a/.venv/lib/python3.12/site-packages/referencing/_core.py b/.venv/lib/python3.12/site-packages/referencing/_core.py
new file mode 100644
index 00000000..ec2d51bd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/_core.py
@@ -0,0 +1,739 @@
+from __future__ import annotations
+
+from collections.abc import Iterable, Iterator, Sequence
+from enum import Enum
+from typing import Any, Callable, ClassVar, Generic, Protocol
+from urllib.parse import unquote, urldefrag, urljoin
+
+from attrs import evolve, field
+from rpds import HashTrieMap, HashTrieSet, List
+
+try:
+ from typing_extensions import TypeVar
+except ImportError: # pragma: no cover
+ from typing import TypeVar
+
+from referencing import exceptions
+from referencing._attrs import frozen
+from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
+
+EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
+EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
+
+
+class _Unset(Enum):
+ """
+ What sillyness...
+ """
+
+ SENTINEL = 1
+
+
+_UNSET = _Unset.SENTINEL
+
+
+class _MaybeInSubresource(Protocol[D]):
+ def __call__(
+ self,
+ segments: Sequence[int | str],
+ resolver: Resolver[D],
+ subresource: Resource[D],
+ ) -> Resolver[D]: ...
+
+
+def _detect_or_error(contents: D) -> Specification[D]:
+ if not isinstance(contents, Mapping):
+ raise exceptions.CannotDetermineSpecification(contents)
+
+ jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
+ if not isinstance(jsonschema_dialect_id, str):
+ raise exceptions.CannotDetermineSpecification(contents)
+
+ from referencing.jsonschema import specification_with
+
+ return specification_with(jsonschema_dialect_id)
+
+
+def _detect_or_default(
+ default: Specification[D],
+) -> Callable[[D], Specification[D]]:
+ def _detect(contents: D) -> Specification[D]:
+ if not isinstance(contents, Mapping):
+ return default
+
+ jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
+ if jsonschema_dialect_id is None:
+ return default
+
+ from referencing.jsonschema import specification_with
+
+ return specification_with(
+ jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
+ default=default,
+ )
+
+ return _detect
+
+
+class _SpecificationDetector:
+ def __get__(
+ self,
+ instance: Specification[D] | None,
+ cls: type[Specification[D]],
+ ) -> Callable[[D], Specification[D]]:
+ if instance is None:
+ return _detect_or_error
+ else:
+ return _detect_or_default(instance)
+
+
+@frozen
+class Specification(Generic[D]):
+ """
+ A specification which defines referencing behavior.
+
+ The various methods of a `Specification` allow for varying referencing
+ behavior across JSON Schema specification versions, etc.
+ """
+
+ #: A short human-readable name for the specification, used for debugging.
+ name: str
+
+ #: Find the ID of a given document.
+ id_of: Callable[[D], URI | None]
+
+ #: Retrieve the subresources of the given document (without traversing into
+ #: the subresources themselves).
+ subresources_of: Callable[[D], Iterable[D]]
+
+ #: While resolving a JSON pointer, conditionally enter a subresource
+ #: (if e.g. we have just entered a keyword whose value is a subresource)
+ maybe_in_subresource: _MaybeInSubresource[D]
+
+ #: Retrieve the anchors contained in the given document.
+ _anchors_in: Callable[
+ [Specification[D], D],
+ Iterable[AnchorType[D]],
+ ] = field(alias="anchors_in")
+
+ #: An opaque specification where resources have no subresources
+ #: nor internal identifiers.
+ OPAQUE: ClassVar[Specification[Any]]
+
+ #: Attempt to discern which specification applies to the given contents.
+ #:
+ #: May be called either as an instance method or as a class method, with
+ #: slightly different behavior in the following case:
+ #:
+ #: Recall that not all contents contains enough internal information about
+ #: which specification it is written for -- the JSON Schema ``{}``,
+ #: for instance, is valid under many different dialects and may be
+ #: interpreted as any one of them.
+ #:
+ #: When this method is used as an instance method (i.e. called on a
+ #: specific specification), that specification is used as the default
+ #: if the given contents are unidentifiable.
+ #:
+ #: On the other hand when called as a class method, an error is raised.
+ #:
+ #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
+ #: whereas the class method ``Specification.detect({})`` will raise an
+ #: error.
+ #:
+ #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
+ #: specification when given a schema which *does* identify as being for
+ #: another version).
+ #:
+ #: Raises:
+ #:
+ #: `CannotDetermineSpecification`
+ #:
+ #: if the given contents don't have any discernible
+ #: information which could be used to guess which
+ #: specification they identify as
+ detect = _SpecificationDetector()
+
+ def __repr__(self) -> str:
+ return f"<Specification name={self.name!r}>"
+
+ def anchors_in(self, contents: D):
+ """
+ Retrieve the anchors contained in the given document.
+ """
+ return self._anchors_in(self, contents)
+
+ def create_resource(self, contents: D) -> Resource[D]:
+ """
+ Create a resource which is interpreted using this specification.
+ """
+ return Resource(contents=contents, specification=self)
+
+
+Specification.OPAQUE = Specification(
+ name="opaque",
+ id_of=lambda contents: None,
+ subresources_of=lambda contents: [],
+ anchors_in=lambda specification, contents: [],
+ maybe_in_subresource=lambda segments, resolver, subresource: resolver,
+)
+
+
+@frozen
+class Resource(Generic[D]):
+ r"""
+ A document (deserialized JSON) with a concrete interpretation under a spec.
+
+ In other words, a Python object, along with an instance of `Specification`
+ which describes how the document interacts with referencing -- both
+ internally (how it refers to other `Resource`\ s) and externally (how it
+ should be identified such that it is referenceable by other documents).
+ """
+
+ contents: D
+ _specification: Specification[D] = field(alias="specification")
+
+ @classmethod
+ def from_contents(
+ cls,
+ contents: D,
+ default_specification: (
+ type[Specification[D]] | Specification[D]
+ ) = Specification,
+ ) -> Resource[D]:
+ """
+ Create a resource guessing which specification applies to the contents.
+
+ Raises:
+
+ `CannotDetermineSpecification`
+
+ if the given contents don't have any discernible
+ information which could be used to guess which
+ specification they identify as
+
+ """
+ specification = default_specification.detect(contents)
+ return specification.create_resource(contents=contents)
+
+ @classmethod
+ def opaque(cls, contents: D) -> Resource[D]:
+ """
+ Create an opaque `Resource` -- i.e. one with opaque specification.
+
+ See `Specification.OPAQUE` for details.
+ """
+ return Specification.OPAQUE.create_resource(contents=contents)
+
+ def id(self) -> URI | None:
+ """
+ Retrieve this resource's (specification-specific) identifier.
+ """
+ id = self._specification.id_of(self.contents)
+ if id is None:
+ return
+ return id.rstrip("#")
+
+ def subresources(self) -> Iterable[Resource[D]]:
+ """
+ Retrieve this resource's subresources.
+ """
+ return (
+ Resource.from_contents(
+ each,
+ default_specification=self._specification,
+ )
+ for each in self._specification.subresources_of(self.contents)
+ )
+
+ def anchors(self) -> Iterable[AnchorType[D]]:
+ """
+ Retrieve this resource's (specification-specific) identifier.
+ """
+ return self._specification.anchors_in(self.contents)
+
+ def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
+ """
+ Resolve the given JSON pointer.
+
+ Raises:
+
+ `exceptions.PointerToNowhere`
+
+ if the pointer points to a location not present in the document
+
+ """
+ if not pointer:
+ return Resolved(contents=self.contents, resolver=resolver)
+
+ contents = self.contents
+ segments: list[int | str] = []
+ for segment in unquote(pointer[1:]).split("/"):
+ if isinstance(contents, Sequence):
+ segment = int(segment)
+ else:
+ segment = segment.replace("~1", "/").replace("~0", "~")
+ try:
+ contents = contents[segment] # type: ignore[reportUnknownArgumentType]
+ except LookupError as lookup_error:
+ error = exceptions.PointerToNowhere(ref=pointer, resource=self)
+ raise error from lookup_error
+
+ segments.append(segment)
+ last = resolver
+ resolver = self._specification.maybe_in_subresource(
+ segments=segments,
+ resolver=resolver,
+ subresource=self._specification.create_resource(contents),
+ )
+ if resolver is not last:
+ segments = []
+ return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
+
+
+def _fail_to_retrieve(uri: URI):
+ raise exceptions.NoSuchResource(ref=uri)
+
+
+@frozen
+class Registry(Mapping[URI, Resource[D]]):
+ r"""
+ A registry of `Resource`\ s, each identified by their canonical URIs.
+
+ Registries store a collection of in-memory resources, and optionally
+ enable additional resources which may be stored elsewhere (e.g. in a
+ database, a separate set of files, over the network, etc.).
+
+ They also lazily walk their known resources, looking for subresources
+ within them. In other words, subresources contained within any added
+ resources will be retrievable via their own IDs (though this discovery of
+ subresources will be delayed until necessary).
+
+ Registries are immutable, and their methods return new instances of the
+ registry with the additional resources added to them.
+
+ The ``retrieve`` argument can be used to configure retrieval of resources
+ dynamically, either over the network, from a database, or the like.
+ Pass it a callable which will be called if any URI not present in the
+ registry is accessed. It must either return a `Resource` or else raise a
+ `NoSuchResource` exception indicating that the resource does not exist
+ even according to the retrieval logic.
+ """
+
+ _resources: HashTrieMap[URI, Resource[D]] = field(
+ default=HashTrieMap(),
+ converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
+ alias="resources",
+ )
+ _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
+ _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
+ _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
+
+ def __getitem__(self, uri: URI) -> Resource[D]:
+ """
+ Return the (already crawled) `Resource` identified by the given URI.
+ """
+ try:
+ return self._resources[uri.rstrip("#")]
+ except KeyError:
+ raise exceptions.NoSuchResource(ref=uri) from None
+
+ def __iter__(self) -> Iterator[URI]:
+ """
+ Iterate over all crawled URIs in the registry.
+ """
+ return iter(self._resources)
+
+ def __len__(self) -> int:
+ """
+ Count the total number of fully crawled resources in this registry.
+ """
+ return len(self._resources)
+
+ def __rmatmul__(
+ self,
+ new: Resource[D] | Iterable[Resource[D]],
+ ) -> Registry[D]:
+ """
+ Create a new registry with resource(s) added using their internal IDs.
+
+ Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
+ modern JSON Schema versions), otherwise an error will be raised.
+
+ Both a single resource as well as an iterable of resources works, i.e.:
+
+ * ``resource @ registry`` or
+
+ * ``[iterable, of, multiple, resources] @ registry``
+
+ which -- again, assuming the resources have internal IDs -- is
+ equivalent to calling `Registry.with_resources` as such:
+
+ .. code:: python
+
+ registry.with_resources(
+ (resource.id(), resource) for resource in new_resources
+ )
+
+ Raises:
+
+ `NoInternalID`
+
+ if the resource(s) in fact do not have IDs
+
+ """
+ if isinstance(new, Resource):
+ new = (new,)
+
+ resources = self._resources
+ uncrawled = self._uncrawled
+ for resource in new:
+ id = resource.id()
+ if id is None:
+ raise exceptions.NoInternalID(resource=resource)
+ uncrawled = uncrawled.insert(id)
+ resources = resources.insert(id, resource)
+ return evolve(self, resources=resources, uncrawled=uncrawled)
+
+ def __repr__(self) -> str:
+ size = len(self)
+ pluralized = "resource" if size == 1 else "resources"
+ if self._uncrawled:
+ uncrawled = len(self._uncrawled)
+ if uncrawled == size:
+ summary = f"uncrawled {pluralized}"
+ else:
+ summary = f"{pluralized}, {uncrawled} uncrawled"
+ else:
+ summary = f"{pluralized}"
+ return f"<Registry ({size} {summary})>"
+
+ def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
+ """
+ Get a resource from the registry, crawling or retrieving if necessary.
+
+ May involve crawling to find the given URI if it is not already known,
+ so the returned object is a `Retrieved` object which contains both the
+ resource value as well as the registry which ultimately contained it.
+ """
+ resource = self._resources.get(uri)
+ if resource is not None:
+ return Retrieved(registry=self, value=resource)
+
+ registry = self.crawl()
+ resource = registry._resources.get(uri)
+ if resource is not None:
+ return Retrieved(registry=registry, value=resource)
+
+ try:
+ resource = registry._retrieve(uri)
+ except (
+ exceptions.CannotDetermineSpecification,
+ exceptions.NoSuchResource,
+ ):
+ raise
+ except Exception as error:
+ raise exceptions.Unretrievable(ref=uri) from error
+ else:
+ registry = registry.with_resource(uri, resource)
+ return Retrieved(registry=registry, value=resource)
+
+ def remove(self, uri: URI):
+ """
+ Return a registry with the resource identified by a given URI removed.
+ """
+ if uri not in self._resources:
+ raise exceptions.NoSuchResource(ref=uri)
+
+ return evolve(
+ self,
+ resources=self._resources.remove(uri),
+ uncrawled=self._uncrawled.discard(uri),
+ anchors=HashTrieMap(
+ (k, v) for k, v in self._anchors.items() if k[0] != uri
+ ),
+ )
+
+ def anchor(self, uri: URI, name: str):
+ """
+ Retrieve a given anchor from a resource which must already be crawled.
+ """
+ value = self._anchors.get((uri, name))
+ if value is not None:
+ return Retrieved(value=value, registry=self)
+
+ registry = self.crawl()
+ value = registry._anchors.get((uri, name))
+ if value is not None:
+ return Retrieved(value=value, registry=registry)
+
+ resource = self[uri]
+ canonical_uri = resource.id()
+ if canonical_uri is not None:
+ value = registry._anchors.get((canonical_uri, name))
+ if value is not None:
+ return Retrieved(value=value, registry=registry)
+
+ if "/" in name:
+ raise exceptions.InvalidAnchor(
+ ref=uri,
+ resource=resource,
+ anchor=name,
+ )
+ raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
+
+ def contents(self, uri: URI) -> D:
+ """
+ Retrieve the (already crawled) contents identified by the given URI.
+ """
+ return self[uri].contents
+
+ def crawl(self) -> Registry[D]:
+ """
+ Crawl all added resources, discovering subresources.
+ """
+ resources = self._resources
+ anchors = self._anchors
+ uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
+ while uncrawled:
+ uri, resource = uncrawled.pop()
+
+ id = resource.id()
+ if id is not None:
+ uri = urljoin(uri, id)
+ resources = resources.insert(uri, resource)
+ for each in resource.anchors():
+ anchors = anchors.insert((uri, each.name), each)
+ uncrawled.extend((uri, each) for each in resource.subresources())
+ return evolve(
+ self,
+ resources=resources,
+ anchors=anchors,
+ uncrawled=EMPTY_UNCRAWLED,
+ )
+
+ def with_resource(self, uri: URI, resource: Resource[D]):
+ """
+ Add the given `Resource` to the registry, without crawling it.
+ """
+ return self.with_resources([(uri, resource)])
+
+ def with_resources(
+ self,
+ pairs: Iterable[tuple[URI, Resource[D]]],
+ ) -> Registry[D]:
+ r"""
+ Add the given `Resource`\ s to the registry, without crawling them.
+ """
+ resources = self._resources
+ uncrawled = self._uncrawled
+ for uri, resource in pairs:
+ # Empty fragment URIs are equivalent to URIs without the fragment.
+ # TODO: Is this true for non JSON Schema resources? Probably not.
+ uri = uri.rstrip("#")
+ uncrawled = uncrawled.insert(uri)
+ resources = resources.insert(uri, resource)
+ return evolve(self, resources=resources, uncrawled=uncrawled)
+
+ def with_contents(
+ self,
+ pairs: Iterable[tuple[URI, D]],
+ **kwargs: Any,
+ ) -> Registry[D]:
+ r"""
+ Add the given contents to the registry, autodetecting when necessary.
+ """
+ return self.with_resources(
+ (uri, Resource.from_contents(each, **kwargs))
+ for uri, each in pairs
+ )
+
+ def combine(self, *registries: Registry[D]) -> Registry[D]:
+ """
+ Combine together one or more other registries, producing a unified one.
+ """
+ if registries == (self,):
+ return self
+ resources = self._resources
+ anchors = self._anchors
+ uncrawled = self._uncrawled
+ retrieve = self._retrieve
+ for registry in registries:
+ resources = resources.update(registry._resources)
+ anchors = anchors.update(registry._anchors)
+ uncrawled = uncrawled.update(registry._uncrawled)
+
+ if registry._retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ???
+ if registry._retrieve is not retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ???
+ raise ValueError( # noqa: TRY003
+ "Cannot combine registries with conflicting retrieval "
+ "functions.",
+ )
+ retrieve = registry._retrieve
+ return evolve(
+ self,
+ anchors=anchors,
+ resources=resources,
+ uncrawled=uncrawled,
+ retrieve=retrieve,
+ )
+
+ def resolver(self, base_uri: URI = "") -> Resolver[D]:
+ """
+ Return a `Resolver` which resolves references against this registry.
+ """
+ return Resolver(base_uri=base_uri, registry=self)
+
+ def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
+ """
+ Return a `Resolver` with a specific root resource.
+ """
+ uri = resource.id() or ""
+ return Resolver(
+ base_uri=uri,
+ registry=self.with_resource(uri, resource),
+ )
+
+
+#: An anchor or resource.
+AnchorOrResource = TypeVar(
+ "AnchorOrResource",
+ AnchorType[Any],
+ Resource[Any],
+ default=Resource[Any],
+)
+
+
+@frozen
+class Retrieved(Generic[D, AnchorOrResource]):
+ """
+ A value retrieved from a `Registry`.
+ """
+
+ value: AnchorOrResource
+ registry: Registry[D]
+
+
+@frozen
+class Resolved(Generic[D]):
+ """
+ A reference resolved to its contents by a `Resolver`.
+ """
+
+ contents: D
+ resolver: Resolver[D]
+
+
+@frozen
+class Resolver(Generic[D]):
+ """
+ A reference resolver.
+
+ Resolvers help resolve references (including relative ones) by
+ pairing a fixed base URI with a `Registry`.
+
+ This object, under normal circumstances, is expected to be used by
+ *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
+ implementations or other libraries resolving JSON references),
+ not directly by end-users populating registries or while writing
+ schemas or other resources.
+
+ References are resolved against the base URI, and the combined URI
+ is then looked up within the registry.
+
+ The process of resolving a reference may itself involve calculating
+ a *new* base URI for future reference resolution (e.g. if an
+ intermediate resource sets a new base URI), or may involve encountering
+ additional subresources and adding them to a new registry.
+ """
+
+ _base_uri: URI = field(alias="base_uri")
+ _registry: Registry[D] = field(alias="registry")
+ _previous: List[URI] = field(default=List(), repr=False, alias="previous")
+
+ def lookup(self, ref: URI) -> Resolved[D]:
+ """
+ Resolve the given reference to the resource it points to.
+
+ Raises:
+
+ `exceptions.Unresolvable`
+
+ or a subclass thereof (see below) if the reference isn't
+ resolvable
+
+ `exceptions.NoSuchAnchor`
+
+ if the reference is to a URI where a resource exists but
+ contains a plain name fragment which does not exist within
+ the resource
+
+ `exceptions.PointerToNowhere`
+
+ if the reference is to a URI where a resource exists but
+ contains a JSON pointer to a location within the resource
+ that does not exist
+
+ """
+ if ref.startswith("#"):
+ uri, fragment = self._base_uri, ref[1:]
+ else:
+ uri, fragment = urldefrag(urljoin(self._base_uri, ref))
+ try:
+ retrieved = self._registry.get_or_retrieve(uri)
+ except exceptions.NoSuchResource:
+ raise exceptions.Unresolvable(ref=ref) from None
+ except exceptions.Unretrievable as error:
+ raise exceptions.Unresolvable(ref=ref) from error
+
+ if fragment.startswith("/"):
+ resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+ return retrieved.value.pointer(pointer=fragment, resolver=resolver)
+
+ if fragment:
+ retrieved = retrieved.registry.anchor(uri, fragment)
+ resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+ return retrieved.value.resolve(resolver=resolver)
+
+ resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
+ return Resolved(contents=retrieved.value.contents, resolver=resolver)
+
+ def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
+ """
+ Create a resolver for a subresource (which may have a new base URI).
+ """
+ id = subresource.id()
+ if id is None:
+ return self
+ return evolve(self, base_uri=urljoin(self._base_uri, id))
+
+ def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
+ """
+ In specs with such a notion, return the URIs in the dynamic scope.
+ """
+ for uri in self._previous:
+ yield uri, self._registry
+
+ def _evolve(self, base_uri: URI, **kwargs: Any):
+ """
+ Evolve, appending to the dynamic scope.
+ """
+ previous = self._previous
+ if self._base_uri and (not previous or base_uri != self._base_uri):
+ previous = previous.push_front(self._base_uri)
+ return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
+
+
+@frozen
+class Anchor(Generic[D]):
+ """
+ A simple anchor in a `Resource`.
+ """
+
+ name: str
+ resource: Resource[D]
+
+ def resolve(self, resolver: Resolver[D]):
+ """
+ Return the resource for this anchor.
+ """
+ return Resolved(contents=self.resource.contents, resolver=resolver)
diff --git a/.venv/lib/python3.12/site-packages/referencing/exceptions.py b/.venv/lib/python3.12/site-packages/referencing/exceptions.py
new file mode 100644
index 00000000..3267fc70
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/exceptions.py
@@ -0,0 +1,165 @@
+"""
+Errors, oh no!
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import attrs
+
+from referencing._attrs import frozen
+
+if TYPE_CHECKING:
+ from referencing import Resource
+ from referencing.typing import URI
+
+
+@frozen
+class NoSuchResource(KeyError):
+ """
+ The given URI is not present in a registry.
+
+ Unlike most exceptions, this class *is* intended to be publicly
+ instantiable and *is* part of the public API of the package.
+ """
+
+ ref: URI
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return attrs.astuple(self) == attrs.astuple(other)
+
+ def __hash__(self) -> int:
+ return hash(attrs.astuple(self))
+
+
+@frozen
+class NoInternalID(Exception):
+ """
+ A resource has no internal ID, but one is needed.
+
+ E.g. in modern JSON Schema drafts, this is the :kw:`$id` keyword.
+
+ One might be needed if a resource was to-be added to a registry but no
+ other URI is available, and the resource doesn't declare its canonical URI.
+ """
+
+ resource: Resource[Any]
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return attrs.astuple(self) == attrs.astuple(other)
+
+ def __hash__(self) -> int:
+ return hash(attrs.astuple(self))
+
+
+@frozen
+class Unretrievable(KeyError):
+ """
+ The given URI is not present in a registry, and retrieving it failed.
+ """
+
+ ref: URI
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return attrs.astuple(self) == attrs.astuple(other)
+
+ def __hash__(self) -> int:
+ return hash(attrs.astuple(self))
+
+
+@frozen
+class CannotDetermineSpecification(Exception):
+ """
+ Attempting to detect the appropriate `Specification` failed.
+
+ This happens if no discernible information is found in the contents of the
+ new resource which would help identify it.
+ """
+
+ contents: Any
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return attrs.astuple(self) == attrs.astuple(other)
+
+ def __hash__(self) -> int:
+ return hash(attrs.astuple(self))
+
+
+@attrs.frozen # Because here we allow subclassing below.
+class Unresolvable(Exception):
+ """
+ A reference was unresolvable.
+ """
+
+ ref: URI
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return attrs.astuple(self) == attrs.astuple(other)
+
+ def __hash__(self) -> int:
+ return hash(attrs.astuple(self))
+
+
+@frozen
+class PointerToNowhere(Unresolvable):
+ """
+ A JSON Pointer leads to a part of a document that does not exist.
+ """
+
+ resource: Resource[Any]
+
+ def __str__(self) -> str:
+ msg = f"{self.ref!r} does not exist within {self.resource.contents!r}"
+ if self.ref == "/":
+ msg += (
+ ". The pointer '/' is a valid JSON Pointer but it points to "
+ "an empty string property ''. If you intended to point "
+ "to the entire resource, you should use '#'."
+ )
+ return msg
+
+
+@frozen
+class NoSuchAnchor(Unresolvable):
+ """
+ An anchor does not exist within a particular resource.
+ """
+
+ resource: Resource[Any]
+ anchor: str
+
+ def __str__(self) -> str:
+ return (
+ f"{self.anchor!r} does not exist within {self.resource.contents!r}"
+ )
+
+
+@frozen
+class InvalidAnchor(Unresolvable):
+ """
+ An anchor which could never exist in a resource was dereferenced.
+
+ It is somehow syntactically invalid.
+ """
+
+ resource: Resource[Any]
+ anchor: str
+
+ def __str__(self) -> str:
+ return (
+ f"'#{self.anchor}' is not a valid anchor, neither as a "
+ "plain name anchor nor as a JSON Pointer. You may have intended "
+ f"to use '#/{self.anchor}', as the slash is required *before each "
+ "segment* of a JSON pointer."
+ )
diff --git a/.venv/lib/python3.12/site-packages/referencing/jsonschema.py b/.venv/lib/python3.12/site-packages/referencing/jsonschema.py
new file mode 100644
index 00000000..169e109d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/jsonschema.py
@@ -0,0 +1,642 @@
+"""
+Referencing implementations for JSON Schema specs (historic & current).
+"""
+
+from __future__ import annotations
+
+from collections.abc import Iterable, Sequence, Set
+from typing import Any, Union
+
+from referencing import Anchor, Registry, Resource, Specification, exceptions
+from referencing._attrs import frozen
+from referencing._core import (
+ _UNSET, # type: ignore[reportPrivateUsage]
+ Resolved as _Resolved,
+ Resolver as _Resolver,
+ _Unset, # type: ignore[reportPrivateUsage]
+)
+from referencing.typing import URI, Anchor as AnchorType, Mapping
+
+#: A JSON Schema which is a JSON object
+ObjectSchema = Mapping[str, Any]
+
+#: A JSON Schema of any kind
+Schema = Union[bool, ObjectSchema]
+
+#: A Resource whose contents are JSON Schemas
+SchemaResource = Resource[Schema]
+
+#: A JSON Schema Registry
+SchemaRegistry = Registry[Schema]
+
+#: The empty JSON Schema Registry
+EMPTY_REGISTRY: SchemaRegistry = Registry()
+
+
+@frozen
+class UnknownDialect(Exception):
+ """
+ A dialect identifier was found for a dialect unknown by this library.
+
+ If it's a custom ("unofficial") dialect, be sure you've registered it.
+ """
+
+ uri: URI
+
+
+def _dollar_id(contents: Schema) -> URI | None:
+ if isinstance(contents, bool):
+ return
+ return contents.get("$id")
+
+
+def _legacy_dollar_id(contents: Schema) -> URI | None:
+ if isinstance(contents, bool) or "$ref" in contents:
+ return
+ id = contents.get("$id")
+ if id is not None and not id.startswith("#"):
+ return id
+
+
+def _legacy_id(contents: ObjectSchema) -> URI | None:
+ if "$ref" in contents:
+ return
+ id = contents.get("id")
+ if id is not None and not id.startswith("#"):
+ return id
+
+
+def _anchor(
+ specification: Specification[Schema],
+ contents: Schema,
+) -> Iterable[AnchorType[Schema]]:
+ if isinstance(contents, bool):
+ return
+ anchor = contents.get("$anchor")
+ if anchor is not None:
+ yield Anchor(
+ name=anchor,
+ resource=specification.create_resource(contents),
+ )
+
+ dynamic_anchor = contents.get("$dynamicAnchor")
+ if dynamic_anchor is not None:
+ yield DynamicAnchor(
+ name=dynamic_anchor,
+ resource=specification.create_resource(contents),
+ )
+
+
+def _anchor_2019(
+ specification: Specification[Schema],
+ contents: Schema,
+) -> Iterable[Anchor[Schema]]:
+ if isinstance(contents, bool):
+ return []
+ anchor = contents.get("$anchor")
+ if anchor is None:
+ return []
+ return [
+ Anchor(
+ name=anchor,
+ resource=specification.create_resource(contents),
+ ),
+ ]
+
+
+def _legacy_anchor_in_dollar_id(
+ specification: Specification[Schema],
+ contents: Schema,
+) -> Iterable[Anchor[Schema]]:
+ if isinstance(contents, bool):
+ return []
+ id = contents.get("$id", "")
+ if not id.startswith("#"):
+ return []
+ return [
+ Anchor(
+ name=id[1:],
+ resource=specification.create_resource(contents),
+ ),
+ ]
+
+
+def _legacy_anchor_in_id(
+ specification: Specification[ObjectSchema],
+ contents: ObjectSchema,
+) -> Iterable[Anchor[ObjectSchema]]:
+ id = contents.get("id", "")
+ if not id.startswith("#"):
+ return []
+ return [
+ Anchor(
+ name=id[1:],
+ resource=specification.create_resource(contents),
+ ),
+ ]
+
+
+def _subresources_of(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ """
+ Create a callable returning JSON Schema specification-style subschemas.
+
+ Relies on specifying the set of keywords containing subschemas in their
+ values, in a subobject's values, or in a subarray.
+ """
+
+ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
+ if isinstance(contents, bool):
+ return
+ for each in in_value:
+ if each in contents:
+ yield contents[each]
+ for each in in_subarray:
+ if each in contents:
+ yield from contents[each]
+ for each in in_subvalues:
+ if each in contents:
+ yield from contents[each].values()
+
+ return subresources_of
+
+
+def _subresources_of_with_crazy_items(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ """
+ Specifically handle older drafts where there are some funky keywords.
+ """
+
+ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
+ if isinstance(contents, bool):
+ return
+ for each in in_value:
+ if each in contents:
+ yield contents[each]
+ for each in in_subarray:
+ if each in contents:
+ yield from contents[each]
+ for each in in_subvalues:
+ if each in contents:
+ yield from contents[each].values()
+
+ items = contents.get("items")
+ if items is not None:
+ if isinstance(items, Sequence):
+ yield from items
+ else:
+ yield items
+
+ return subresources_of
+
+
+def _subresources_of_with_crazy_items_dependencies(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ """
+ Specifically handle older drafts where there are some funky keywords.
+ """
+
+ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
+ if isinstance(contents, bool):
+ return
+ for each in in_value:
+ if each in contents:
+ yield contents[each]
+ for each in in_subarray:
+ if each in contents:
+ yield from contents[each]
+ for each in in_subvalues:
+ if each in contents:
+ yield from contents[each].values()
+
+ items = contents.get("items")
+ if items is not None:
+ if isinstance(items, Sequence):
+ yield from items
+ else:
+ yield items
+ dependencies = contents.get("dependencies")
+ if dependencies is not None:
+ values = iter(dependencies.values())
+ value = next(values, None)
+ if isinstance(value, Mapping):
+ yield value
+ yield from values
+
+ return subresources_of
+
+
+def _subresources_of_with_crazy_aP_items_dependencies(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ """
+ Specifically handle even older drafts where there are some funky keywords.
+ """
+
+ def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
+ for each in in_value:
+ if each in contents:
+ yield contents[each]
+ for each in in_subarray:
+ if each in contents:
+ yield from contents[each]
+ for each in in_subvalues:
+ if each in contents:
+ yield from contents[each].values()
+
+ items = contents.get("items")
+ if items is not None:
+ if isinstance(items, Sequence):
+ yield from items
+ else:
+ yield items
+ dependencies = contents.get("dependencies")
+ if dependencies is not None:
+ values = iter(dependencies.values())
+ value = next(values, None)
+ if isinstance(value, Mapping):
+ yield value
+ yield from values
+
+ for each in "additionalItems", "additionalProperties":
+ value = contents.get(each)
+ if isinstance(value, Mapping):
+ yield value
+
+ return subresources_of
+
+
+def _maybe_in_subresource(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ in_child = in_subvalues | in_subarray
+
+ def maybe_in_subresource(
+ segments: Sequence[int | str],
+ resolver: _Resolver[Any],
+ subresource: Resource[Any],
+ ) -> _Resolver[Any]:
+ _segments = iter(segments)
+ for segment in _segments:
+ if segment not in in_value and (
+ segment not in in_child or next(_segments, None) is None
+ ):
+ return resolver
+ return resolver.in_subresource(subresource)
+
+ return maybe_in_subresource
+
+
+def _maybe_in_subresource_crazy_items(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ in_child = in_subvalues | in_subarray
+
+ def maybe_in_subresource(
+ segments: Sequence[int | str],
+ resolver: _Resolver[Any],
+ subresource: Resource[Any],
+ ) -> _Resolver[Any]:
+ _segments = iter(segments)
+ for segment in _segments:
+ if segment == "items" and isinstance(
+ subresource.contents,
+ Mapping,
+ ):
+ return resolver.in_subresource(subresource)
+ if segment not in in_value and (
+ segment not in in_child or next(_segments, None) is None
+ ):
+ return resolver
+ return resolver.in_subresource(subresource)
+
+ return maybe_in_subresource
+
+
+def _maybe_in_subresource_crazy_items_dependencies(
+ in_value: Set[str] = frozenset(),
+ in_subvalues: Set[str] = frozenset(),
+ in_subarray: Set[str] = frozenset(),
+):
+ in_child = in_subvalues | in_subarray
+
+ def maybe_in_subresource(
+ segments: Sequence[int | str],
+ resolver: _Resolver[Any],
+ subresource: Resource[Any],
+ ) -> _Resolver[Any]:
+ _segments = iter(segments)
+ for segment in _segments:
+ if segment in {"items", "dependencies"} and isinstance(
+ subresource.contents,
+ Mapping,
+ ):
+ return resolver.in_subresource(subresource)
+ if segment not in in_value and (
+ segment not in in_child or next(_segments, None) is None
+ ):
+ return resolver
+ return resolver.in_subresource(subresource)
+
+ return maybe_in_subresource
+
+
+#: JSON Schema draft 2020-12
+DRAFT202012 = Specification(
+ name="draft2020-12",
+ id_of=_dollar_id,
+ subresources_of=_subresources_of(
+ in_value={
+ "additionalProperties",
+ "contains",
+ "contentSchema",
+ "else",
+ "if",
+ "items",
+ "not",
+ "propertyNames",
+ "then",
+ "unevaluatedItems",
+ "unevaluatedProperties",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
+ in_subvalues={
+ "$defs",
+ "definitions",
+ "dependentSchemas",
+ "patternProperties",
+ "properties",
+ },
+ ),
+ anchors_in=_anchor,
+ maybe_in_subresource=_maybe_in_subresource(
+ in_value={
+ "additionalProperties",
+ "contains",
+ "contentSchema",
+ "else",
+ "if",
+ "items",
+ "not",
+ "propertyNames",
+ "then",
+ "unevaluatedItems",
+ "unevaluatedProperties",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
+ in_subvalues={
+ "$defs",
+ "definitions",
+ "dependentSchemas",
+ "patternProperties",
+ "properties",
+ },
+ ),
+)
+#: JSON Schema draft 2019-09
+DRAFT201909 = Specification(
+ name="draft2019-09",
+ id_of=_dollar_id,
+ subresources_of=_subresources_of_with_crazy_items(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "contentSchema",
+ "else",
+ "if",
+ "not",
+ "propertyNames",
+ "then",
+ "unevaluatedItems",
+ "unevaluatedProperties",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={
+ "$defs",
+ "definitions",
+ "dependentSchemas",
+ "patternProperties",
+ "properties",
+ },
+ ),
+ anchors_in=_anchor_2019,
+ maybe_in_subresource=_maybe_in_subresource_crazy_items(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "contentSchema",
+ "else",
+ "if",
+ "not",
+ "propertyNames",
+ "then",
+ "unevaluatedItems",
+ "unevaluatedProperties",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={
+ "$defs",
+ "definitions",
+ "dependentSchemas",
+ "patternProperties",
+ "properties",
+ },
+ ),
+)
+#: JSON Schema draft 7
+DRAFT7 = Specification(
+ name="draft-07",
+ id_of=_legacy_dollar_id,
+ subresources_of=_subresources_of_with_crazy_items_dependencies(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "else",
+ "if",
+ "not",
+ "propertyNames",
+ "then",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+ anchors_in=_legacy_anchor_in_dollar_id,
+ maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "else",
+ "if",
+ "not",
+ "propertyNames",
+ "then",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+)
+#: JSON Schema draft 6
+DRAFT6 = Specification(
+ name="draft-06",
+ id_of=_legacy_dollar_id,
+ subresources_of=_subresources_of_with_crazy_items_dependencies(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "not",
+ "propertyNames",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+ anchors_in=_legacy_anchor_in_dollar_id,
+ maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
+ in_value={
+ "additionalItems",
+ "additionalProperties",
+ "contains",
+ "not",
+ "propertyNames",
+ },
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+)
+#: JSON Schema draft 4
+DRAFT4 = Specification(
+ name="draft-04",
+ id_of=_legacy_id,
+ subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
+ in_value={"not"},
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+ anchors_in=_legacy_anchor_in_id,
+ maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
+ in_value={"additionalItems", "additionalProperties", "not"},
+ in_subarray={"allOf", "anyOf", "oneOf"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+)
+#: JSON Schema draft 3
+DRAFT3 = Specification(
+ name="draft-03",
+ id_of=_legacy_id,
+ subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
+ in_subarray={"extends"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+ anchors_in=_legacy_anchor_in_id,
+ maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
+ in_value={"additionalItems", "additionalProperties"},
+ in_subarray={"extends"},
+ in_subvalues={"definitions", "patternProperties", "properties"},
+ ),
+)
+
+
+_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
+ {
+ dialect_id: Resource.opaque(specification)
+ for dialect_id, specification in [
+ ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
+ ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
+ ("http://json-schema.org/draft-07/schema", DRAFT7),
+ ("http://json-schema.org/draft-06/schema", DRAFT6),
+ ("http://json-schema.org/draft-04/schema", DRAFT4),
+ ("http://json-schema.org/draft-03/schema", DRAFT3),
+ ]
+ },
+)
+
+
+def specification_with(
+ dialect_id: URI,
+ default: Specification[Any] | _Unset = _UNSET,
+) -> Specification[Any]:
+ """
+ Retrieve the `Specification` with the given dialect identifier.
+
+ Raises:
+
+ `UnknownDialect`
+
+ if the given ``dialect_id`` isn't known
+
+ """
+ resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
+ if resource is not None:
+ return resource.contents
+ if default is _UNSET:
+ raise UnknownDialect(dialect_id)
+ return default
+
+
+@frozen
+class DynamicAnchor:
+ """
+ Dynamic anchors, introduced in draft 2020.
+ """
+
+ name: str
+ resource: SchemaResource
+
+ def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
+ """
+ Resolve this anchor dynamically.
+ """
+ last = self.resource
+ for uri, registry in resolver.dynamic_scope():
+ try:
+ anchor = registry.anchor(uri, self.name).value
+ except exceptions.NoSuchAnchor:
+ continue
+ if isinstance(anchor, DynamicAnchor):
+ last = anchor.resource
+ return _Resolved(
+ contents=last.contents,
+ resolver=resolver.in_subresource(last),
+ )
+
+
+def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
+ """
+ Recursive references (via recursive anchors), present only in draft 2019.
+
+ As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
+ reference is supported (and is therefore assumed to be the relevant
+ reference).
+ """
+ resolved = resolver.lookup("#")
+ if isinstance(resolved.contents, Mapping) and resolved.contents.get(
+ "$recursiveAnchor",
+ ):
+ for uri, _ in resolver.dynamic_scope():
+ next_resolved = resolver.lookup(uri)
+ if not isinstance(
+ next_resolved.contents,
+ Mapping,
+ ) or not next_resolved.contents.get("$recursiveAnchor"):
+ break
+ resolved = next_resolved
+ return resolved
diff --git a/.venv/lib/python3.12/site-packages/referencing/py.typed b/.venv/lib/python3.12/site-packages/referencing/py.typed
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/py.typed
diff --git a/.venv/lib/python3.12/site-packages/referencing/retrieval.py b/.venv/lib/python3.12/site-packages/referencing/retrieval.py
new file mode 100644
index 00000000..53e0512b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/retrieval.py
@@ -0,0 +1,92 @@
+"""
+Helpers related to (dynamic) resource retrieval.
+"""
+
+from __future__ import annotations
+
+from functools import lru_cache
+from typing import TYPE_CHECKING, Callable
+import json
+
+try:
+ from typing_extensions import TypeVar
+except ImportError: # pragma: no cover
+ from typing import TypeVar
+
+from referencing import Resource
+
+if TYPE_CHECKING:
+ from referencing.typing import URI, D, Retrieve
+
+#: A serialized document (e.g. a JSON string)
+_T = TypeVar("_T", default=str)
+
+
+def to_cached_resource(
+ cache: Callable[[Retrieve[D]], Retrieve[D]] | None = None,
+ loads: Callable[[_T], D] = json.loads,
+ from_contents: Callable[[D], Resource[D]] = Resource.from_contents,
+) -> Callable[[Callable[[URI], _T]], Retrieve[D]]:
+ """
+ Create a retriever which caches its return values from a simpler callable.
+
+ Takes a function which returns things like serialized JSON (strings) and
+ returns something suitable for passing to `Registry` as a retrieve
+ function.
+
+ This decorator both reduces a small bit of boilerplate for a common case
+ (deserializing JSON from strings and creating `Resource` objects from the
+ result) as well as makes the probable need for caching a bit easier.
+ Retrievers which otherwise do expensive operations (like hitting the
+ network) might otherwise be called repeatedly.
+
+ Examples
+ --------
+
+ .. testcode::
+
+ from referencing import Registry
+ from referencing.typing import URI
+ import referencing.retrieval
+
+
+ @referencing.retrieval.to_cached_resource()
+ def retrieve(uri: URI):
+ print(f"Retrieved {uri}")
+
+ # Normally, go get some expensive JSON from the network, a file ...
+ return '''
+ {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "foo": "bar"
+ }
+ '''
+
+ one = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo")
+ print(one.value.contents["foo"])
+
+ # Retrieving the same URI again reuses the same value (and thus doesn't
+ # print another retrieval message here)
+ two = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo")
+ print(two.value.contents["foo"])
+
+ .. testoutput::
+
+ Retrieved urn:example:foo
+ bar
+ bar
+
+ """
+ if cache is None:
+ cache = lru_cache(maxsize=None)
+
+ def decorator(retrieve: Callable[[URI], _T]):
+ @cache
+ def cached_retrieve(uri: URI):
+ response = retrieve(uri)
+ contents = loads(response)
+ return from_contents(contents)
+
+ return cached_retrieve
+
+ return decorator
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py b/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py
new file mode 100644
index 00000000..3edddbc3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_core.py
@@ -0,0 +1,1057 @@
+from rpds import HashTrieMap
+import pytest
+
+from referencing import Anchor, Registry, Resource, Specification, exceptions
+from referencing.jsonschema import DRAFT202012
+
+ID_AND_CHILDREN = Specification(
+ name="id-and-children",
+ id_of=lambda contents: contents.get("ID"),
+ subresources_of=lambda contents: contents.get("children", []),
+ anchors_in=lambda specification, contents: [
+ Anchor(
+ name=name,
+ resource=specification.create_resource(contents=each),
+ )
+ for name, each in contents.get("anchors", {}).items()
+ ],
+ maybe_in_subresource=lambda segments, resolver, subresource: (
+ resolver.in_subresource(subresource)
+ if not len(segments) % 2
+ and all(each == "children" for each in segments[::2])
+ else resolver
+ ),
+)
+
+
+def blow_up(uri): # pragma: no cover
+ """
+ A retriever suitable for use in tests which expect it never to be used.
+ """
+ raise RuntimeError("This retrieve function expects to never be called!")
+
+
+class TestRegistry:
+ def test_with_resource(self):
+ """
+ Adding a resource to the registry then allows re-retrieving it.
+ """
+
+ resource = Resource.opaque(contents={"foo": "bar"})
+ uri = "urn:example"
+ registry = Registry().with_resource(uri=uri, resource=resource)
+ assert registry[uri] is resource
+
+ def test_with_resources(self):
+ """
+ Adding multiple resources to the registry is like adding each one.
+ """
+
+ one = Resource.opaque(contents={})
+ two = Resource(contents={"foo": "bar"}, specification=ID_AND_CHILDREN)
+ registry = Registry().with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/foo/bar", two),
+ ],
+ )
+ assert registry == Registry().with_resource(
+ uri="http://example.com/1",
+ resource=one,
+ ).with_resource(
+ uri="http://example.com/foo/bar",
+ resource=two,
+ )
+
+ def test_matmul_resource(self):
+ uri = "urn:example:resource"
+ resource = ID_AND_CHILDREN.create_resource({"ID": uri, "foo": 12})
+ registry = resource @ Registry()
+ assert registry == Registry().with_resource(uri, resource)
+
+ def test_matmul_many_resources(self):
+ one_uri = "urn:example:one"
+ one = ID_AND_CHILDREN.create_resource({"ID": one_uri, "foo": 12})
+
+ two_uri = "urn:example:two"
+ two = ID_AND_CHILDREN.create_resource({"ID": two_uri, "foo": 12})
+
+ registry = [one, two] @ Registry()
+ assert registry == Registry().with_resources(
+ [(one_uri, one), (two_uri, two)],
+ )
+
+ def test_matmul_resource_without_id(self):
+ resource = Resource.opaque(contents={"foo": "bar"})
+ with pytest.raises(exceptions.NoInternalID) as e:
+ resource @ Registry()
+ assert e.value == exceptions.NoInternalID(resource=resource)
+
+ def test_with_contents_from_json_schema(self):
+ uri = "urn:example"
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ registry = Registry().with_contents([(uri, schema)])
+
+ expected = Resource(contents=schema, specification=DRAFT202012)
+ assert registry[uri] == expected
+
+ def test_with_contents_and_default_specification(self):
+ uri = "urn:example"
+ registry = Registry().with_contents(
+ [(uri, {"foo": "bar"})],
+ default_specification=Specification.OPAQUE,
+ )
+ assert registry[uri] == Resource.opaque({"foo": "bar"})
+
+ def test_len(self):
+ total = 5
+ registry = Registry().with_contents(
+ [(str(i), {"foo": "bar"}) for i in range(total)],
+ default_specification=Specification.OPAQUE,
+ )
+ assert len(registry) == total
+
+ def test_bool_empty(self):
+ assert not Registry()
+
+ def test_bool_not_empty(self):
+ registry = Registry().with_contents(
+ [(str(i), {"foo": "bar"}) for i in range(3)],
+ default_specification=Specification.OPAQUE,
+ )
+ assert registry
+
+ def test_iter(self):
+ registry = Registry().with_contents(
+ [(str(i), {"foo": "bar"}) for i in range(8)],
+ default_specification=Specification.OPAQUE,
+ )
+ assert set(registry) == {str(i) for i in range(8)}
+
+ def test_crawl_still_has_top_level_resource(self):
+ resource = Resource.opaque({"foo": "bar"})
+ uri = "urn:example"
+ registry = Registry({uri: resource}).crawl()
+ assert registry[uri] is resource
+
+ def test_crawl_finds_a_subresource(self):
+ child_id = "urn:child"
+ root = ID_AND_CHILDREN.create_resource(
+ {"ID": "urn:root", "children": [{"ID": child_id, "foo": 12}]},
+ )
+ registry = root @ Registry()
+ with pytest.raises(LookupError):
+ registry[child_id]
+
+ expected = ID_AND_CHILDREN.create_resource({"ID": child_id, "foo": 12})
+ assert registry.crawl()[child_id] == expected
+
+ def test_crawl_finds_anchors_with_id(self):
+ resource = ID_AND_CHILDREN.create_resource(
+ {"ID": "urn:bar", "anchors": {"foo": 12}},
+ )
+ registry = resource @ Registry()
+
+ assert registry.crawl().anchor(resource.id(), "foo").value == Anchor(
+ name="foo",
+ resource=ID_AND_CHILDREN.create_resource(12),
+ )
+
+ def test_crawl_finds_anchors_no_id(self):
+ resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}})
+ registry = Registry().with_resource("urn:root", resource)
+
+ assert registry.crawl().anchor("urn:root", "foo").value == Anchor(
+ name="foo",
+ resource=ID_AND_CHILDREN.create_resource(12),
+ )
+
+ def test_contents(self):
+ resource = Resource.opaque({"foo": "bar"})
+ uri = "urn:example"
+ registry = Registry().with_resource(uri, resource)
+ assert registry.contents(uri) == {"foo": "bar"}
+
+ def test_getitem_strips_empty_fragments(self):
+ uri = "http://example.com/"
+ resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"})
+ registry = resource @ Registry()
+ assert registry[uri] == registry[uri + "#"] == resource
+
+ def test_contents_strips_empty_fragments(self):
+ uri = "http://example.com/"
+ resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"})
+ registry = resource @ Registry()
+ assert (
+ registry.contents(uri)
+ == registry.contents(uri + "#")
+ == {"ID": uri + "#"}
+ )
+
+ def test_contents_nonexistent_resource(self):
+ registry = Registry()
+ with pytest.raises(exceptions.NoSuchResource) as e:
+ registry.contents("urn:example")
+ assert e.value == exceptions.NoSuchResource(ref="urn:example")
+
+ def test_crawled_anchor(self):
+ resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}})
+ registry = Registry().with_resource("urn:example", resource)
+ retrieved = registry.anchor("urn:example", "foo")
+ assert retrieved.value == Anchor(
+ name="foo",
+ resource=ID_AND_CHILDREN.create_resource("bar"),
+ )
+ assert retrieved.registry == registry.crawl()
+
+ def test_anchor_in_nonexistent_resource(self):
+ registry = Registry()
+ with pytest.raises(exceptions.NoSuchResource) as e:
+ registry.anchor("urn:example", "foo")
+ assert e.value == exceptions.NoSuchResource(ref="urn:example")
+
+ def test_init(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = Registry(
+ {
+ "http://example.com/1": one,
+ "http://example.com/foo/bar": two,
+ },
+ )
+ assert (
+ registry
+ == Registry()
+ .with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/foo/bar", two),
+ ],
+ )
+ .crawl()
+ )
+
+ def test_dict_conversion(self):
+ """
+ Passing a `dict` to `Registry` gets converted to a `HashTrieMap`.
+
+ So continuing to use the registry works.
+ """
+
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = Registry(
+ {"http://example.com/1": one},
+ ).with_resource("http://example.com/foo/bar", two)
+ assert (
+ registry.crawl()
+ == Registry()
+ .with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/foo/bar", two),
+ ],
+ )
+ .crawl()
+ )
+
+ def test_no_such_resource(self):
+ registry = Registry()
+ with pytest.raises(exceptions.NoSuchResource) as e:
+ registry["urn:bigboom"]
+ assert e.value == exceptions.NoSuchResource(ref="urn:bigboom")
+
+ def test_combine(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ three = ID_AND_CHILDREN.create_resource({"baz": "quux"})
+ four = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}})
+
+ first = Registry({"http://example.com/1": one})
+ second = Registry().with_resource("http://example.com/foo/bar", two)
+ third = Registry(
+ {
+ "http://example.com/1": one,
+ "http://example.com/baz": three,
+ },
+ )
+ fourth = (
+ Registry()
+ .with_resource(
+ "http://example.com/foo/quux",
+ four,
+ )
+ .crawl()
+ )
+ assert first.combine(second, third, fourth) == Registry(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/baz", three),
+ ("http://example.com/foo/quux", four),
+ ],
+ anchors=HashTrieMap(
+ {
+ ("http://example.com/foo/quux", "foo"): Anchor(
+ name="foo",
+ resource=ID_AND_CHILDREN.create_resource(12),
+ ),
+ },
+ ),
+ ).with_resource("http://example.com/foo/bar", two)
+
+ def test_combine_self(self):
+ """
+ Combining a registry with itself short-circuits.
+
+ This is a performance optimization -- otherwise we do lots more work
+ (in jsonschema this seems to correspond to making the test suite take
+ *3x* longer).
+ """
+
+ registry = Registry({"urn:foo": "bar"})
+ assert registry.combine(registry) is registry
+
+ def test_combine_with_uncrawled_resources(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ three = ID_AND_CHILDREN.create_resource({"baz": "quux"})
+
+ first = Registry().with_resource("http://example.com/1", one)
+ second = Registry().with_resource("http://example.com/foo/bar", two)
+ third = Registry(
+ {
+ "http://example.com/1": one,
+ "http://example.com/baz": three,
+ },
+ )
+ expected = Registry(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/foo/bar", two),
+ ("http://example.com/baz", three),
+ ],
+ )
+ combined = first.combine(second, third)
+ assert combined != expected
+ assert combined.crawl() == expected
+
+ def test_combine_with_single_retrieve(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ three = ID_AND_CHILDREN.create_resource({"baz": "quux"})
+
+ def retrieve(uri): # pragma: no cover
+ pass
+
+ first = Registry().with_resource("http://example.com/1", one)
+ second = Registry(
+ retrieve=retrieve,
+ ).with_resource("http://example.com/2", two)
+ third = Registry().with_resource("http://example.com/3", three)
+
+ assert first.combine(second, third) == Registry(
+ retrieve=retrieve,
+ ).with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/2", two),
+ ("http://example.com/3", three),
+ ],
+ )
+ assert second.combine(first, third) == Registry(
+ retrieve=retrieve,
+ ).with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/2", two),
+ ("http://example.com/3", three),
+ ],
+ )
+
+ def test_combine_with_common_retrieve(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ three = ID_AND_CHILDREN.create_resource({"baz": "quux"})
+
+ def retrieve(uri): # pragma: no cover
+ pass
+
+ first = Registry(retrieve=retrieve).with_resource(
+ "http://example.com/1",
+ one,
+ )
+ second = Registry(
+ retrieve=retrieve,
+ ).with_resource("http://example.com/2", two)
+ third = Registry(retrieve=retrieve).with_resource(
+ "http://example.com/3",
+ three,
+ )
+
+ assert first.combine(second, third) == Registry(
+ retrieve=retrieve,
+ ).with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/2", two),
+ ("http://example.com/3", three),
+ ],
+ )
+ assert second.combine(first, third) == Registry(
+ retrieve=retrieve,
+ ).with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/2", two),
+ ("http://example.com/3", three),
+ ],
+ )
+
+ def test_combine_conflicting_retrieve(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ three = ID_AND_CHILDREN.create_resource({"baz": "quux"})
+
+ def foo_retrieve(uri): # pragma: no cover
+ pass
+
+ def bar_retrieve(uri): # pragma: no cover
+ pass
+
+ first = Registry(retrieve=foo_retrieve).with_resource(
+ "http://example.com/1",
+ one,
+ )
+ second = Registry().with_resource("http://example.com/2", two)
+ third = Registry(retrieve=bar_retrieve).with_resource(
+ "http://example.com/3",
+ three,
+ )
+
+ with pytest.raises(Exception, match="conflict.*retriev"):
+ first.combine(second, third)
+
+ def test_remove(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = Registry({"urn:foo": one, "urn:bar": two})
+ assert registry.remove("urn:foo") == Registry({"urn:bar": two})
+
+ def test_remove_uncrawled(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = Registry().with_resources(
+ [("urn:foo", one), ("urn:bar", two)],
+ )
+ assert registry.remove("urn:foo") == Registry().with_resource(
+ "urn:bar",
+ two,
+ )
+
+ def test_remove_with_anchors(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}})
+ registry = (
+ Registry()
+ .with_resources(
+ [("urn:foo", one), ("urn:bar", two)],
+ )
+ .crawl()
+ )
+ assert (
+ registry.remove("urn:bar")
+ == Registry()
+ .with_resource(
+ "urn:foo",
+ one,
+ )
+ .crawl()
+ )
+
+ def test_remove_nonexistent_uri(self):
+ with pytest.raises(exceptions.NoSuchResource) as e:
+ Registry().remove("urn:doesNotExist")
+ assert e.value == exceptions.NoSuchResource(ref="urn:doesNotExist")
+
+ def test_retrieve(self):
+ foo = Resource.opaque({"foo": "bar"})
+ registry = Registry(retrieve=lambda uri: foo)
+ assert registry.get_or_retrieve("urn:example").value == foo
+
+ def test_retrieve_arbitrary_exception(self):
+ foo = Resource.opaque({"foo": "bar"})
+
+ def retrieve(uri):
+ if uri == "urn:succeed":
+ return foo
+ raise Exception("Oh no!")
+
+ registry = Registry(retrieve=retrieve)
+ assert registry.get_or_retrieve("urn:succeed").value == foo
+ with pytest.raises(exceptions.Unretrievable):
+ registry.get_or_retrieve("urn:uhoh")
+
+ def test_retrieve_no_such_resource(self):
+ foo = Resource.opaque({"foo": "bar"})
+
+ def retrieve(uri):
+ if uri == "urn:succeed":
+ return foo
+ raise exceptions.NoSuchResource(ref=uri)
+
+ registry = Registry(retrieve=retrieve)
+ assert registry.get_or_retrieve("urn:succeed").value == foo
+ with pytest.raises(exceptions.NoSuchResource):
+ registry.get_or_retrieve("urn:uhoh")
+
+ def test_retrieve_cannot_determine_specification(self):
+ def retrieve(uri):
+ return Resource.from_contents({})
+
+ registry = Registry(retrieve=retrieve)
+ with pytest.raises(exceptions.CannotDetermineSpecification):
+ registry.get_or_retrieve("urn:uhoh")
+
+ def test_retrieve_already_available_resource(self):
+ foo = Resource.opaque({"foo": "bar"})
+ registry = Registry({"urn:example": foo}, retrieve=blow_up)
+ assert registry["urn:example"] == foo
+ assert registry.get_or_retrieve("urn:example").value == foo
+
+ def test_retrieve_first_checks_crawlable_resource(self):
+ child = ID_AND_CHILDREN.create_resource({"ID": "urn:child", "foo": 12})
+ root = ID_AND_CHILDREN.create_resource({"children": [child.contents]})
+ registry = Registry(retrieve=blow_up).with_resource("urn:root", root)
+ assert registry.crawl()["urn:child"] == child
+
+ def test_resolver(self):
+ one = Resource.opaque(contents={})
+ registry = Registry({"http://example.com": one})
+ resolver = registry.resolver(base_uri="http://example.com")
+ assert resolver.lookup("#").contents == {}
+
+ def test_resolver_with_root_identified(self):
+ root = ID_AND_CHILDREN.create_resource({"ID": "http://example.com"})
+ resolver = Registry().resolver_with_root(root)
+ assert resolver.lookup("http://example.com").contents == root.contents
+ assert resolver.lookup("#").contents == root.contents
+
+ def test_resolver_with_root_unidentified(self):
+ root = Resource.opaque(contents={})
+ resolver = Registry().resolver_with_root(root)
+ assert resolver.lookup("#").contents == root.contents
+
+ def test_repr(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = Registry().with_resources(
+ [
+ ("http://example.com/1", one),
+ ("http://example.com/foo/bar", two),
+ ],
+ )
+ assert repr(registry) == "<Registry (2 uncrawled resources)>"
+ assert repr(registry.crawl()) == "<Registry (2 resources)>"
+
+ def test_repr_mixed_crawled(self):
+ one = Resource.opaque(contents={})
+ two = ID_AND_CHILDREN.create_resource({"foo": "bar"})
+ registry = (
+ Registry(
+ {"http://example.com/1": one},
+ )
+ .crawl()
+ .with_resource(uri="http://example.com/foo/bar", resource=two)
+ )
+ assert repr(registry) == "<Registry (2 resources, 1 uncrawled)>"
+
+ def test_repr_one_resource(self):
+ registry = Registry().with_resource(
+ uri="http://example.com/1",
+ resource=Resource.opaque(contents={}),
+ )
+ assert repr(registry) == "<Registry (1 uncrawled resource)>"
+
+ def test_repr_empty(self):
+ assert repr(Registry()) == "<Registry (0 resources)>"
+
+
+class TestResource:
+ def test_from_contents_from_json_schema(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ resource = Resource.from_contents(schema)
+ assert resource == Resource(contents=schema, specification=DRAFT202012)
+
+ def test_from_contents_with_no_discernible_information(self):
+ """
+ Creating a resource with no discernible way to see what
+ specification it belongs to (e.g. no ``$schema`` keyword for JSON
+ Schema) raises an error.
+ """
+
+ with pytest.raises(exceptions.CannotDetermineSpecification):
+ Resource.from_contents({"foo": "bar"})
+
+ def test_from_contents_with_no_discernible_information_and_default(self):
+ resource = Resource.from_contents(
+ {"foo": "bar"},
+ default_specification=Specification.OPAQUE,
+ )
+ assert resource == Resource.opaque(contents={"foo": "bar"})
+
+ def test_from_contents_unneeded_default(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ resource = Resource.from_contents(
+ schema,
+ default_specification=Specification.OPAQUE,
+ )
+ assert resource == Resource(
+ contents=schema,
+ specification=DRAFT202012,
+ )
+
+ def test_non_mapping_from_contents(self):
+ resource = Resource.from_contents(
+ True,
+ default_specification=ID_AND_CHILDREN,
+ )
+ assert resource == Resource(
+ contents=True,
+ specification=ID_AND_CHILDREN,
+ )
+
+ def test_from_contents_with_fallback(self):
+ resource = Resource.from_contents(
+ {"foo": "bar"},
+ default_specification=Specification.OPAQUE,
+ )
+ assert resource == Resource.opaque(contents={"foo": "bar"})
+
+ def test_id_delegates_to_specification(self):
+ specification = Specification(
+ name="",
+ id_of=lambda contents: "urn:fixedID",
+ subresources_of=lambda contents: [],
+ anchors_in=lambda specification, contents: [],
+ maybe_in_subresource=(
+ lambda segments, resolver, subresource: resolver
+ ),
+ )
+ resource = Resource(
+ contents={"foo": "baz"},
+ specification=specification,
+ )
+ assert resource.id() == "urn:fixedID"
+
+ def test_id_strips_empty_fragment(self):
+ uri = "http://example.com/"
+ root = ID_AND_CHILDREN.create_resource({"ID": uri + "#"})
+ assert root.id() == uri
+
+ def test_subresources_delegates_to_specification(self):
+ resource = ID_AND_CHILDREN.create_resource({"children": [{}, 12]})
+ assert list(resource.subresources()) == [
+ ID_AND_CHILDREN.create_resource(each) for each in [{}, 12]
+ ]
+
+ def test_subresource_with_different_specification(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ resource = ID_AND_CHILDREN.create_resource({"children": [schema]})
+ assert list(resource.subresources()) == [
+ DRAFT202012.create_resource(schema),
+ ]
+
+ def test_anchors_delegates_to_specification(self):
+ resource = ID_AND_CHILDREN.create_resource(
+ {"anchors": {"foo": {}, "bar": 1, "baz": ""}},
+ )
+ assert list(resource.anchors()) == [
+ Anchor(name="foo", resource=ID_AND_CHILDREN.create_resource({})),
+ Anchor(name="bar", resource=ID_AND_CHILDREN.create_resource(1)),
+ Anchor(name="baz", resource=ID_AND_CHILDREN.create_resource("")),
+ ]
+
+ def test_pointer_to_mapping(self):
+ resource = Resource.opaque(contents={"foo": "baz"})
+ resolver = Registry().resolver()
+ assert resource.pointer("/foo", resolver=resolver).contents == "baz"
+
+ def test_pointer_to_array(self):
+ resource = Resource.opaque(contents={"foo": {"bar": [3]}})
+ resolver = Registry().resolver()
+ assert resource.pointer("/foo/bar/0", resolver=resolver).contents == 3
+
+ def test_root_pointer(self):
+ contents = {"foo": "baz"}
+ resource = Resource.opaque(contents=contents)
+ resolver = Registry().resolver()
+ assert resource.pointer("", resolver=resolver).contents == contents
+
+ def test_opaque(self):
+ contents = {"foo": "bar"}
+ assert Resource.opaque(contents) == Resource(
+ contents=contents,
+ specification=Specification.OPAQUE,
+ )
+
+
+class TestResolver:
+ def test_lookup_exact_uri(self):
+ resource = Resource.opaque(contents={"foo": "baz"})
+ resolver = Registry({"http://example.com/1": resource}).resolver()
+ resolved = resolver.lookup("http://example.com/1")
+ assert resolved.contents == resource.contents
+
+ def test_lookup_subresource(self):
+ root = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "children": [
+ {"ID": "http://example.com/a", "foo": 12},
+ ],
+ },
+ )
+ registry = root @ Registry()
+ resolved = registry.resolver().lookup("http://example.com/a")
+ assert resolved.contents == {"ID": "http://example.com/a", "foo": 12}
+
+ def test_lookup_anchor_with_id(self):
+ root = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "anchors": {"foo": 12},
+ },
+ )
+ registry = root @ Registry()
+ resolved = registry.resolver().lookup("http://example.com/#foo")
+ assert resolved.contents == 12
+
+ def test_lookup_anchor_without_id(self):
+ root = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}})
+ resolver = Registry().with_resource("urn:example", root).resolver()
+ resolved = resolver.lookup("urn:example#foo")
+ assert resolved.contents == 12
+
+ def test_lookup_unknown_reference(self):
+ resolver = Registry().resolver()
+ ref = "http://example.com/does/not/exist"
+ with pytest.raises(exceptions.Unresolvable) as e:
+ resolver.lookup(ref)
+ assert e.value == exceptions.Unresolvable(ref=ref)
+
+ def test_lookup_non_existent_pointer(self):
+ resource = Resource.opaque({"foo": {}})
+ resolver = Registry({"http://example.com/1": resource}).resolver()
+ ref = "http://example.com/1#/foo/bar"
+ with pytest.raises(exceptions.Unresolvable) as e:
+ resolver.lookup(ref)
+ assert e.value == exceptions.PointerToNowhere(
+ ref="/foo/bar",
+ resource=resource,
+ )
+ assert str(e.value) == "'/foo/bar' does not exist within {'foo': {}}"
+
+ def test_lookup_non_existent_pointer_to_array_index(self):
+ resource = Resource.opaque([1, 2, 4, 8])
+ resolver = Registry({"http://example.com/1": resource}).resolver()
+ ref = "http://example.com/1#/10"
+ with pytest.raises(exceptions.Unresolvable) as e:
+ resolver.lookup(ref)
+ assert e.value == exceptions.PointerToNowhere(
+ ref="/10",
+ resource=resource,
+ )
+
+ def test_lookup_pointer_to_empty_string(self):
+ resolver = Registry().resolver_with_root(Resource.opaque({"": {}}))
+ assert resolver.lookup("#/").contents == {}
+
+ def test_lookup_non_existent_pointer_to_empty_string(self):
+ resource = Resource.opaque({"foo": {}})
+ resolver = Registry().resolver_with_root(resource)
+ with pytest.raises(
+ exceptions.Unresolvable,
+ match="^'/' does not exist within {'foo': {}}.*'#'",
+ ) as e:
+ resolver.lookup("#/")
+ assert e.value == exceptions.PointerToNowhere(
+ ref="/",
+ resource=resource,
+ )
+
+ def test_lookup_non_existent_anchor(self):
+ root = ID_AND_CHILDREN.create_resource({"anchors": {}})
+ resolver = Registry().with_resource("urn:example", root).resolver()
+ resolved = resolver.lookup("urn:example")
+ assert resolved.contents == root.contents
+
+ ref = "urn:example#noSuchAnchor"
+ with pytest.raises(exceptions.Unresolvable) as e:
+ resolver.lookup(ref)
+ assert "'noSuchAnchor' does not exist" in str(e.value)
+ assert e.value == exceptions.NoSuchAnchor(
+ ref="urn:example",
+ resource=root,
+ anchor="noSuchAnchor",
+ )
+
+ def test_lookup_invalid_JSON_pointerish_anchor(self):
+ resolver = Registry().resolver_with_root(
+ ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "foo": {"bar": 12},
+ },
+ ),
+ )
+
+ valid = resolver.lookup("#/foo/bar")
+ assert valid.contents == 12
+
+ with pytest.raises(exceptions.InvalidAnchor) as e:
+ resolver.lookup("#foo/bar")
+ assert " '#/foo/bar'" in str(e.value)
+
+ def test_lookup_retrieved_resource(self):
+ resource = Resource.opaque(contents={"foo": "baz"})
+ resolver = Registry(retrieve=lambda uri: resource).resolver()
+ resolved = resolver.lookup("http://example.com/")
+ assert resolved.contents == resource.contents
+
+ def test_lookup_failed_retrieved_resource(self):
+ """
+ Unretrievable exceptions are also wrapped in Unresolvable.
+ """
+
+ uri = "http://example.com/"
+
+ registry = Registry(retrieve=blow_up)
+ with pytest.raises(exceptions.Unretrievable):
+ registry.get_or_retrieve(uri)
+
+ resolver = registry.resolver()
+ with pytest.raises(exceptions.Unresolvable):
+ resolver.lookup(uri)
+
+ def test_repeated_lookup_from_retrieved_resource(self):
+ """
+ A (custom-)retrieved resource is added to the registry returned by
+ looking it up.
+ """
+ resource = Resource.opaque(contents={"foo": "baz"})
+ once = [resource]
+
+ def retrieve(uri):
+ return once.pop()
+
+ resolver = Registry(retrieve=retrieve).resolver()
+ resolved = resolver.lookup("http://example.com/")
+ assert resolved.contents == resource.contents
+
+ resolved = resolved.resolver.lookup("http://example.com/")
+ assert resolved.contents == resource.contents
+
+ def test_repeated_anchor_lookup_from_retrieved_resource(self):
+ resource = Resource.opaque(contents={"foo": "baz"})
+ once = [resource]
+
+ def retrieve(uri):
+ return once.pop()
+
+ resolver = Registry(retrieve=retrieve).resolver()
+ resolved = resolver.lookup("http://example.com/")
+ assert resolved.contents == resource.contents
+
+ resolved = resolved.resolver.lookup("#")
+ assert resolved.contents == resource.contents
+
+ # FIXME: The tests below aren't really representable in the current
+ # suite, though we should probably think of ways to do so.
+
+ def test_in_subresource(self):
+ root = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "children": [
+ {
+ "ID": "child/",
+ "children": [{"ID": "grandchild"}],
+ },
+ ],
+ },
+ )
+ registry = root @ Registry()
+
+ resolver = registry.resolver()
+ first = resolver.lookup("http://example.com/")
+ assert first.contents == root.contents
+
+ with pytest.raises(exceptions.Unresolvable):
+ first.resolver.lookup("grandchild")
+
+ sub = first.resolver.in_subresource(
+ ID_AND_CHILDREN.create_resource(first.contents["children"][0]),
+ )
+ second = sub.lookup("grandchild")
+ assert second.contents == {"ID": "grandchild"}
+
+ def test_in_pointer_subresource(self):
+ root = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "children": [
+ {
+ "ID": "child/",
+ "children": [{"ID": "grandchild"}],
+ },
+ ],
+ },
+ )
+ registry = root @ Registry()
+
+ resolver = registry.resolver()
+ first = resolver.lookup("http://example.com/")
+ assert first.contents == root.contents
+
+ with pytest.raises(exceptions.Unresolvable):
+ first.resolver.lookup("grandchild")
+
+ second = first.resolver.lookup("#/children/0")
+ third = second.resolver.lookup("grandchild")
+ assert third.contents == {"ID": "grandchild"}
+
+ def test_dynamic_scope(self):
+ one = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/",
+ "children": [
+ {
+ "ID": "child/",
+ "children": [{"ID": "grandchild"}],
+ },
+ ],
+ },
+ )
+ two = ID_AND_CHILDREN.create_resource(
+ {
+ "ID": "http://example.com/two",
+ "children": [{"ID": "two-child/"}],
+ },
+ )
+ registry = [one, two] @ Registry()
+
+ resolver = registry.resolver()
+ first = resolver.lookup("http://example.com/")
+ second = first.resolver.lookup("#/children/0")
+ third = second.resolver.lookup("grandchild")
+ fourth = third.resolver.lookup("http://example.com/two")
+ assert list(fourth.resolver.dynamic_scope()) == [
+ ("http://example.com/child/grandchild", fourth.resolver._registry),
+ ("http://example.com/child/", fourth.resolver._registry),
+ ("http://example.com/", fourth.resolver._registry),
+ ]
+ assert list(third.resolver.dynamic_scope()) == [
+ ("http://example.com/child/", third.resolver._registry),
+ ("http://example.com/", third.resolver._registry),
+ ]
+ assert list(second.resolver.dynamic_scope()) == [
+ ("http://example.com/", second.resolver._registry),
+ ]
+ assert list(first.resolver.dynamic_scope()) == []
+
+
+class TestSpecification:
+ def test_create_resource(self):
+ specification = Specification(
+ name="",
+ id_of=lambda contents: "urn:fixedID",
+ subresources_of=lambda contents: [],
+ anchors_in=lambda specification, contents: [],
+ maybe_in_subresource=(
+ lambda segments, resolver, subresource: resolver
+ ),
+ )
+ resource = specification.create_resource(contents={"foo": "baz"})
+ assert resource == Resource(
+ contents={"foo": "baz"},
+ specification=specification,
+ )
+ assert resource.id() == "urn:fixedID"
+
+ def test_detect_from_json_schema(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ specification = Specification.detect(schema)
+ assert specification == DRAFT202012
+
+ def test_detect_with_no_discernible_information(self):
+ with pytest.raises(exceptions.CannotDetermineSpecification):
+ Specification.detect({"foo": "bar"})
+
+ def test_detect_with_non_URI_schema(self):
+ with pytest.raises(exceptions.CannotDetermineSpecification):
+ Specification.detect({"$schema": 37})
+
+ def test_detect_with_no_discernible_information_and_default(self):
+ specification = Specification.OPAQUE.detect({"foo": "bar"})
+ assert specification is Specification.OPAQUE
+
+ def test_detect_unneeded_default(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ specification = Specification.OPAQUE.detect(schema)
+ assert specification == DRAFT202012
+
+ def test_non_mapping_detect(self):
+ with pytest.raises(exceptions.CannotDetermineSpecification):
+ Specification.detect(True)
+
+ def test_non_mapping_detect_with_default(self):
+ specification = ID_AND_CHILDREN.detect(True)
+ assert specification is ID_AND_CHILDREN
+
+ def test_detect_with_fallback(self):
+ specification = Specification.OPAQUE.detect({"foo": "bar"})
+ assert specification is Specification.OPAQUE
+
+ def test_repr(self):
+ assert (
+ repr(ID_AND_CHILDREN) == "<Specification name='id-and-children'>"
+ )
+
+
+class TestOpaqueSpecification:
+ THINGS = [{"foo": "bar"}, True, 37, "foo", object()]
+
+ @pytest.mark.parametrize("thing", THINGS)
+ def test_no_id(self, thing):
+ """
+ An arbitrary thing has no ID.
+ """
+
+ assert Specification.OPAQUE.id_of(thing) is None
+
+ @pytest.mark.parametrize("thing", THINGS)
+ def test_no_subresources(self, thing):
+ """
+ An arbitrary thing has no subresources.
+ """
+
+ assert list(Specification.OPAQUE.subresources_of(thing)) == []
+
+ @pytest.mark.parametrize("thing", THINGS)
+ def test_no_anchors(self, thing):
+ """
+ An arbitrary thing has no anchors.
+ """
+
+ assert list(Specification.OPAQUE.anchors_in(thing)) == []
+
+
+@pytest.mark.parametrize(
+ "cls",
+ [Anchor, Registry, Resource, Specification, exceptions.PointerToNowhere],
+)
+def test_nonsubclassable(cls):
+ with pytest.raises(Exception, match="(?i)subclassing"):
+
+ class Boom(cls): # pragma: no cover
+ pass
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py
new file mode 100644
index 00000000..85cf99ec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_exceptions.py
@@ -0,0 +1,34 @@
+import itertools
+
+import pytest
+
+from referencing import Resource, exceptions
+
+
+def pairs(choices):
+ return itertools.combinations(choices, 2)
+
+
+TRUE = Resource.opaque(True)
+
+
+thunks = (
+ lambda: exceptions.CannotDetermineSpecification(TRUE),
+ lambda: exceptions.NoSuchResource("urn:example:foo"),
+ lambda: exceptions.NoInternalID(TRUE),
+ lambda: exceptions.InvalidAnchor(resource=TRUE, anchor="foo", ref="a#b"),
+ lambda: exceptions.NoSuchAnchor(resource=TRUE, anchor="foo", ref="a#b"),
+ lambda: exceptions.PointerToNowhere(resource=TRUE, ref="urn:example:foo"),
+ lambda: exceptions.Unresolvable("urn:example:foo"),
+ lambda: exceptions.Unretrievable("urn:example:foo"),
+)
+
+
+@pytest.mark.parametrize("one, two", pairs(each() for each in thunks))
+def test_eq_incompatible_types(one, two):
+ assert one != two
+
+
+@pytest.mark.parametrize("thunk", thunks)
+def test_hash(thunk):
+ assert thunk() in {thunk()}
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py
new file mode 100644
index 00000000..c80714d0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_jsonschema.py
@@ -0,0 +1,382 @@
+import pytest
+
+from referencing import Registry, Resource, Specification
+import referencing.jsonschema
+
+
+@pytest.mark.parametrize(
+ "uri, expected",
+ [
+ (
+ "https://json-schema.org/draft/2020-12/schema",
+ referencing.jsonschema.DRAFT202012,
+ ),
+ (
+ "https://json-schema.org/draft/2019-09/schema",
+ referencing.jsonschema.DRAFT201909,
+ ),
+ (
+ "http://json-schema.org/draft-07/schema#",
+ referencing.jsonschema.DRAFT7,
+ ),
+ (
+ "http://json-schema.org/draft-06/schema#",
+ referencing.jsonschema.DRAFT6,
+ ),
+ (
+ "http://json-schema.org/draft-04/schema#",
+ referencing.jsonschema.DRAFT4,
+ ),
+ (
+ "http://json-schema.org/draft-03/schema#",
+ referencing.jsonschema.DRAFT3,
+ ),
+ ],
+)
+def test_schemas_with_explicit_schema_keywords_are_detected(uri, expected):
+ """
+ The $schema keyword in JSON Schema is a dialect identifier.
+ """
+ contents = {"$schema": uri}
+ resource = Resource.from_contents(contents)
+ assert resource == Resource(contents=contents, specification=expected)
+
+
+def test_unknown_dialect():
+ dialect_id = "http://example.com/unknown-json-schema-dialect-id"
+ with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo:
+ Resource.from_contents({"$schema": dialect_id})
+ assert excinfo.value.uri == dialect_id
+
+
+@pytest.mark.parametrize(
+ "id, specification",
+ [
+ ("$id", referencing.jsonschema.DRAFT202012),
+ ("$id", referencing.jsonschema.DRAFT201909),
+ ("$id", referencing.jsonschema.DRAFT7),
+ ("$id", referencing.jsonschema.DRAFT6),
+ ("id", referencing.jsonschema.DRAFT4),
+ ("id", referencing.jsonschema.DRAFT3),
+ ],
+)
+def test_id_of_mapping(id, specification):
+ uri = "http://example.com/some-schema"
+ assert specification.id_of({id: uri}) == uri
+
+
+@pytest.mark.parametrize(
+ "specification",
+ [
+ referencing.jsonschema.DRAFT202012,
+ referencing.jsonschema.DRAFT201909,
+ referencing.jsonschema.DRAFT7,
+ referencing.jsonschema.DRAFT6,
+ ],
+)
+@pytest.mark.parametrize("value", [True, False])
+def test_id_of_bool(specification, value):
+ assert specification.id_of(value) is None
+
+
+@pytest.mark.parametrize(
+ "specification",
+ [
+ referencing.jsonschema.DRAFT202012,
+ referencing.jsonschema.DRAFT201909,
+ referencing.jsonschema.DRAFT7,
+ referencing.jsonschema.DRAFT6,
+ ],
+)
+@pytest.mark.parametrize("value", [True, False])
+def test_anchors_in_bool(specification, value):
+ assert list(specification.anchors_in(value)) == []
+
+
+@pytest.mark.parametrize(
+ "specification",
+ [
+ referencing.jsonschema.DRAFT202012,
+ referencing.jsonschema.DRAFT201909,
+ referencing.jsonschema.DRAFT7,
+ referencing.jsonschema.DRAFT6,
+ ],
+)
+@pytest.mark.parametrize("value", [True, False])
+def test_subresources_of_bool(specification, value):
+ assert list(specification.subresources_of(value)) == []
+
+
+@pytest.mark.parametrize(
+ "uri, expected",
+ [
+ (
+ "https://json-schema.org/draft/2020-12/schema",
+ referencing.jsonschema.DRAFT202012,
+ ),
+ (
+ "https://json-schema.org/draft/2019-09/schema",
+ referencing.jsonschema.DRAFT201909,
+ ),
+ (
+ "http://json-schema.org/draft-07/schema#",
+ referencing.jsonschema.DRAFT7,
+ ),
+ (
+ "http://json-schema.org/draft-06/schema#",
+ referencing.jsonschema.DRAFT6,
+ ),
+ (
+ "http://json-schema.org/draft-04/schema#",
+ referencing.jsonschema.DRAFT4,
+ ),
+ (
+ "http://json-schema.org/draft-03/schema#",
+ referencing.jsonschema.DRAFT3,
+ ),
+ ],
+)
+def test_specification_with(uri, expected):
+ assert referencing.jsonschema.specification_with(uri) == expected
+
+
+@pytest.mark.parametrize(
+ "uri, expected",
+ [
+ (
+ "http://json-schema.org/draft-07/schema",
+ referencing.jsonschema.DRAFT7,
+ ),
+ (
+ "http://json-schema.org/draft-06/schema",
+ referencing.jsonschema.DRAFT6,
+ ),
+ (
+ "http://json-schema.org/draft-04/schema",
+ referencing.jsonschema.DRAFT4,
+ ),
+ (
+ "http://json-schema.org/draft-03/schema",
+ referencing.jsonschema.DRAFT3,
+ ),
+ ],
+)
+def test_specification_with_no_empty_fragment(uri, expected):
+ assert referencing.jsonschema.specification_with(uri) == expected
+
+
+def test_specification_with_unknown_dialect():
+ dialect_id = "http://example.com/unknown-json-schema-dialect-id"
+ with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo:
+ referencing.jsonschema.specification_with(dialect_id)
+ assert excinfo.value.uri == dialect_id
+
+
+def test_specification_with_default():
+ dialect_id = "http://example.com/unknown-json-schema-dialect-id"
+ specification = referencing.jsonschema.specification_with(
+ dialect_id,
+ default=Specification.OPAQUE,
+ )
+ assert specification is Specification.OPAQUE
+
+
+# FIXME: The tests below should move to the referencing suite but I haven't yet
+# figured out how to represent dynamic (& recursive) ref lookups in it.
+def test_lookup_trivial_dynamic_ref():
+ one = referencing.jsonschema.DRAFT202012.create_resource(
+ {"$dynamicAnchor": "foo"},
+ )
+ resolver = Registry().with_resource("http://example.com", one).resolver()
+ resolved = resolver.lookup("http://example.com#foo")
+ assert resolved.contents == one.contents
+
+
+def test_multiple_lookup_trivial_dynamic_ref():
+ TRUE = referencing.jsonschema.DRAFT202012.create_resource(True)
+ root = referencing.jsonschema.DRAFT202012.create_resource(
+ {
+ "$id": "http://example.com",
+ "$dynamicAnchor": "fooAnchor",
+ "$defs": {
+ "foo": {
+ "$id": "foo",
+ "$dynamicAnchor": "fooAnchor",
+ "$defs": {
+ "bar": True,
+ "baz": {
+ "$dynamicAnchor": "fooAnchor",
+ },
+ },
+ },
+ },
+ },
+ )
+ resolver = (
+ Registry()
+ .with_resources(
+ [
+ ("http://example.com", root),
+ ("http://example.com/foo/", TRUE),
+ ("http://example.com/foo/bar", root),
+ ],
+ )
+ .resolver()
+ )
+
+ first = resolver.lookup("http://example.com")
+ second = first.resolver.lookup("foo/")
+ resolver = second.resolver.lookup("bar").resolver
+ fourth = resolver.lookup("#fooAnchor")
+ assert fourth.contents == root.contents
+
+
+def test_multiple_lookup_dynamic_ref_to_nondynamic_ref():
+ one = referencing.jsonschema.DRAFT202012.create_resource(
+ {"$anchor": "fooAnchor"},
+ )
+ two = referencing.jsonschema.DRAFT202012.create_resource(
+ {
+ "$id": "http://example.com",
+ "$dynamicAnchor": "fooAnchor",
+ "$defs": {
+ "foo": {
+ "$id": "foo",
+ "$dynamicAnchor": "fooAnchor",
+ "$defs": {
+ "bar": True,
+ "baz": {
+ "$dynamicAnchor": "fooAnchor",
+ },
+ },
+ },
+ },
+ },
+ )
+ resolver = (
+ Registry()
+ .with_resources(
+ [
+ ("http://example.com", two),
+ ("http://example.com/foo/", one),
+ ("http://example.com/foo/bar", two),
+ ],
+ )
+ .resolver()
+ )
+
+ first = resolver.lookup("http://example.com")
+ second = first.resolver.lookup("foo/")
+ resolver = second.resolver.lookup("bar").resolver
+ fourth = resolver.lookup("#fooAnchor")
+ assert fourth.contents == two.contents
+
+
+def test_lookup_trivial_recursive_ref():
+ one = referencing.jsonschema.DRAFT201909.create_resource(
+ {"$recursiveAnchor": True},
+ )
+ resolver = Registry().with_resource("http://example.com", one).resolver()
+ first = resolver.lookup("http://example.com")
+ resolved = referencing.jsonschema.lookup_recursive_ref(
+ resolver=first.resolver,
+ )
+ assert resolved.contents == one.contents
+
+
+def test_lookup_recursive_ref_to_bool():
+ TRUE = referencing.jsonschema.DRAFT201909.create_resource(True)
+ registry = Registry({"http://example.com": TRUE})
+ resolved = referencing.jsonschema.lookup_recursive_ref(
+ resolver=registry.resolver(base_uri="http://example.com"),
+ )
+ assert resolved.contents == TRUE.contents
+
+
+def test_multiple_lookup_recursive_ref_to_bool():
+ TRUE = referencing.jsonschema.DRAFT201909.create_resource(True)
+ root = referencing.jsonschema.DRAFT201909.create_resource(
+ {
+ "$id": "http://example.com",
+ "$recursiveAnchor": True,
+ "$defs": {
+ "foo": {
+ "$id": "foo",
+ "$recursiveAnchor": True,
+ "$defs": {
+ "bar": True,
+ "baz": {
+ "$recursiveAnchor": True,
+ "$anchor": "fooAnchor",
+ },
+ },
+ },
+ },
+ },
+ )
+ resolver = (
+ Registry()
+ .with_resources(
+ [
+ ("http://example.com", root),
+ ("http://example.com/foo/", TRUE),
+ ("http://example.com/foo/bar", root),
+ ],
+ )
+ .resolver()
+ )
+
+ first = resolver.lookup("http://example.com")
+ second = first.resolver.lookup("foo/")
+ resolver = second.resolver.lookup("bar").resolver
+ fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver)
+ assert fourth.contents == root.contents
+
+
+def test_multiple_lookup_recursive_ref_with_nonrecursive_ref():
+ one = referencing.jsonschema.DRAFT201909.create_resource(
+ {"$recursiveAnchor": True},
+ )
+ two = referencing.jsonschema.DRAFT201909.create_resource(
+ {
+ "$id": "http://example.com",
+ "$recursiveAnchor": True,
+ "$defs": {
+ "foo": {
+ "$id": "foo",
+ "$recursiveAnchor": True,
+ "$defs": {
+ "bar": True,
+ "baz": {
+ "$recursiveAnchor": True,
+ "$anchor": "fooAnchor",
+ },
+ },
+ },
+ },
+ },
+ )
+ three = referencing.jsonschema.DRAFT201909.create_resource(
+ {"$recursiveAnchor": False},
+ )
+ resolver = (
+ Registry()
+ .with_resources(
+ [
+ ("http://example.com", three),
+ ("http://example.com/foo/", two),
+ ("http://example.com/foo/bar", one),
+ ],
+ )
+ .resolver()
+ )
+
+ first = resolver.lookup("http://example.com")
+ second = first.resolver.lookup("foo/")
+ resolver = second.resolver.lookup("bar").resolver
+ fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver)
+ assert fourth.contents == two.contents
+
+
+def test_empty_registry():
+ assert referencing.jsonschema.EMPTY_REGISTRY == Registry()
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py
new file mode 100644
index 00000000..4b8ae917
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_referencing_suite.py
@@ -0,0 +1,66 @@
+from pathlib import Path
+import json
+import os
+
+import pytest
+
+from referencing import Registry
+from referencing.exceptions import Unresolvable
+import referencing.jsonschema
+
+
+class SuiteNotFound(Exception):
+ def __str__(self): # pragma: no cover
+ return (
+ "Cannot find the referencing suite. "
+ "Set the REFERENCING_SUITE environment variable to the path to "
+ "the suite, or run the test suite from alongside a full checkout "
+ "of the git repository."
+ )
+
+
+if "REFERENCING_SUITE" in os.environ: # pragma: no cover
+ SUITE = Path(os.environ["REFERENCING_SUITE"]) / "tests"
+else:
+ SUITE = Path(__file__).parent.parent.parent / "suite/tests"
+if not SUITE.is_dir(): # pragma: no cover
+ raise SuiteNotFound()
+DIALECT_IDS = json.loads(SUITE.joinpath("specifications.json").read_text())
+
+
+@pytest.mark.parametrize(
+ "test_path",
+ [
+ pytest.param(each, id=f"{each.parent.name}-{each.stem}")
+ for each in SUITE.glob("*/**/*.json")
+ ],
+)
+def test_referencing_suite(test_path, subtests):
+ dialect_id = DIALECT_IDS[test_path.relative_to(SUITE).parts[0]]
+ specification = referencing.jsonschema.specification_with(dialect_id)
+ loaded = json.loads(test_path.read_text())
+ registry = loaded["registry"]
+ registry = Registry().with_resources(
+ (uri, specification.create_resource(contents))
+ for uri, contents in loaded["registry"].items()
+ )
+ for test in loaded["tests"]:
+ with subtests.test(test=test):
+ if "normalization" in test_path.stem:
+ pytest.xfail("APIs need to change for proper URL support.")
+
+ resolver = registry.resolver(base_uri=test.get("base_uri", ""))
+
+ if test.get("error"):
+ with pytest.raises(Unresolvable):
+ resolver.lookup(test["ref"])
+ else:
+ resolved = resolver.lookup(test["ref"])
+ assert resolved.contents == test["target"]
+
+ then = test.get("then")
+ while then: # pragma: no cover
+ with subtests.test(test=test, then=then):
+ resolved = resolved.resolver.lookup(then["ref"])
+ assert resolved.contents == then["target"]
+ then = then.get("then")
diff --git a/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py b/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py
new file mode 100644
index 00000000..d0a8f8ad
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/tests/test_retrieval.py
@@ -0,0 +1,106 @@
+from functools import lru_cache
+import json
+
+import pytest
+
+from referencing import Registry, Resource, exceptions
+from referencing.jsonschema import DRAFT202012
+from referencing.retrieval import to_cached_resource
+
+
+class TestToCachedResource:
+ def test_it_caches_retrieved_resources(self):
+ contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ stack = [json.dumps(contents)]
+
+ @to_cached_resource()
+ def retrieve(uri):
+ return stack.pop()
+
+ registry = Registry(retrieve=retrieve)
+
+ expected = Resource.from_contents(contents)
+
+ got = registry.get_or_retrieve("urn:example:schema")
+ assert got.value == expected
+
+ # And a second time we get the same value.
+ again = registry.get_or_retrieve("urn:example:schema")
+ assert again.value is got.value
+
+ def test_custom_loader(self):
+ contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ stack = [json.dumps(contents)[::-1]]
+
+ @to_cached_resource(loads=lambda s: json.loads(s[::-1]))
+ def retrieve(uri):
+ return stack.pop()
+
+ registry = Registry(retrieve=retrieve)
+
+ expected = Resource.from_contents(contents)
+
+ got = registry.get_or_retrieve("urn:example:schema")
+ assert got.value == expected
+
+ # And a second time we get the same value.
+ again = registry.get_or_retrieve("urn:example:schema")
+ assert again.value is got.value
+
+ def test_custom_from_contents(self):
+ contents = {}
+ stack = [json.dumps(contents)]
+
+ @to_cached_resource(from_contents=DRAFT202012.create_resource)
+ def retrieve(uri):
+ return stack.pop()
+
+ registry = Registry(retrieve=retrieve)
+
+ expected = DRAFT202012.create_resource(contents)
+
+ got = registry.get_or_retrieve("urn:example:schema")
+ assert got.value == expected
+
+ # And a second time we get the same value.
+ again = registry.get_or_retrieve("urn:example:schema")
+ assert again.value is got.value
+
+ def test_custom_cache(self):
+ schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"}
+ mapping = {
+ "urn:example:1": dict(schema, foo=1),
+ "urn:example:2": dict(schema, foo=2),
+ "urn:example:3": dict(schema, foo=3),
+ }
+
+ resources = {
+ uri: Resource.from_contents(contents)
+ for uri, contents in mapping.items()
+ }
+
+ @to_cached_resource(cache=lru_cache(maxsize=2))
+ def retrieve(uri):
+ return json.dumps(mapping.pop(uri))
+
+ registry = Registry(retrieve=retrieve)
+
+ got = registry.get_or_retrieve("urn:example:1")
+ assert got.value == resources["urn:example:1"]
+ assert registry.get_or_retrieve("urn:example:1").value is got.value
+ assert registry.get_or_retrieve("urn:example:1").value is got.value
+
+ got = registry.get_or_retrieve("urn:example:2")
+ assert got.value == resources["urn:example:2"]
+ assert registry.get_or_retrieve("urn:example:2").value is got.value
+ assert registry.get_or_retrieve("urn:example:2").value is got.value
+
+ # This still succeeds, but evicts the first URI
+ got = registry.get_or_retrieve("urn:example:3")
+ assert got.value == resources["urn:example:3"]
+ assert registry.get_or_retrieve("urn:example:3").value is got.value
+ assert registry.get_or_retrieve("urn:example:3").value is got.value
+
+ # And now this fails (as we popped the value out of `mapping`)
+ with pytest.raises(exceptions.Unretrievable):
+ registry.get_or_retrieve("urn:example:1")
diff --git a/.venv/lib/python3.12/site-packages/referencing/typing.py b/.venv/lib/python3.12/site-packages/referencing/typing.py
new file mode 100644
index 00000000..a6144641
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/referencing/typing.py
@@ -0,0 +1,61 @@
+"""
+Type-annotation related support for the referencing library.
+"""
+
+from __future__ import annotations
+
+from collections.abc import Mapping as Mapping
+from typing import TYPE_CHECKING, Any, Protocol
+
+try:
+ from typing_extensions import TypeVar
+except ImportError: # pragma: no cover
+ from typing import TypeVar
+
+if TYPE_CHECKING:
+ from referencing._core import Resolved, Resolver, Resource
+
+#: A URI which identifies a `Resource`.
+URI = str
+
+#: The type of documents within a registry.
+D = TypeVar("D", default=Any)
+
+
+class Retrieve(Protocol[D]):
+ """
+ A retrieval callable, usable within a `Registry` for resource retrieval.
+
+ Does not make assumptions about where the resource might be coming from.
+ """
+
+ def __call__(self, uri: URI) -> Resource[D]:
+ """
+ Retrieve the resource with the given URI.
+
+ Raise `referencing.exceptions.NoSuchResource` if you wish to indicate
+ the retriever cannot lookup the given URI.
+ """
+ ...
+
+
+class Anchor(Protocol[D]):
+ """
+ An anchor within a `Resource`.
+
+ Beyond "simple" anchors, some specifications like JSON Schema's 2020
+ version have dynamic anchors.
+ """
+
+ @property
+ def name(self) -> str:
+ """
+ Return the name of this anchor.
+ """
+ ...
+
+ def resolve(self, resolver: Resolver[D]) -> Resolved[D]:
+ """
+ Return the resource for this anchor.
+ """
+ ...