aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pydantic/networks.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pydantic/networks.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pydantic/networks.py1291
1 files changed, 1291 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pydantic/networks.py b/.venv/lib/python3.12/site-packages/pydantic/networks.py
new file mode 100644
index 00000000..3d2b842d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pydantic/networks.py
@@ -0,0 +1,1291 @@
+"""The networks module contains types for common network-related fields."""
+
+from __future__ import annotations as _annotations
+
+import dataclasses as _dataclasses
+import re
+from dataclasses import fields
+from functools import lru_cache
+from importlib.metadata import version
+from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
+from typing import TYPE_CHECKING, Any, ClassVar
+
+from pydantic_core import (
+ MultiHostHost,
+ PydanticCustomError,
+ PydanticSerializationUnexpectedValue,
+ SchemaSerializer,
+ core_schema,
+)
+from pydantic_core import MultiHostUrl as _CoreMultiHostUrl
+from pydantic_core import Url as _CoreUrl
+from typing_extensions import Annotated, Self, TypeAlias
+
+from pydantic.errors import PydanticUserError
+
+from ._internal import _repr, _schema_generation_shared
+from ._migration import getattr_migration
+from .annotated_handlers import GetCoreSchemaHandler
+from .json_schema import JsonSchemaValue
+from .type_adapter import TypeAdapter
+
+if TYPE_CHECKING:
+ import email_validator
+
+ NetworkType: TypeAlias = 'str | bytes | int | tuple[str | bytes | int, str | int]'
+
+else:
+ email_validator = None
+
+
+__all__ = [
+ 'AnyUrl',
+ 'AnyHttpUrl',
+ 'FileUrl',
+ 'FtpUrl',
+ 'HttpUrl',
+ 'WebsocketUrl',
+ 'AnyWebsocketUrl',
+ 'UrlConstraints',
+ 'EmailStr',
+ 'NameEmail',
+ 'IPvAnyAddress',
+ 'IPvAnyInterface',
+ 'IPvAnyNetwork',
+ 'PostgresDsn',
+ 'CockroachDsn',
+ 'AmqpDsn',
+ 'RedisDsn',
+ 'MongoDsn',
+ 'KafkaDsn',
+ 'NatsDsn',
+ 'validate_email',
+ 'MySQLDsn',
+ 'MariaDBDsn',
+ 'ClickHouseDsn',
+ 'SnowflakeDsn',
+]
+
+
+@_dataclasses.dataclass
+class UrlConstraints:
+ """Url constraints.
+
+ Attributes:
+ max_length: The maximum length of the url. Defaults to `None`.
+ allowed_schemes: The allowed schemes. Defaults to `None`.
+ host_required: Whether the host is required. Defaults to `None`.
+ default_host: The default host. Defaults to `None`.
+ default_port: The default port. Defaults to `None`.
+ default_path: The default path. Defaults to `None`.
+ """
+
+ max_length: int | None = None
+ allowed_schemes: list[str] | None = None
+ host_required: bool | None = None
+ default_host: str | None = None
+ default_port: int | None = None
+ default_path: str | None = None
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self.max_length,
+ tuple(self.allowed_schemes) if self.allowed_schemes is not None else None,
+ self.host_required,
+ self.default_host,
+ self.default_port,
+ self.default_path,
+ )
+ )
+
+ @property
+ def defined_constraints(self) -> dict[str, Any]:
+ """Fetch a key / value mapping of constraints to values that are not None. Used for core schema updates."""
+ return {field.name: value for field in fields(self) if (value := getattr(self, field.name)) is not None}
+
+ def __get_pydantic_core_schema__(self, source: Any, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
+ schema = handler(source)
+
+ # for function-wrap schemas, url constraints is applied to the inner schema
+ # because when we generate schemas for urls, we wrap a core_schema.url_schema() with a function-wrap schema
+ # that helps with validation on initialization, see _BaseUrl and _BaseMultiHostUrl below.
+ schema_to_mutate = schema['schema'] if schema['type'] == 'function-wrap' else schema
+ if annotated_type := schema_to_mutate['type'] not in ('url', 'multi-host-url'):
+ raise PydanticUserError(
+ f"'UrlConstraints' cannot annotate '{annotated_type}'.", code='invalid-annotated-type'
+ )
+ for constraint_key, constraint_value in self.defined_constraints.items():
+ schema_to_mutate[constraint_key] = constraint_value
+ return schema
+
+
+class _BaseUrl:
+ _constraints: ClassVar[UrlConstraints] = UrlConstraints()
+ _url: _CoreUrl
+
+ def __init__(self, url: str | _CoreUrl | _BaseUrl) -> None:
+ self._url = _build_type_adapter(self.__class__).validate_python(url)._url
+
+ @property
+ def scheme(self) -> str:
+ """The scheme part of the URL.
+
+ e.g. `https` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.scheme
+
+ @property
+ def username(self) -> str | None:
+ """The username part of the URL, or `None`.
+
+ e.g. `user` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.username
+
+ @property
+ def password(self) -> str | None:
+ """The password part of the URL, or `None`.
+
+ e.g. `pass` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.password
+
+ @property
+ def host(self) -> str | None:
+ """The host part of the URL, or `None`.
+
+ If the URL must be punycode encoded, this is the encoded host, e.g if the input URL is `https://£££.com`,
+ `host` will be `xn--9aaa.com`
+ """
+ return self._url.host
+
+ def unicode_host(self) -> str | None:
+ """The host part of the URL as a unicode string, or `None`.
+
+ e.g. `host` in `https://user:pass@host:port/path?query#fragment`
+
+ If the URL must be punycode encoded, this is the decoded host, e.g if the input URL is `https://£££.com`,
+ `unicode_host()` will be `£££.com`
+ """
+ return self._url.unicode_host()
+
+ @property
+ def port(self) -> int | None:
+ """The port part of the URL, or `None`.
+
+ e.g. `port` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.port
+
+ @property
+ def path(self) -> str | None:
+ """The path part of the URL, or `None`.
+
+ e.g. `/path` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.path
+
+ @property
+ def query(self) -> str | None:
+ """The query part of the URL, or `None`.
+
+ e.g. `query` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.query
+
+ def query_params(self) -> list[tuple[str, str]]:
+ """The query part of the URL as a list of key-value pairs.
+
+ e.g. `[('foo', 'bar')]` in `https://user:pass@host:port/path?foo=bar#fragment`
+ """
+ return self._url.query_params()
+
+ @property
+ def fragment(self) -> str | None:
+ """The fragment part of the URL, or `None`.
+
+ e.g. `fragment` in `https://user:pass@host:port/path?query#fragment`
+ """
+ return self._url.fragment
+
+ def unicode_string(self) -> str:
+ """The URL as a unicode string, unlike `__str__()` this will not punycode encode the host.
+
+ If the URL must be punycode encoded, this is the decoded string, e.g if the input URL is `https://£££.com`,
+ `unicode_string()` will be `https://£££.com`
+ """
+ return self._url.unicode_string()
+
+ def __str__(self) -> str:
+ """The URL as a string, this will punycode encode the host if required."""
+ return str(self._url)
+
+ def __repr__(self) -> str:
+ return f'{self.__class__.__name__}({str(self._url)!r})'
+
+ def __deepcopy__(self, memo: dict) -> Self:
+ return self.__class__(self._url)
+
+ def __eq__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url == other._url
+
+ def __lt__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url < other._url
+
+ def __gt__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url > other._url
+
+ def __le__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url <= other._url
+
+ def __ge__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url >= other._url
+
+ def __hash__(self) -> int:
+ return hash(self._url)
+
+ def __len__(self) -> int:
+ return len(str(self._url))
+
+ @classmethod
+ def build(
+ cls,
+ *,
+ scheme: str,
+ username: str | None = None,
+ password: str | None = None,
+ host: str,
+ port: int | None = None,
+ path: str | None = None,
+ query: str | None = None,
+ fragment: str | None = None,
+ ) -> Self:
+ """Build a new `Url` instance from its component parts.
+
+ Args:
+ scheme: The scheme part of the URL.
+ username: The username part of the URL, or omit for no username.
+ password: The password part of the URL, or omit for no password.
+ host: The host part of the URL.
+ port: The port part of the URL, or omit for no port.
+ path: The path part of the URL, or omit for no path.
+ query: The query part of the URL, or omit for no query.
+ fragment: The fragment part of the URL, or omit for no fragment.
+
+ Returns:
+ An instance of URL
+ """
+ return cls(
+ _CoreUrl.build(
+ scheme=scheme,
+ username=username,
+ password=password,
+ host=host,
+ port=port,
+ path=path,
+ query=query,
+ fragment=fragment,
+ )
+ )
+
+ @classmethod
+ def serialize_url(cls, url: Any, info: core_schema.SerializationInfo) -> str | Self:
+ if not isinstance(url, cls):
+ raise PydanticSerializationUnexpectedValue(
+ f"Expected `{cls}` but got `{type(url)}` with value `'{url}'` - serialized value may not be as expected."
+ )
+ if info.mode == 'json':
+ return str(url)
+ return url
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls, source: type[_BaseUrl], handler: GetCoreSchemaHandler
+ ) -> core_schema.CoreSchema:
+ def wrap_val(v, h):
+ if isinstance(v, source):
+ return v
+ if isinstance(v, _BaseUrl):
+ v = str(v)
+ core_url = h(v)
+ instance = source.__new__(source)
+ instance._url = core_url
+ return instance
+
+ return core_schema.no_info_wrap_validator_function(
+ wrap_val,
+ schema=core_schema.url_schema(**cls._constraints.defined_constraints),
+ serialization=core_schema.plain_serializer_function_ser_schema(
+ cls.serialize_url, info_arg=True, when_used='always'
+ ),
+ )
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ # we use the url schema for json schema generation, but we might have to extract it from
+ # the function-wrap schema we use as a tool for validation on initialization
+ inner_schema = core_schema['schema'] if core_schema['type'] == 'function-wrap' else core_schema
+ return handler(inner_schema)
+
+ __pydantic_serializer__ = SchemaSerializer(core_schema.any_schema(serialization=core_schema.to_string_ser_schema()))
+
+
+class _BaseMultiHostUrl:
+ _constraints: ClassVar[UrlConstraints] = UrlConstraints()
+ _url: _CoreMultiHostUrl
+
+ def __init__(self, url: str | _CoreMultiHostUrl | _BaseMultiHostUrl) -> None:
+ self._url = _build_type_adapter(self.__class__).validate_python(url)._url
+
+ @property
+ def scheme(self) -> str:
+ """The scheme part of the URL.
+
+ e.g. `https` in `https://foo.com,bar.com/path?query#fragment`
+ """
+ return self._url.scheme
+
+ @property
+ def path(self) -> str | None:
+ """The path part of the URL, or `None`.
+
+ e.g. `/path` in `https://foo.com,bar.com/path?query#fragment`
+ """
+ return self._url.path
+
+ @property
+ def query(self) -> str | None:
+ """The query part of the URL, or `None`.
+
+ e.g. `query` in `https://foo.com,bar.com/path?query#fragment`
+ """
+ return self._url.query
+
+ def query_params(self) -> list[tuple[str, str]]:
+ """The query part of the URL as a list of key-value pairs.
+
+ e.g. `[('foo', 'bar')]` in `https://foo.com,bar.com/path?query#fragment`
+ """
+ return self._url.query_params()
+
+ @property
+ def fragment(self) -> str | None:
+ """The fragment part of the URL, or `None`.
+
+ e.g. `fragment` in `https://foo.com,bar.com/path?query#fragment`
+ """
+ return self._url.fragment
+
+ def hosts(self) -> list[MultiHostHost]:
+ '''The hosts of the `MultiHostUrl` as [`MultiHostHost`][pydantic_core.MultiHostHost] typed dicts.
+
+ ```python
+ from pydantic_core import MultiHostUrl
+
+ mhu = MultiHostUrl('https://foo.com:123,foo:bar@bar.com/path')
+ print(mhu.hosts())
+ """
+ [
+ {'username': None, 'password': None, 'host': 'foo.com', 'port': 123},
+ {'username': 'foo', 'password': 'bar', 'host': 'bar.com', 'port': 443}
+ ]
+ ```
+ Returns:
+ A list of dicts, each representing a host.
+ '''
+ return self._url.hosts()
+
+ def unicode_string(self) -> str:
+ """The URL as a unicode string, unlike `__str__()` this will not punycode encode the hosts."""
+ return self._url.unicode_string()
+
+ def __str__(self) -> str:
+ """The URL as a string, this will punycode encode the host if required."""
+ return str(self._url)
+
+ def __repr__(self) -> str:
+ return f'{self.__class__.__name__}({str(self._url)!r})'
+
+ def __deepcopy__(self, memo: dict) -> Self:
+ return self.__class__(self._url)
+
+ def __eq__(self, other: Any) -> bool:
+ return self.__class__ is other.__class__ and self._url == other._url
+
+ def __hash__(self) -> int:
+ return hash(self._url)
+
+ def __len__(self) -> int:
+ return len(str(self._url))
+
+ @classmethod
+ def build(
+ cls,
+ *,
+ scheme: str,
+ hosts: list[MultiHostHost] | None = None,
+ username: str | None = None,
+ password: str | None = None,
+ host: str | None = None,
+ port: int | None = None,
+ path: str | None = None,
+ query: str | None = None,
+ fragment: str | None = None,
+ ) -> Self:
+ """Build a new `MultiHostUrl` instance from its component parts.
+
+ This method takes either `hosts` - a list of `MultiHostHost` typed dicts, or the individual components
+ `username`, `password`, `host` and `port`.
+
+ Args:
+ scheme: The scheme part of the URL.
+ hosts: Multiple hosts to build the URL from.
+ username: The username part of the URL.
+ password: The password part of the URL.
+ host: The host part of the URL.
+ port: The port part of the URL.
+ path: The path part of the URL.
+ query: The query part of the URL, or omit for no query.
+ fragment: The fragment part of the URL, or omit for no fragment.
+
+ Returns:
+ An instance of `MultiHostUrl`
+ """
+ return cls(
+ _CoreMultiHostUrl.build(
+ scheme=scheme,
+ hosts=hosts,
+ username=username,
+ password=password,
+ host=host,
+ port=port,
+ path=path,
+ query=query,
+ fragment=fragment,
+ )
+ )
+
+ @classmethod
+ def serialize_url(cls, url: Any, info: core_schema.SerializationInfo) -> str | Self:
+ if not isinstance(url, cls):
+ raise PydanticSerializationUnexpectedValue(
+ f"Expected `{cls}` but got `{type(url)}` with value `'{url}'` - serialized value may not be as expected."
+ )
+ if info.mode == 'json':
+ return str(url)
+ return url
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls, source: type[_BaseMultiHostUrl], handler: GetCoreSchemaHandler
+ ) -> core_schema.CoreSchema:
+ def wrap_val(v, h):
+ if isinstance(v, source):
+ return v
+ if isinstance(v, _BaseMultiHostUrl):
+ v = str(v)
+ core_url = h(v)
+ instance = source.__new__(source)
+ instance._url = core_url
+ return instance
+
+ return core_schema.no_info_wrap_validator_function(
+ wrap_val,
+ schema=core_schema.multi_host_url_schema(**cls._constraints.defined_constraints),
+ serialization=core_schema.plain_serializer_function_ser_schema(
+ cls.serialize_url, info_arg=True, when_used='always'
+ ),
+ )
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ # we use the url schema for json schema generation, but we might have to extract it from
+ # the function-wrap schema we use as a tool for validation on initialization
+ inner_schema = core_schema['schema'] if core_schema['type'] == 'function-wrap' else core_schema
+ return handler(inner_schema)
+
+ __pydantic_serializer__ = SchemaSerializer(core_schema.any_schema(serialization=core_schema.to_string_ser_schema()))
+
+
+@lru_cache
+def _build_type_adapter(cls: type[_BaseUrl | _BaseMultiHostUrl]) -> TypeAdapter:
+ return TypeAdapter(cls)
+
+
+class AnyUrl(_BaseUrl):
+ """Base type for all URLs.
+
+ * Any scheme allowed
+ * Top-level domain (TLD) not required
+ * Host not required
+
+ Assuming an input URL of `http://samuel:pass@example.com:8000/the/path/?query=here#fragment=is;this=bit`,
+ the types export the following properties:
+
+ - `scheme`: the URL scheme (`http`), always set.
+ - `host`: the URL host (`example.com`).
+ - `username`: optional username if included (`samuel`).
+ - `password`: optional password if included (`pass`).
+ - `port`: optional port (`8000`).
+ - `path`: optional path (`/the/path/`).
+ - `query`: optional URL query (for example, `GET` arguments or "search string", such as `query=here`).
+ - `fragment`: optional fragment (`fragment=is;this=bit`).
+ """
+
+
+# Note: all single host urls inherit from `AnyUrl` to preserve compatibility with pre-v2.10 code
+# Where urls were annotated variants of `AnyUrl`, which was an alias to `pydantic_core.Url`
+
+
+class AnyHttpUrl(AnyUrl):
+ """A type that will accept any http or https URL.
+
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['http', 'https'])
+
+
+class HttpUrl(AnyUrl):
+ """A type that will accept any http or https URL.
+
+ * TLD not required
+ * Host not required
+ * Max length 2083
+
+ ```python
+ from pydantic import BaseModel, HttpUrl, ValidationError
+
+ class MyModel(BaseModel):
+ url: HttpUrl
+
+ m = MyModel(url='http://www.example.com') # (1)!
+ print(m.url)
+ #> http://www.example.com/
+
+ try:
+ MyModel(url='ftp://invalid.url')
+ except ValidationError as e:
+ print(e)
+ '''
+ 1 validation error for MyModel
+ url
+ URL scheme should be 'http' or 'https' [type=url_scheme, input_value='ftp://invalid.url', input_type=str]
+ '''
+
+ try:
+ MyModel(url='not a url')
+ except ValidationError as e:
+ print(e)
+ '''
+ 1 validation error for MyModel
+ url
+ Input should be a valid URL, relative URL without a base [type=url_parsing, input_value='not a url', input_type=str]
+ '''
+ ```
+
+ 1. Note: mypy would prefer `m = MyModel(url=HttpUrl('http://www.example.com'))`, but Pydantic will convert the string to an HttpUrl instance anyway.
+
+ "International domains" (e.g. a URL where the host or TLD includes non-ascii characters) will be encoded via
+ [punycode](https://en.wikipedia.org/wiki/Punycode) (see
+ [this article](https://www.xudongz.com/blog/2017/idn-phishing/) for a good description of why this is important):
+
+ ```python
+ from pydantic import BaseModel, HttpUrl
+
+ class MyModel(BaseModel):
+ url: HttpUrl
+
+ m1 = MyModel(url='http://puny£code.com')
+ print(m1.url)
+ #> http://xn--punycode-eja.com/
+ m2 = MyModel(url='https://www.аррӏе.com/')
+ print(m2.url)
+ #> https://www.xn--80ak6aa92e.com/
+ m3 = MyModel(url='https://www.example.珠宝/')
+ print(m3.url)
+ #> https://www.example.xn--pbt977c/
+ ```
+
+
+ !!! warning "Underscores in Hostnames"
+ In Pydantic, underscores are allowed in all parts of a domain except the TLD.
+ Technically this might be wrong - in theory the hostname cannot have underscores, but subdomains can.
+
+ To explain this; consider the following two cases:
+
+ - `exam_ple.co.uk`: the hostname is `exam_ple`, which should not be allowed since it contains an underscore.
+ - `foo_bar.example.com` the hostname is `example`, which should be allowed since the underscore is in the subdomain.
+
+ Without having an exhaustive list of TLDs, it would be impossible to differentiate between these two. Therefore
+ underscores are allowed, but you can always do further validation in a validator if desired.
+
+ Also, Chrome, Firefox, and Safari all currently accept `http://exam_ple.com` as a URL, so we're in good
+ (or at least big) company.
+ """
+
+ _constraints = UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'])
+
+
+class AnyWebsocketUrl(AnyUrl):
+ """A type that will accept any ws or wss URL.
+
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['ws', 'wss'])
+
+
+class WebsocketUrl(AnyUrl):
+ """A type that will accept any ws or wss URL.
+
+ * TLD not required
+ * Host not required
+ * Max length 2083
+ """
+
+ _constraints = UrlConstraints(max_length=2083, allowed_schemes=['ws', 'wss'])
+
+
+class FileUrl(AnyUrl):
+ """A type that will accept any file URL.
+
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['file'])
+
+
+class FtpUrl(AnyUrl):
+ """A type that will accept ftp URL.
+
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['ftp'])
+
+
+class PostgresDsn(_BaseMultiHostUrl):
+ """A type that will accept any Postgres DSN.
+
+ * User info required
+ * TLD not required
+ * Host required
+ * Supports multiple hosts
+
+ If further validation is required, these properties can be used by validators to enforce specific behaviour:
+
+ ```python
+ from pydantic import (
+ BaseModel,
+ HttpUrl,
+ PostgresDsn,
+ ValidationError,
+ field_validator,
+ )
+
+ class MyModel(BaseModel):
+ url: HttpUrl
+
+ m = MyModel(url='http://www.example.com')
+
+ # the repr() method for a url will display all properties of the url
+ print(repr(m.url))
+ #> HttpUrl('http://www.example.com/')
+ print(m.url.scheme)
+ #> http
+ print(m.url.host)
+ #> www.example.com
+ print(m.url.port)
+ #> 80
+
+ class MyDatabaseModel(BaseModel):
+ db: PostgresDsn
+
+ @field_validator('db')
+ def check_db_name(cls, v):
+ assert v.path and len(v.path) > 1, 'database must be provided'
+ return v
+
+ m = MyDatabaseModel(db='postgres://user:pass@localhost:5432/foobar')
+ print(m.db)
+ #> postgres://user:pass@localhost:5432/foobar
+
+ try:
+ MyDatabaseModel(db='postgres://user:pass@localhost:5432')
+ except ValidationError as e:
+ print(e)
+ '''
+ 1 validation error for MyDatabaseModel
+ db
+ Assertion failed, database must be provided
+ assert (None)
+ + where None = PostgresDsn('postgres://user:pass@localhost:5432').path [type=assertion_error, input_value='postgres://user:pass@localhost:5432', input_type=str]
+ '''
+ ```
+ """
+
+ _constraints = UrlConstraints(
+ host_required=True,
+ allowed_schemes=[
+ 'postgres',
+ 'postgresql',
+ 'postgresql+asyncpg',
+ 'postgresql+pg8000',
+ 'postgresql+psycopg',
+ 'postgresql+psycopg2',
+ 'postgresql+psycopg2cffi',
+ 'postgresql+py-postgresql',
+ 'postgresql+pygresql',
+ ],
+ )
+
+ @property
+ def host(self) -> str:
+ """The required URL host."""
+ return self._url.host # pyright: ignore[reportAttributeAccessIssue]
+
+
+class CockroachDsn(AnyUrl):
+ """A type that will accept any Cockroach DSN.
+
+ * User info required
+ * TLD not required
+ * Host required
+ """
+
+ _constraints = UrlConstraints(
+ host_required=True,
+ allowed_schemes=[
+ 'cockroachdb',
+ 'cockroachdb+psycopg2',
+ 'cockroachdb+asyncpg',
+ ],
+ )
+
+ @property
+ def host(self) -> str:
+ """The required URL host."""
+ return self._url.host # pyright: ignore[reportReturnType]
+
+
+class AmqpDsn(AnyUrl):
+ """A type that will accept any AMQP DSN.
+
+ * User info required
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['amqp', 'amqps'])
+
+
+class RedisDsn(AnyUrl):
+ """A type that will accept any Redis DSN.
+
+ * User info required
+ * TLD not required
+ * Host required (e.g., `rediss://:pass@localhost`)
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=['redis', 'rediss'],
+ default_host='localhost',
+ default_port=6379,
+ default_path='/0',
+ host_required=True,
+ )
+
+ @property
+ def host(self) -> str:
+ """The required URL host."""
+ return self._url.host # pyright: ignore[reportReturnType]
+
+
+class MongoDsn(_BaseMultiHostUrl):
+ """A type that will accept any MongoDB DSN.
+
+ * User info not required
+ * Database name not required
+ * Port not required
+ * User info may be passed without user part (e.g., `mongodb://mongodb0.example.com:27017`).
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['mongodb', 'mongodb+srv'], default_port=27017)
+
+
+class KafkaDsn(AnyUrl):
+ """A type that will accept any Kafka DSN.
+
+ * User info required
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(allowed_schemes=['kafka'], default_host='localhost', default_port=9092)
+
+
+class NatsDsn(_BaseMultiHostUrl):
+ """A type that will accept any NATS DSN.
+
+ NATS is a connective technology built for the ever increasingly hyper-connected world.
+ It is a single technology that enables applications to securely communicate across
+ any combination of cloud vendors, on-premise, edge, web and mobile, and devices.
+ More: https://nats.io
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=['nats', 'tls', 'ws', 'wss'], default_host='localhost', default_port=4222
+ )
+
+
+class MySQLDsn(AnyUrl):
+ """A type that will accept any MySQL DSN.
+
+ * User info required
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=[
+ 'mysql',
+ 'mysql+mysqlconnector',
+ 'mysql+aiomysql',
+ 'mysql+asyncmy',
+ 'mysql+mysqldb',
+ 'mysql+pymysql',
+ 'mysql+cymysql',
+ 'mysql+pyodbc',
+ ],
+ default_port=3306,
+ host_required=True,
+ )
+
+
+class MariaDBDsn(AnyUrl):
+ """A type that will accept any MariaDB DSN.
+
+ * User info required
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=['mariadb', 'mariadb+mariadbconnector', 'mariadb+pymysql'],
+ default_port=3306,
+ )
+
+
+class ClickHouseDsn(AnyUrl):
+ """A type that will accept any ClickHouse DSN.
+
+ * User info required
+ * TLD not required
+ * Host not required
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=['clickhouse+native', 'clickhouse+asynch'],
+ default_host='localhost',
+ default_port=9000,
+ )
+
+
+class SnowflakeDsn(AnyUrl):
+ """A type that will accept any Snowflake DSN.
+
+ * User info required
+ * TLD not required
+ * Host required
+ """
+
+ _constraints = UrlConstraints(
+ allowed_schemes=['snowflake'],
+ host_required=True,
+ )
+
+ @property
+ def host(self) -> str:
+ """The required URL host."""
+ return self._url.host # pyright: ignore[reportReturnType]
+
+
+def import_email_validator() -> None:
+ global email_validator
+ try:
+ import email_validator
+ except ImportError as e:
+ raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e
+ if not version('email-validator').partition('.')[0] == '2':
+ raise ImportError('email-validator version >= 2.0 required, run pip install -U email-validator')
+
+
+if TYPE_CHECKING:
+ EmailStr = Annotated[str, ...]
+else:
+
+ class EmailStr:
+ """
+ Info:
+ To use this type, you need to install the optional
+ [`email-validator`](https://github.com/JoshData/python-email-validator) package:
+
+ ```bash
+ pip install email-validator
+ ```
+
+ Validate email addresses.
+
+ ```python
+ from pydantic import BaseModel, EmailStr
+
+ class Model(BaseModel):
+ email: EmailStr
+
+ print(Model(email='contact@mail.com'))
+ #> email='contact@mail.com'
+ ```
+ """ # noqa: D212
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source: type[Any],
+ _handler: GetCoreSchemaHandler,
+ ) -> core_schema.CoreSchema:
+ import_email_validator()
+ return core_schema.no_info_after_validator_function(cls._validate, core_schema.str_schema())
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ field_schema = handler(core_schema)
+ field_schema.update(type='string', format='email')
+ return field_schema
+
+ @classmethod
+ def _validate(cls, input_value: str, /) -> str:
+ return validate_email(input_value)[1]
+
+
+class NameEmail(_repr.Representation):
+ """
+ Info:
+ To use this type, you need to install the optional
+ [`email-validator`](https://github.com/JoshData/python-email-validator) package:
+
+ ```bash
+ pip install email-validator
+ ```
+
+ Validate a name and email address combination, as specified by
+ [RFC 5322](https://datatracker.ietf.org/doc/html/rfc5322#section-3.4).
+
+ The `NameEmail` has two properties: `name` and `email`.
+ In case the `name` is not provided, it's inferred from the email address.
+
+ ```python
+ from pydantic import BaseModel, NameEmail
+
+ class User(BaseModel):
+ email: NameEmail
+
+ user = User(email='Fred Bloggs <fred.bloggs@example.com>')
+ print(user.email)
+ #> Fred Bloggs <fred.bloggs@example.com>
+ print(user.email.name)
+ #> Fred Bloggs
+
+ user = User(email='fred.bloggs@example.com')
+ print(user.email)
+ #> fred.bloggs <fred.bloggs@example.com>
+ print(user.email.name)
+ #> fred.bloggs
+ ```
+ """ # noqa: D212
+
+ __slots__ = 'name', 'email'
+
+ def __init__(self, name: str, email: str):
+ self.name = name
+ self.email = email
+
+ def __eq__(self, other: Any) -> bool:
+ return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email)
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ field_schema = handler(core_schema)
+ field_schema.update(type='string', format='name-email')
+ return field_schema
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source: type[Any],
+ _handler: GetCoreSchemaHandler,
+ ) -> core_schema.CoreSchema:
+ import_email_validator()
+
+ return core_schema.no_info_after_validator_function(
+ cls._validate,
+ core_schema.json_or_python_schema(
+ json_schema=core_schema.str_schema(),
+ python_schema=core_schema.union_schema(
+ [core_schema.is_instance_schema(cls), core_schema.str_schema()],
+ custom_error_type='name_email_type',
+ custom_error_message='Input is not a valid NameEmail',
+ ),
+ serialization=core_schema.to_string_ser_schema(),
+ ),
+ )
+
+ @classmethod
+ def _validate(cls, input_value: Self | str, /) -> Self:
+ if isinstance(input_value, str):
+ name, email = validate_email(input_value)
+ return cls(name, email)
+ else:
+ return input_value
+
+ def __str__(self) -> str:
+ if '@' in self.name:
+ return f'"{self.name}" <{self.email}>'
+
+ return f'{self.name} <{self.email}>'
+
+
+IPvAnyAddressType: TypeAlias = 'IPv4Address | IPv6Address'
+IPvAnyInterfaceType: TypeAlias = 'IPv4Interface | IPv6Interface'
+IPvAnyNetworkType: TypeAlias = 'IPv4Network | IPv6Network'
+
+if TYPE_CHECKING:
+ IPvAnyAddress = IPvAnyAddressType
+ IPvAnyInterface = IPvAnyInterfaceType
+ IPvAnyNetwork = IPvAnyNetworkType
+else:
+
+ class IPvAnyAddress:
+ """Validate an IPv4 or IPv6 address.
+
+ ```python
+ from pydantic import BaseModel
+ from pydantic.networks import IPvAnyAddress
+
+ class IpModel(BaseModel):
+ ip: IPvAnyAddress
+
+ print(IpModel(ip='127.0.0.1'))
+ #> ip=IPv4Address('127.0.0.1')
+
+ try:
+ IpModel(ip='http://www.example.com')
+ except ValueError as e:
+ print(e.errors())
+ '''
+ [
+ {
+ 'type': 'ip_any_address',
+ 'loc': ('ip',),
+ 'msg': 'value is not a valid IPv4 or IPv6 address',
+ 'input': 'http://www.example.com',
+ }
+ ]
+ '''
+ ```
+ """
+
+ __slots__ = ()
+
+ def __new__(cls, value: Any) -> IPvAnyAddressType:
+ """Validate an IPv4 or IPv6 address."""
+ try:
+ return IPv4Address(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Address(value)
+ except ValueError:
+ raise PydanticCustomError('ip_any_address', 'value is not a valid IPv4 or IPv6 address')
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ field_schema = {}
+ field_schema.update(type='string', format='ipvanyaddress')
+ return field_schema
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source: type[Any],
+ _handler: GetCoreSchemaHandler,
+ ) -> core_schema.CoreSchema:
+ return core_schema.no_info_plain_validator_function(
+ cls._validate, serialization=core_schema.to_string_ser_schema()
+ )
+
+ @classmethod
+ def _validate(cls, input_value: Any, /) -> IPvAnyAddressType:
+ return cls(input_value) # type: ignore[return-value]
+
+ class IPvAnyInterface:
+ """Validate an IPv4 or IPv6 interface."""
+
+ __slots__ = ()
+
+ def __new__(cls, value: NetworkType) -> IPvAnyInterfaceType:
+ """Validate an IPv4 or IPv6 interface."""
+ try:
+ return IPv4Interface(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Interface(value)
+ except ValueError:
+ raise PydanticCustomError('ip_any_interface', 'value is not a valid IPv4 or IPv6 interface')
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ field_schema = {}
+ field_schema.update(type='string', format='ipvanyinterface')
+ return field_schema
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source: type[Any],
+ _handler: GetCoreSchemaHandler,
+ ) -> core_schema.CoreSchema:
+ return core_schema.no_info_plain_validator_function(
+ cls._validate, serialization=core_schema.to_string_ser_schema()
+ )
+
+ @classmethod
+ def _validate(cls, input_value: NetworkType, /) -> IPvAnyInterfaceType:
+ return cls(input_value) # type: ignore[return-value]
+
+ class IPvAnyNetwork:
+ """Validate an IPv4 or IPv6 network."""
+
+ __slots__ = ()
+
+ def __new__(cls, value: NetworkType) -> IPvAnyNetworkType:
+ """Validate an IPv4 or IPv6 network."""
+ # Assume IP Network is defined with a default value for `strict` argument.
+ # Define your own class if you want to specify network address check strictness.
+ try:
+ return IPv4Network(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Network(value)
+ except ValueError:
+ raise PydanticCustomError('ip_any_network', 'value is not a valid IPv4 or IPv6 network')
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
+ ) -> JsonSchemaValue:
+ field_schema = {}
+ field_schema.update(type='string', format='ipvanynetwork')
+ return field_schema
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source: type[Any],
+ _handler: GetCoreSchemaHandler,
+ ) -> core_schema.CoreSchema:
+ return core_schema.no_info_plain_validator_function(
+ cls._validate, serialization=core_schema.to_string_ser_schema()
+ )
+
+ @classmethod
+ def _validate(cls, input_value: NetworkType, /) -> IPvAnyNetworkType:
+ return cls(input_value) # type: ignore[return-value]
+
+
+def _build_pretty_email_regex() -> re.Pattern[str]:
+ name_chars = r'[\w!#$%&\'*+\-/=?^_`{|}~]'
+ unquoted_name_group = rf'((?:{name_chars}+\s+)*{name_chars}+)'
+ quoted_name_group = r'"((?:[^"]|\")+)"'
+ email_group = r'<(.+)>'
+ return re.compile(rf'\s*(?:{unquoted_name_group}|{quoted_name_group})?\s*{email_group}\s*')
+
+
+pretty_email_regex = _build_pretty_email_regex()
+
+MAX_EMAIL_LENGTH = 2048
+"""Maximum length for an email.
+A somewhat arbitrary but very generous number compared to what is allowed by most implementations.
+"""
+
+
+def validate_email(value: str) -> tuple[str, str]:
+ """Email address validation using [email-validator](https://pypi.org/project/email-validator/).
+
+ Returns:
+ A tuple containing the local part of the email (or the name for "pretty" email addresses)
+ and the normalized email.
+
+ Raises:
+ PydanticCustomError: If the email is invalid.
+
+ Note:
+ Note that:
+
+ * Raw IP address (literal) domain parts are not allowed.
+ * `"John Doe <local_part@domain.com>"` style "pretty" email addresses are processed.
+ * Spaces are striped from the beginning and end of addresses, but no error is raised.
+ """
+ if email_validator is None:
+ import_email_validator()
+
+ if len(value) > MAX_EMAIL_LENGTH:
+ raise PydanticCustomError(
+ 'value_error',
+ 'value is not a valid email address: {reason}',
+ {'reason': f'Length must not exceed {MAX_EMAIL_LENGTH} characters'},
+ )
+
+ m = pretty_email_regex.fullmatch(value)
+ name: str | None = None
+ if m:
+ unquoted_name, quoted_name, value = m.groups()
+ name = unquoted_name or quoted_name
+
+ email = value.strip()
+
+ try:
+ parts = email_validator.validate_email(email, check_deliverability=False)
+ except email_validator.EmailNotValidError as e:
+ raise PydanticCustomError(
+ 'value_error', 'value is not a valid email address: {reason}', {'reason': str(e.args[0])}
+ ) from e
+
+ email = parts.normalized
+ assert email is not None
+ name = name or parts.local_part
+ return name, email
+
+
+__getattr__ = getattr_migration(__name__)