diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/pydantic/v1/networks.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/pydantic/v1/networks.py | 747 |
1 files changed, 747 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py b/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py new file mode 100644 index 00000000..ba07b748 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/pydantic/v1/networks.py @@ -0,0 +1,747 @@ +import re +from ipaddress import ( + IPv4Address, + IPv4Interface, + IPv4Network, + IPv6Address, + IPv6Interface, + IPv6Network, + _BaseAddress, + _BaseNetwork, +) +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + Generator, + List, + Match, + Optional, + Pattern, + Set, + Tuple, + Type, + Union, + cast, + no_type_check, +) + +from pydantic.v1 import errors +from pydantic.v1.utils import Representation, update_not_none +from pydantic.v1.validators import constr_length_validator, str_validator + +if TYPE_CHECKING: + import email_validator + from typing_extensions import TypedDict + + from pydantic.v1.config import BaseConfig + from pydantic.v1.fields import ModelField + from pydantic.v1.typing import AnyCallable + + CallableGenerator = Generator[AnyCallable, None, None] + + class Parts(TypedDict, total=False): + scheme: str + user: Optional[str] + password: Optional[str] + ipv4: Optional[str] + ipv6: Optional[str] + domain: Optional[str] + port: Optional[str] + path: Optional[str] + query: Optional[str] + fragment: Optional[str] + + class HostParts(TypedDict, total=False): + host: str + tld: Optional[str] + host_type: Optional[str] + port: Optional[str] + rebuild: bool + +else: + email_validator = None + + class Parts(dict): + pass + + +NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]] + +__all__ = [ + 'AnyUrl', + 'AnyHttpUrl', + 'FileUrl', + 'HttpUrl', + 'stricturl', + 'EmailStr', + 'NameEmail', + 'IPvAnyAddress', + 'IPvAnyInterface', + 'IPvAnyNetwork', + 'PostgresDsn', + 'CockroachDsn', + 'AmqpDsn', + 'RedisDsn', + 'MongoDsn', + 'KafkaDsn', + 'validate_email', +] + +_url_regex_cache = None +_multi_host_url_regex_cache = None +_ascii_domain_regex_cache = None +_int_domain_regex_cache = None +_host_regex_cache = None + +_host_regex = ( + r'(?:' + r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4 + r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6 + r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later + r')?' + r'(?::(?P<port>\d+))?' # port +) +_scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A +_user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?' +_path_regex = r'(?P<path>/[^\s?#]*)?' +_query_regex = r'(?:\?(?P<query>[^\s#]*))?' +_fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?' + + +def url_regex() -> Pattern[str]: + global _url_regex_cache + if _url_regex_cache is None: + _url_regex_cache = re.compile( + rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}', + re.IGNORECASE, + ) + return _url_regex_cache + + +def multi_host_url_regex() -> Pattern[str]: + """ + Compiled multi host url regex. + + Additionally to `url_regex` it allows to match multiple hosts. + E.g. host1.db.net,host2.db.net + """ + global _multi_host_url_regex_cache + if _multi_host_url_regex_cache is None: + _multi_host_url_regex_cache = re.compile( + rf'{_scheme_regex}{_user_info_regex}' + r'(?P<hosts>([^/]*))' # validation occurs later + rf'{_path_regex}{_query_regex}{_fragment_regex}', + re.IGNORECASE, + ) + return _multi_host_url_regex_cache + + +def ascii_domain_regex() -> Pattern[str]: + global _ascii_domain_regex_cache + if _ascii_domain_regex_cache is None: + ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?' + ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?' + _ascii_domain_regex_cache = re.compile( + fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE + ) + return _ascii_domain_regex_cache + + +def int_domain_regex() -> Pattern[str]: + global _int_domain_regex_cache + if _int_domain_regex_cache is None: + int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?' + int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?' + _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE) + return _int_domain_regex_cache + + +def host_regex() -> Pattern[str]: + global _host_regex_cache + if _host_regex_cache is None: + _host_regex_cache = re.compile( + _host_regex, + re.IGNORECASE, + ) + return _host_regex_cache + + +class AnyUrl(str): + strip_whitespace = True + min_length = 1 + max_length = 2**16 + allowed_schemes: Optional[Collection[str]] = None + tld_required: bool = False + user_required: bool = False + host_required: bool = True + hidden_parts: Set[str] = set() + + __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment') + + @no_type_check + def __new__(cls, url: Optional[str], **kwargs) -> object: + return str.__new__(cls, cls.build(**kwargs) if url is None else url) + + def __init__( + self, + url: str, + *, + scheme: str, + user: Optional[str] = None, + password: Optional[str] = None, + host: Optional[str] = None, + tld: Optional[str] = None, + host_type: str = 'domain', + port: Optional[str] = None, + path: Optional[str] = None, + query: Optional[str] = None, + fragment: Optional[str] = None, + ) -> None: + str.__init__(url) + self.scheme = scheme + self.user = user + self.password = password + self.host = host + self.tld = tld + self.host_type = host_type + self.port = port + self.path = path + self.query = query + self.fragment = fragment + + @classmethod + def build( + cls, + *, + scheme: str, + user: Optional[str] = None, + password: Optional[str] = None, + host: str, + port: Optional[str] = None, + path: Optional[str] = None, + query: Optional[str] = None, + fragment: Optional[str] = None, + **_kwargs: str, + ) -> str: + parts = Parts( + scheme=scheme, + user=user, + password=password, + host=host, + port=port, + path=path, + query=query, + fragment=fragment, + **_kwargs, # type: ignore[misc] + ) + + url = scheme + '://' + if user: + url += user + if password: + url += ':' + password + if user or password: + url += '@' + url += host + if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port): + url += ':' + port + if path: + url += path + if query: + url += '?' + query + if fragment: + url += '#' + fragment + return url + + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + yield cls.validate + + @classmethod + def validate(cls, value: Any, field: 'ModelField', config: 'BaseConfig') -> 'AnyUrl': + if value.__class__ == cls: + return value + value = str_validator(value) + if cls.strip_whitespace: + value = value.strip() + url: str = cast(str, constr_length_validator(value, field, config)) + + m = cls._match_url(url) + # the regex should always match, if it doesn't please report with details of the URL tried + assert m, 'URL regex failed unexpectedly' + + original_parts = cast('Parts', m.groupdict()) + parts = cls.apply_default_parts(original_parts) + parts = cls.validate_parts(parts) + + if m.end() != len(url): + raise errors.UrlExtraError(extra=url[m.end() :]) + + return cls._build_url(m, url, parts) + + @classmethod + def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl': + """ + Validate hosts and build the AnyUrl object. Split from `validate` so this method + can be altered in `MultiHostDsn`. + """ + host, tld, host_type, rebuild = cls.validate_host(parts) + + return cls( + None if rebuild else url, + scheme=parts['scheme'], + user=parts['user'], + password=parts['password'], + host=host, + tld=tld, + host_type=host_type, + port=parts['port'], + path=parts['path'], + query=parts['query'], + fragment=parts['fragment'], + ) + + @staticmethod + def _match_url(url: str) -> Optional[Match[str]]: + return url_regex().match(url) + + @staticmethod + def _validate_port(port: Optional[str]) -> None: + if port is not None and int(port) > 65_535: + raise errors.UrlPortError() + + @classmethod + def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': + """ + A method used to validate parts of a URL. + Could be overridden to set default values for parts if missing + """ + scheme = parts['scheme'] + if scheme is None: + raise errors.UrlSchemeError() + + if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes: + raise errors.UrlSchemePermittedError(set(cls.allowed_schemes)) + + if validate_port: + cls._validate_port(parts['port']) + + user = parts['user'] + if cls.user_required and user is None: + raise errors.UrlUserInfoError() + + return parts + + @classmethod + def validate_host(cls, parts: 'Parts') -> Tuple[str, Optional[str], str, bool]: + tld, host_type, rebuild = None, None, False + for f in ('domain', 'ipv4', 'ipv6'): + host = parts[f] # type: ignore[literal-required] + if host: + host_type = f + break + + if host is None: + if cls.host_required: + raise errors.UrlHostError() + elif host_type == 'domain': + is_international = False + d = ascii_domain_regex().fullmatch(host) + if d is None: + d = int_domain_regex().fullmatch(host) + if d is None: + raise errors.UrlHostError() + is_international = True + + tld = d.group('tld') + if tld is None and not is_international: + d = int_domain_regex().fullmatch(host) + assert d is not None + tld = d.group('tld') + is_international = True + + if tld is not None: + tld = tld[1:] + elif cls.tld_required: + raise errors.UrlHostTldError() + + if is_international: + host_type = 'int_domain' + rebuild = True + host = host.encode('idna').decode('ascii') + if tld is not None: + tld = tld.encode('idna').decode('ascii') + + return host, tld, host_type, rebuild # type: ignore + + @staticmethod + def get_default_parts(parts: 'Parts') -> 'Parts': + return {} + + @classmethod + def apply_default_parts(cls, parts: 'Parts') -> 'Parts': + for key, value in cls.get_default_parts(parts).items(): + if not parts[key]: # type: ignore[literal-required] + parts[key] = value # type: ignore[literal-required] + return parts + + def __repr__(self) -> str: + extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None) + return f'{self.__class__.__name__}({super().__repr__()}, {extra})' + + +class AnyHttpUrl(AnyUrl): + allowed_schemes = {'http', 'https'} + + __slots__ = () + + +class HttpUrl(AnyHttpUrl): + tld_required = True + # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers + max_length = 2083 + hidden_parts = {'port'} + + @staticmethod + def get_default_parts(parts: 'Parts') -> 'Parts': + return {'port': '80' if parts['scheme'] == 'http' else '443'} + + +class FileUrl(AnyUrl): + allowed_schemes = {'file'} + host_required = False + + __slots__ = () + + +class MultiHostDsn(AnyUrl): + __slots__ = AnyUrl.__slots__ + ('hosts',) + + def __init__(self, *args: Any, hosts: Optional[List['HostParts']] = None, **kwargs: Any): + super().__init__(*args, **kwargs) + self.hosts = hosts + + @staticmethod + def _match_url(url: str) -> Optional[Match[str]]: + return multi_host_url_regex().match(url) + + @classmethod + def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': + return super().validate_parts(parts, validate_port=False) + + @classmethod + def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn': + hosts_parts: List['HostParts'] = [] + host_re = host_regex() + for host in m.groupdict()['hosts'].split(','): + d: Parts = host_re.match(host).groupdict() # type: ignore + host, tld, host_type, rebuild = cls.validate_host(d) + port = d.get('port') + cls._validate_port(port) + hosts_parts.append( + { + 'host': host, + 'host_type': host_type, + 'tld': tld, + 'rebuild': rebuild, + 'port': port, + } + ) + + if len(hosts_parts) > 1: + return cls( + None if any([hp['rebuild'] for hp in hosts_parts]) else url, + scheme=parts['scheme'], + user=parts['user'], + password=parts['password'], + path=parts['path'], + query=parts['query'], + fragment=parts['fragment'], + host_type=None, + hosts=hosts_parts, + ) + else: + # backwards compatibility with single host + host_part = hosts_parts[0] + return cls( + None if host_part['rebuild'] else url, + scheme=parts['scheme'], + user=parts['user'], + password=parts['password'], + host=host_part['host'], + tld=host_part['tld'], + host_type=host_part['host_type'], + port=host_part.get('port'), + path=parts['path'], + query=parts['query'], + fragment=parts['fragment'], + ) + + +class PostgresDsn(MultiHostDsn): + allowed_schemes = { + 'postgres', + 'postgresql', + 'postgresql+asyncpg', + 'postgresql+pg8000', + 'postgresql+psycopg', + 'postgresql+psycopg2', + 'postgresql+psycopg2cffi', + 'postgresql+py-postgresql', + 'postgresql+pygresql', + } + user_required = True + + __slots__ = () + + +class CockroachDsn(AnyUrl): + allowed_schemes = { + 'cockroachdb', + 'cockroachdb+psycopg2', + 'cockroachdb+asyncpg', + } + user_required = True + + +class AmqpDsn(AnyUrl): + allowed_schemes = {'amqp', 'amqps'} + host_required = False + + +class RedisDsn(AnyUrl): + __slots__ = () + allowed_schemes = {'redis', 'rediss'} + host_required = False + + @staticmethod + def get_default_parts(parts: 'Parts') -> 'Parts': + return { + 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '', + 'port': '6379', + 'path': '/0', + } + + +class MongoDsn(AnyUrl): + allowed_schemes = {'mongodb'} + + # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes + @staticmethod + def get_default_parts(parts: 'Parts') -> 'Parts': + return { + 'port': '27017', + } + + +class KafkaDsn(AnyUrl): + allowed_schemes = {'kafka'} + + @staticmethod + def get_default_parts(parts: 'Parts') -> 'Parts': + return { + 'domain': 'localhost', + 'port': '9092', + } + + +def stricturl( + *, + strip_whitespace: bool = True, + min_length: int = 1, + max_length: int = 2**16, + tld_required: bool = True, + host_required: bool = True, + allowed_schemes: Optional[Collection[str]] = None, +) -> Type[AnyUrl]: + # use kwargs then define conf in a dict to aid with IDE type hinting + namespace = dict( + strip_whitespace=strip_whitespace, + min_length=min_length, + max_length=max_length, + tld_required=tld_required, + host_required=host_required, + allowed_schemes=allowed_schemes, + ) + return type('UrlValue', (AnyUrl,), namespace) + + +def import_email_validator() -> None: + global email_validator + try: + import email_validator + except ImportError as e: + raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e + + +class EmailStr(str): + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + field_schema.update(type='string', format='email') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + # included here and below so the error happens straight away + import_email_validator() + + yield str_validator + yield cls.validate + + @classmethod + def validate(cls, value: Union[str]) -> str: + return validate_email(value)[1] + + +class NameEmail(Representation): + __slots__ = 'name', 'email' + + def __init__(self, name: str, email: str): + self.name = name + self.email = email + + def __eq__(self, other: Any) -> bool: + return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email) + + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + field_schema.update(type='string', format='name-email') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + import_email_validator() + + yield cls.validate + + @classmethod + def validate(cls, value: Any) -> 'NameEmail': + if value.__class__ == cls: + return value + value = str_validator(value) + return cls(*validate_email(value)) + + def __str__(self) -> str: + return f'{self.name} <{self.email}>' + + +class IPvAnyAddress(_BaseAddress): + __slots__ = () + + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + field_schema.update(type='string', format='ipvanyaddress') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + yield cls.validate + + @classmethod + def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]: + try: + return IPv4Address(value) + except ValueError: + pass + + try: + return IPv6Address(value) + except ValueError: + raise errors.IPvAnyAddressError() + + +class IPvAnyInterface(_BaseAddress): + __slots__ = () + + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + field_schema.update(type='string', format='ipvanyinterface') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + yield cls.validate + + @classmethod + def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]: + try: + return IPv4Interface(value) + except ValueError: + pass + + try: + return IPv6Interface(value) + except ValueError: + raise errors.IPvAnyInterfaceError() + + +class IPvAnyNetwork(_BaseNetwork): # type: ignore + @classmethod + def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None: + field_schema.update(type='string', format='ipvanynetwork') + + @classmethod + def __get_validators__(cls) -> 'CallableGenerator': + yield cls.validate + + @classmethod + def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]: + # Assume IP Network is defined with a default value for ``strict`` argument. + # Define your own class if you want to specify network address check strictness. + try: + return IPv4Network(value) + except ValueError: + pass + + try: + return IPv6Network(value) + except ValueError: + raise errors.IPvAnyNetworkError() + + +pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *') +MAX_EMAIL_LENGTH = 2048 +"""Maximum length for an email. +A somewhat arbitrary but very generous number compared to what is allowed by most implementations. +""" + + +def validate_email(value: Union[str]) -> Tuple[str, str]: + """ + Email address validation using https://pypi.org/project/email-validator/ + Notes: + * raw ip address (literal) domain parts are not allowed. + * "John Doe <local_part@domain.com>" style "pretty" email addresses are processed + * spaces are striped from the beginning and end of addresses but no error is raised + """ + if email_validator is None: + import_email_validator() + + if len(value) > MAX_EMAIL_LENGTH: + raise errors.EmailError() + + m = pretty_email_regex.fullmatch(value) + name: Union[str, None] = None + if m: + name, value = m.groups() + email = value.strip() + try: + parts = email_validator.validate_email(email, check_deliverability=False) + except email_validator.EmailNotValidError as e: + raise errors.EmailError from e + + if hasattr(parts, 'normalized'): + # email-validator >= 2 + email = parts.normalized + assert email is not None + name = name or parts.local_part + return name, email + else: + # email-validator >1, <2 + at_index = email.index('@') + local_part = email[:at_index] # RFC 5321, local part must be case-sensitive. + global_part = email[at_index:].lower() + + return name or local_part, local_part + global_part |