aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/httpx/_auth.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/httpx/_auth.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/httpx/_auth.py')
-rw-r--r--.venv/lib/python3.12/site-packages/httpx/_auth.py348
1 files changed, 348 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/httpx/_auth.py b/.venv/lib/python3.12/site-packages/httpx/_auth.py
new file mode 100644
index 00000000..b03971ab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpx/_auth.py
@@ -0,0 +1,348 @@
+from __future__ import annotations
+
+import hashlib
+import os
+import re
+import time
+import typing
+from base64 import b64encode
+from urllib.request import parse_http_list
+
+from ._exceptions import ProtocolError
+from ._models import Cookies, Request, Response
+from ._utils import to_bytes, to_str, unquote
+
+if typing.TYPE_CHECKING: # pragma: no cover
+ from hashlib import _Hash
+
+
+__all__ = ["Auth", "BasicAuth", "DigestAuth", "NetRCAuth"]
+
+
+class Auth:
+ """
+ Base class for all authentication schemes.
+
+ To implement a custom authentication scheme, subclass `Auth` and override
+ the `.auth_flow()` method.
+
+ If the authentication scheme does I/O such as disk access or network calls, or uses
+ synchronization primitives such as locks, you should override `.sync_auth_flow()`
+ and/or `.async_auth_flow()` instead of `.auth_flow()` to provide specialized
+ implementations that will be used by `Client` and `AsyncClient` respectively.
+ """
+
+ requires_request_body = False
+ requires_response_body = False
+
+ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+ """
+ Execute the authentication flow.
+
+ To dispatch a request, `yield` it:
+
+ ```
+ yield request
+ ```
+
+ The client will `.send()` the response back into the flow generator. You can
+ access it like so:
+
+ ```
+ response = yield request
+ ```
+
+ A `return` (or reaching the end of the generator) will result in the
+ client returning the last response obtained from the server.
+
+ You can dispatch as many requests as is necessary.
+ """
+ yield request
+
+ def sync_auth_flow(
+ self, request: Request
+ ) -> typing.Generator[Request, Response, None]:
+ """
+ Execute the authentication flow synchronously.
+
+ By default, this defers to `.auth_flow()`. You should override this method
+ when the authentication scheme does I/O and/or uses concurrency primitives.
+ """
+ if self.requires_request_body:
+ request.read()
+
+ flow = self.auth_flow(request)
+ request = next(flow)
+
+ while True:
+ response = yield request
+ if self.requires_response_body:
+ response.read()
+
+ try:
+ request = flow.send(response)
+ except StopIteration:
+ break
+
+ async def async_auth_flow(
+ self, request: Request
+ ) -> typing.AsyncGenerator[Request, Response]:
+ """
+ Execute the authentication flow asynchronously.
+
+ By default, this defers to `.auth_flow()`. You should override this method
+ when the authentication scheme does I/O and/or uses concurrency primitives.
+ """
+ if self.requires_request_body:
+ await request.aread()
+
+ flow = self.auth_flow(request)
+ request = next(flow)
+
+ while True:
+ response = yield request
+ if self.requires_response_body:
+ await response.aread()
+
+ try:
+ request = flow.send(response)
+ except StopIteration:
+ break
+
+
+class FunctionAuth(Auth):
+ """
+ Allows the 'auth' argument to be passed as a simple callable function,
+ that takes the request, and returns a new, modified request.
+ """
+
+ def __init__(self, func: typing.Callable[[Request], Request]) -> None:
+ self._func = func
+
+ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+ yield self._func(request)
+
+
+class BasicAuth(Auth):
+ """
+ Allows the 'auth' argument to be passed as a (username, password) pair,
+ and uses HTTP Basic authentication.
+ """
+
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
+ self._auth_header = self._build_auth_header(username, password)
+
+ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+ request.headers["Authorization"] = self._auth_header
+ yield request
+
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
+ userpass = b":".join((to_bytes(username), to_bytes(password)))
+ token = b64encode(userpass).decode()
+ return f"Basic {token}"
+
+
+class NetRCAuth(Auth):
+ """
+ Use a 'netrc' file to lookup basic auth credentials based on the url host.
+ """
+
+ def __init__(self, file: str | None = None) -> None:
+ # Lazily import 'netrc'.
+ # There's no need for us to load this module unless 'NetRCAuth' is being used.
+ import netrc
+
+ self._netrc_info = netrc.netrc(file)
+
+ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+ auth_info = self._netrc_info.authenticators(request.url.host)
+ if auth_info is None or not auth_info[2]:
+ # The netrc file did not have authentication credentials for this host.
+ yield request
+ else:
+ # Build a basic auth header with credentials from the netrc file.
+ request.headers["Authorization"] = self._build_auth_header(
+ username=auth_info[0], password=auth_info[2]
+ )
+ yield request
+
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
+ userpass = b":".join((to_bytes(username), to_bytes(password)))
+ token = b64encode(userpass).decode()
+ return f"Basic {token}"
+
+
+class DigestAuth(Auth):
+ _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
+ "MD5": hashlib.md5,
+ "MD5-SESS": hashlib.md5,
+ "SHA": hashlib.sha1,
+ "SHA-SESS": hashlib.sha1,
+ "SHA-256": hashlib.sha256,
+ "SHA-256-SESS": hashlib.sha256,
+ "SHA-512": hashlib.sha512,
+ "SHA-512-SESS": hashlib.sha512,
+ }
+
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
+ self._username = to_bytes(username)
+ self._password = to_bytes(password)
+ self._last_challenge: _DigestAuthChallenge | None = None
+ self._nonce_count = 1
+
+ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+ if self._last_challenge:
+ request.headers["Authorization"] = self._build_auth_header(
+ request, self._last_challenge
+ )
+
+ response = yield request
+
+ if response.status_code != 401 or "www-authenticate" not in response.headers:
+ # If the response is not a 401 then we don't
+ # need to build an authenticated request.
+ return
+
+ for auth_header in response.headers.get_list("www-authenticate"):
+ if auth_header.lower().startswith("digest "):
+ break
+ else:
+ # If the response does not include a 'WWW-Authenticate: Digest ...'
+ # header, then we don't need to build an authenticated request.
+ return
+
+ self._last_challenge = self._parse_challenge(request, response, auth_header)
+ self._nonce_count = 1
+
+ request.headers["Authorization"] = self._build_auth_header(
+ request, self._last_challenge
+ )
+ if response.cookies:
+ Cookies(response.cookies).set_cookie_header(request=request)
+ yield request
+
+ def _parse_challenge(
+ self, request: Request, response: Response, auth_header: str
+ ) -> _DigestAuthChallenge:
+ """
+ Returns a challenge from a Digest WWW-Authenticate header.
+ These take the form of:
+ `Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
+ """
+ scheme, _, fields = auth_header.partition(" ")
+
+ # This method should only ever have been called with a Digest auth header.
+ assert scheme.lower() == "digest"
+
+ header_dict: dict[str, str] = {}
+ for field in parse_http_list(fields):
+ key, value = field.strip().split("=", 1)
+ header_dict[key] = unquote(value)
+
+ try:
+ realm = header_dict["realm"].encode()
+ nonce = header_dict["nonce"].encode()
+ algorithm = header_dict.get("algorithm", "MD5")
+ opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
+ qop = header_dict["qop"].encode() if "qop" in header_dict else None
+ return _DigestAuthChallenge(
+ realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
+ )
+ except KeyError as exc:
+ message = "Malformed Digest WWW-Authenticate header"
+ raise ProtocolError(message, request=request) from exc
+
+ def _build_auth_header(
+ self, request: Request, challenge: _DigestAuthChallenge
+ ) -> str:
+ hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
+
+ def digest(data: bytes) -> bytes:
+ return hash_func(data).hexdigest().encode()
+
+ A1 = b":".join((self._username, challenge.realm, self._password))
+
+ path = request.url.raw_path
+ A2 = b":".join((request.method.encode(), path))
+ # TODO: implement auth-int
+ HA2 = digest(A2)
+
+ nc_value = b"%08x" % self._nonce_count
+ cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
+ self._nonce_count += 1
+
+ HA1 = digest(A1)
+ if challenge.algorithm.lower().endswith("-sess"):
+ HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
+
+ qop = self._resolve_qop(challenge.qop, request=request)
+ if qop is None:
+ # Following RFC 2069
+ digest_data = [HA1, challenge.nonce, HA2]
+ else:
+ # Following RFC 2617/7616
+ digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]
+
+ format_args = {
+ "username": self._username,
+ "realm": challenge.realm,
+ "nonce": challenge.nonce,
+ "uri": path,
+ "response": digest(b":".join(digest_data)),
+ "algorithm": challenge.algorithm.encode(),
+ }
+ if challenge.opaque:
+ format_args["opaque"] = challenge.opaque
+ if qop:
+ format_args["qop"] = b"auth"
+ format_args["nc"] = nc_value
+ format_args["cnonce"] = cnonce
+
+ return "Digest " + self._get_header_value(format_args)
+
+ def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
+ s = str(nonce_count).encode()
+ s += nonce
+ s += time.ctime().encode()
+ s += os.urandom(8)
+
+ return hashlib.sha1(s).hexdigest()[:16].encode()
+
+ def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
+ NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
+ QUOTED_TEMPLATE = '{}="{}"'
+ NON_QUOTED_TEMPLATE = "{}={}"
+
+ header_value = ""
+ for i, (field, value) in enumerate(header_fields.items()):
+ if i > 0:
+ header_value += ", "
+ template = (
+ QUOTED_TEMPLATE
+ if field not in NON_QUOTED_FIELDS
+ else NON_QUOTED_TEMPLATE
+ )
+ header_value += template.format(field, to_str(value))
+
+ return header_value
+
+ def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
+ if qop is None:
+ return None
+ qops = re.split(b", ?", qop)
+ if b"auth" in qops:
+ return b"auth"
+
+ if qops == [b"auth-int"]:
+ raise NotImplementedError("Digest auth-int support is not yet implemented")
+
+ message = f'Unexpected qop value "{qop!r}" in digest auth'
+ raise ProtocolError(message, request=request)
+
+
+class _DigestAuthChallenge(typing.NamedTuple):
+ realm: bytes
+ nonce: bytes
+ algorithm: str
+ opaque: bytes | None
+ qop: bytes | None