about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py')
-rw-r--r--.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py149
1 files changed, 149 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py b/.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py
new file mode 100644
index 00000000..8592ffe0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpx/_transports/wsgi.py
@@ -0,0 +1,149 @@
+from __future__ import annotations
+
+import io
+import itertools
+import sys
+import typing
+
+from .._models import Request, Response
+from .._types import SyncByteStream
+from .base import BaseTransport
+
+if typing.TYPE_CHECKING:
+    from _typeshed import OptExcInfo  # pragma: no cover
+    from _typeshed.wsgi import WSGIApplication  # pragma: no cover
+
+_T = typing.TypeVar("_T")
+
+
+__all__ = ["WSGITransport"]
+
+
+def _skip_leading_empty_chunks(body: typing.Iterable[_T]) -> typing.Iterable[_T]:
+    body = iter(body)
+    for chunk in body:
+        if chunk:
+            return itertools.chain([chunk], body)
+    return []
+
+
+class WSGIByteStream(SyncByteStream):
+    def __init__(self, result: typing.Iterable[bytes]) -> None:
+        self._close = getattr(result, "close", None)
+        self._result = _skip_leading_empty_chunks(result)
+
+    def __iter__(self) -> typing.Iterator[bytes]:
+        for part in self._result:
+            yield part
+
+    def close(self) -> None:
+        if self._close is not None:
+            self._close()
+
+
+class WSGITransport(BaseTransport):
+    """
+    A custom transport that handles sending requests directly to an WSGI app.
+    The simplest way to use this functionality is to use the `app` argument.
+
+    ```
+    client = httpx.Client(app=app)
+    ```
+
+    Alternatively, you can setup the transport instance explicitly.
+    This allows you to include any additional configuration arguments specific
+    to the WSGITransport class:
+
+    ```
+    transport = httpx.WSGITransport(
+        app=app,
+        script_name="/submount",
+        remote_addr="1.2.3.4"
+    )
+    client = httpx.Client(transport=transport)
+    ```
+
+    Arguments:
+
+    * `app` - The WSGI application.
+    * `raise_app_exceptions` - Boolean indicating if exceptions in the application
+       should be raised. Default to `True`. Can be set to `False` for use cases
+       such as testing the content of a client 500 response.
+    * `script_name` - The root path on which the WSGI application should be mounted.
+    * `remote_addr` - A string indicating the client IP of incoming requests.
+    ```
+    """
+
+    def __init__(
+        self,
+        app: WSGIApplication,
+        raise_app_exceptions: bool = True,
+        script_name: str = "",
+        remote_addr: str = "127.0.0.1",
+        wsgi_errors: typing.TextIO | None = None,
+    ) -> None:
+        self.app = app
+        self.raise_app_exceptions = raise_app_exceptions
+        self.script_name = script_name
+        self.remote_addr = remote_addr
+        self.wsgi_errors = wsgi_errors
+
+    def handle_request(self, request: Request) -> Response:
+        request.read()
+        wsgi_input = io.BytesIO(request.content)
+
+        port = request.url.port or {"http": 80, "https": 443}[request.url.scheme]
+        environ = {
+            "wsgi.version": (1, 0),
+            "wsgi.url_scheme": request.url.scheme,
+            "wsgi.input": wsgi_input,
+            "wsgi.errors": self.wsgi_errors or sys.stderr,
+            "wsgi.multithread": True,
+            "wsgi.multiprocess": False,
+            "wsgi.run_once": False,
+            "REQUEST_METHOD": request.method,
+            "SCRIPT_NAME": self.script_name,
+            "PATH_INFO": request.url.path,
+            "QUERY_STRING": request.url.query.decode("ascii"),
+            "SERVER_NAME": request.url.host,
+            "SERVER_PORT": str(port),
+            "SERVER_PROTOCOL": "HTTP/1.1",
+            "REMOTE_ADDR": self.remote_addr,
+        }
+        for header_key, header_value in request.headers.raw:
+            key = header_key.decode("ascii").upper().replace("-", "_")
+            if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
+                key = "HTTP_" + key
+            environ[key] = header_value.decode("ascii")
+
+        seen_status = None
+        seen_response_headers = None
+        seen_exc_info = None
+
+        def start_response(
+            status: str,
+            response_headers: list[tuple[str, str]],
+            exc_info: OptExcInfo | None = None,
+        ) -> typing.Callable[[bytes], typing.Any]:
+            nonlocal seen_status, seen_response_headers, seen_exc_info
+            seen_status = status
+            seen_response_headers = response_headers
+            seen_exc_info = exc_info
+            return lambda _: None
+
+        result = self.app(environ, start_response)
+
+        stream = WSGIByteStream(result)
+
+        assert seen_status is not None
+        assert seen_response_headers is not None
+        if seen_exc_info and seen_exc_info[0] and self.raise_app_exceptions:
+            raise seen_exc_info[1]
+
+        status_code = int(seen_status.split()[0])
+        headers = [
+            (key.encode("ascii"), value.encode("ascii"))
+            for key, value in seen_response_headers
+        ]
+
+        return Response(status_code, headers=headers, stream=stream)