about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/httpx/_transports/default.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/httpx/_transports/default.py')
-rw-r--r--.venv/lib/python3.12/site-packages/httpx/_transports/default.py389
1 files changed, 389 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/httpx/_transports/default.py b/.venv/lib/python3.12/site-packages/httpx/_transports/default.py
new file mode 100644
index 00000000..33db416d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpx/_transports/default.py
@@ -0,0 +1,389 @@
+"""
+Custom transports, with nicely configured defaults.
+
+The following additional keyword arguments are currently supported by httpcore...
+
+* uds: str
+* local_address: str
+* retries: int
+
+Example usages...
+
+# Disable HTTP/2 on a single specific domain.
+mounts = {
+    "all://": httpx.HTTPTransport(http2=True),
+    "all://*example.org": httpx.HTTPTransport()
+}
+
+# Using advanced httpcore configuration, with connection retries.
+transport = httpx.HTTPTransport(retries=1)
+client = httpx.Client(transport=transport)
+
+# Using advanced httpcore configuration, with unix domain sockets.
+transport = httpx.HTTPTransport(uds="socket.uds")
+client = httpx.Client(transport=transport)
+"""
+
+from __future__ import annotations
+
+import contextlib
+import typing
+from types import TracebackType
+
+import httpcore
+
+from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
+from .._exceptions import (
+    ConnectError,
+    ConnectTimeout,
+    LocalProtocolError,
+    NetworkError,
+    PoolTimeout,
+    ProtocolError,
+    ProxyError,
+    ReadError,
+    ReadTimeout,
+    RemoteProtocolError,
+    TimeoutException,
+    UnsupportedProtocol,
+    WriteError,
+    WriteTimeout,
+)
+from .._models import Request, Response
+from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes
+from .._urls import URL
+from .base import AsyncBaseTransport, BaseTransport
+
+T = typing.TypeVar("T", bound="HTTPTransport")
+A = typing.TypeVar("A", bound="AsyncHTTPTransport")
+
+SOCKET_OPTION = typing.Union[
+    typing.Tuple[int, int, int],
+    typing.Tuple[int, int, typing.Union[bytes, bytearray]],
+    typing.Tuple[int, int, None, int],
+]
+
+__all__ = ["AsyncHTTPTransport", "HTTPTransport"]
+
+
+@contextlib.contextmanager
+def map_httpcore_exceptions() -> typing.Iterator[None]:
+    try:
+        yield
+    except Exception as exc:
+        mapped_exc = None
+
+        for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
+            if not isinstance(exc, from_exc):
+                continue
+            # We want to map to the most specific exception we can find.
+            # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
+            # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
+            if mapped_exc is None or issubclass(to_exc, mapped_exc):
+                mapped_exc = to_exc
+
+        if mapped_exc is None:  # pragma: no cover
+            raise
+
+        message = str(exc)
+        raise mapped_exc(message) from exc
+
+
+HTTPCORE_EXC_MAP = {
+    httpcore.TimeoutException: TimeoutException,
+    httpcore.ConnectTimeout: ConnectTimeout,
+    httpcore.ReadTimeout: ReadTimeout,
+    httpcore.WriteTimeout: WriteTimeout,
+    httpcore.PoolTimeout: PoolTimeout,
+    httpcore.NetworkError: NetworkError,
+    httpcore.ConnectError: ConnectError,
+    httpcore.ReadError: ReadError,
+    httpcore.WriteError: WriteError,
+    httpcore.ProxyError: ProxyError,
+    httpcore.UnsupportedProtocol: UnsupportedProtocol,
+    httpcore.ProtocolError: ProtocolError,
+    httpcore.LocalProtocolError: LocalProtocolError,
+    httpcore.RemoteProtocolError: RemoteProtocolError,
+}
+
+
+class ResponseStream(SyncByteStream):
+    def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None:
+        self._httpcore_stream = httpcore_stream
+
+    def __iter__(self) -> typing.Iterator[bytes]:
+        with map_httpcore_exceptions():
+            for part in self._httpcore_stream:
+                yield part
+
+    def close(self) -> None:
+        if hasattr(self._httpcore_stream, "close"):
+            self._httpcore_stream.close()
+
+
+class HTTPTransport(BaseTransport):
+    def __init__(
+        self,
+        verify: VerifyTypes = True,
+        cert: CertTypes | None = None,
+        http1: bool = True,
+        http2: bool = False,
+        limits: Limits = DEFAULT_LIMITS,
+        trust_env: bool = True,
+        proxy: ProxyTypes | None = None,
+        uds: str | None = None,
+        local_address: str | None = None,
+        retries: int = 0,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> None:
+        ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
+        proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
+
+        if proxy is None:
+            self._pool = httpcore.ConnectionPool(
+                ssl_context=ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+                uds=uds,
+                local_address=local_address,
+                retries=retries,
+                socket_options=socket_options,
+            )
+        elif proxy.url.scheme in ("http", "https"):
+            self._pool = httpcore.HTTPProxy(
+                proxy_url=httpcore.URL(
+                    scheme=proxy.url.raw_scheme,
+                    host=proxy.url.raw_host,
+                    port=proxy.url.port,
+                    target=proxy.url.raw_path,
+                ),
+                proxy_auth=proxy.raw_auth,
+                proxy_headers=proxy.headers.raw,
+                ssl_context=ssl_context,
+                proxy_ssl_context=proxy.ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+                socket_options=socket_options,
+            )
+        elif proxy.url.scheme == "socks5":
+            try:
+                import socksio  # noqa
+            except ImportError:  # pragma: no cover
+                raise ImportError(
+                    "Using SOCKS proxy, but the 'socksio' package is not installed. "
+                    "Make sure to install httpx using `pip install httpx[socks]`."
+                ) from None
+
+            self._pool = httpcore.SOCKSProxy(
+                proxy_url=httpcore.URL(
+                    scheme=proxy.url.raw_scheme,
+                    host=proxy.url.raw_host,
+                    port=proxy.url.port,
+                    target=proxy.url.raw_path,
+                ),
+                proxy_auth=proxy.raw_auth,
+                ssl_context=ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+            )
+        else:  # pragma: no cover
+            raise ValueError(
+                "Proxy protocol must be either 'http', 'https', or 'socks5',"
+                f" but got {proxy.url.scheme!r}."
+            )
+
+    def __enter__(self: T) -> T:  # Use generics for subclass support.
+        self._pool.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: type[BaseException] | None = None,
+        exc_value: BaseException | None = None,
+        traceback: TracebackType | None = None,
+    ) -> None:
+        with map_httpcore_exceptions():
+            self._pool.__exit__(exc_type, exc_value, traceback)
+
+    def handle_request(
+        self,
+        request: Request,
+    ) -> Response:
+        assert isinstance(request.stream, SyncByteStream)
+
+        req = httpcore.Request(
+            method=request.method,
+            url=httpcore.URL(
+                scheme=request.url.raw_scheme,
+                host=request.url.raw_host,
+                port=request.url.port,
+                target=request.url.raw_path,
+            ),
+            headers=request.headers.raw,
+            content=request.stream,
+            extensions=request.extensions,
+        )
+        with map_httpcore_exceptions():
+            resp = self._pool.handle_request(req)
+
+        assert isinstance(resp.stream, typing.Iterable)
+
+        return Response(
+            status_code=resp.status,
+            headers=resp.headers,
+            stream=ResponseStream(resp.stream),
+            extensions=resp.extensions,
+        )
+
+    def close(self) -> None:
+        self._pool.close()
+
+
+class AsyncResponseStream(AsyncByteStream):
+    def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None:
+        self._httpcore_stream = httpcore_stream
+
+    async def __aiter__(self) -> typing.AsyncIterator[bytes]:
+        with map_httpcore_exceptions():
+            async for part in self._httpcore_stream:
+                yield part
+
+    async def aclose(self) -> None:
+        if hasattr(self._httpcore_stream, "aclose"):
+            await self._httpcore_stream.aclose()
+
+
+class AsyncHTTPTransport(AsyncBaseTransport):
+    def __init__(
+        self,
+        verify: VerifyTypes = True,
+        cert: CertTypes | None = None,
+        http1: bool = True,
+        http2: bool = False,
+        limits: Limits = DEFAULT_LIMITS,
+        trust_env: bool = True,
+        proxy: ProxyTypes | None = None,
+        uds: str | None = None,
+        local_address: str | None = None,
+        retries: int = 0,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> None:
+        ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
+        proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
+
+        if proxy is None:
+            self._pool = httpcore.AsyncConnectionPool(
+                ssl_context=ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+                uds=uds,
+                local_address=local_address,
+                retries=retries,
+                socket_options=socket_options,
+            )
+        elif proxy.url.scheme in ("http", "https"):
+            self._pool = httpcore.AsyncHTTPProxy(
+                proxy_url=httpcore.URL(
+                    scheme=proxy.url.raw_scheme,
+                    host=proxy.url.raw_host,
+                    port=proxy.url.port,
+                    target=proxy.url.raw_path,
+                ),
+                proxy_auth=proxy.raw_auth,
+                proxy_headers=proxy.headers.raw,
+                proxy_ssl_context=proxy.ssl_context,
+                ssl_context=ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+                socket_options=socket_options,
+            )
+        elif proxy.url.scheme == "socks5":
+            try:
+                import socksio  # noqa
+            except ImportError:  # pragma: no cover
+                raise ImportError(
+                    "Using SOCKS proxy, but the 'socksio' package is not installed. "
+                    "Make sure to install httpx using `pip install httpx[socks]`."
+                ) from None
+
+            self._pool = httpcore.AsyncSOCKSProxy(
+                proxy_url=httpcore.URL(
+                    scheme=proxy.url.raw_scheme,
+                    host=proxy.url.raw_host,
+                    port=proxy.url.port,
+                    target=proxy.url.raw_path,
+                ),
+                proxy_auth=proxy.raw_auth,
+                ssl_context=ssl_context,
+                max_connections=limits.max_connections,
+                max_keepalive_connections=limits.max_keepalive_connections,
+                keepalive_expiry=limits.keepalive_expiry,
+                http1=http1,
+                http2=http2,
+            )
+        else:  # pragma: no cover
+            raise ValueError(
+                "Proxy protocol must be either 'http', 'https', or 'socks5',"
+                " but got {proxy.url.scheme!r}."
+            )
+
+    async def __aenter__(self: A) -> A:  # Use generics for subclass support.
+        await self._pool.__aenter__()
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None = None,
+        exc_value: BaseException | None = None,
+        traceback: TracebackType | None = None,
+    ) -> None:
+        with map_httpcore_exceptions():
+            await self._pool.__aexit__(exc_type, exc_value, traceback)
+
+    async def handle_async_request(
+        self,
+        request: Request,
+    ) -> Response:
+        assert isinstance(request.stream, AsyncByteStream)
+
+        req = httpcore.Request(
+            method=request.method,
+            url=httpcore.URL(
+                scheme=request.url.raw_scheme,
+                host=request.url.raw_host,
+                port=request.url.port,
+                target=request.url.raw_path,
+            ),
+            headers=request.headers.raw,
+            content=request.stream,
+            extensions=request.extensions,
+        )
+        with map_httpcore_exceptions():
+            resp = await self._pool.handle_async_request(req)
+
+        assert isinstance(resp.stream, typing.AsyncIterable)
+
+        return Response(
+            status_code=resp.status,
+            headers=resp.headers,
+            stream=AsyncResponseStream(resp.stream),
+            extensions=resp.extensions,
+        )
+
+    async def aclose(self) -> None:
+        await self._pool.aclose()