aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py2522
1 files changed, 2522 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py b/.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py
new file mode 100644
index 00000000..4d4a6a63
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py
@@ -0,0 +1,2522 @@
+"""Convert python types to pydantic-core schema."""
+
+from __future__ import annotations as _annotations
+
+import collections.abc
+import dataclasses
+import datetime
+import inspect
+import os
+import pathlib
+import re
+import sys
+import typing
+import warnings
+from contextlib import contextmanager
+from copy import copy, deepcopy
+from decimal import Decimal
+from enum import Enum
+from fractions import Fraction
+from functools import partial
+from inspect import Parameter, _ParameterKind, signature
+from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
+from itertools import chain
+from operator import attrgetter
+from types import FunctionType, LambdaType, MethodType
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ Final,
+ ForwardRef,
+ Iterable,
+ Iterator,
+ Mapping,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+ overload,
+)
+from uuid import UUID
+from warnings import warn
+
+import typing_extensions
+from pydantic_core import (
+ CoreSchema,
+ MultiHostUrl,
+ PydanticCustomError,
+ PydanticSerializationUnexpectedValue,
+ PydanticUndefined,
+ Url,
+ core_schema,
+ to_jsonable_python,
+)
+from typing_extensions import Literal, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict
+
+from ..aliases import AliasChoices, AliasGenerator, AliasPath
+from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
+from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
+from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
+from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
+from ..json_schema import JsonSchemaValue
+from ..version import version_short
+from ..warnings import PydanticDeprecatedSince20
+from . import _core_utils, _decorators, _discriminated_union, _known_annotated_metadata, _typing_extra
+from ._config import ConfigWrapper, ConfigWrapperStack
+from ._core_metadata import update_core_metadata
+from ._core_utils import (
+ collect_invalid_schemas,
+ define_expected_missing_refs,
+ get_ref,
+ get_type_ref,
+ is_function_with_inner_schema,
+ is_list_like_schema_with_items_schema,
+ simplify_schema_references,
+ validate_core_schema,
+)
+from ._decorators import (
+ Decorator,
+ DecoratorInfos,
+ FieldSerializerDecoratorInfo,
+ FieldValidatorDecoratorInfo,
+ ModelSerializerDecoratorInfo,
+ ModelValidatorDecoratorInfo,
+ RootValidatorDecoratorInfo,
+ ValidatorDecoratorInfo,
+ get_attribute_from_bases,
+ inspect_field_serializer,
+ inspect_model_serializer,
+ inspect_validator,
+)
+from ._docs_extraction import extract_docstrings_from_cls
+from ._fields import collect_dataclass_fields, takes_validated_data_argument
+from ._forward_ref import PydanticRecursiveRef
+from ._generics import get_standard_typevars_map, has_instance_in_type, recursively_defined_type_refs, replace_types
+from ._import_utils import import_cached_base_model, import_cached_field_info
+from ._mock_val_ser import MockCoreSchema
+from ._namespace_utils import NamespacesTuple, NsResolver
+from ._schema_generation_shared import CallbackGetCoreSchemaHandler
+from ._utils import lenient_issubclass, smart_deepcopy
+
+if TYPE_CHECKING:
+ from ..fields import ComputedFieldInfo, FieldInfo
+ from ..main import BaseModel
+ from ..types import Discriminator
+ from ._dataclasses import StandardDataclass
+ from ._schema_generation_shared import GetJsonSchemaFunction
+
+_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
+
+FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
+FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
+AnyFieldDecorator = Union[
+ Decorator[ValidatorDecoratorInfo],
+ Decorator[FieldValidatorDecoratorInfo],
+ Decorator[FieldSerializerDecoratorInfo],
+]
+
+ModifyCoreSchemaWrapHandler = GetCoreSchemaHandler
+GetCoreSchemaFunction = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
+
+TUPLE_TYPES: list[type] = [tuple, typing.Tuple]
+LIST_TYPES: list[type] = [list, typing.List, collections.abc.MutableSequence]
+SET_TYPES: list[type] = [set, typing.Set, collections.abc.MutableSet]
+FROZEN_SET_TYPES: list[type] = [frozenset, typing.FrozenSet, collections.abc.Set]
+DICT_TYPES: list[type] = [dict, typing.Dict]
+IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
+SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
+PATH_TYPES: list[type] = [
+ os.PathLike,
+ pathlib.Path,
+ pathlib.PurePath,
+ pathlib.PosixPath,
+ pathlib.PurePosixPath,
+ pathlib.PureWindowsPath,
+]
+MAPPING_TYPES = [
+ typing.Mapping,
+ typing.MutableMapping,
+ collections.abc.Mapping,
+ collections.abc.MutableMapping,
+ collections.OrderedDict,
+ typing_extensions.OrderedDict,
+ typing.DefaultDict,
+ collections.defaultdict,
+ collections.Counter,
+ typing.Counter,
+]
+DEQUE_TYPES: list[type] = [collections.deque, typing.Deque]
+
+# Note: This does not play very well with type checkers. For example,
+# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
+ValidateCallSupportedTypes = Union[
+ LambdaType,
+ FunctionType,
+ MethodType,
+ partial,
+]
+
+VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
+
+_mode_to_validator: dict[
+ FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
+] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
+
+
+def check_validator_fields_against_field_name(
+ info: FieldDecoratorInfo,
+ field: str,
+) -> bool:
+ """Check if field name is in validator fields.
+
+ Args:
+ info: The field info.
+ field: The field name to check.
+
+ Returns:
+ `True` if field name is in validator fields, `False` otherwise.
+ """
+ if '*' in info.fields:
+ return True
+ for v_field_name in info.fields:
+ if v_field_name == field:
+ return True
+ return False
+
+
+def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
+ """Check if the defined fields in decorators exist in `fields` param.
+
+ It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
+
+ Args:
+ decorators: An iterable of decorators.
+ fields: An iterable of fields name.
+
+ Raises:
+ PydanticUserError: If one of the field names does not exist in `fields` param.
+ """
+ fields = set(fields)
+ for dec in decorators:
+ if '*' in dec.info.fields:
+ continue
+ if dec.info.check_fields is False:
+ continue
+ for field in dec.info.fields:
+ if field not in fields:
+ raise PydanticUserError(
+ f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
+ " (use check_fields=False if you're inheriting from the model and intended this)",
+ code='decorator-missing-field',
+ )
+
+
+def filter_field_decorator_info_by_field(
+ validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
+) -> list[Decorator[FieldDecoratorInfoType]]:
+ return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
+
+
+def apply_each_item_validators(
+ schema: core_schema.CoreSchema,
+ each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
+ field_name: str | None,
+) -> core_schema.CoreSchema:
+ # This V1 compatibility shim should eventually be removed
+
+ # fail early if each_item_validators is empty
+ if not each_item_validators:
+ return schema
+
+ # push down any `each_item=True` validators
+ # note that this won't work for any Annotated types that get wrapped by a function validator
+ # but that's okay because that didn't exist in V1
+ if schema['type'] == 'nullable':
+ schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name)
+ return schema
+ elif schema['type'] == 'tuple':
+ if (variadic_item_index := schema.get('variadic_item_index')) is not None:
+ schema['items_schema'][variadic_item_index] = apply_validators(
+ schema['items_schema'][variadic_item_index],
+ each_item_validators,
+ field_name,
+ )
+ elif is_list_like_schema_with_items_schema(schema):
+ inner_schema = schema.get('items_schema', core_schema.any_schema())
+ schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
+ elif schema['type'] == 'dict':
+ inner_schema = schema.get('values_schema', core_schema.any_schema())
+ schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
+ else:
+ raise TypeError(
+ f"`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema['type']}"
+ )
+ return schema
+
+
+def _extract_json_schema_info_from_field_info(
+ info: FieldInfo | ComputedFieldInfo,
+) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
+ json_schema_updates = {
+ 'title': info.title,
+ 'description': info.description,
+ 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
+ 'examples': to_jsonable_python(info.examples),
+ }
+ json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
+ return (json_schema_updates or None, info.json_schema_extra)
+
+
+JsonEncoders = Dict[Type[Any], JsonEncoder]
+
+
+def _add_custom_serialization_from_json_encoders(
+ json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
+) -> CoreSchema:
+ """Iterate over the json_encoders and add the first matching encoder to the schema.
+
+ Args:
+ json_encoders: A dictionary of types and their encoder functions.
+ tp: The type to check for a matching encoder.
+ schema: The schema to add the encoder to.
+ """
+ if not json_encoders:
+ return schema
+ if 'serialization' in schema:
+ return schema
+ # Check the class type and its superclasses for a matching encoder
+ # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
+ # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
+ for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
+ encoder = json_encoders.get(base)
+ if encoder is None:
+ continue
+
+ warnings.warn(
+ f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
+ PydanticDeprecatedSince20,
+ )
+
+ # TODO: in theory we should check that the schema accepts a serialization key
+ schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
+ return schema
+
+ return schema
+
+
+def _get_first_non_null(a: Any, b: Any) -> Any:
+ """Return the first argument if it is not None, otherwise return the second argument.
+
+ Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
+ This function will return serialization_alias, which is the first argument, even though it is an empty string.
+ """
+ return a if a is not None else b
+
+
+class GenerateSchema:
+ """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
+
+ __slots__ = (
+ '_config_wrapper_stack',
+ '_ns_resolver',
+ '_typevars_map',
+ 'field_name_stack',
+ 'model_type_stack',
+ 'defs',
+ )
+
+ def __init__(
+ self,
+ config_wrapper: ConfigWrapper,
+ ns_resolver: NsResolver | None = None,
+ typevars_map: dict[Any, Any] | None = None,
+ ) -> None:
+ # we need a stack for recursing into nested models
+ self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
+ self._ns_resolver = ns_resolver or NsResolver()
+ self._typevars_map = typevars_map
+ self.field_name_stack = _FieldNameStack()
+ self.model_type_stack = _ModelTypeStack()
+ self.defs = _Definitions()
+
+ def __init_subclass__(cls) -> None:
+ super().__init_subclass__()
+ warnings.warn(
+ 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
+ UserWarning,
+ stacklevel=2,
+ )
+
+ @property
+ def _config_wrapper(self) -> ConfigWrapper:
+ return self._config_wrapper_stack.tail
+
+ @property
+ def _types_namespace(self) -> NamespacesTuple:
+ return self._ns_resolver.types_namespace
+
+ @property
+ def _arbitrary_types(self) -> bool:
+ return self._config_wrapper.arbitrary_types_allowed
+
+ # the following methods can be overridden but should be considered
+ # unstable / private APIs
+ def _list_schema(self, items_type: Any) -> CoreSchema:
+ return core_schema.list_schema(self.generate_schema(items_type))
+
+ def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
+ return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
+
+ def _set_schema(self, items_type: Any) -> CoreSchema:
+ return core_schema.set_schema(self.generate_schema(items_type))
+
+ def _frozenset_schema(self, items_type: Any) -> CoreSchema:
+ return core_schema.frozenset_schema(self.generate_schema(items_type))
+
+ def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
+ cases: list[Any] = list(enum_type.__members__.values())
+
+ enum_ref = get_type_ref(enum_type)
+ description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
+ if (
+ description == 'An enumeration.'
+ ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
+ description = None
+ js_updates = {'title': enum_type.__name__, 'description': description}
+ js_updates = {k: v for k, v in js_updates.items() if v is not None}
+
+ sub_type: Literal['str', 'int', 'float'] | None = None
+ if issubclass(enum_type, int):
+ sub_type = 'int'
+ value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
+ elif issubclass(enum_type, str):
+ # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
+ sub_type = 'str'
+ value_ser_type = core_schema.simple_ser_schema('str')
+ elif issubclass(enum_type, float):
+ sub_type = 'float'
+ value_ser_type = core_schema.simple_ser_schema('float')
+ else:
+ # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
+ value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
+
+ if cases:
+
+ def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
+ json_schema = handler(schema)
+ original_schema = handler.resolve_ref_schema(json_schema)
+ original_schema.update(js_updates)
+ return json_schema
+
+ # we don't want to add the missing to the schema if it's the default one
+ default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
+ enum_schema = core_schema.enum_schema(
+ enum_type,
+ cases,
+ sub_type=sub_type,
+ missing=None if default_missing else enum_type._missing_,
+ ref=enum_ref,
+ metadata={'pydantic_js_functions': [get_json_schema]},
+ )
+
+ if self._config_wrapper.use_enum_values:
+ enum_schema = core_schema.no_info_after_validator_function(
+ attrgetter('value'), enum_schema, serialization=value_ser_type
+ )
+
+ return enum_schema
+
+ else:
+
+ def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
+ json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
+ original_schema = handler.resolve_ref_schema(json_schema)
+ original_schema.update(js_updates)
+ return json_schema
+
+ # Use an isinstance check for enums with no cases.
+ # The most important use case for this is creating TypeVar bounds for generics that should
+ # be restricted to enums. This is more consistent than it might seem at first, since you can only
+ # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
+ # We use the get_json_schema function when an Enum subclass has been declared with no cases
+ # so that we can still generate a valid json schema.
+ return core_schema.is_instance_schema(
+ enum_type,
+ metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
+ )
+
+ def _ip_schema(self, tp: Any) -> CoreSchema:
+ from ._validators import IP_VALIDATOR_LOOKUP, IpType
+
+ ip_type_json_schema_format: dict[type[IpType], str] = {
+ IPv4Address: 'ipv4',
+ IPv4Network: 'ipv4network',
+ IPv4Interface: 'ipv4interface',
+ IPv6Address: 'ipv6',
+ IPv6Network: 'ipv6network',
+ IPv6Interface: 'ipv6interface',
+ }
+
+ def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
+ if not isinstance(ip, (tp, str)):
+ raise PydanticSerializationUnexpectedValue(
+ f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
+ )
+ if info.mode == 'python':
+ return ip
+ return str(ip)
+
+ return core_schema.lax_or_strict_schema(
+ lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
+ strict_schema=core_schema.json_or_python_schema(
+ json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
+ python_schema=core_schema.is_instance_schema(tp),
+ ),
+ serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
+ metadata={
+ 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
+ },
+ )
+
+ def _fraction_schema(self) -> CoreSchema:
+ """Support for [`fractions.Fraction`][fractions.Fraction]."""
+ from ._validators import fraction_validator
+
+ # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
+ # can we use a helper function to reduce boilerplate?
+ return core_schema.lax_or_strict_schema(
+ lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
+ strict_schema=core_schema.json_or_python_schema(
+ json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
+ python_schema=core_schema.is_instance_schema(Fraction),
+ ),
+ # use str serialization to guarantee round trip behavior
+ serialization=core_schema.to_string_ser_schema(when_used='always'),
+ metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
+ )
+
+ def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
+ if not isinstance(tp, type):
+ warn(
+ f'{tp!r} is not a Python type (it may be an instance of an object),'
+ ' Pydantic will allow any object with no validation since we cannot even'
+ ' enforce that the input is an instance of the given type.'
+ ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
+ UserWarning,
+ )
+ return core_schema.any_schema()
+ return core_schema.is_instance_schema(tp)
+
+ def _unknown_type_schema(self, obj: Any) -> CoreSchema:
+ raise PydanticSchemaGenerationError(
+ f'Unable to generate pydantic-core schema for {obj!r}. '
+ 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
+ ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
+ '\n\nIf you got this error by calling handler(<some type>) within'
+ ' `__get_pydantic_core_schema__` then you likely need to call'
+ ' `handler.generate_schema(<some type>)` since we do not call'
+ ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
+ )
+
+ def _apply_discriminator_to_union(
+ self, schema: CoreSchema, discriminator: str | Discriminator | None
+ ) -> CoreSchema:
+ if discriminator is None:
+ return schema
+ try:
+ return _discriminated_union.apply_discriminator(
+ schema,
+ discriminator,
+ )
+ except _discriminated_union.MissingDefinitionForUnionRef:
+ # defer until defs are resolved
+ _discriminated_union.set_discriminator_in_metadata(
+ schema,
+ discriminator,
+ )
+ return schema
+
+ class CollectedInvalid(Exception):
+ pass
+
+ def clean_schema(self, schema: CoreSchema) -> CoreSchema:
+ schema = self.collect_definitions(schema)
+ schema = simplify_schema_references(schema)
+ if collect_invalid_schemas(schema):
+ raise self.CollectedInvalid()
+ schema = _discriminated_union.apply_discriminators(schema)
+ schema = validate_core_schema(schema)
+ return schema
+
+ def collect_definitions(self, schema: CoreSchema) -> CoreSchema:
+ ref = cast('str | None', schema.get('ref', None))
+ if ref:
+ self.defs.definitions[ref] = schema
+ if 'ref' in schema:
+ schema = core_schema.definition_reference_schema(schema['ref'])
+ return core_schema.definitions_schema(
+ schema,
+ list(self.defs.definitions.values()),
+ )
+
+ def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
+ metadata = metadata_schema.get('metadata', {})
+ pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
+ # because of how we generate core schemas for nested generic models
+ # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
+ # this check may fail to catch duplicates if the function is a `functools.partial`
+ # or something like that, but if it does it'll fail by inserting the duplicate
+ if js_function not in pydantic_js_functions:
+ pydantic_js_functions.append(js_function)
+ metadata_schema['metadata'] = metadata
+
+ def generate_schema(
+ self,
+ obj: Any,
+ from_dunder_get_core_schema: bool = True,
+ ) -> core_schema.CoreSchema:
+ """Generate core schema.
+
+ Args:
+ obj: The object to generate core schema for.
+ from_dunder_get_core_schema: Whether to generate schema from either the
+ `__get_pydantic_core_schema__` function or `__pydantic_core_schema__` property.
+
+ Returns:
+ The generated core schema.
+
+ Raises:
+ PydanticUndefinedAnnotation:
+ If it is not possible to evaluate forward reference.
+ PydanticSchemaGenerationError:
+ If it is not possible to generate pydantic-core schema.
+ TypeError:
+ - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
+ - If V1 style validator with `each_item=True` applied on a wrong field.
+ PydanticUserError:
+ - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
+ - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
+ """
+ schema: CoreSchema | None = None
+
+ if from_dunder_get_core_schema:
+ from_property = self._generate_schema_from_property(obj, obj)
+ if from_property is not None:
+ schema = from_property
+
+ if schema is None:
+ schema = self._generate_schema_inner(obj)
+
+ metadata_js_function = _extract_get_pydantic_json_schema(obj, schema)
+ if metadata_js_function is not None:
+ metadata_schema = resolve_original_schema(schema, self.defs.definitions)
+ if metadata_schema:
+ self._add_js_function(metadata_schema, metadata_js_function)
+
+ schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
+
+ return schema
+
+ def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
+ """Generate schema for a Pydantic model."""
+ with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
+ if maybe_schema is not None:
+ return maybe_schema
+
+ fields = getattr(cls, '__pydantic_fields__', {})
+ decorators = cls.__pydantic_decorators__
+ computed_fields = decorators.computed_fields
+ check_decorator_fields_exist(
+ chain(
+ decorators.field_validators.values(),
+ decorators.field_serializers.values(),
+ decorators.validators.values(),
+ ),
+ {*fields.keys(), *computed_fields.keys()},
+ )
+ config_wrapper = ConfigWrapper(cls.model_config, check=False)
+ core_config = config_wrapper.core_config(title=cls.__name__)
+ model_validators = decorators.model_validators.values()
+
+ with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
+ extras_schema = None
+ if core_config.get('extra_fields_behavior') == 'allow':
+ assert cls.__mro__[0] is cls
+ assert cls.__mro__[-1] is object
+ for candidate_cls in cls.__mro__[:-1]:
+ extras_annotation = getattr(candidate_cls, '__annotations__', {}).get(
+ '__pydantic_extra__', None
+ )
+ if extras_annotation is not None:
+ if isinstance(extras_annotation, str):
+ extras_annotation = _typing_extra.eval_type_backport(
+ _typing_extra._make_forward_ref(
+ extras_annotation, is_argument=False, is_class=True
+ ),
+ *self._types_namespace,
+ )
+ tp = get_origin(extras_annotation)
+ if tp not in (Dict, dict):
+ raise PydanticSchemaGenerationError(
+ 'The type annotation for `__pydantic_extra__` must be `Dict[str, ...]`'
+ )
+ extra_items_type = self._get_args_resolving_forward_refs(
+ extras_annotation,
+ required=True,
+ )[1]
+ if not _typing_extra.is_any(extra_items_type):
+ extras_schema = self.generate_schema(extra_items_type)
+ break
+
+ generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
+
+ if cls.__pydantic_root_model__:
+ root_field = self._common_field_schema('root', fields['root'], decorators)
+ inner_schema = root_field['schema']
+ inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
+ model_schema = core_schema.model_schema(
+ cls,
+ inner_schema,
+ generic_origin=generic_origin,
+ custom_init=getattr(cls, '__pydantic_custom_init__', None),
+ root_model=True,
+ post_init=getattr(cls, '__pydantic_post_init__', None),
+ config=core_config,
+ ref=model_ref,
+ )
+ else:
+ fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
+ {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
+ computed_fields=[
+ self._computed_field_schema(d, decorators.field_serializers)
+ for d in computed_fields.values()
+ ],
+ extras_schema=extras_schema,
+ model_name=cls.__name__,
+ )
+ inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None)
+ new_inner_schema = define_expected_missing_refs(inner_schema, recursively_defined_type_refs())
+ if new_inner_schema is not None:
+ inner_schema = new_inner_schema
+ inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
+
+ model_schema = core_schema.model_schema(
+ cls,
+ inner_schema,
+ generic_origin=generic_origin,
+ custom_init=getattr(cls, '__pydantic_custom_init__', None),
+ root_model=False,
+ post_init=getattr(cls, '__pydantic_post_init__', None),
+ config=core_config,
+ ref=model_ref,
+ )
+
+ schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
+ schema = apply_model_validators(schema, model_validators, 'outer')
+ self.defs.definitions[model_ref] = schema
+ return core_schema.definition_reference_schema(model_ref)
+
+ def _unpack_refs_defs(self, schema: CoreSchema) -> CoreSchema:
+ """Unpack all 'definitions' schemas into `GenerateSchema.defs.definitions`
+ and return the inner schema.
+ """
+ if schema['type'] == 'definitions':
+ definitions = self.defs.definitions
+ for s in schema['definitions']:
+ definitions[s['ref']] = s # type: ignore
+ return schema['schema']
+ return schema
+
+ def _resolve_self_type(self, obj: Any) -> Any:
+ obj = self.model_type_stack.get()
+ if obj is None:
+ raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
+ return obj
+
+ def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
+ """Try to generate schema from either the `__get_pydantic_core_schema__` function or
+ `__pydantic_core_schema__` property.
+
+ Note: `__get_pydantic_core_schema__` takes priority so it can
+ decide whether to use a `__pydantic_core_schema__` attribute, or generate a fresh schema.
+ """
+ # avoid calling `__get_pydantic_core_schema__` if we've already visited this object
+ if _typing_extra.is_self(obj):
+ obj = self._resolve_self_type(obj)
+ with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
+ if maybe_schema is not None:
+ return maybe_schema
+ if obj is source:
+ ref_mode = 'unpack'
+ else:
+ ref_mode = 'to-def'
+
+ schema: CoreSchema
+
+ if (get_schema := getattr(obj, '__get_pydantic_core_schema__', None)) is not None:
+ schema = get_schema(
+ source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
+ )
+ elif (
+ hasattr(obj, '__dict__')
+ # In some cases (e.g. a stdlib dataclass subclassing a Pydantic dataclass),
+ # doing an attribute access to get the schema will result in the parent schema
+ # being fetched. Thus, only look for the current obj's dict:
+ and (existing_schema := obj.__dict__.get('__pydantic_core_schema__')) is not None
+ and not isinstance(existing_schema, MockCoreSchema)
+ ):
+ schema = existing_schema
+ elif (validators := getattr(obj, '__get_validators__', None)) is not None:
+ from pydantic.v1 import BaseModel as BaseModelV1
+
+ if issubclass(obj, BaseModelV1):
+ warn(
+ f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
+ UserWarning,
+ )
+ else:
+ warn(
+ '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
+ PydanticDeprecatedSince20,
+ )
+ schema = core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
+ else:
+ # we have no existing schema information on the property, exit early so that we can go generate a schema
+ return None
+
+ schema = self._unpack_refs_defs(schema)
+
+ if is_function_with_inner_schema(schema):
+ ref = schema['schema'].pop('ref', None) # pyright: ignore[reportCallIssue, reportArgumentType]
+ if ref:
+ schema['ref'] = ref
+ else:
+ ref = get_ref(schema)
+
+ if ref:
+ self.defs.definitions[ref] = schema
+ return core_schema.definition_reference_schema(ref)
+
+ return schema
+
+ def _resolve_forward_ref(self, obj: Any) -> Any:
+ # we assume that types_namespace has the target of forward references in its scope,
+ # but this could fail, for example, if calling Validator on an imported type which contains
+ # forward references to other types only defined in the module from which it was imported
+ # `Validator(SomeImportedTypeAliasWithAForwardReference)`
+ # or the equivalent for BaseModel
+ # class Model(BaseModel):
+ # x: SomeImportedTypeAliasWithAForwardReference
+ try:
+ obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+
+ # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
+ if isinstance(obj, ForwardRef):
+ raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
+
+ if self._typevars_map:
+ obj = replace_types(obj, self._typevars_map)
+
+ return obj
+
+ @overload
+ def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
+
+ @overload
+ def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
+
+ def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
+ args = get_args(obj)
+ if args:
+ if sys.version_info >= (3, 9):
+ from types import GenericAlias
+
+ if isinstance(obj, GenericAlias):
+ # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
+ args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
+ args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
+ elif required: # pragma: no cover
+ raise TypeError(f'Expected {obj} to have generic parameters but it had none')
+ return args
+
+ def _get_first_arg_or_any(self, obj: Any) -> Any:
+ args = self._get_args_resolving_forward_refs(obj)
+ if not args:
+ return Any
+ return args[0]
+
+ def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
+ args = self._get_args_resolving_forward_refs(obj)
+ if not args:
+ return (Any, Any)
+ if len(args) < 2:
+ origin = get_origin(obj)
+ raise TypeError(f'Expected two type arguments for {origin}, got 1')
+ return args[0], args[1]
+
+ def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
+ if _typing_extra.is_annotated(obj):
+ return self._annotated_schema(obj)
+
+ if isinstance(obj, dict):
+ # we assume this is already a valid schema
+ return obj # type: ignore[return-value]
+
+ if isinstance(obj, str):
+ obj = ForwardRef(obj)
+
+ if isinstance(obj, ForwardRef):
+ return self.generate_schema(self._resolve_forward_ref(obj))
+
+ BaseModel = import_cached_base_model()
+
+ if lenient_issubclass(obj, BaseModel):
+ with self.model_type_stack.push(obj):
+ return self._model_schema(obj)
+
+ if isinstance(obj, PydanticRecursiveRef):
+ return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
+
+ return self.match_type(obj)
+
+ def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
+ """Main mapping of types to schemas.
+
+ The general structure is a series of if statements starting with the simple cases
+ (non-generic primitive types) and then handling generics and other more complex cases.
+
+ Each case either generates a schema directly, calls into a public user-overridable method
+ (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
+ boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
+
+ The idea is that we'll evolve this into adding more and more user facing methods over time
+ as they get requested and we figure out what the right API for them is.
+ """
+ if obj is str:
+ return core_schema.str_schema()
+ elif obj is bytes:
+ return core_schema.bytes_schema()
+ elif obj is int:
+ return core_schema.int_schema()
+ elif obj is float:
+ return core_schema.float_schema()
+ elif obj is bool:
+ return core_schema.bool_schema()
+ elif obj is complex:
+ return core_schema.complex_schema()
+ elif _typing_extra.is_any(obj) or obj is object:
+ return core_schema.any_schema()
+ elif obj is datetime.date:
+ return core_schema.date_schema()
+ elif obj is datetime.datetime:
+ return core_schema.datetime_schema()
+ elif obj is datetime.time:
+ return core_schema.time_schema()
+ elif obj is datetime.timedelta:
+ return core_schema.timedelta_schema()
+ elif obj is Decimal:
+ return core_schema.decimal_schema()
+ elif obj is UUID:
+ return core_schema.uuid_schema()
+ elif obj is Url:
+ return core_schema.url_schema()
+ elif obj is Fraction:
+ return self._fraction_schema()
+ elif obj is MultiHostUrl:
+ return core_schema.multi_host_url_schema()
+ elif obj is None or obj is _typing_extra.NoneType:
+ return core_schema.none_schema()
+ elif obj in IP_TYPES:
+ return self._ip_schema(obj)
+ elif obj in TUPLE_TYPES:
+ return self._tuple_schema(obj)
+ elif obj in LIST_TYPES:
+ return self._list_schema(Any)
+ elif obj in SET_TYPES:
+ return self._set_schema(Any)
+ elif obj in FROZEN_SET_TYPES:
+ return self._frozenset_schema(Any)
+ elif obj in SEQUENCE_TYPES:
+ return self._sequence_schema(Any)
+ elif obj in DICT_TYPES:
+ return self._dict_schema(Any, Any)
+ elif _typing_extra.is_type_alias_type(obj):
+ return self._type_alias_type_schema(obj)
+ elif obj is type:
+ return self._type_schema()
+ elif _typing_extra.is_callable(obj):
+ return core_schema.callable_schema()
+ elif _typing_extra.is_literal(obj):
+ return self._literal_schema(obj)
+ elif is_typeddict(obj):
+ return self._typed_dict_schema(obj, None)
+ elif _typing_extra.is_namedtuple(obj):
+ return self._namedtuple_schema(obj, None)
+ elif _typing_extra.is_new_type(obj):
+ # NewType, can't use isinstance because it fails <3.10
+ return self.generate_schema(obj.__supertype__)
+ elif obj is re.Pattern:
+ return self._pattern_schema(obj)
+ elif _typing_extra.is_hashable(obj):
+ return self._hashable_schema()
+ elif isinstance(obj, typing.TypeVar):
+ return self._unsubstituted_typevar_schema(obj)
+ elif _typing_extra.is_finalvar(obj):
+ if obj is Final:
+ return core_schema.any_schema()
+ return self.generate_schema(
+ self._get_first_arg_or_any(obj),
+ )
+ elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
+ return self._call_schema(obj)
+ elif inspect.isclass(obj) and issubclass(obj, Enum):
+ return self._enum_schema(obj)
+ elif _typing_extra.is_zoneinfo_type(obj):
+ return self._zoneinfo_schema()
+
+ if dataclasses.is_dataclass(obj):
+ return self._dataclass_schema(obj, None)
+
+ origin = get_origin(obj)
+ if origin is not None:
+ return self._match_generic_type(obj, origin)
+
+ res = self._get_prepare_pydantic_annotations_for_known_type(obj, ())
+ if res is not None:
+ source_type, annotations = res
+ return self._apply_annotations(source_type, annotations)
+
+ if self._arbitrary_types:
+ return self._arbitrary_type_schema(obj)
+ return self._unknown_type_schema(obj)
+
+ def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
+ # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
+ # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
+ # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
+ # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
+ if dataclasses.is_dataclass(origin):
+ return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
+ if _typing_extra.is_namedtuple(origin):
+ return self._namedtuple_schema(obj, origin)
+
+ from_property = self._generate_schema_from_property(origin, obj)
+ if from_property is not None:
+ return from_property
+
+ if _typing_extra.is_type_alias_type(origin):
+ return self._type_alias_type_schema(obj)
+ elif _typing_extra.origin_is_union(origin):
+ return self._union_schema(obj)
+ elif origin in TUPLE_TYPES:
+ return self._tuple_schema(obj)
+ elif origin in LIST_TYPES:
+ return self._list_schema(self._get_first_arg_or_any(obj))
+ elif origin in SET_TYPES:
+ return self._set_schema(self._get_first_arg_or_any(obj))
+ elif origin in FROZEN_SET_TYPES:
+ return self._frozenset_schema(self._get_first_arg_or_any(obj))
+ elif origin in DICT_TYPES:
+ return self._dict_schema(*self._get_first_two_args_or_any(obj))
+ elif is_typeddict(origin):
+ return self._typed_dict_schema(obj, origin)
+ elif origin in (typing.Type, type):
+ return self._subclass_schema(obj)
+ elif origin in SEQUENCE_TYPES:
+ return self._sequence_schema(self._get_first_arg_or_any(obj))
+ elif origin in {typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator}:
+ return self._iterable_schema(obj)
+ elif origin in (re.Pattern, typing.Pattern):
+ return self._pattern_schema(obj)
+
+ res = self._get_prepare_pydantic_annotations_for_known_type(obj, ())
+ if res is not None:
+ source_type, annotations = res
+ return self._apply_annotations(source_type, annotations)
+
+ if self._arbitrary_types:
+ return self._arbitrary_type_schema(origin)
+ return self._unknown_type_schema(obj)
+
+ def _generate_td_field_schema(
+ self,
+ name: str,
+ field_info: FieldInfo,
+ decorators: DecoratorInfos,
+ *,
+ required: bool = True,
+ ) -> core_schema.TypedDictField:
+ """Prepare a TypedDictField to represent a model or typeddict field."""
+ common_field = self._common_field_schema(name, field_info, decorators)
+ return core_schema.typed_dict_field(
+ common_field['schema'],
+ required=False if not field_info.is_required() else required,
+ serialization_exclude=common_field['serialization_exclude'],
+ validation_alias=common_field['validation_alias'],
+ serialization_alias=common_field['serialization_alias'],
+ metadata=common_field['metadata'],
+ )
+
+ def _generate_md_field_schema(
+ self,
+ name: str,
+ field_info: FieldInfo,
+ decorators: DecoratorInfos,
+ ) -> core_schema.ModelField:
+ """Prepare a ModelField to represent a model field."""
+ common_field = self._common_field_schema(name, field_info, decorators)
+ return core_schema.model_field(
+ common_field['schema'],
+ serialization_exclude=common_field['serialization_exclude'],
+ validation_alias=common_field['validation_alias'],
+ serialization_alias=common_field['serialization_alias'],
+ frozen=common_field['frozen'],
+ metadata=common_field['metadata'],
+ )
+
+ def _generate_dc_field_schema(
+ self,
+ name: str,
+ field_info: FieldInfo,
+ decorators: DecoratorInfos,
+ ) -> core_schema.DataclassField:
+ """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
+ common_field = self._common_field_schema(name, field_info, decorators)
+ return core_schema.dataclass_field(
+ name,
+ common_field['schema'],
+ init=field_info.init,
+ init_only=field_info.init_var or None,
+ kw_only=None if field_info.kw_only else False,
+ serialization_exclude=common_field['serialization_exclude'],
+ validation_alias=common_field['validation_alias'],
+ serialization_alias=common_field['serialization_alias'],
+ frozen=common_field['frozen'],
+ metadata=common_field['metadata'],
+ )
+
+ @staticmethod
+ def _apply_alias_generator_to_field_info(
+ alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str
+ ) -> None:
+ """Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
+
+ Args:
+ alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
+ field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
+ field_name: The name of the field from which to generate the alias.
+ """
+ # Apply an alias_generator if
+ # 1. An alias is not specified
+ # 2. An alias is specified, but the priority is <= 1
+ if (
+ field_info.alias_priority is None
+ or field_info.alias_priority <= 1
+ or field_info.alias is None
+ or field_info.validation_alias is None
+ or field_info.serialization_alias is None
+ ):
+ alias, validation_alias, serialization_alias = None, None, None
+
+ if isinstance(alias_generator, AliasGenerator):
+ alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name)
+ elif isinstance(alias_generator, Callable):
+ alias = alias_generator(field_name)
+ if not isinstance(alias, str):
+ raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
+
+ # if priority is not set, we set to 1
+ # which supports the case where the alias_generator from a child class is used
+ # to generate an alias for a field in a parent class
+ if field_info.alias_priority is None or field_info.alias_priority <= 1:
+ field_info.alias_priority = 1
+
+ # if the priority is 1, then we set the aliases to the generated alias
+ if field_info.alias_priority == 1:
+ field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
+ field_info.validation_alias = _get_first_non_null(validation_alias, alias)
+ field_info.alias = alias
+
+ # if any of the aliases are not set, then we set them to the corresponding generated alias
+ if field_info.alias is None:
+ field_info.alias = alias
+ if field_info.serialization_alias is None:
+ field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
+ if field_info.validation_alias is None:
+ field_info.validation_alias = _get_first_non_null(validation_alias, alias)
+
+ @staticmethod
+ def _apply_alias_generator_to_computed_field_info(
+ alias_generator: Callable[[str], str] | AliasGenerator,
+ computed_field_info: ComputedFieldInfo,
+ computed_field_name: str,
+ ):
+ """Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate.
+
+ Args:
+ alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
+ computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied.
+ computed_field_name: The name of the computed field from which to generate the alias.
+ """
+ # Apply an alias_generator if
+ # 1. An alias is not specified
+ # 2. An alias is specified, but the priority is <= 1
+
+ if (
+ computed_field_info.alias_priority is None
+ or computed_field_info.alias_priority <= 1
+ or computed_field_info.alias is None
+ ):
+ alias, validation_alias, serialization_alias = None, None, None
+
+ if isinstance(alias_generator, AliasGenerator):
+ alias, validation_alias, serialization_alias = alias_generator.generate_aliases(computed_field_name)
+ elif isinstance(alias_generator, Callable):
+ alias = alias_generator(computed_field_name)
+ if not isinstance(alias, str):
+ raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
+
+ # if priority is not set, we set to 1
+ # which supports the case where the alias_generator from a child class is used
+ # to generate an alias for a field in a parent class
+ if computed_field_info.alias_priority is None or computed_field_info.alias_priority <= 1:
+ computed_field_info.alias_priority = 1
+
+ # if the priority is 1, then we set the aliases to the generated alias
+ # note that we use the serialization_alias with priority over alias, as computed_field
+ # aliases are used for serialization only (not validation)
+ if computed_field_info.alias_priority == 1:
+ computed_field_info.alias = _get_first_non_null(serialization_alias, alias)
+
+ @staticmethod
+ def _apply_field_title_generator_to_field_info(
+ config_wrapper: ConfigWrapper, field_info: FieldInfo | ComputedFieldInfo, field_name: str
+ ) -> None:
+ """Apply a field_title_generator on a FieldInfo or ComputedFieldInfo instance if appropriate
+ Args:
+ config_wrapper: The config of the model
+ field_info: The FieldInfo or ComputedField instance to which the title_generator is (maybe) applied.
+ field_name: The name of the field from which to generate the title.
+ """
+ field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator
+
+ if field_title_generator is None:
+ return
+
+ if field_info.title is None:
+ title = field_title_generator(field_name, field_info) # type: ignore
+ if not isinstance(title, str):
+ raise TypeError(f'field_title_generator {field_title_generator} must return str, not {title.__class__}')
+
+ field_info.title = title
+
+ def _common_field_schema( # C901
+ self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
+ ) -> _CommonField:
+ # Update FieldInfo annotation if appropriate:
+ FieldInfo = import_cached_field_info()
+ if not field_info.evaluated:
+ # TODO Can we use field_info.apply_typevars_map here?
+ try:
+ evaluated_type = _typing_extra.eval_type(field_info.annotation, *self._types_namespace)
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+ evaluated_type = replace_types(evaluated_type, self._typevars_map)
+ field_info.evaluated = True
+ if not has_instance_in_type(evaluated_type, PydanticRecursiveRef):
+ new_field_info = FieldInfo.from_annotation(evaluated_type)
+ field_info.annotation = new_field_info.annotation
+
+ # Handle any field info attributes that may have been obtained from now-resolved annotations
+ for k, v in new_field_info._attributes_set.items():
+ # If an attribute is already set, it means it was set by assigning to a call to Field (or just a
+ # default value), and that should take the highest priority. So don't overwrite existing attributes.
+ # We skip over "attributes" that are present in the metadata_lookup dict because these won't
+ # actually end up as attributes of the `FieldInfo` instance.
+ if k not in field_info._attributes_set and k not in field_info.metadata_lookup:
+ setattr(field_info, k, v)
+
+ # Finally, ensure the field info also reflects all the `_attributes_set` that are actually metadata.
+ field_info.metadata = [*new_field_info.metadata, *field_info.metadata]
+
+ source_type, annotations = field_info.annotation, field_info.metadata
+
+ def set_discriminator(schema: CoreSchema) -> CoreSchema:
+ schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
+ return schema
+
+ # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
+ validators_from_decorators = []
+ for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name):
+ validators_from_decorators.append(_mode_to_validator[decorator.info.mode]._from_decorator(decorator))
+
+ with self.field_name_stack.push(name):
+ if field_info.discriminator is not None:
+ schema = self._apply_annotations(
+ source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
+ )
+ else:
+ schema = self._apply_annotations(
+ source_type,
+ annotations + validators_from_decorators,
+ )
+
+ # This V1 compatibility shim should eventually be removed
+ # push down any `each_item=True` validators
+ # note that this won't work for any Annotated types that get wrapped by a function validator
+ # but that's okay because that didn't exist in V1
+ this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
+ if _validators_require_validate_default(this_field_validators):
+ field_info.validate_default = True
+ each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
+ this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
+ schema = apply_each_item_validators(schema, each_item_validators, name)
+
+ schema = apply_validators(schema, this_field_validators, name)
+
+ # the default validator needs to go outside of any other validators
+ # so that it is the topmost validator for the field validator
+ # which uses it to check if the field has a default value or not
+ if not field_info.is_required():
+ schema = wrap_default(field_info, schema)
+
+ schema = self._apply_field_serializers(
+ schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
+ )
+ self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, name)
+
+ pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
+ core_metadata: dict[str, Any] = {}
+ update_core_metadata(
+ core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
+ )
+
+ alias_generator = self._config_wrapper.alias_generator
+ if alias_generator is not None:
+ self._apply_alias_generator_to_field_info(alias_generator, field_info, name)
+
+ if isinstance(field_info.validation_alias, (AliasChoices, AliasPath)):
+ validation_alias = field_info.validation_alias.convert_to_aliases()
+ else:
+ validation_alias = field_info.validation_alias
+
+ return _common_field(
+ schema,
+ serialization_exclude=True if field_info.exclude else None,
+ validation_alias=validation_alias,
+ serialization_alias=field_info.serialization_alias,
+ frozen=field_info.frozen,
+ metadata=core_metadata,
+ )
+
+ def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
+ """Generate schema for a Union."""
+ args = self._get_args_resolving_forward_refs(union_type, required=True)
+ choices: list[CoreSchema] = []
+ nullable = False
+ for arg in args:
+ if arg is None or arg is _typing_extra.NoneType:
+ nullable = True
+ else:
+ choices.append(self.generate_schema(arg))
+
+ if len(choices) == 1:
+ s = choices[0]
+ else:
+ choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
+ for choice in choices:
+ tag = choice.get('metadata', {}).get(_core_utils.TAGGED_UNION_TAG_KEY)
+ if tag is not None:
+ choices_with_tags.append((choice, tag))
+ else:
+ choices_with_tags.append(choice)
+ s = core_schema.union_schema(choices_with_tags)
+
+ if nullable:
+ s = core_schema.nullable_schema(s)
+ return s
+
+ def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
+ with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
+ if maybe_schema is not None:
+ return maybe_schema
+
+ origin: TypeAliasType = get_origin(obj) or obj
+ typevars_map = get_standard_typevars_map(obj)
+
+ with self._ns_resolver.push(origin):
+ try:
+ annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+ annotation = replace_types(annotation, typevars_map)
+ schema = self.generate_schema(annotation)
+ assert schema['type'] != 'definitions'
+ schema['ref'] = ref # type: ignore
+ self.defs.definitions[ref] = schema
+ return core_schema.definition_reference_schema(ref)
+
+ def _literal_schema(self, literal_type: Any) -> CoreSchema:
+ """Generate schema for a Literal."""
+ expected = _typing_extra.literal_values(literal_type)
+ assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
+ schema = core_schema.literal_schema(expected)
+
+ if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
+ schema = core_schema.no_info_after_validator_function(
+ lambda v: v.value if isinstance(v, Enum) else v, schema
+ )
+
+ return schema
+
+ def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
+ """Generate schema for a TypedDict.
+
+ It is not possible to track required/optional keys in TypedDict without __required_keys__
+ since TypedDict.__new__ erases the base classes (it replaces them with just `dict`)
+ and thus we can track usage of total=True/False
+ __required_keys__ was added in Python 3.9
+ (https://github.com/miss-islington/cpython/blob/1e9939657dd1f8eb9f596f77c1084d2d351172fc/Doc/library/typing.rst?plain=1#L1546-L1548)
+ however it is buggy
+ (https://github.com/python/typing_extensions/blob/ac52ac5f2cb0e00e7988bae1e2a1b8257ac88d6d/src/typing_extensions.py#L657-L666).
+
+ On 3.11 but < 3.12 TypedDict does not preserve inheritance information.
+
+ Hence to avoid creating validators that do not do what users expect we only
+ support typing.TypedDict on Python >= 3.12 or typing_extension.TypedDict on all versions
+ """
+ FieldInfo = import_cached_field_info()
+
+ with self.model_type_stack.push(typed_dict_cls), self.defs.get_schema_or_ref(typed_dict_cls) as (
+ typed_dict_ref,
+ maybe_schema,
+ ):
+ if maybe_schema is not None:
+ return maybe_schema
+
+ typevars_map = get_standard_typevars_map(typed_dict_cls)
+ if origin is not None:
+ typed_dict_cls = origin
+
+ if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
+ raise PydanticUserError(
+ 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
+ code='typed-dict-version',
+ )
+
+ try:
+ # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
+ # see https://github.com/pydantic/pydantic/issues/10917
+ config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
+ except AttributeError:
+ config = None
+
+ with self._config_wrapper_stack.push(config):
+ core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
+
+ required_keys: frozenset[str] = typed_dict_cls.__required_keys__
+
+ fields: dict[str, core_schema.TypedDictField] = {}
+
+ decorators = DecoratorInfos.build(typed_dict_cls)
+
+ if self._config_wrapper.use_attribute_docstrings:
+ field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
+ else:
+ field_docstrings = None
+
+ try:
+ annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+
+ for field_name, annotation in annotations.items():
+ annotation = replace_types(annotation, typevars_map)
+ required = field_name in required_keys
+
+ if _typing_extra.is_required(annotation):
+ required = True
+ annotation = self._get_args_resolving_forward_refs(
+ annotation,
+ required=True,
+ )[0]
+ elif _typing_extra.is_not_required(annotation):
+ required = False
+ annotation = self._get_args_resolving_forward_refs(
+ annotation,
+ required=True,
+ )[0]
+
+ field_info = FieldInfo.from_annotation(annotation)
+ if (
+ field_docstrings is not None
+ and field_info.description is None
+ and field_name in field_docstrings
+ ):
+ field_info.description = field_docstrings[field_name]
+ self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, field_name)
+ fields[field_name] = self._generate_td_field_schema(
+ field_name, field_info, decorators, required=required
+ )
+
+ td_schema = core_schema.typed_dict_schema(
+ fields,
+ cls=typed_dict_cls,
+ computed_fields=[
+ self._computed_field_schema(d, decorators.field_serializers)
+ for d in decorators.computed_fields.values()
+ ],
+ ref=typed_dict_ref,
+ config=core_config,
+ )
+
+ schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
+ schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
+ self.defs.definitions[typed_dict_ref] = schema
+ return core_schema.definition_reference_schema(typed_dict_ref)
+
+ def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
+ """Generate schema for a NamedTuple."""
+ with self.model_type_stack.push(namedtuple_cls), self.defs.get_schema_or_ref(namedtuple_cls) as (
+ namedtuple_ref,
+ maybe_schema,
+ ):
+ if maybe_schema is not None:
+ return maybe_schema
+ typevars_map = get_standard_typevars_map(namedtuple_cls)
+ if origin is not None:
+ namedtuple_cls = origin
+
+ try:
+ annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+ if not annotations:
+ # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
+ annotations: dict[str, Any] = {k: Any for k in namedtuple_cls._fields}
+
+ if typevars_map:
+ annotations = {
+ field_name: replace_types(annotation, typevars_map)
+ for field_name, annotation in annotations.items()
+ }
+
+ arguments_schema = core_schema.arguments_schema(
+ [
+ self._generate_parameter_schema(
+ field_name,
+ annotation,
+ default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
+ )
+ for field_name, annotation in annotations.items()
+ ],
+ metadata={'pydantic_js_prefer_positional_arguments': True},
+ )
+ return core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
+
+ def _generate_parameter_schema(
+ self,
+ name: str,
+ annotation: type[Any],
+ default: Any = Parameter.empty,
+ mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
+ ) -> core_schema.ArgumentsParameter:
+ """Prepare a ArgumentsParameter to represent a field in a namedtuple or function signature."""
+ FieldInfo = import_cached_field_info()
+
+ if default is Parameter.empty:
+ field = FieldInfo.from_annotation(annotation)
+ else:
+ field = FieldInfo.from_annotated_attribute(annotation, default)
+ assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
+ with self.field_name_stack.push(name):
+ schema = self._apply_annotations(field.annotation, [field])
+
+ if not field.is_required():
+ schema = wrap_default(field, schema)
+
+ parameter_schema = core_schema.arguments_parameter(name, schema)
+ if mode is not None:
+ parameter_schema['mode'] = mode
+ if field.alias is not None:
+ parameter_schema['alias'] = field.alias
+ else:
+ alias_generator = self._config_wrapper.alias_generator
+ if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None:
+ parameter_schema['alias'] = alias_generator.alias(name)
+ elif isinstance(alias_generator, Callable):
+ parameter_schema['alias'] = alias_generator(name)
+ return parameter_schema
+
+ def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
+ """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
+ # TODO: do we really need to resolve type vars here?
+ typevars_map = get_standard_typevars_map(tuple_type)
+ params = self._get_args_resolving_forward_refs(tuple_type)
+
+ if typevars_map and params:
+ params = tuple(replace_types(param, typevars_map) for param in params)
+
+ # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
+ # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
+ if not params:
+ if tuple_type in TUPLE_TYPES:
+ return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
+ else:
+ # special case for `tuple[()]` which means `tuple[]` - an empty tuple
+ return core_schema.tuple_schema([])
+ elif params[-1] is Ellipsis:
+ if len(params) == 2:
+ return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
+ else:
+ # TODO: something like https://github.com/pydantic/pydantic/issues/5952
+ raise ValueError('Variable tuples can only have one type')
+ elif len(params) == 1 and params[0] == ():
+ # special case for `Tuple[()]` which means `Tuple[]` - an empty tuple
+ # NOTE: This conditional can be removed when we drop support for Python 3.10.
+ return core_schema.tuple_schema([])
+ else:
+ return core_schema.tuple_schema([self.generate_schema(param) for param in params])
+
+ def _type_schema(self) -> core_schema.CoreSchema:
+ return core_schema.custom_error_schema(
+ core_schema.is_instance_schema(type),
+ custom_error_type='is_type',
+ custom_error_message='Input should be a type',
+ )
+
+ def _zoneinfo_schema(self) -> core_schema.CoreSchema:
+ """Generate schema for a zone_info.ZoneInfo object"""
+ # we're def >=py3.9 if ZoneInfo was included in input
+ if sys.version_info < (3, 9):
+ assert False, 'Unreachable'
+
+ # import in this path is safe
+ from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
+
+ def validate_str_is_valid_iana_tz(value: Any, /) -> ZoneInfo:
+ if isinstance(value, ZoneInfo):
+ return value
+ try:
+ return ZoneInfo(value)
+ except (ZoneInfoNotFoundError, ValueError, TypeError):
+ raise PydanticCustomError('zoneinfo_str', 'invalid timezone: {value}', {'value': value})
+
+ metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
+ return core_schema.no_info_plain_validator_function(
+ validate_str_is_valid_iana_tz,
+ serialization=core_schema.to_string_ser_schema(),
+ metadata=metadata,
+ )
+
+ def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
+ """Generate schema for `Type[Union[X, ...]]`."""
+ args = self._get_args_resolving_forward_refs(union_type, required=True)
+ return core_schema.union_schema([self.generate_schema(typing.Type[args]) for args in args])
+
+ def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
+ """Generate schema for a Type, e.g. `Type[int]`."""
+ type_param = self._get_first_arg_or_any(type_)
+
+ # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
+ type_param = _typing_extra.annotated_type(type_param) or type_param
+
+ if _typing_extra.is_any(type_param):
+ return self._type_schema()
+ elif _typing_extra.is_type_alias_type(type_param):
+ return self.generate_schema(typing.Type[type_param.__value__])
+ elif isinstance(type_param, typing.TypeVar):
+ if type_param.__bound__:
+ if _typing_extra.origin_is_union(get_origin(type_param.__bound__)):
+ return self._union_is_subclass_schema(type_param.__bound__)
+ return core_schema.is_subclass_schema(type_param.__bound__)
+ elif type_param.__constraints__:
+ return core_schema.union_schema(
+ [self.generate_schema(typing.Type[c]) for c in type_param.__constraints__]
+ )
+ else:
+ return self._type_schema()
+ elif _typing_extra.origin_is_union(get_origin(type_param)):
+ return self._union_is_subclass_schema(type_param)
+ else:
+ if _typing_extra.is_self(type_param):
+ type_param = self._resolve_self_type(type_param)
+
+ if not inspect.isclass(type_param):
+ raise TypeError(f'Expected a class, got {type_param!r}')
+ return core_schema.is_subclass_schema(type_param)
+
+ def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
+ """Generate schema for a Sequence, e.g. `Sequence[int]`."""
+ from ._serializers import serialize_sequence_via_list
+
+ item_type_schema = self.generate_schema(items_type)
+ list_schema = core_schema.list_schema(item_type_schema)
+
+ json_schema = smart_deepcopy(list_schema)
+ python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
+ if not _typing_extra.is_any(items_type):
+ from ._validators import sequence_validator
+
+ python_schema = core_schema.chain_schema(
+ [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
+ )
+
+ serialization = core_schema.wrap_serializer_function_ser_schema(
+ serialize_sequence_via_list, schema=item_type_schema, info_arg=True
+ )
+ return core_schema.json_or_python_schema(
+ json_schema=json_schema, python_schema=python_schema, serialization=serialization
+ )
+
+ def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
+ """Generate a schema for an `Iterable`."""
+ item_type = self._get_first_arg_or_any(type_)
+
+ return core_schema.generator_schema(self.generate_schema(item_type))
+
+ def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
+ from . import _validators
+
+ metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
+ ser = core_schema.plain_serializer_function_ser_schema(
+ attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
+ )
+ if pattern_type is typing.Pattern or pattern_type is re.Pattern:
+ # bare type
+ return core_schema.no_info_plain_validator_function(
+ _validators.pattern_either_validator, serialization=ser, metadata=metadata
+ )
+
+ param = self._get_args_resolving_forward_refs(
+ pattern_type,
+ required=True,
+ )[0]
+ if param is str:
+ return core_schema.no_info_plain_validator_function(
+ _validators.pattern_str_validator, serialization=ser, metadata=metadata
+ )
+ elif param is bytes:
+ return core_schema.no_info_plain_validator_function(
+ _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
+ )
+ else:
+ raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
+
+ def _hashable_schema(self) -> core_schema.CoreSchema:
+ return core_schema.custom_error_schema(
+ schema=core_schema.json_or_python_schema(
+ json_schema=core_schema.chain_schema(
+ [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
+ ),
+ python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
+ ),
+ custom_error_type='is_hashable',
+ custom_error_message='Input should be hashable',
+ )
+
+ def _dataclass_schema(
+ self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
+ ) -> core_schema.CoreSchema:
+ """Generate schema for a dataclass."""
+ with self.model_type_stack.push(dataclass), self.defs.get_schema_or_ref(dataclass) as (
+ dataclass_ref,
+ maybe_schema,
+ ):
+ if maybe_schema is not None:
+ return maybe_schema
+
+ typevars_map = get_standard_typevars_map(dataclass)
+ if origin is not None:
+ dataclass = origin
+
+ # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
+ # (Pydantic dataclasses have an empty dict config by default).
+ # see https://github.com/pydantic/pydantic/issues/10917
+ config = getattr(dataclass, '__pydantic_config__', None)
+
+ from ..dataclasses import is_pydantic_dataclass
+
+ with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
+ if is_pydantic_dataclass(dataclass):
+ fields = deepcopy(dataclass.__pydantic_fields__)
+ if typevars_map:
+ for field in fields.values():
+ field.apply_typevars_map(typevars_map, *self._types_namespace)
+ else:
+ fields = collect_dataclass_fields(
+ dataclass,
+ typevars_map=typevars_map,
+ )
+
+ if self._config_wrapper.extra == 'allow':
+ # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
+ for field_name, field in fields.items():
+ if field.init is False:
+ raise PydanticUserError(
+ f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
+ f'This combination is not allowed.',
+ code='dataclass-init-false-extra-allow',
+ )
+
+ decorators = dataclass.__dict__.get('__pydantic_decorators__') or DecoratorInfos.build(dataclass)
+ # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
+ # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
+ args = sorted(
+ (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
+ key=lambda a: a.get('kw_only') is not False,
+ )
+ has_post_init = hasattr(dataclass, '__post_init__')
+ has_slots = hasattr(dataclass, '__slots__')
+
+ args_schema = core_schema.dataclass_args_schema(
+ dataclass.__name__,
+ args,
+ computed_fields=[
+ self._computed_field_schema(d, decorators.field_serializers)
+ for d in decorators.computed_fields.values()
+ ],
+ collect_init_only=has_post_init,
+ )
+
+ inner_schema = apply_validators(args_schema, decorators.root_validators.values(), None)
+
+ model_validators = decorators.model_validators.values()
+ inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
+
+ core_config = self._config_wrapper.core_config(title=dataclass.__name__)
+
+ dc_schema = core_schema.dataclass_schema(
+ dataclass,
+ inner_schema,
+ generic_origin=origin,
+ post_init=has_post_init,
+ ref=dataclass_ref,
+ fields=[field.name for field in dataclasses.fields(dataclass)],
+ slots=has_slots,
+ config=core_config,
+ # we don't use a custom __setattr__ for dataclasses, so we must
+ # pass along the frozen config setting to the pydantic-core schema
+ frozen=self._config_wrapper_stack.tail.frozen,
+ )
+ schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
+ schema = apply_model_validators(schema, model_validators, 'outer')
+ self.defs.definitions[dataclass_ref] = schema
+ return core_schema.definition_reference_schema(dataclass_ref)
+
+ def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
+ """Generate schema for a Callable.
+
+ TODO support functional validators once we support them in Config
+ """
+ sig = signature(function)
+ globalns, localns = self._types_namespace
+ type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
+
+ mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
+ Parameter.POSITIONAL_ONLY: 'positional_only',
+ Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
+ Parameter.KEYWORD_ONLY: 'keyword_only',
+ }
+
+ arguments_list: list[core_schema.ArgumentsParameter] = []
+ var_args_schema: core_schema.CoreSchema | None = None
+ var_kwargs_schema: core_schema.CoreSchema | None = None
+ var_kwargs_mode: core_schema.VarKwargsMode | None = None
+
+ for name, p in sig.parameters.items():
+ if p.annotation is sig.empty:
+ annotation = typing.cast(Any, Any)
+ else:
+ annotation = type_hints[name]
+
+ parameter_mode = mode_lookup.get(p.kind)
+ if parameter_mode is not None:
+ arg_schema = self._generate_parameter_schema(name, annotation, p.default, parameter_mode)
+ arguments_list.append(arg_schema)
+ elif p.kind == Parameter.VAR_POSITIONAL:
+ var_args_schema = self.generate_schema(annotation)
+ else:
+ assert p.kind == Parameter.VAR_KEYWORD, p.kind
+
+ unpack_type = _typing_extra.unpack_type(annotation)
+ if unpack_type is not None:
+ if not is_typeddict(unpack_type):
+ raise PydanticUserError(
+ f'Expected a `TypedDict` class, got {unpack_type.__name__!r}', code='unpack-typed-dict'
+ )
+ non_pos_only_param_names = {
+ name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
+ }
+ overlapping_params = non_pos_only_param_names.intersection(unpack_type.__annotations__)
+ if overlapping_params:
+ raise PydanticUserError(
+ f'Typed dictionary {unpack_type.__name__!r} overlaps with parameter'
+ f"{'s' if len(overlapping_params) >= 2 else ''} "
+ f"{', '.join(repr(p) for p in sorted(overlapping_params))}",
+ code='overlapping-unpack-typed-dict',
+ )
+
+ var_kwargs_mode = 'unpacked-typed-dict'
+ var_kwargs_schema = self._typed_dict_schema(unpack_type, None)
+ else:
+ var_kwargs_mode = 'uniform'
+ var_kwargs_schema = self.generate_schema(annotation)
+
+ return_schema: core_schema.CoreSchema | None = None
+ config_wrapper = self._config_wrapper
+ if config_wrapper.validate_return:
+ return_hint = sig.return_annotation
+ if return_hint is not sig.empty:
+ return_schema = self.generate_schema(return_hint)
+
+ return core_schema.call_schema(
+ core_schema.arguments_schema(
+ arguments_list,
+ var_args_schema=var_args_schema,
+ var_kwargs_mode=var_kwargs_mode,
+ var_kwargs_schema=var_kwargs_schema,
+ populate_by_name=config_wrapper.populate_by_name,
+ ),
+ function,
+ return_schema=return_schema,
+ )
+
+ def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
+ assert isinstance(typevar, typing.TypeVar)
+
+ bound = typevar.__bound__
+ constraints = typevar.__constraints__
+
+ try:
+ typevar_has_default = typevar.has_default() # type: ignore
+ except AttributeError:
+ # could still have a default if it's an old version of typing_extensions.TypeVar
+ typevar_has_default = getattr(typevar, '__default__', None) is not None
+
+ if (bound is not None) + (len(constraints) != 0) + typevar_has_default > 1:
+ raise NotImplementedError(
+ 'Pydantic does not support mixing more than one of TypeVar bounds, constraints and defaults'
+ )
+
+ if typevar_has_default:
+ return self.generate_schema(typevar.__default__) # type: ignore
+ elif constraints:
+ return self._union_schema(typing.Union[constraints]) # type: ignore
+ elif bound:
+ schema = self.generate_schema(bound)
+ schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
+ lambda x, h: h(x), schema=core_schema.any_schema()
+ )
+ return schema
+ else:
+ return core_schema.any_schema()
+
+ def _computed_field_schema(
+ self,
+ d: Decorator[ComputedFieldInfo],
+ field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
+ ) -> core_schema.ComputedField:
+ try:
+ # Do not pass in globals as the function could be defined in a different module.
+ # Instead, let `get_function_return_type` infer the globals to use, but still pass
+ # in locals that may contain a parent/rebuild namespace:
+ return_type = _decorators.get_function_return_type(
+ d.func, d.info.return_type, localns=self._types_namespace.locals
+ )
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+ if return_type is PydanticUndefined:
+ raise PydanticUserError(
+ 'Computed field is missing return type annotation or specifying `return_type`'
+ ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int|str)`)',
+ code='model-field-missing-annotation',
+ )
+
+ return_type = replace_types(return_type, self._typevars_map)
+ # Create a new ComputedFieldInfo so that different type parametrizations of the same
+ # generic model's computed field can have different return types.
+ d.info = dataclasses.replace(d.info, return_type=return_type)
+ return_type_schema = self.generate_schema(return_type)
+ # Apply serializers to computed field if there exist
+ return_type_schema = self._apply_field_serializers(
+ return_type_schema,
+ filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
+ )
+
+ alias_generator = self._config_wrapper.alias_generator
+ if alias_generator is not None:
+ self._apply_alias_generator_to_computed_field_info(
+ alias_generator=alias_generator, computed_field_info=d.info, computed_field_name=d.cls_var_name
+ )
+ self._apply_field_title_generator_to_field_info(self._config_wrapper, d.info, d.cls_var_name)
+
+ pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
+ core_metadata: dict[str, Any] = {}
+ update_core_metadata(
+ core_metadata,
+ pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
+ pydantic_js_extra=pydantic_js_extra,
+ )
+ return core_schema.computed_field(
+ d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata
+ )
+
+ def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
+ """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
+ FieldInfo = import_cached_field_info()
+
+ source_type, *annotations = self._get_args_resolving_forward_refs(
+ annotated_type,
+ required=True,
+ )
+ schema = self._apply_annotations(source_type, annotations)
+ # put the default validator last so that TypeAdapter.get_default_value() works
+ # even if there are function validators involved
+ for annotation in annotations:
+ if isinstance(annotation, FieldInfo):
+ schema = wrap_default(annotation, schema)
+ return schema
+
+ def _get_prepare_pydantic_annotations_for_known_type(
+ self, obj: Any, annotations: tuple[Any, ...]
+ ) -> tuple[Any, list[Any]] | None:
+ from ._std_types_schema import (
+ deque_schema_prepare_pydantic_annotations,
+ mapping_like_prepare_pydantic_annotations,
+ path_schema_prepare_pydantic_annotations,
+ )
+
+ # Check for hashability
+ try:
+ hash(obj)
+ except TypeError:
+ # obj is definitely not a known type if this fails
+ return None
+
+ # TODO: I'd rather we didn't handle the generic nature in the annotations prep, but the same way we do other
+ # generic types like list[str] via _match_generic_type, but I'm not sure if we can do that because this is
+ # not always called from match_type, but sometimes from _apply_annotations
+ obj_origin = get_origin(obj) or obj
+
+ if obj_origin in PATH_TYPES:
+ return path_schema_prepare_pydantic_annotations(obj, annotations)
+ elif obj_origin in DEQUE_TYPES:
+ return deque_schema_prepare_pydantic_annotations(obj, annotations)
+ elif obj_origin in MAPPING_TYPES:
+ return mapping_like_prepare_pydantic_annotations(obj, annotations)
+ else:
+ return None
+
+ def _apply_annotations(
+ self,
+ source_type: Any,
+ annotations: list[Any],
+ transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
+ ) -> CoreSchema:
+ """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
+
+ This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
+ not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
+ (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
+ """
+ annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
+ res = self._get_prepare_pydantic_annotations_for_known_type(source_type, tuple(annotations))
+ if res is not None:
+ source_type, annotations = res
+
+ pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
+
+ def inner_handler(obj: Any) -> CoreSchema:
+ from_property = self._generate_schema_from_property(obj, source_type)
+ if from_property is None:
+ schema = self._generate_schema_inner(obj)
+ else:
+ schema = from_property
+ metadata_js_function = _extract_get_pydantic_json_schema(obj, schema)
+ if metadata_js_function is not None:
+ metadata_schema = resolve_original_schema(schema, self.defs.definitions)
+ if metadata_schema is not None:
+ self._add_js_function(metadata_schema, metadata_js_function)
+ return transform_inner_schema(schema)
+
+ get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
+
+ for annotation in annotations:
+ if annotation is None:
+ continue
+ get_inner_schema = self._get_wrapped_inner_schema(
+ get_inner_schema, annotation, pydantic_js_annotation_functions
+ )
+
+ schema = get_inner_schema(source_type)
+ if pydantic_js_annotation_functions:
+ core_metadata = schema.setdefault('metadata', {})
+ update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
+ return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
+
+ def _apply_single_annotation(self, schema: core_schema.CoreSchema, metadata: Any) -> core_schema.CoreSchema:
+ FieldInfo = import_cached_field_info()
+
+ if isinstance(metadata, FieldInfo):
+ for field_metadata in metadata.metadata:
+ schema = self._apply_single_annotation(schema, field_metadata)
+
+ if metadata.discriminator is not None:
+ schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
+ return schema
+
+ if schema['type'] == 'nullable':
+ # for nullable schemas, metadata is automatically applied to the inner schema
+ inner = schema.get('schema', core_schema.any_schema())
+ inner = self._apply_single_annotation(inner, metadata)
+ if inner:
+ schema['schema'] = inner
+ return schema
+
+ original_schema = schema
+ ref = schema.get('ref', None)
+ if ref is not None:
+ schema = schema.copy()
+ new_ref = ref + f'_{repr(metadata)}'
+ if new_ref in self.defs.definitions:
+ return self.defs.definitions[new_ref]
+ schema['ref'] = new_ref # type: ignore
+ elif schema['type'] == 'definition-ref':
+ ref = schema['schema_ref']
+ if ref in self.defs.definitions:
+ schema = self.defs.definitions[ref].copy()
+ new_ref = ref + f'_{repr(metadata)}'
+ if new_ref in self.defs.definitions:
+ return self.defs.definitions[new_ref]
+ schema['ref'] = new_ref # type: ignore
+
+ maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema.copy())
+
+ if maybe_updated_schema is not None:
+ return maybe_updated_schema
+ return original_schema
+
+ def _apply_single_annotation_json_schema(
+ self, schema: core_schema.CoreSchema, metadata: Any
+ ) -> core_schema.CoreSchema:
+ FieldInfo = import_cached_field_info()
+
+ if isinstance(metadata, FieldInfo):
+ for field_metadata in metadata.metadata:
+ schema = self._apply_single_annotation_json_schema(schema, field_metadata)
+
+ pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
+ core_metadata = schema.setdefault('metadata', {})
+ update_core_metadata(
+ core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
+ )
+ return schema
+
+ def _get_wrapped_inner_schema(
+ self,
+ get_inner_schema: GetCoreSchemaHandler,
+ annotation: Any,
+ pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
+ ) -> CallbackGetCoreSchemaHandler:
+ metadata_get_schema: GetCoreSchemaFunction = getattr(annotation, '__get_pydantic_core_schema__', None) or (
+ lambda source, handler: handler(source)
+ )
+
+ def new_handler(source: Any) -> core_schema.CoreSchema:
+ schema = metadata_get_schema(source, get_inner_schema)
+ schema = self._apply_single_annotation(schema, annotation)
+ schema = self._apply_single_annotation_json_schema(schema, annotation)
+
+ metadata_js_function = _extract_get_pydantic_json_schema(annotation, schema)
+ if metadata_js_function is not None:
+ pydantic_js_annotation_functions.append(metadata_js_function)
+ return schema
+
+ return CallbackGetCoreSchemaHandler(new_handler, self)
+
+ def _apply_field_serializers(
+ self,
+ schema: core_schema.CoreSchema,
+ serializers: list[Decorator[FieldSerializerDecoratorInfo]],
+ ) -> core_schema.CoreSchema:
+ """Apply field serializers to a schema."""
+ if serializers:
+ schema = copy(schema)
+ if schema['type'] == 'definitions':
+ inner_schema = schema['schema']
+ schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
+ return schema
+ else:
+ ref = typing.cast('str|None', schema.get('ref', None))
+ if ref is not None:
+ self.defs.definitions[ref] = schema
+ schema = core_schema.definition_reference_schema(ref)
+
+ # use the last serializer to make it easy to override a serializer set on a parent model
+ serializer = serializers[-1]
+ is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
+
+ try:
+ # Do not pass in globals as the function could be defined in a different module.
+ # Instead, let `get_function_return_type` infer the globals to use, but still pass
+ # in locals that may contain a parent/rebuild namespace:
+ return_type = _decorators.get_function_return_type(
+ serializer.func, serializer.info.return_type, localns=self._types_namespace.locals
+ )
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+
+ if return_type is PydanticUndefined:
+ return_schema = None
+ else:
+ return_schema = self.generate_schema(return_type)
+
+ if serializer.info.mode == 'wrap':
+ schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
+ serializer.func,
+ is_field_serializer=is_field_serializer,
+ info_arg=info_arg,
+ return_schema=return_schema,
+ when_used=serializer.info.when_used,
+ )
+ else:
+ assert serializer.info.mode == 'plain'
+ schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
+ serializer.func,
+ is_field_serializer=is_field_serializer,
+ info_arg=info_arg,
+ return_schema=return_schema,
+ when_used=serializer.info.when_used,
+ )
+ return schema
+
+ def _apply_model_serializers(
+ self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
+ ) -> core_schema.CoreSchema:
+ """Apply model serializers to a schema."""
+ ref: str | None = schema.pop('ref', None) # type: ignore
+ if serializers:
+ serializer = list(serializers)[-1]
+ info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
+
+ try:
+ # Do not pass in globals as the function could be defined in a different module.
+ # Instead, let `get_function_return_type` infer the globals to use, but still pass
+ # in locals that may contain a parent/rebuild namespace:
+ return_type = _decorators.get_function_return_type(
+ serializer.func, serializer.info.return_type, localns=self._types_namespace.locals
+ )
+ except NameError as e:
+ raise PydanticUndefinedAnnotation.from_name_error(e) from e
+ if return_type is PydanticUndefined:
+ return_schema = None
+ else:
+ return_schema = self.generate_schema(return_type)
+
+ if serializer.info.mode == 'wrap':
+ ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
+ serializer.func,
+ info_arg=info_arg,
+ return_schema=return_schema,
+ when_used=serializer.info.when_used,
+ )
+ else:
+ # plain
+ ser_schema = core_schema.plain_serializer_function_ser_schema(
+ serializer.func,
+ info_arg=info_arg,
+ return_schema=return_schema,
+ when_used=serializer.info.when_used,
+ )
+ schema['serialization'] = ser_schema
+ if ref:
+ schema['ref'] = ref # type: ignore
+ return schema
+
+
+_VALIDATOR_F_MATCH: Mapping[
+ tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
+ Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema],
+] = {
+ ('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema),
+ ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema),
+ ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f),
+ ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema),
+ ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function(
+ f, schema, field_name=field_name
+ ),
+ ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function(
+ f, schema, field_name=field_name
+ ),
+ ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function(
+ f, field_name=field_name
+ ),
+ ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function(
+ f, schema, field_name=field_name
+ ),
+}
+
+
+# TODO V3: this function is only used for deprecated decorators. It should
+# be removed once we drop support for those.
+def apply_validators(
+ schema: core_schema.CoreSchema,
+ validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
+ | Iterable[Decorator[ValidatorDecoratorInfo]]
+ | Iterable[Decorator[FieldValidatorDecoratorInfo]],
+ field_name: str | None,
+) -> core_schema.CoreSchema:
+ """Apply validators to a schema.
+
+ Args:
+ schema: The schema to apply validators on.
+ validators: An iterable of validators.
+ field_name: The name of the field if validators are being applied to a model field.
+
+ Returns:
+ The updated schema.
+ """
+ for validator in validators:
+ info_arg = inspect_validator(validator.func, validator.info.mode)
+ val_type = 'with-info' if info_arg else 'no-info'
+
+ schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema, field_name)
+ return schema
+
+
+def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
+ """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
+
+ This serves as an auxiliary function for re-implementing that logic, by looping over a provided
+ collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
+
+ We should be able to drop this function and the associated logic calling it once we drop support
+ for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
+ to the v1-validator `always` kwarg to `field_validator`.)
+ """
+ for validator in validators:
+ if validator.info.always:
+ return True
+ return False
+
+
+def apply_model_validators(
+ schema: core_schema.CoreSchema,
+ validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
+ mode: Literal['inner', 'outer', 'all'],
+) -> core_schema.CoreSchema:
+ """Apply model validators to a schema.
+
+ If mode == 'inner', only "before" validators are applied
+ If mode == 'outer', validators other than "before" are applied
+ If mode == 'all', all validators are applied
+
+ Args:
+ schema: The schema to apply validators on.
+ validators: An iterable of validators.
+ mode: The validator mode.
+
+ Returns:
+ The updated schema.
+ """
+ ref: str | None = schema.pop('ref', None) # type: ignore
+ for validator in validators:
+ if mode == 'inner' and validator.info.mode != 'before':
+ continue
+ if mode == 'outer' and validator.info.mode == 'before':
+ continue
+ info_arg = inspect_validator(validator.func, validator.info.mode)
+ if validator.info.mode == 'wrap':
+ if info_arg:
+ schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
+ else:
+ schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
+ elif validator.info.mode == 'before':
+ if info_arg:
+ schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
+ else:
+ schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
+ else:
+ assert validator.info.mode == 'after'
+ if info_arg:
+ schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
+ else:
+ schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
+ if ref:
+ schema['ref'] = ref # type: ignore
+ return schema
+
+
+def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
+ """Wrap schema with default schema if default value or `default_factory` are available.
+
+ Args:
+ field_info: The field info object.
+ schema: The schema to apply default on.
+
+ Returns:
+ Updated schema by default value or `default_factory`.
+ """
+ if field_info.default_factory:
+ return core_schema.with_default_schema(
+ schema,
+ default_factory=field_info.default_factory,
+ default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
+ validate_default=field_info.validate_default,
+ )
+ elif field_info.default is not PydanticUndefined:
+ return core_schema.with_default_schema(
+ schema, default=field_info.default, validate_default=field_info.validate_default
+ )
+ else:
+ return schema
+
+
+def _extract_get_pydantic_json_schema(tp: Any, schema: CoreSchema) -> GetJsonSchemaFunction | None:
+ """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
+ js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
+
+ if hasattr(tp, '__modify_schema__'):
+ BaseModel = import_cached_base_model()
+
+ has_custom_v2_modify_js_func = (
+ js_modify_function is not None
+ and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
+ not in (js_modify_function, getattr(js_modify_function, '__func__', None))
+ )
+
+ if not has_custom_v2_modify_js_func:
+ cls_name = getattr(tp, '__name__', None)
+ raise PydanticUserError(
+ f'The `__modify_schema__` method is not supported in Pydantic v2. '
+ f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
+ code='custom-json-schema',
+ )
+
+ # handle GenericAlias' but ignore Annotated which "lies" about its origin (in this case it would be `int`)
+ if hasattr(tp, '__origin__') and not _typing_extra.is_annotated(tp):
+ return _extract_get_pydantic_json_schema(tp.__origin__, schema)
+
+ if js_modify_function is None:
+ return None
+
+ return js_modify_function
+
+
+class _CommonField(TypedDict):
+ schema: core_schema.CoreSchema
+ validation_alias: str | list[str | int] | list[list[str | int]] | None
+ serialization_alias: str | None
+ serialization_exclude: bool | None
+ frozen: bool | None
+ metadata: dict[str, Any]
+
+
+def _common_field(
+ schema: core_schema.CoreSchema,
+ *,
+ validation_alias: str | list[str | int] | list[list[str | int]] | None = None,
+ serialization_alias: str | None = None,
+ serialization_exclude: bool | None = None,
+ frozen: bool | None = None,
+ metadata: Any = None,
+) -> _CommonField:
+ return {
+ 'schema': schema,
+ 'validation_alias': validation_alias,
+ 'serialization_alias': serialization_alias,
+ 'serialization_exclude': serialization_exclude,
+ 'frozen': frozen,
+ 'metadata': metadata,
+ }
+
+
+class _Definitions:
+ """Keeps track of references and definitions."""
+
+ def __init__(self) -> None:
+ self.seen: set[str] = set()
+ self.definitions: dict[str, core_schema.CoreSchema] = {}
+
+ @contextmanager
+ def get_schema_or_ref(self, tp: Any) -> Iterator[tuple[str, None] | tuple[str, CoreSchema]]:
+ """Get a definition for `tp` if one exists.
+
+ If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
+ If no definition exists yet, a tuple of `(ref_string, None)` is returned.
+
+ Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
+ not the actual definition itself.
+
+ This should be called for any type that can be identified by reference.
+ This includes any recursive types.
+
+ At present the following types can be named/recursive:
+
+ - BaseModel
+ - Dataclasses
+ - TypedDict
+ - TypeAliasType
+ """
+ ref = get_type_ref(tp)
+ # return the reference if we're either (1) in a cycle or (2) it was already defined
+ if ref in self.seen or ref in self.definitions:
+ yield (ref, core_schema.definition_reference_schema(ref))
+ else:
+ self.seen.add(ref)
+ try:
+ yield (ref, None)
+ finally:
+ self.seen.discard(ref)
+
+
+def resolve_original_schema(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> CoreSchema | None:
+ if schema['type'] == 'definition-ref':
+ return definitions.get(schema['schema_ref'], None)
+ elif schema['type'] == 'definitions':
+ return schema['schema']
+ else:
+ return schema
+
+
+class _FieldNameStack:
+ __slots__ = ('_stack',)
+
+ def __init__(self) -> None:
+ self._stack: list[str] = []
+
+ @contextmanager
+ def push(self, field_name: str) -> Iterator[None]:
+ self._stack.append(field_name)
+ yield
+ self._stack.pop()
+
+ def get(self) -> str | None:
+ if self._stack:
+ return self._stack[-1]
+ else:
+ return None
+
+
+class _ModelTypeStack:
+ __slots__ = ('_stack',)
+
+ def __init__(self) -> None:
+ self._stack: list[type] = []
+
+ @contextmanager
+ def push(self, type_obj: type) -> Iterator[None]:
+ self._stack.append(type_obj)
+ yield
+ self._stack.pop()
+
+ def get(self) -> type | None:
+ if self._stack:
+ return self._stack[-1]
+ else:
+ return None