aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pydantic/v1/networks.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pydantic/v1/networks.py747
1 files changed, 747 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py b/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py
new file mode 100644
index 00000000..ba07b748
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py
@@ -0,0 +1,747 @@
+import re
+from ipaddress import (
+ IPv4Address,
+ IPv4Interface,
+ IPv4Network,
+ IPv6Address,
+ IPv6Interface,
+ IPv6Network,
+ _BaseAddress,
+ _BaseNetwork,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Collection,
+ Dict,
+ Generator,
+ List,
+ Match,
+ Optional,
+ Pattern,
+ Set,
+ Tuple,
+ Type,
+ Union,
+ cast,
+ no_type_check,
+)
+
+from pydantic.v1 import errors
+from pydantic.v1.utils import Representation, update_not_none
+from pydantic.v1.validators import constr_length_validator, str_validator
+
+if TYPE_CHECKING:
+ import email_validator
+ from typing_extensions import TypedDict
+
+ from pydantic.v1.config import BaseConfig
+ from pydantic.v1.fields import ModelField
+ from pydantic.v1.typing import AnyCallable
+
+ CallableGenerator = Generator[AnyCallable, None, None]
+
+ class Parts(TypedDict, total=False):
+ scheme: str
+ user: Optional[str]
+ password: Optional[str]
+ ipv4: Optional[str]
+ ipv6: Optional[str]
+ domain: Optional[str]
+ port: Optional[str]
+ path: Optional[str]
+ query: Optional[str]
+ fragment: Optional[str]
+
+ class HostParts(TypedDict, total=False):
+ host: str
+ tld: Optional[str]
+ host_type: Optional[str]
+ port: Optional[str]
+ rebuild: bool
+
+else:
+ email_validator = None
+
+ class Parts(dict):
+ pass
+
+
+NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]]
+
+__all__ = [
+ 'AnyUrl',
+ 'AnyHttpUrl',
+ 'FileUrl',
+ 'HttpUrl',
+ 'stricturl',
+ 'EmailStr',
+ 'NameEmail',
+ 'IPvAnyAddress',
+ 'IPvAnyInterface',
+ 'IPvAnyNetwork',
+ 'PostgresDsn',
+ 'CockroachDsn',
+ 'AmqpDsn',
+ 'RedisDsn',
+ 'MongoDsn',
+ 'KafkaDsn',
+ 'validate_email',
+]
+
+_url_regex_cache = None
+_multi_host_url_regex_cache = None
+_ascii_domain_regex_cache = None
+_int_domain_regex_cache = None
+_host_regex_cache = None
+
+_host_regex = (
+ r'(?:'
+ r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4
+ r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6
+ r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later
+ r')?'
+ r'(?::(?P<port>\d+))?' # port
+)
+_scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A
+_user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?'
+_path_regex = r'(?P<path>/[^\s?#]*)?'
+_query_regex = r'(?:\?(?P<query>[^\s#]*))?'
+_fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?'
+
+
+def url_regex() -> Pattern[str]:
+ global _url_regex_cache
+ if _url_regex_cache is None:
+ _url_regex_cache = re.compile(
+ rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}',
+ re.IGNORECASE,
+ )
+ return _url_regex_cache
+
+
+def multi_host_url_regex() -> Pattern[str]:
+ """
+ Compiled multi host url regex.
+
+ Additionally to `url_regex` it allows to match multiple hosts.
+ E.g. host1.db.net,host2.db.net
+ """
+ global _multi_host_url_regex_cache
+ if _multi_host_url_regex_cache is None:
+ _multi_host_url_regex_cache = re.compile(
+ rf'{_scheme_regex}{_user_info_regex}'
+ r'(?P<hosts>([^/]*))' # validation occurs later
+ rf'{_path_regex}{_query_regex}{_fragment_regex}',
+ re.IGNORECASE,
+ )
+ return _multi_host_url_regex_cache
+
+
+def ascii_domain_regex() -> Pattern[str]:
+ global _ascii_domain_regex_cache
+ if _ascii_domain_regex_cache is None:
+ ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?'
+ ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?'
+ _ascii_domain_regex_cache = re.compile(
+ fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE
+ )
+ return _ascii_domain_regex_cache
+
+
+def int_domain_regex() -> Pattern[str]:
+ global _int_domain_regex_cache
+ if _int_domain_regex_cache is None:
+ int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?'
+ int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?'
+ _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE)
+ return _int_domain_regex_cache
+
+
+def host_regex() -> Pattern[str]:
+ global _host_regex_cache
+ if _host_regex_cache is None:
+ _host_regex_cache = re.compile(
+ _host_regex,
+ re.IGNORECASE,
+ )
+ return _host_regex_cache
+
+
+class AnyUrl(str):
+ strip_whitespace = True
+ min_length = 1
+ max_length = 2**16
+ allowed_schemes: Optional[Collection[str]] = None
+ tld_required: bool = False
+ user_required: bool = False
+ host_required: bool = True
+ hidden_parts: Set[str] = set()
+
+ __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment')
+
+ @no_type_check
+ def __new__(cls, url: Optional[str], **kwargs) -> object:
+ return str.__new__(cls, cls.build(**kwargs) if url is None else url)
+
+ def __init__(
+ self,
+ url: str,
+ *,
+ scheme: str,
+ user: Optional[str] = None,
+ password: Optional[str] = None,
+ host: Optional[str] = None,
+ tld: Optional[str] = None,
+ host_type: str = 'domain',
+ port: Optional[str] = None,
+ path: Optional[str] = None,
+ query: Optional[str] = None,
+ fragment: Optional[str] = None,
+ ) -> None:
+ str.__init__(url)
+ self.scheme = scheme
+ self.user = user
+ self.password = password
+ self.host = host
+ self.tld = tld
+ self.host_type = host_type
+ self.port = port
+ self.path = path
+ self.query = query
+ self.fragment = fragment
+
+ @classmethod
+ def build(
+ cls,
+ *,
+ scheme: str,
+ user: Optional[str] = None,
+ password: Optional[str] = None,
+ host: str,
+ port: Optional[str] = None,
+ path: Optional[str] = None,
+ query: Optional[str] = None,
+ fragment: Optional[str] = None,
+ **_kwargs: str,
+ ) -> str:
+ parts = Parts(
+ scheme=scheme,
+ user=user,
+ password=password,
+ host=host,
+ port=port,
+ path=path,
+ query=query,
+ fragment=fragment,
+ **_kwargs, # type: ignore[misc]
+ )
+
+ url = scheme + '://'
+ if user:
+ url += user
+ if password:
+ url += ':' + password
+ if user or password:
+ url += '@'
+ url += host
+ if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port):
+ url += ':' + port
+ if path:
+ url += path
+ if query:
+ url += '?' + query
+ if fragment:
+ url += '#' + fragment
+ return url
+
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: Any, field: 'ModelField', config: 'BaseConfig') -> 'AnyUrl':
+ if value.__class__ == cls:
+ return value
+ value = str_validator(value)
+ if cls.strip_whitespace:
+ value = value.strip()
+ url: str = cast(str, constr_length_validator(value, field, config))
+
+ m = cls._match_url(url)
+ # the regex should always match, if it doesn't please report with details of the URL tried
+ assert m, 'URL regex failed unexpectedly'
+
+ original_parts = cast('Parts', m.groupdict())
+ parts = cls.apply_default_parts(original_parts)
+ parts = cls.validate_parts(parts)
+
+ if m.end() != len(url):
+ raise errors.UrlExtraError(extra=url[m.end() :])
+
+ return cls._build_url(m, url, parts)
+
+ @classmethod
+ def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl':
+ """
+ Validate hosts and build the AnyUrl object. Split from `validate` so this method
+ can be altered in `MultiHostDsn`.
+ """
+ host, tld, host_type, rebuild = cls.validate_host(parts)
+
+ return cls(
+ None if rebuild else url,
+ scheme=parts['scheme'],
+ user=parts['user'],
+ password=parts['password'],
+ host=host,
+ tld=tld,
+ host_type=host_type,
+ port=parts['port'],
+ path=parts['path'],
+ query=parts['query'],
+ fragment=parts['fragment'],
+ )
+
+ @staticmethod
+ def _match_url(url: str) -> Optional[Match[str]]:
+ return url_regex().match(url)
+
+ @staticmethod
+ def _validate_port(port: Optional[str]) -> None:
+ if port is not None and int(port) > 65_535:
+ raise errors.UrlPortError()
+
+ @classmethod
+ def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
+ """
+ A method used to validate parts of a URL.
+ Could be overridden to set default values for parts if missing
+ """
+ scheme = parts['scheme']
+ if scheme is None:
+ raise errors.UrlSchemeError()
+
+ if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes:
+ raise errors.UrlSchemePermittedError(set(cls.allowed_schemes))
+
+ if validate_port:
+ cls._validate_port(parts['port'])
+
+ user = parts['user']
+ if cls.user_required and user is None:
+ raise errors.UrlUserInfoError()
+
+ return parts
+
+ @classmethod
+ def validate_host(cls, parts: 'Parts') -> Tuple[str, Optional[str], str, bool]:
+ tld, host_type, rebuild = None, None, False
+ for f in ('domain', 'ipv4', 'ipv6'):
+ host = parts[f] # type: ignore[literal-required]
+ if host:
+ host_type = f
+ break
+
+ if host is None:
+ if cls.host_required:
+ raise errors.UrlHostError()
+ elif host_type == 'domain':
+ is_international = False
+ d = ascii_domain_regex().fullmatch(host)
+ if d is None:
+ d = int_domain_regex().fullmatch(host)
+ if d is None:
+ raise errors.UrlHostError()
+ is_international = True
+
+ tld = d.group('tld')
+ if tld is None and not is_international:
+ d = int_domain_regex().fullmatch(host)
+ assert d is not None
+ tld = d.group('tld')
+ is_international = True
+
+ if tld is not None:
+ tld = tld[1:]
+ elif cls.tld_required:
+ raise errors.UrlHostTldError()
+
+ if is_international:
+ host_type = 'int_domain'
+ rebuild = True
+ host = host.encode('idna').decode('ascii')
+ if tld is not None:
+ tld = tld.encode('idna').decode('ascii')
+
+ return host, tld, host_type, rebuild # type: ignore
+
+ @staticmethod
+ def get_default_parts(parts: 'Parts') -> 'Parts':
+ return {}
+
+ @classmethod
+ def apply_default_parts(cls, parts: 'Parts') -> 'Parts':
+ for key, value in cls.get_default_parts(parts).items():
+ if not parts[key]: # type: ignore[literal-required]
+ parts[key] = value # type: ignore[literal-required]
+ return parts
+
+ def __repr__(self) -> str:
+ extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None)
+ return f'{self.__class__.__name__}({super().__repr__()}, {extra})'
+
+
+class AnyHttpUrl(AnyUrl):
+ allowed_schemes = {'http', 'https'}
+
+ __slots__ = ()
+
+
+class HttpUrl(AnyHttpUrl):
+ tld_required = True
+ # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers
+ max_length = 2083
+ hidden_parts = {'port'}
+
+ @staticmethod
+ def get_default_parts(parts: 'Parts') -> 'Parts':
+ return {'port': '80' if parts['scheme'] == 'http' else '443'}
+
+
+class FileUrl(AnyUrl):
+ allowed_schemes = {'file'}
+ host_required = False
+
+ __slots__ = ()
+
+
+class MultiHostDsn(AnyUrl):
+ __slots__ = AnyUrl.__slots__ + ('hosts',)
+
+ def __init__(self, *args: Any, hosts: Optional[List['HostParts']] = None, **kwargs: Any):
+ super().__init__(*args, **kwargs)
+ self.hosts = hosts
+
+ @staticmethod
+ def _match_url(url: str) -> Optional[Match[str]]:
+ return multi_host_url_regex().match(url)
+
+ @classmethod
+ def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
+ return super().validate_parts(parts, validate_port=False)
+
+ @classmethod
+ def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn':
+ hosts_parts: List['HostParts'] = []
+ host_re = host_regex()
+ for host in m.groupdict()['hosts'].split(','):
+ d: Parts = host_re.match(host).groupdict() # type: ignore
+ host, tld, host_type, rebuild = cls.validate_host(d)
+ port = d.get('port')
+ cls._validate_port(port)
+ hosts_parts.append(
+ {
+ 'host': host,
+ 'host_type': host_type,
+ 'tld': tld,
+ 'rebuild': rebuild,
+ 'port': port,
+ }
+ )
+
+ if len(hosts_parts) > 1:
+ return cls(
+ None if any([hp['rebuild'] for hp in hosts_parts]) else url,
+ scheme=parts['scheme'],
+ user=parts['user'],
+ password=parts['password'],
+ path=parts['path'],
+ query=parts['query'],
+ fragment=parts['fragment'],
+ host_type=None,
+ hosts=hosts_parts,
+ )
+ else:
+ # backwards compatibility with single host
+ host_part = hosts_parts[0]
+ return cls(
+ None if host_part['rebuild'] else url,
+ scheme=parts['scheme'],
+ user=parts['user'],
+ password=parts['password'],
+ host=host_part['host'],
+ tld=host_part['tld'],
+ host_type=host_part['host_type'],
+ port=host_part.get('port'),
+ path=parts['path'],
+ query=parts['query'],
+ fragment=parts['fragment'],
+ )
+
+
+class PostgresDsn(MultiHostDsn):
+ allowed_schemes = {
+ 'postgres',
+ 'postgresql',
+ 'postgresql+asyncpg',
+ 'postgresql+pg8000',
+ 'postgresql+psycopg',
+ 'postgresql+psycopg2',
+ 'postgresql+psycopg2cffi',
+ 'postgresql+py-postgresql',
+ 'postgresql+pygresql',
+ }
+ user_required = True
+
+ __slots__ = ()
+
+
+class CockroachDsn(AnyUrl):
+ allowed_schemes = {
+ 'cockroachdb',
+ 'cockroachdb+psycopg2',
+ 'cockroachdb+asyncpg',
+ }
+ user_required = True
+
+
+class AmqpDsn(AnyUrl):
+ allowed_schemes = {'amqp', 'amqps'}
+ host_required = False
+
+
+class RedisDsn(AnyUrl):
+ __slots__ = ()
+ allowed_schemes = {'redis', 'rediss'}
+ host_required = False
+
+ @staticmethod
+ def get_default_parts(parts: 'Parts') -> 'Parts':
+ return {
+ 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '',
+ 'port': '6379',
+ 'path': '/0',
+ }
+
+
+class MongoDsn(AnyUrl):
+ allowed_schemes = {'mongodb'}
+
+ # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes
+ @staticmethod
+ def get_default_parts(parts: 'Parts') -> 'Parts':
+ return {
+ 'port': '27017',
+ }
+
+
+class KafkaDsn(AnyUrl):
+ allowed_schemes = {'kafka'}
+
+ @staticmethod
+ def get_default_parts(parts: 'Parts') -> 'Parts':
+ return {
+ 'domain': 'localhost',
+ 'port': '9092',
+ }
+
+
+def stricturl(
+ *,
+ strip_whitespace: bool = True,
+ min_length: int = 1,
+ max_length: int = 2**16,
+ tld_required: bool = True,
+ host_required: bool = True,
+ allowed_schemes: Optional[Collection[str]] = None,
+) -> Type[AnyUrl]:
+ # use kwargs then define conf in a dict to aid with IDE type hinting
+ namespace = dict(
+ strip_whitespace=strip_whitespace,
+ min_length=min_length,
+ max_length=max_length,
+ tld_required=tld_required,
+ host_required=host_required,
+ allowed_schemes=allowed_schemes,
+ )
+ return type('UrlValue', (AnyUrl,), namespace)
+
+
+def import_email_validator() -> None:
+ global email_validator
+ try:
+ import email_validator
+ except ImportError as e:
+ raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e
+
+
+class EmailStr(str):
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ field_schema.update(type='string', format='email')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ # included here and below so the error happens straight away
+ import_email_validator()
+
+ yield str_validator
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: Union[str]) -> str:
+ return validate_email(value)[1]
+
+
+class NameEmail(Representation):
+ __slots__ = 'name', 'email'
+
+ def __init__(self, name: str, email: str):
+ self.name = name
+ self.email = email
+
+ def __eq__(self, other: Any) -> bool:
+ return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email)
+
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ field_schema.update(type='string', format='name-email')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ import_email_validator()
+
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: Any) -> 'NameEmail':
+ if value.__class__ == cls:
+ return value
+ value = str_validator(value)
+ return cls(*validate_email(value))
+
+ def __str__(self) -> str:
+ return f'{self.name} <{self.email}>'
+
+
+class IPvAnyAddress(_BaseAddress):
+ __slots__ = ()
+
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ field_schema.update(type='string', format='ipvanyaddress')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]:
+ try:
+ return IPv4Address(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Address(value)
+ except ValueError:
+ raise errors.IPvAnyAddressError()
+
+
+class IPvAnyInterface(_BaseAddress):
+ __slots__ = ()
+
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ field_schema.update(type='string', format='ipvanyinterface')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]:
+ try:
+ return IPv4Interface(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Interface(value)
+ except ValueError:
+ raise errors.IPvAnyInterfaceError()
+
+
+class IPvAnyNetwork(_BaseNetwork): # type: ignore
+ @classmethod
+ def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
+ field_schema.update(type='string', format='ipvanynetwork')
+
+ @classmethod
+ def __get_validators__(cls) -> 'CallableGenerator':
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]:
+ # Assume IP Network is defined with a default value for ``strict`` argument.
+ # Define your own class if you want to specify network address check strictness.
+ try:
+ return IPv4Network(value)
+ except ValueError:
+ pass
+
+ try:
+ return IPv6Network(value)
+ except ValueError:
+ raise errors.IPvAnyNetworkError()
+
+
+pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *')
+MAX_EMAIL_LENGTH = 2048
+"""Maximum length for an email.
+A somewhat arbitrary but very generous number compared to what is allowed by most implementations.
+"""
+
+
+def validate_email(value: Union[str]) -> Tuple[str, str]:
+ """
+ Email address validation using https://pypi.org/project/email-validator/
+ Notes:
+ * raw ip address (literal) domain parts are not allowed.
+ * "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
+ * spaces are striped from the beginning and end of addresses but no error is raised
+ """
+ if email_validator is None:
+ import_email_validator()
+
+ if len(value) > MAX_EMAIL_LENGTH:
+ raise errors.EmailError()
+
+ m = pretty_email_regex.fullmatch(value)
+ name: Union[str, None] = None
+ if m:
+ name, value = m.groups()
+ email = value.strip()
+ try:
+ parts = email_validator.validate_email(email, check_deliverability=False)
+ except email_validator.EmailNotValidError as e:
+ raise errors.EmailError from e
+
+ if hasattr(parts, 'normalized'):
+ # email-validator >= 2
+ email = parts.normalized
+ assert email is not None
+ name = name or parts.local_part
+ return name, email
+ else:
+ # email-validator >1, <2
+ at_index = email.index('@')
+ local_part = email[:at_index] # RFC 5321, local part must be case-sensitive.
+ global_part = email[at_index:].lower()
+
+ return name or local_part, local_part + global_part