about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py536
1 files changed, 536 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py b/.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py
new file mode 100644
index 00000000..8a9de221
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pydantic/_internal/_generics.py
@@ -0,0 +1,536 @@
+from __future__ import annotations
+
+import sys
+import types
+import typing
+from collections import ChainMap
+from contextlib import contextmanager
+from contextvars import ContextVar
+from types import prepare_class
+from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping, Tuple, TypeVar
+from weakref import WeakValueDictionary
+
+import typing_extensions
+
+from . import _typing_extra
+from ._core_utils import get_type_ref
+from ._forward_ref import PydanticRecursiveRef
+from ._utils import all_identical, is_model_class
+
+if sys.version_info >= (3, 10):
+    from typing import _UnionGenericAlias  # type: ignore[attr-defined]
+
+if TYPE_CHECKING:
+    from ..main import BaseModel
+
+GenericTypesCacheKey = Tuple[Any, Any, Tuple[Any, ...]]
+
+# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching.
+#   Right now, to handle recursive generics, we some types must remain cached for brief periods without references.
+#   By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references,
+#   while also retaining a limited number of types even without references. This is generally enough to build
+#   specific recursive generic models without losing required items out of the cache.
+
+KT = TypeVar('KT')
+VT = TypeVar('VT')
+_LIMITED_DICT_SIZE = 100
+if TYPE_CHECKING:
+
+    class LimitedDict(dict, MutableMapping[KT, VT]):
+        def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): ...
+
+else:
+
+    class LimitedDict(dict):
+        """Limit the size/length of a dict used for caching to avoid unlimited increase in memory usage.
+
+        Since the dict is ordered, and we always remove elements from the beginning, this is effectively a FIFO cache.
+        """
+
+        def __init__(self, size_limit: int = _LIMITED_DICT_SIZE):
+            self.size_limit = size_limit
+            super().__init__()
+
+        def __setitem__(self, key: Any, value: Any, /) -> None:
+            super().__setitem__(key, value)
+            if len(self) > self.size_limit:
+                excess = len(self) - self.size_limit + self.size_limit // 10
+                to_remove = list(self.keys())[:excess]
+                for k in to_remove:
+                    del self[k]
+
+
+# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected
+# once they are no longer referenced by the caller.
+if sys.version_info >= (3, 9):  # Typing for weak dictionaries available at 3.9
+    GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]']
+else:
+    GenericTypesCache = WeakValueDictionary
+
+if TYPE_CHECKING:
+
+    class DeepChainMap(ChainMap[KT, VT]):  # type: ignore
+        ...
+
+else:
+
+    class DeepChainMap(ChainMap):
+        """Variant of ChainMap that allows direct updates to inner scopes.
+
+        Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap,
+        with some light modifications for this use case.
+        """
+
+        def clear(self) -> None:
+            for mapping in self.maps:
+                mapping.clear()
+
+        def __setitem__(self, key: KT, value: VT) -> None:
+            for mapping in self.maps:
+                mapping[key] = value
+
+        def __delitem__(self, key: KT) -> None:
+            hit = False
+            for mapping in self.maps:
+                if key in mapping:
+                    del mapping[key]
+                    hit = True
+            if not hit:
+                raise KeyError(key)
+
+
+# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it
+# and discover later on that we need to re-add all this infrastructure...
+# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict())
+
+_GENERIC_TYPES_CACHE = GenericTypesCache()
+
+
+class PydanticGenericMetadata(typing_extensions.TypedDict):
+    origin: type[BaseModel] | None  # analogous to typing._GenericAlias.__origin__
+    args: tuple[Any, ...]  # analogous to typing._GenericAlias.__args__
+    parameters: tuple[TypeVar, ...]  # analogous to typing.Generic.__parameters__
+
+
+def create_generic_submodel(
+    model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...]
+) -> type[BaseModel]:
+    """Dynamically create a submodel of a provided (generic) BaseModel.
+
+    This is used when producing concrete parametrizations of generic models. This function
+    only *creates* the new subclass; the schema/validators/serialization must be updated to
+    reflect a concrete parametrization elsewhere.
+
+    Args:
+        model_name: The name of the newly created model.
+        origin: The base class for the new model to inherit from.
+        args: A tuple of generic metadata arguments.
+        params: A tuple of generic metadata parameters.
+
+    Returns:
+        The created submodel.
+    """
+    namespace: dict[str, Any] = {'__module__': origin.__module__}
+    bases = (origin,)
+    meta, ns, kwds = prepare_class(model_name, bases)
+    namespace.update(ns)
+    created_model = meta(
+        model_name,
+        bases,
+        namespace,
+        __pydantic_generic_metadata__={
+            'origin': origin,
+            'args': args,
+            'parameters': params,
+        },
+        __pydantic_reset_parent_namespace__=False,
+        **kwds,
+    )
+
+    model_module, called_globally = _get_caller_frame_info(depth=3)
+    if called_globally:  # create global reference and therefore allow pickling
+        object_by_reference = None
+        reference_name = model_name
+        reference_module_globals = sys.modules[created_model.__module__].__dict__
+        while object_by_reference is not created_model:
+            object_by_reference = reference_module_globals.setdefault(reference_name, created_model)
+            reference_name += '_'
+
+    return created_model
+
+
+def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]:
+    """Used inside a function to check whether it was called globally.
+
+    Args:
+        depth: The depth to get the frame.
+
+    Returns:
+        A tuple contains `module_name` and `called_globally`.
+
+    Raises:
+        RuntimeError: If the function is not called inside a function.
+    """
+    try:
+        previous_caller_frame = sys._getframe(depth)
+    except ValueError as e:
+        raise RuntimeError('This function must be used inside another function') from e
+    except AttributeError:  # sys module does not have _getframe function, so there's nothing we can do about it
+        return None, False
+    frame_globals = previous_caller_frame.f_globals
+    return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals
+
+
+DictValues: type[Any] = {}.values().__class__
+
+
+def iter_contained_typevars(v: Any) -> Iterator[TypeVar]:
+    """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found.
+
+    This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias,
+    since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list.
+    """
+    if isinstance(v, TypeVar):
+        yield v
+    elif is_model_class(v):
+        yield from v.__pydantic_generic_metadata__['parameters']
+    elif isinstance(v, (DictValues, list)):
+        for var in v:
+            yield from iter_contained_typevars(var)
+    else:
+        args = get_args(v)
+        for arg in args:
+            yield from iter_contained_typevars(arg)
+
+
+def get_args(v: Any) -> Any:
+    pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
+    if pydantic_generic_metadata:
+        return pydantic_generic_metadata.get('args')
+    return typing_extensions.get_args(v)
+
+
+def get_origin(v: Any) -> Any:
+    pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
+    if pydantic_generic_metadata:
+        return pydantic_generic_metadata.get('origin')
+    return typing_extensions.get_origin(v)
+
+
+def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None:
+    """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the
+    `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias.
+    """
+    origin = get_origin(cls)
+    if origin is None:
+        return None
+    if not hasattr(origin, '__parameters__'):
+        return None
+
+    # In this case, we know that cls is a _GenericAlias, and origin is the generic type
+    # So it is safe to access cls.__args__ and origin.__parameters__
+    args: tuple[Any, ...] = cls.__args__  # type: ignore
+    parameters: tuple[TypeVar, ...] = origin.__parameters__
+    return dict(zip(parameters, args))
+
+
+def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any] | None:
+    """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible
+    with the `replace_types` function.
+
+    Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is
+    stored in the __pydantic_generic_metadata__ attribute, we need special handling here.
+    """
+    # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata
+    #   in the __origin__, __args__, and __parameters__ attributes of the model.
+    generic_metadata = cls.__pydantic_generic_metadata__
+    origin = generic_metadata['origin']
+    args = generic_metadata['args']
+    return dict(zip(iter_contained_typevars(origin), args))
+
+
+def replace_types(type_: Any, type_map: Mapping[Any, Any] | None) -> Any:
+    """Return type with all occurrences of `type_map` keys recursively replaced with their values.
+
+    Args:
+        type_: The class or generic alias.
+        type_map: Mapping from `TypeVar` instance to concrete types.
+
+    Returns:
+        A new type representing the basic structure of `type_` with all
+        `typevar_map` keys recursively replaced.
+
+    Example:
+        ```python
+        from typing import List, Tuple, Union
+
+        from pydantic._internal._generics import replace_types
+
+        replace_types(Tuple[str, Union[List[str], float]], {str: int})
+        #> Tuple[int, Union[List[int], float]]
+        ```
+    """
+    if not type_map:
+        return type_
+
+    type_args = get_args(type_)
+
+    if _typing_extra.is_annotated(type_):
+        annotated_type, *annotations = type_args
+        annotated = replace_types(annotated_type, type_map)
+        for annotation in annotations:
+            annotated = typing_extensions.Annotated[annotated, annotation]
+        return annotated
+
+    origin_type = get_origin(type_)
+
+    # Having type args is a good indicator that this is a typing special form
+    # instance or a generic alias of some sort.
+    if type_args:
+        resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args)
+        if all_identical(type_args, resolved_type_args):
+            # If all arguments are the same, there is no need to modify the
+            # type or create a new object at all
+            return type_
+
+        if (
+            origin_type is not None
+            and isinstance(type_, _typing_extra.typing_base)
+            and not isinstance(origin_type, _typing_extra.typing_base)
+            and getattr(type_, '_name', None) is not None
+        ):
+            # In python < 3.9 generic aliases don't exist so any of these like `list`,
+            # `type` or `collections.abc.Callable` need to be translated.
+            # See: https://www.python.org/dev/peps/pep-0585
+            origin_type = getattr(typing, type_._name)
+        assert origin_type is not None
+
+        if _typing_extra.origin_is_union(origin_type):
+            if any(_typing_extra.is_any(arg) for arg in resolved_type_args):
+                # `Any | T` ~ `Any`:
+                resolved_type_args = (Any,)
+            # `Never | T` ~ `T`:
+            resolved_type_args = tuple(
+                arg
+                for arg in resolved_type_args
+                if not (_typing_extra.is_no_return(arg) or _typing_extra.is_never(arg))
+            )
+
+        # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__.
+        # We also cannot use isinstance() since we have to compare types.
+        if sys.version_info >= (3, 10) and origin_type is types.UnionType:
+            return _UnionGenericAlias(origin_type, resolved_type_args)
+        # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below
+        return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args]
+
+    # We handle pydantic generic models separately as they don't have the same
+    # semantics as "typing" classes or generic aliases
+
+    if not origin_type and is_model_class(type_):
+        parameters = type_.__pydantic_generic_metadata__['parameters']
+        if not parameters:
+            return type_
+        resolved_type_args = tuple(replace_types(t, type_map) for t in parameters)
+        if all_identical(parameters, resolved_type_args):
+            return type_
+        return type_[resolved_type_args]
+
+    # Handle special case for typehints that can have lists as arguments.
+    # `typing.Callable[[int, str], int]` is an example for this.
+    if isinstance(type_, list):
+        resolved_list = [replace_types(element, type_map) for element in type_]
+        if all_identical(type_, resolved_list):
+            return type_
+        return resolved_list
+
+    # If all else fails, we try to resolve the type directly and otherwise just
+    # return the input with no modifications.
+    return type_map.get(type_, type_)
+
+
+def has_instance_in_type(type_: Any, isinstance_target: Any) -> bool:
+    """Checks if the type, or any of its arbitrary nested args, satisfy
+    `isinstance(<type>, isinstance_target)`.
+    """
+    if isinstance(type_, isinstance_target):
+        return True
+    if _typing_extra.is_annotated(type_):
+        return has_instance_in_type(type_.__origin__, isinstance_target)
+    if _typing_extra.is_literal(type_):
+        return False
+
+    type_args = get_args(type_)
+
+    # Having type args is a good indicator that this is a typing module
+    # class instantiation or a generic alias of some sort.
+    for arg in type_args:
+        if has_instance_in_type(arg, isinstance_target):
+            return True
+
+    # Handle special case for typehints that can have lists as arguments.
+    # `typing.Callable[[int, str], int]` is an example for this.
+    if (
+        isinstance(type_, list)
+        # On Python < 3.10, typing_extensions implements `ParamSpec` as a subclass of `list`:
+        and not isinstance(type_, typing_extensions.ParamSpec)
+    ):
+        for element in type_:
+            if has_instance_in_type(element, isinstance_target):
+                return True
+
+    return False
+
+
+def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]) -> None:
+    """Check the generic model parameters count is equal.
+
+    Args:
+        cls: The generic model.
+        parameters: A tuple of passed parameters to the generic model.
+
+    Raises:
+        TypeError: If the passed parameters count is not equal to generic model parameters count.
+    """
+    actual = len(parameters)
+    expected = len(cls.__pydantic_generic_metadata__['parameters'])
+    if actual != expected:
+        description = 'many' if actual > expected else 'few'
+        raise TypeError(f'Too {description} parameters for {cls}; actual {actual}, expected {expected}')
+
+
+_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None)
+
+
+@contextmanager
+def generic_recursion_self_type(
+    origin: type[BaseModel], args: tuple[Any, ...]
+) -> Iterator[PydanticRecursiveRef | None]:
+    """This contextmanager should be placed around the recursive calls used to build a generic type,
+    and accept as arguments the generic origin type and the type arguments being passed to it.
+
+    If the same origin and arguments are observed twice, it implies that a self-reference placeholder
+    can be used while building the core schema, and will produce a schema_ref that will be valid in the
+    final parent schema.
+    """
+    previously_seen_type_refs = _generic_recursion_cache.get()
+    if previously_seen_type_refs is None:
+        previously_seen_type_refs = set()
+        token = _generic_recursion_cache.set(previously_seen_type_refs)
+    else:
+        token = None
+
+    try:
+        type_ref = get_type_ref(origin, args_override=args)
+        if type_ref in previously_seen_type_refs:
+            self_type = PydanticRecursiveRef(type_ref=type_ref)
+            yield self_type
+        else:
+            previously_seen_type_refs.add(type_ref)
+            yield
+            previously_seen_type_refs.remove(type_ref)
+    finally:
+        if token:
+            _generic_recursion_cache.reset(token)
+
+
+def recursively_defined_type_refs() -> set[str]:
+    visited = _generic_recursion_cache.get()
+    if not visited:
+        return set()  # not in a generic recursion, so there are no types
+
+    return visited.copy()  # don't allow modifications
+
+
+def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None:
+    """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for
+    repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime),
+    while still ensuring that certain alternative parametrizations ultimately resolve to the same type.
+
+    As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]].
+    The approach could be modified to not use two different cache keys at different points, but the
+    _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the
+    _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the
+    same after resolving the type arguments will always produce cache hits.
+
+    If we wanted to move to only using a single cache key per type, we would either need to always use the
+    slower/more computationally intensive logic associated with _late_cache_key, or would need to accept
+    that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships
+    during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually
+    equal.
+    """
+    return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values))
+
+
+def get_cached_generic_type_late(
+    parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...]
+) -> type[BaseModel] | None:
+    """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup."""
+    cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values))
+    if cached is not None:
+        set_cached_generic_type(parent, typevar_values, cached, origin, args)
+    return cached
+
+
+def set_cached_generic_type(
+    parent: type[BaseModel],
+    typevar_values: tuple[Any, ...],
+    type_: type[BaseModel],
+    origin: type[BaseModel] | None = None,
+    args: tuple[Any, ...] | None = None,
+) -> None:
+    """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with
+    two different keys.
+    """
+    _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_
+    if len(typevar_values) == 1:
+        _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_
+    if origin and args:
+        _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_
+
+
+def _union_orderings_key(typevar_values: Any) -> Any:
+    """This is intended to help differentiate between Union types with the same arguments in different order.
+
+    Thanks to caching internal to the `typing` module, it is not possible to distinguish between
+    List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List)
+    because `typing` considers Union[int, float] to be equal to Union[float, int].
+
+    However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int].
+    Because we parse items as the first Union type that is successful, we get slightly more consistent behavior
+    if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_
+    get the exact-correct order of items in the union, but that would require a change to the `typing` module itself.
+    (See https://github.com/python/cpython/issues/86483 for reference.)
+    """
+    if isinstance(typevar_values, tuple):
+        args_data = []
+        for value in typevar_values:
+            args_data.append(_union_orderings_key(value))
+        return tuple(args_data)
+    elif _typing_extra.is_union(typevar_values):
+        return get_args(typevar_values)
+    else:
+        return ()
+
+
+def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey:
+    """This is intended for minimal computational overhead during lookups of cached types.
+
+    Note that this is overly simplistic, and it's possible that two different cls/typevar_values
+    inputs would ultimately result in the same type being created in BaseModel.__class_getitem__.
+    To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key
+    lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__
+    would result in the same type.
+    """
+    return cls, typevar_values, _union_orderings_key(typevar_values)
+
+
+def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey:
+    """This is intended for use later in the process of creating a new type, when we have more information
+    about the exact args that will be passed. If it turns out that a different set of inputs to
+    __class_getitem__ resulted in the same inputs to the generic type creation process, we can still
+    return the cached type, and update the cache with the _early_cache_key as well.
+    """
+    # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an
+    # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key,
+    # whereas this function will always produce a tuple as the first item in the key.
+    return _union_orderings_key(typevar_values), origin, args