diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/yarl/_url.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/yarl/_url.py | 1584 |
1 files changed, 1584 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/yarl/_url.py b/.venv/lib/python3.12/site-packages/yarl/_url.py new file mode 100644 index 00000000..4e4b8a37 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/yarl/_url.py @@ -0,0 +1,1584 @@ +import re +import sys +import warnings +from collections.abc import Mapping, Sequence +from enum import Enum +from functools import _CacheInfo, lru_cache +from ipaddress import ip_address +from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Union, overload +from urllib.parse import SplitResult, parse_qsl, uses_relative + +import idna +from multidict import MultiDict, MultiDictProxy +from propcache.api import under_cached_property as cached_property + +from ._parse import ( + USES_AUTHORITY, + SplitURLType, + make_netloc, + split_netloc, + split_url, + unsplit_result, +) +from ._path import normalize_path, normalize_path_segments +from ._query import ( + Query, + QueryVariable, + SimpleQuery, + get_str_query, + get_str_query_from_iterable, + get_str_query_from_sequence_iterable, +) +from ._quoters import ( + FRAGMENT_QUOTER, + FRAGMENT_REQUOTER, + PATH_QUOTER, + PATH_REQUOTER, + PATH_SAFE_UNQUOTER, + PATH_UNQUOTER, + QS_UNQUOTER, + QUERY_QUOTER, + QUERY_REQUOTER, + QUOTER, + REQUOTER, + UNQUOTER, + human_quote, +) + +DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21} +USES_RELATIVE = frozenset(uses_relative) + +# Special schemes https://url.spec.whatwg.org/#special-scheme +# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation +SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp")) + + +# reg-name: unreserved / pct-encoded / sub-delims +# this pattern matches anything that is *not* in those classes. and is only used +# on lower-cased ASCII values. +NOT_REG_NAME = re.compile( + r""" + # any character not in the unreserved or sub-delims sets, plus % + # (validated with the additional check for pct-encoded sequences below) + [^a-z0-9\-._~!$&'()*+,;=%] + | + # % only allowed if it is part of a pct-encoded + # sequence of 2 hex digits. + %(?![0-9a-f]{2}) + """, + re.VERBOSE, +) + +_T = TypeVar("_T") + +if sys.version_info >= (3, 11): + from typing import Self +else: + Self = Any + + +class UndefinedType(Enum): + """Singleton type for use with not set sentinel values.""" + + _singleton = 0 + + +UNDEFINED = UndefinedType._singleton + + +class CacheInfo(TypedDict): + """Host encoding cache.""" + + idna_encode: _CacheInfo + idna_decode: _CacheInfo + ip_address: _CacheInfo + host_validate: _CacheInfo + encode_host: _CacheInfo + + +class _InternalURLCache(TypedDict, total=False): + _val: SplitURLType + _origin: "URL" + absolute: bool + scheme: str + raw_authority: str + authority: str + raw_user: Union[str, None] + user: Union[str, None] + raw_password: Union[str, None] + password: Union[str, None] + raw_host: Union[str, None] + host: Union[str, None] + host_subcomponent: Union[str, None] + host_port_subcomponent: Union[str, None] + port: Union[int, None] + explicit_port: Union[int, None] + raw_path: str + path: str + _parsed_query: list[tuple[str, str]] + query: "MultiDictProxy[str]" + raw_query_string: str + query_string: str + path_qs: str + raw_path_qs: str + raw_fragment: str + fragment: str + raw_parts: tuple[str, ...] + parts: tuple[str, ...] + parent: "URL" + raw_name: str + name: str + raw_suffix: str + suffix: str + raw_suffixes: tuple[str, ...] + suffixes: tuple[str, ...] + + +def rewrite_module(obj: _T) -> _T: + obj.__module__ = "yarl" + return obj + + +@lru_cache +def encode_url(url_str: str) -> "URL": + """Parse unencoded URL.""" + cache: _InternalURLCache = {} + host: Union[str, None] + scheme, netloc, path, query, fragment = split_url(url_str) + if not netloc: # netloc + host = "" + else: + if ":" in netloc or "@" in netloc or "[" in netloc: + # Complex netloc + username, password, host, port = split_netloc(netloc) + else: + username = password = port = None + host = netloc + if host is None: + if scheme in SCHEME_REQUIRES_HOST: + msg = ( + "Invalid URL: host is required for " + f"absolute urls with the {scheme} scheme" + ) + raise ValueError(msg) + else: + host = "" + host = _encode_host(host, validate_host=False) + # Remove brackets as host encoder adds back brackets for IPv6 addresses + cache["raw_host"] = host[1:-1] if "[" in host else host + cache["explicit_port"] = port + if password is None and username is None: + # Fast path for URLs without user, password + netloc = host if port is None else f"{host}:{port}" + cache["raw_user"] = None + cache["raw_password"] = None + else: + raw_user = REQUOTER(username) if username else username + raw_password = REQUOTER(password) if password else password + netloc = make_netloc(raw_user, raw_password, host, port) + cache["raw_user"] = raw_user + cache["raw_password"] = raw_password + + if path: + path = PATH_REQUOTER(path) + if netloc and "." in path: + path = normalize_path(path) + if query: + query = QUERY_REQUOTER(query) + if fragment: + fragment = FRAGMENT_REQUOTER(fragment) + + cache["scheme"] = scheme + cache["raw_path"] = "/" if not path and netloc else path + cache["raw_query_string"] = query + cache["raw_fragment"] = fragment + + self = object.__new__(URL) + self._scheme = scheme + self._netloc = netloc + self._path = path + self._query = query + self._fragment = fragment + self._cache = cache + return self + + +@lru_cache +def pre_encoded_url(url_str: str) -> "URL": + """Parse pre-encoded URL.""" + self = object.__new__(URL) + val = split_url(url_str) + self._scheme, self._netloc, self._path, self._query, self._fragment = val + self._cache = {} + return self + + +@lru_cache +def build_pre_encoded_url( + scheme: str, + authority: str, + user: Union[str, None], + password: Union[str, None], + host: str, + port: Union[int, None], + path: str, + query_string: str, + fragment: str, +) -> "URL": + """Build a pre-encoded URL from parts.""" + self = object.__new__(URL) + self._scheme = scheme + if authority: + self._netloc = authority + elif host: + if port is not None: + port = None if port == DEFAULT_PORTS.get(scheme) else port + if user is None and password is None: + self._netloc = host if port is None else f"{host}:{port}" + else: + self._netloc = make_netloc(user, password, host, port) + else: + self._netloc = "" + self._path = path + self._query = query_string + self._fragment = fragment + self._cache = {} + return self + + +def from_parts_uncached( + scheme: str, netloc: str, path: str, query: str, fragment: str +) -> "URL": + """Create a new URL from parts.""" + self = object.__new__(URL) + self._scheme = scheme + self._netloc = netloc + self._path = path + self._query = query + self._fragment = fragment + self._cache = {} + return self + + +from_parts = lru_cache(from_parts_uncached) + + +@rewrite_module +class URL: + # Don't derive from str + # follow pathlib.Path design + # probably URL will not suffer from pathlib problems: + # it's intended for libraries like aiohttp, + # not to be passed into standard library functions like os.open etc. + + # URL grammar (RFC 3986) + # pct-encoded = "%" HEXDIG HEXDIG + # reserved = gen-delims / sub-delims + # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@" + # sub-delims = "!" / "$" / "&" / "'" / "(" / ")" + # / "*" / "+" / "," / ";" / "=" + # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" + # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ] + # hier-part = "//" authority path-abempty + # / path-absolute + # / path-rootless + # / path-empty + # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) + # authority = [ userinfo "@" ] host [ ":" port ] + # userinfo = *( unreserved / pct-encoded / sub-delims / ":" ) + # host = IP-literal / IPv4address / reg-name + # IP-literal = "[" ( IPv6address / IPvFuture ) "]" + # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) + # IPv6address = 6( h16 ":" ) ls32 + # / "::" 5( h16 ":" ) ls32 + # / [ h16 ] "::" 4( h16 ":" ) ls32 + # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 + # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 + # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 + # / [ *4( h16 ":" ) h16 ] "::" ls32 + # / [ *5( h16 ":" ) h16 ] "::" h16 + # / [ *6( h16 ":" ) h16 ] "::" + # ls32 = ( h16 ":" h16 ) / IPv4address + # ; least-significant 32 bits of address + # h16 = 1*4HEXDIG + # ; 16 bits of address represented in hexadecimal + # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet + # dec-octet = DIGIT ; 0-9 + # / %x31-39 DIGIT ; 10-99 + # / "1" 2DIGIT ; 100-199 + # / "2" %x30-34 DIGIT ; 200-249 + # / "25" %x30-35 ; 250-255 + # reg-name = *( unreserved / pct-encoded / sub-delims ) + # port = *DIGIT + # path = path-abempty ; begins with "/" or is empty + # / path-absolute ; begins with "/" but not "//" + # / path-noscheme ; begins with a non-colon segment + # / path-rootless ; begins with a segment + # / path-empty ; zero characters + # path-abempty = *( "/" segment ) + # path-absolute = "/" [ segment-nz *( "/" segment ) ] + # path-noscheme = segment-nz-nc *( "/" segment ) + # path-rootless = segment-nz *( "/" segment ) + # path-empty = 0<pchar> + # segment = *pchar + # segment-nz = 1*pchar + # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" ) + # ; non-zero-length segment without any colon ":" + # pchar = unreserved / pct-encoded / sub-delims / ":" / "@" + # query = *( pchar / "/" / "?" ) + # fragment = *( pchar / "/" / "?" ) + # URI-reference = URI / relative-ref + # relative-ref = relative-part [ "?" query ] [ "#" fragment ] + # relative-part = "//" authority path-abempty + # / path-absolute + # / path-noscheme + # / path-empty + # absolute-URI = scheme ":" hier-part [ "?" query ] + __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment") + + _scheme: str + _netloc: str + _path: str + _query: str + _fragment: str + + def __new__( + cls, + val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED, + *, + encoded: bool = False, + strict: Union[bool, None] = None, + ) -> "URL": + if strict is not None: # pragma: no cover + warnings.warn("strict parameter is ignored") + if type(val) is str: + return pre_encoded_url(val) if encoded else encode_url(val) + if type(val) is cls: + return val + if type(val) is SplitResult: + if not encoded: + raise ValueError("Cannot apply decoding to SplitResult") + return from_parts(*val) + if isinstance(val, str): + return pre_encoded_url(str(val)) if encoded else encode_url(str(val)) + if val is UNDEFINED: + # Special case for UNDEFINED since it might be unpickling and we do + # not want to cache as the `__set_state__` call would mutate the URL + # object in the `pre_encoded_url` or `encoded_url` caches. + self = object.__new__(URL) + self._scheme = self._netloc = self._path = self._query = self._fragment = "" + self._cache = {} + return self + raise TypeError("Constructor parameter should be str") + + @classmethod + def build( + cls, + *, + scheme: str = "", + authority: str = "", + user: Union[str, None] = None, + password: Union[str, None] = None, + host: str = "", + port: Union[int, None] = None, + path: str = "", + query: Union[Query, None] = None, + query_string: str = "", + fragment: str = "", + encoded: bool = False, + ) -> "URL": + """Creates and returns a new URL""" + + if authority and (user or password or host or port): + raise ValueError( + 'Can\'t mix "authority" with "user", "password", "host" or "port".' + ) + if port is not None and not isinstance(port, int): + raise TypeError(f"The port is required to be int, got {type(port)!r}.") + if port and not host: + raise ValueError('Can\'t build URL with "port" but without "host".') + if query and query_string: + raise ValueError('Only one of "query" or "query_string" should be passed') + if ( + scheme is None + or authority is None + or host is None + or path is None + or query_string is None + or fragment is None + ): + raise TypeError( + 'NoneType is illegal for "scheme", "authority", "host", "path", ' + '"query_string", and "fragment" args, use empty string instead.' + ) + + if query: + query_string = get_str_query(query) or "" + + if encoded: + return build_pre_encoded_url( + scheme, + authority, + user, + password, + host, + port, + path, + query_string, + fragment, + ) + + self = object.__new__(URL) + self._scheme = scheme + _host: Union[str, None] = None + if authority: + user, password, _host, port = split_netloc(authority) + _host = _encode_host(_host, validate_host=False) if _host else "" + elif host: + _host = _encode_host(host, validate_host=True) + else: + self._netloc = "" + + if _host is not None: + if port is not None: + port = None if port == DEFAULT_PORTS.get(scheme) else port + if user is None and password is None: + self._netloc = _host if port is None else f"{_host}:{port}" + else: + self._netloc = make_netloc(user, password, _host, port, True) + + path = PATH_QUOTER(path) if path else path + if path and self._netloc: + if "." in path: + path = normalize_path(path) + if path[0] != "/": + msg = ( + "Path in a URL with authority should " + "start with a slash ('/') if set" + ) + raise ValueError(msg) + + self._path = path + if not query and query_string: + query_string = QUERY_QUOTER(query_string) + self._query = query_string + self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment + self._cache = {} + return self + + def __init_subclass__(cls): + raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden") + + def __str__(self) -> str: + if not self._path and self._netloc and (self._query or self._fragment): + path = "/" + else: + path = self._path + if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get( + self._scheme + ): + # port normalization - using None for default ports to remove from rendering + # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3 + host = self.host_subcomponent + netloc = make_netloc(self.raw_user, self.raw_password, host, None) + else: + netloc = self._netloc + return unsplit_result(self._scheme, netloc, path, self._query, self._fragment) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{str(self)}')" + + def __bytes__(self) -> bytes: + return str(self).encode("ascii") + + def __eq__(self, other: object) -> bool: + if type(other) is not URL: + return NotImplemented + + path1 = "/" if not self._path and self._netloc else self._path + path2 = "/" if not other._path and other._netloc else other._path + return ( + self._scheme == other._scheme + and self._netloc == other._netloc + and path1 == path2 + and self._query == other._query + and self._fragment == other._fragment + ) + + def __hash__(self) -> int: + if (ret := self._cache.get("hash")) is None: + path = "/" if not self._path and self._netloc else self._path + ret = self._cache["hash"] = hash( + (self._scheme, self._netloc, path, self._query, self._fragment) + ) + return ret + + def __le__(self, other: object) -> bool: + if type(other) is not URL: + return NotImplemented + return self._val <= other._val + + def __lt__(self, other: object) -> bool: + if type(other) is not URL: + return NotImplemented + return self._val < other._val + + def __ge__(self, other: object) -> bool: + if type(other) is not URL: + return NotImplemented + return self._val >= other._val + + def __gt__(self, other: object) -> bool: + if type(other) is not URL: + return NotImplemented + return self._val > other._val + + def __truediv__(self, name: str) -> "URL": + if not isinstance(name, str): + return NotImplemented + return self._make_child((str(name),)) + + def __mod__(self, query: Query) -> "URL": + return self.update_query(query) + + def __bool__(self) -> bool: + return bool(self._netloc or self._path or self._query or self._fragment) + + def __getstate__(self) -> tuple[SplitResult]: + return (tuple.__new__(SplitResult, self._val),) + + def __setstate__(self, state): + if state[0] is None and isinstance(state[1], dict): + # default style pickle + val = state[1]["_val"] + else: + val, *unused = state + self._scheme, self._netloc, self._path, self._query, self._fragment = val + self._cache = {} + + def _cache_netloc(self) -> None: + """Cache the netloc parts of the URL.""" + c = self._cache + split_loc = split_netloc(self._netloc) + c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc + + def is_absolute(self) -> bool: + """A check for absolute URLs. + + Return True for absolute ones (having scheme or starting + with //), False otherwise. + + Is is preferred to call the .absolute property instead + as it is cached. + """ + return self.absolute + + def is_default_port(self) -> bool: + """A check for default port. + + Return True if port is default for specified scheme, + e.g. 'http://python.org' or 'http://python.org:80', False + otherwise. + + Return False for relative URLs. + + """ + if (explicit := self.explicit_port) is None: + # If the explicit port is None, then the URL must be + # using the default port unless its a relative URL + # which does not have an implicit port / default port + return self._netloc != "" + return explicit == DEFAULT_PORTS.get(self._scheme) + + def origin(self) -> "URL": + """Return an URL with scheme, host and port parts only. + + user, password, path, query and fragment are removed. + + """ + # TODO: add a keyword-only option for keeping user/pass maybe? + return self._origin + + @cached_property + def _val(self) -> SplitURLType: + return (self._scheme, self._netloc, self._path, self._query, self._fragment) + + @cached_property + def _origin(self) -> "URL": + """Return an URL with scheme, host and port parts only. + + user, password, path, query and fragment are removed. + """ + if not (netloc := self._netloc): + raise ValueError("URL should be absolute") + if not (scheme := self._scheme): + raise ValueError("URL should have scheme") + if "@" in netloc: + encoded_host = self.host_subcomponent + netloc = make_netloc(None, None, encoded_host, self.explicit_port) + elif not self._path and not self._query and not self._fragment: + return self + return from_parts(scheme, netloc, "", "", "") + + def relative(self) -> "URL": + """Return a relative part of the URL. + + scheme, user, password, host and port are removed. + + """ + if not self._netloc: + raise ValueError("URL should be absolute") + return from_parts("", "", self._path, self._query, self._fragment) + + @cached_property + def absolute(self) -> bool: + """A check for absolute URLs. + + Return True for absolute ones (having scheme or starting + with //), False otherwise. + + """ + # `netloc`` is an empty string for relative URLs + # Checking `netloc` is faster than checking `hostname` + # because `hostname` is a property that does some extra work + # to parse the host from the `netloc` + return self._netloc != "" + + @cached_property + def scheme(self) -> str: + """Scheme for absolute URLs. + + Empty string for relative URLs or URLs starting with // + + """ + return self._scheme + + @cached_property + def raw_authority(self) -> str: + """Encoded authority part of URL. + + Empty string for relative URLs. + + """ + return self._netloc + + @cached_property + def authority(self) -> str: + """Decoded authority part of URL. + + Empty string for relative URLs. + + """ + return make_netloc(self.user, self.password, self.host, self.port) + + @cached_property + def raw_user(self) -> Union[str, None]: + """Encoded user part of URL. + + None if user is missing. + + """ + # not .username + self._cache_netloc() + return self._cache["raw_user"] + + @cached_property + def user(self) -> Union[str, None]: + """Decoded user part of URL. + + None if user is missing. + + """ + if (raw_user := self.raw_user) is None: + return None + return UNQUOTER(raw_user) + + @cached_property + def raw_password(self) -> Union[str, None]: + """Encoded password part of URL. + + None if password is missing. + + """ + self._cache_netloc() + return self._cache["raw_password"] + + @cached_property + def password(self) -> Union[str, None]: + """Decoded password part of URL. + + None if password is missing. + + """ + if (raw_password := self.raw_password) is None: + return None + return UNQUOTER(raw_password) + + @cached_property + def raw_host(self) -> Union[str, None]: + """Encoded host part of URL. + + None for relative URLs. + + When working with IPv6 addresses, use the `host_subcomponent` property instead + as it will return the host subcomponent with brackets. + """ + # Use host instead of hostname for sake of shortness + # May add .hostname prop later + self._cache_netloc() + return self._cache["raw_host"] + + @cached_property + def host(self) -> Union[str, None]: + """Decoded host part of URL. + + None for relative URLs. + + """ + if (raw := self.raw_host) is None: + return None + if raw and raw[-1].isdigit() or ":" in raw: + # IP addresses are never IDNA encoded + return raw + return _idna_decode(raw) + + @cached_property + def host_subcomponent(self) -> Union[str, None]: + """Return the host subcomponent part of URL. + + None for relative URLs. + + https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 + + `IP-literal = "[" ( IPv6address / IPvFuture ) "]"` + + Examples: + - `http://example.com:8080` -> `example.com` + - `http://example.com:80` -> `example.com` + - `https://127.0.0.1:8443` -> `127.0.0.1` + - `https://[::1]:8443` -> `[::1]` + - `http://[::1]` -> `[::1]` + + """ + if (raw := self.raw_host) is None: + return None + return f"[{raw}]" if ":" in raw else raw + + @cached_property + def host_port_subcomponent(self) -> Union[str, None]: + """Return the host and port subcomponent part of URL. + + Trailing dots are removed from the host part. + + This value is suitable for use in the Host header of an HTTP request. + + None for relative URLs. + + https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 + `IP-literal = "[" ( IPv6address / IPvFuture ) "]"` + https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3 + port = *DIGIT + + Examples: + - `http://example.com:8080` -> `example.com:8080` + - `http://example.com:80` -> `example.com` + - `http://example.com.:80` -> `example.com` + - `https://127.0.0.1:8443` -> `127.0.0.1:8443` + - `https://[::1]:8443` -> `[::1]:8443` + - `http://[::1]` -> `[::1]` + + """ + if (raw := self.raw_host) is None: + return None + if raw[-1] == ".": + # Remove all trailing dots from the netloc as while + # they are valid FQDNs in DNS, TLS validation fails. + # See https://github.com/aio-libs/aiohttp/issues/3636. + # To avoid string manipulation we only call rstrip if + # the last character is a dot. + raw = raw.rstrip(".") + port = self.explicit_port + if port is None or port == DEFAULT_PORTS.get(self._scheme): + return f"[{raw}]" if ":" in raw else raw + return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}" + + @cached_property + def port(self) -> Union[int, None]: + """Port part of URL, with scheme-based fallback. + + None for relative URLs or URLs without explicit port and + scheme without default port substitution. + + """ + if (explicit_port := self.explicit_port) is not None: + return explicit_port + return DEFAULT_PORTS.get(self._scheme) + + @cached_property + def explicit_port(self) -> Union[int, None]: + """Port part of URL, without scheme-based fallback. + + None for relative URLs or URLs without explicit port. + + """ + self._cache_netloc() + return self._cache["explicit_port"] + + @cached_property + def raw_path(self) -> str: + """Encoded path of URL. + + / for absolute URLs without path part. + + """ + return self._path if self._path or not self._netloc else "/" + + @cached_property + def path(self) -> str: + """Decoded path of URL. + + / for absolute URLs without path part. + + """ + return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else "" + + @cached_property + def path_safe(self) -> str: + """Decoded path of URL. + + / for absolute URLs without path part. + + / (%2F) and % (%25) are not decoded + + """ + if self._path: + return PATH_SAFE_UNQUOTER(self._path) + return "/" if self._netloc else "" + + @cached_property + def _parsed_query(self) -> list[tuple[str, str]]: + """Parse query part of URL.""" + return parse_qsl(self._query, keep_blank_values=True) + + @cached_property + def query(self) -> "MultiDictProxy[str]": + """A MultiDictProxy representing parsed query parameters in decoded + representation. + + Empty value if URL has no query part. + + """ + return MultiDictProxy(MultiDict(self._parsed_query)) + + @cached_property + def raw_query_string(self) -> str: + """Encoded query part of URL. + + Empty string if query is missing. + + """ + return self._query + + @cached_property + def query_string(self) -> str: + """Decoded query part of URL. + + Empty string if query is missing. + + """ + return QS_UNQUOTER(self._query) if self._query else "" + + @cached_property + def path_qs(self) -> str: + """Decoded path of URL with query.""" + return self.path if not (q := self.query_string) else f"{self.path}?{q}" + + @cached_property + def raw_path_qs(self) -> str: + """Encoded path of URL with query.""" + if q := self._query: + return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}" + return self._path if self._path or not self._netloc else "/" + + @cached_property + def raw_fragment(self) -> str: + """Encoded fragment part of URL. + + Empty string if fragment is missing. + + """ + return self._fragment + + @cached_property + def fragment(self) -> str: + """Decoded fragment part of URL. + + Empty string if fragment is missing. + + """ + return UNQUOTER(self._fragment) if self._fragment else "" + + @cached_property + def raw_parts(self) -> tuple[str, ...]: + """A tuple containing encoded *path* parts. + + ('/',) for absolute URLs if *path* is missing. + + """ + path = self._path + if self._netloc: + return ("/", *path[1:].split("/")) if path else ("/",) + if path and path[0] == "/": + return ("/", *path[1:].split("/")) + return tuple(path.split("/")) + + @cached_property + def parts(self) -> tuple[str, ...]: + """A tuple containing decoded *path* parts. + + ('/',) for absolute URLs if *path* is missing. + + """ + return tuple(UNQUOTER(part) for part in self.raw_parts) + + @cached_property + def parent(self) -> "URL": + """A new URL with last part of path removed and cleaned up query and + fragment. + + """ + path = self._path + if not path or path == "/": + if self._fragment or self._query: + return from_parts(self._scheme, self._netloc, path, "", "") + return self + parts = path.split("/") + return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "") + + @cached_property + def raw_name(self) -> str: + """The last part of raw_parts.""" + parts = self.raw_parts + if not self._netloc: + return parts[-1] + parts = parts[1:] + return parts[-1] if parts else "" + + @cached_property + def name(self) -> str: + """The last part of parts.""" + return UNQUOTER(self.raw_name) + + @cached_property + def raw_suffix(self) -> str: + name = self.raw_name + i = name.rfind(".") + return name[i:] if 0 < i < len(name) - 1 else "" + + @cached_property + def suffix(self) -> str: + return UNQUOTER(self.raw_suffix) + + @cached_property + def raw_suffixes(self) -> tuple[str, ...]: + name = self.raw_name + if name.endswith("."): + return () + name = name.lstrip(".") + return tuple("." + suffix for suffix in name.split(".")[1:]) + + @cached_property + def suffixes(self) -> tuple[str, ...]: + return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes) + + def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL": + """ + add paths to self._path, accounting for absolute vs relative paths, + keep existing, but do not create new, empty segments + """ + parsed: list[str] = [] + needs_normalize: bool = False + for idx, path in enumerate(reversed(paths)): + # empty segment of last is not removed + last = idx == 0 + if path and path[0] == "/": + raise ValueError( + f"Appending path {path!r} starting from slash is forbidden" + ) + # We need to quote the path if it is not already encoded + # This cannot be done at the end because the existing + # path is already quoted and we do not want to double quote + # the existing path. + path = path if encoded else PATH_QUOTER(path) + needs_normalize |= "." in path + segments = path.split("/") + segments.reverse() + # remove trailing empty segment for all but the last path + parsed += segments[1:] if not last and segments[0] == "" else segments + + if (path := self._path) and (old_segments := path.split("/")): + # If the old path ends with a slash, the last segment is an empty string + # and should be removed before adding the new path segments. + old = old_segments[:-1] if old_segments[-1] == "" else old_segments + old.reverse() + parsed += old + + # If the netloc is present, inject a leading slash when adding a + # path to an absolute URL where there was none before. + if (netloc := self._netloc) and parsed and parsed[-1] != "": + parsed.append("") + + parsed.reverse() + if not netloc or not needs_normalize: + return from_parts(self._scheme, netloc, "/".join(parsed), "", "") + + path = "/".join(normalize_path_segments(parsed)) + # If normalizing the path segments removed the leading slash, add it back. + if path and path[0] != "/": + path = f"/{path}" + return from_parts(self._scheme, netloc, path, "", "") + + def with_scheme(self, scheme: str) -> "URL": + """Return a new URL with scheme replaced.""" + # N.B. doesn't cleanup query/fragment + if not isinstance(scheme, str): + raise TypeError("Invalid scheme type") + lower_scheme = scheme.lower() + netloc = self._netloc + if not netloc and lower_scheme in SCHEME_REQUIRES_HOST: + msg = ( + "scheme replacement is not allowed for " + f"relative URLs for the {lower_scheme} scheme" + ) + raise ValueError(msg) + return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment) + + def with_user(self, user: Union[str, None]) -> "URL": + """Return a new URL with user replaced. + + Autoencode user if needed. + + Clear user/password if user is None. + + """ + # N.B. doesn't cleanup query/fragment + if user is None: + password = None + elif isinstance(user, str): + user = QUOTER(user) + password = self.raw_password + else: + raise TypeError("Invalid user type") + if not (netloc := self._netloc): + raise ValueError("user replacement is not allowed for relative URLs") + encoded_host = self.host_subcomponent or "" + netloc = make_netloc(user, password, encoded_host, self.explicit_port) + return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) + + def with_password(self, password: Union[str, None]) -> "URL": + """Return a new URL with password replaced. + + Autoencode password if needed. + + Clear password if argument is None. + + """ + # N.B. doesn't cleanup query/fragment + if password is None: + pass + elif isinstance(password, str): + password = QUOTER(password) + else: + raise TypeError("Invalid password type") + if not (netloc := self._netloc): + raise ValueError("password replacement is not allowed for relative URLs") + encoded_host = self.host_subcomponent or "" + port = self.explicit_port + netloc = make_netloc(self.raw_user, password, encoded_host, port) + return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) + + def with_host(self, host: str) -> "URL": + """Return a new URL with host replaced. + + Autoencode host if needed. + + Changing host for relative URLs is not allowed, use .join() + instead. + + """ + # N.B. doesn't cleanup query/fragment + if not isinstance(host, str): + raise TypeError("Invalid host type") + if not (netloc := self._netloc): + raise ValueError("host replacement is not allowed for relative URLs") + if not host: + raise ValueError("host removing is not allowed") + encoded_host = _encode_host(host, validate_host=True) if host else "" + port = self.explicit_port + netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) + return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) + + def with_port(self, port: Union[int, None]) -> "URL": + """Return a new URL with port replaced. + + Clear port to default if None is passed. + + """ + # N.B. doesn't cleanup query/fragment + if port is not None: + if isinstance(port, bool) or not isinstance(port, int): + raise TypeError(f"port should be int or None, got {type(port)}") + if not (0 <= port <= 65535): + raise ValueError(f"port must be between 0 and 65535, got {port}") + if not (netloc := self._netloc): + raise ValueError("port replacement is not allowed for relative URLs") + encoded_host = self.host_subcomponent or "" + netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) + return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) + + def with_path( + self, + path: str, + *, + encoded: bool = False, + keep_query: bool = False, + keep_fragment: bool = False, + ) -> "URL": + """Return a new URL with path replaced.""" + netloc = self._netloc + if not encoded: + path = PATH_QUOTER(path) + if netloc: + path = normalize_path(path) if "." in path else path + if path and path[0] != "/": + path = f"/{path}" + query = self._query if keep_query else "" + fragment = self._fragment if keep_fragment else "" + return from_parts(self._scheme, netloc, path, query, fragment) + + @overload + def with_query(self, query: Query) -> "URL": ... + + @overload + def with_query(self, **kwargs: QueryVariable) -> "URL": ... + + def with_query(self, *args: Any, **kwargs: Any) -> "URL": + """Return a new URL with query part replaced. + + Accepts any Mapping (e.g. dict, multidict.MultiDict instances) + or str, autoencode the argument if needed. + + A sequence of (key, value) pairs is supported as well. + + It also can take an arbitrary number of keyword arguments. + + Clear query if None is passed. + + """ + # N.B. doesn't cleanup query/fragment + query = get_str_query(*args, **kwargs) or "" + return from_parts_uncached( + self._scheme, self._netloc, self._path, query, self._fragment + ) + + @overload + def extend_query(self, query: Query) -> "URL": ... + + @overload + def extend_query(self, **kwargs: QueryVariable) -> "URL": ... + + def extend_query(self, *args: Any, **kwargs: Any) -> "URL": + """Return a new URL with query part combined with the existing. + + This method will not remove existing query parameters. + + Example: + >>> url = URL('http://example.com/?a=1&b=2') + >>> url.extend_query(a=3, c=4) + URL('http://example.com/?a=1&b=2&a=3&c=4') + """ + if not (new_query := get_str_query(*args, **kwargs)): + return self + if query := self._query: + # both strings are already encoded so we can use a simple + # string join + query += new_query if query[-1] == "&" else f"&{new_query}" + else: + query = new_query + return from_parts_uncached( + self._scheme, self._netloc, self._path, query, self._fragment + ) + + @overload + def update_query(self, query: Query) -> "URL": ... + + @overload + def update_query(self, **kwargs: QueryVariable) -> "URL": ... + + def update_query(self, *args: Any, **kwargs: Any) -> "URL": + """Return a new URL with query part updated. + + This method will overwrite existing query parameters. + + Example: + >>> url = URL('http://example.com/?a=1&b=2') + >>> url.update_query(a=3, c=4) + URL('http://example.com/?a=3&b=2&c=4') + """ + in_query: Union[str, Mapping[str, QueryVariable], None] + if kwargs: + if args: + msg = "Either kwargs or single query parameter must be present" + raise ValueError(msg) + in_query = kwargs + elif len(args) == 1: + in_query = args[0] + else: + raise ValueError("Either kwargs or single query parameter must be present") + + if in_query is None: + query = "" + elif not in_query: + query = self._query + elif isinstance(in_query, Mapping): + qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query) + qm.update(in_query) + query = get_str_query_from_sequence_iterable(qm.items()) + elif isinstance(in_query, str): + qstr: MultiDict[str] = MultiDict(self._parsed_query) + qstr.update(parse_qsl(in_query, keep_blank_values=True)) + query = get_str_query_from_iterable(qstr.items()) + elif isinstance(in_query, (bytes, bytearray, memoryview)): + msg = "Invalid query type: bytes, bytearray and memoryview are forbidden" + raise TypeError(msg) + elif isinstance(in_query, Sequence): + # We don't expect sequence values if we're given a list of pairs + # already; only mappings like builtin `dict` which can't have the + # same key pointing to multiple values are allowed to use + # `_query_seq_pairs`. + qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query) + qs.update(in_query) + query = get_str_query_from_iterable(qs.items()) + else: + raise TypeError( + "Invalid query type: only str, mapping or " + "sequence of (key, value) pairs is allowed" + ) + return from_parts_uncached( + self._scheme, self._netloc, self._path, query, self._fragment + ) + + def without_query_params(self, *query_params: str) -> "URL": + """Remove some keys from query part and return new URL.""" + params_to_remove = set(query_params) & self.query.keys() + if not params_to_remove: + return self + return self.with_query( + tuple( + (name, value) + for name, value in self.query.items() + if name not in params_to_remove + ) + ) + + def with_fragment(self, fragment: Union[str, None]) -> "URL": + """Return a new URL with fragment replaced. + + Autoencode fragment if needed. + + Clear fragment to default if None is passed. + + """ + # N.B. doesn't cleanup query/fragment + if fragment is None: + raw_fragment = "" + elif not isinstance(fragment, str): + raise TypeError("Invalid fragment type") + else: + raw_fragment = FRAGMENT_QUOTER(fragment) + if self._fragment == raw_fragment: + return self + return from_parts( + self._scheme, self._netloc, self._path, self._query, raw_fragment + ) + + def with_name( + self, + name: str, + *, + keep_query: bool = False, + keep_fragment: bool = False, + ) -> "URL": + """Return a new URL with name (last part of path) replaced. + + Query and fragment parts are cleaned up. + + Name is encoded if needed. + + """ + # N.B. DOES cleanup query/fragment + if not isinstance(name, str): + raise TypeError("Invalid name type") + if "/" in name: + raise ValueError("Slash in name is not allowed") + name = PATH_QUOTER(name) + if name in (".", ".."): + raise ValueError(". and .. values are forbidden") + parts = list(self.raw_parts) + if netloc := self._netloc: + if len(parts) == 1: + parts.append(name) + else: + parts[-1] = name + parts[0] = "" # replace leading '/' + else: + parts[-1] = name + if parts[0] == "/": + parts[0] = "" # replace leading '/' + + query = self._query if keep_query else "" + fragment = self._fragment if keep_fragment else "" + return from_parts(self._scheme, netloc, "/".join(parts), query, fragment) + + def with_suffix( + self, + suffix: str, + *, + keep_query: bool = False, + keep_fragment: bool = False, + ) -> "URL": + """Return a new URL with suffix (file extension of name) replaced. + + Query and fragment parts are cleaned up. + + suffix is encoded if needed. + """ + if not isinstance(suffix, str): + raise TypeError("Invalid suffix type") + if suffix and not suffix[0] == "." or suffix == ".": + raise ValueError(f"Invalid suffix {suffix!r}") + name = self.raw_name + if not name: + raise ValueError(f"{self!r} has an empty name") + old_suffix = self.raw_suffix + name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix + + return self.with_name(name, keep_query=keep_query, keep_fragment=keep_fragment) + + def join(self, url: "URL") -> "URL": + """Join URLs + + Construct a full (“absolute”) URL by combining a “base URL” + (self) with another URL (url). + + Informally, this uses components of the base URL, in + particular the addressing scheme, the network location and + (part of) the path, to provide missing components in the + relative URL. + + """ + if type(url) is not URL: + raise TypeError("url should be URL") + + scheme = url._scheme or self._scheme + if scheme != self._scheme or scheme not in USES_RELATIVE: + return url + + # scheme is in uses_authority as uses_authority is a superset of uses_relative + if (join_netloc := url._netloc) and scheme in USES_AUTHORITY: + return from_parts(scheme, join_netloc, url._path, url._query, url._fragment) + + orig_path = self._path + if join_path := url._path: + if join_path[0] == "/": + path = join_path + elif not orig_path: + path = f"/{join_path}" + elif orig_path[-1] == "/": + path = f"{orig_path}{join_path}" + else: + # … + # and relativizing ".." + # parts[0] is / for absolute urls, + # this join will add a double slash there + path = "/".join([*self.parts[:-1], ""]) + join_path + # which has to be removed + if orig_path[0] == "/": + path = path[1:] + path = normalize_path(path) if "." in path else path + else: + path = orig_path + + return from_parts( + scheme, + self._netloc, + path, + url._query if join_path or url._query else self._query, + url._fragment if join_path or url._fragment else self._fragment, + ) + + def joinpath(self, *other: str, encoded: bool = False) -> "URL": + """Return a new URL with the elements in other appended to the path.""" + return self._make_child(other, encoded=encoded) + + def human_repr(self) -> str: + """Return decoded human readable string for URL representation.""" + user = human_quote(self.user, "#/:?@[]") + password = human_quote(self.password, "#/:?@[]") + if (host := self.host) and ":" in host: + host = f"[{host}]" + path = human_quote(self.path, "#?") + if TYPE_CHECKING: + assert path is not None + query_string = "&".join( + "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;=")) + for k, v in self.query.items() + ) + fragment = human_quote(self.fragment, "") + if TYPE_CHECKING: + assert fragment is not None + netloc = make_netloc(user, password, host, self.explicit_port) + return unsplit_result(self._scheme, netloc, path, query_string, fragment) + + +_DEFAULT_IDNA_SIZE = 256 +_DEFAULT_ENCODE_SIZE = 512 + + +@lru_cache(_DEFAULT_IDNA_SIZE) +def _idna_decode(raw: str) -> str: + try: + return idna.decode(raw.encode("ascii")) + except UnicodeError: # e.g. '::1' + return raw.encode("ascii").decode("idna") + + +@lru_cache(_DEFAULT_IDNA_SIZE) +def _idna_encode(host: str) -> str: + try: + return idna.encode(host, uts46=True).decode("ascii") + except UnicodeError: + return host.encode("idna").decode("ascii") + + +@lru_cache(_DEFAULT_ENCODE_SIZE) +def _encode_host(host: str, validate_host: bool) -> str: + """Encode host part of URL.""" + # If the host ends with a digit or contains a colon, its likely + # an IP address. + if host and (host[-1].isdigit() or ":" in host): + raw_ip, sep, zone = host.partition("%") + # If it looks like an IP, we check with _ip_compressed_version + # and fall-through if its not an IP address. This is a performance + # optimization to avoid parsing IP addresses as much as possible + # because it is orders of magnitude slower than almost any other + # operation this library does. + # Might be an IP address, check it + # + # IP Addresses can look like: + # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 + # - 127.0.0.1 (last character is a digit) + # - 2001:db8::ff00:42:8329 (contains a colon) + # - 2001:db8::ff00:42:8329%eth0 (contains a colon) + # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should + # have been removed before it gets here) + # Rare IP Address formats are not supported per: + # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4 + # + # IP parsing is slow, so its wrapped in an LRU + try: + ip = ip_address(raw_ip) + except ValueError: + pass + else: + # These checks should not happen in the + # LRU to keep the cache size small + host = ip.compressed + if ip.version == 6: + return f"[{host}%{zone}]" if sep else f"[{host}]" + return f"{host}%{zone}" if sep else host + + # IDNA encoding is slow, skip it for ASCII-only strings + if host.isascii(): + # Check for invalid characters explicitly; _idna_encode() does this + # for non-ascii host names. + host = host.lower() + if validate_host and (invalid := NOT_REG_NAME.search(host)): + value, pos, extra = invalid.group(), invalid.start(), "" + if value == "@" or (value == ":" and "@" in host[pos:]): + # this looks like an authority string + extra = ( + ", if the value includes a username or password, " + "use 'authority' instead of 'host'" + ) + raise ValueError( + f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}" + ) from None + return host + + return _idna_encode(host) + + +@rewrite_module +def cache_clear() -> None: + """Clear all LRU caches.""" + _idna_encode.cache_clear() + _idna_decode.cache_clear() + _encode_host.cache_clear() + + +@rewrite_module +def cache_info() -> CacheInfo: + """Report cache statistics.""" + return { + "idna_encode": _idna_encode.cache_info(), + "idna_decode": _idna_decode.cache_info(), + "ip_address": _encode_host.cache_info(), + "host_validate": _encode_host.cache_info(), + "encode_host": _encode_host.cache_info(), + } + + +@rewrite_module +def cache_configure( + *, + idna_encode_size: Union[int, None] = _DEFAULT_IDNA_SIZE, + idna_decode_size: Union[int, None] = _DEFAULT_IDNA_SIZE, + ip_address_size: Union[int, None, UndefinedType] = UNDEFINED, + host_validate_size: Union[int, None, UndefinedType] = UNDEFINED, + encode_host_size: Union[int, None, UndefinedType] = UNDEFINED, +) -> None: + """Configure LRU cache sizes.""" + global _idna_decode, _idna_encode, _encode_host + # ip_address_size, host_validate_size are no longer + # used, but are kept for backwards compatibility. + if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED: + warnings.warn( + "cache_configure() no longer accepts the " + "ip_address_size or host_validate_size arguments, " + "they are used to set the encode_host_size instead " + "and will be removed in the future", + DeprecationWarning, + stacklevel=2, + ) + + if encode_host_size is not None: + for size in (ip_address_size, host_validate_size): + if size is None: + encode_host_size = None + elif encode_host_size is UNDEFINED: + if size is not UNDEFINED: + encode_host_size = size + elif size is not UNDEFINED: + if TYPE_CHECKING: + assert isinstance(size, int) + assert isinstance(encode_host_size, int) + encode_host_size = max(size, encode_host_size) + if encode_host_size is UNDEFINED: + encode_host_size = _DEFAULT_ENCODE_SIZE + + if TYPE_CHECKING: + assert not isinstance(encode_host_size, object) + _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__) + _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__) + _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__) |