about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py')
-rw-r--r--.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py708
1 files changed, 708 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py b/.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py
new file mode 100644
index 00000000..a514306e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/urllib3/contrib/emscripten/fetch.py
@@ -0,0 +1,708 @@
+"""
+Support for streaming http requests in emscripten.
+
+A few caveats -
+
+If your browser (or Node.js) has WebAssembly JavaScript Promise Integration enabled
+https://github.com/WebAssembly/js-promise-integration/blob/main/proposals/js-promise-integration/Overview.md
+*and* you launch pyodide using `pyodide.runPythonAsync`, this will fetch data using the
+JavaScript asynchronous fetch api (wrapped via `pyodide.ffi.call_sync`). In this case
+timeouts and streaming should just work.
+
+Otherwise, it uses a combination of XMLHttpRequest and a web-worker for streaming.
+
+This approach has several caveats:
+
+Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
+Streaming only works if you're running pyodide in a web worker.
+
+Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
+operation, so it requires that you have crossOriginIsolation enabled, by serving over https
+(or from localhost) with the two headers below set:
+
+    Cross-Origin-Opener-Policy: same-origin
+    Cross-Origin-Embedder-Policy: require-corp
+
+You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
+JavaScript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
+request into a buffer and then returning it. it shows a warning in the JavaScript console in this case.
+
+Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
+control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
+
+NB: in this code, there are a lot of JavaScript objects. They are named js_*
+to make it clear what type of object they are.
+"""
+
+from __future__ import annotations
+
+import io
+import json
+from email.parser import Parser
+from importlib.resources import files
+from typing import TYPE_CHECKING, Any
+
+import js  # type: ignore[import-not-found]
+from pyodide.ffi import (  # type: ignore[import-not-found]
+    JsArray,
+    JsException,
+    JsProxy,
+    to_js,
+)
+
+if TYPE_CHECKING:
+    from typing_extensions import Buffer
+
+from .request import EmscriptenRequest
+from .response import EmscriptenResponse
+
+"""
+There are some headers that trigger unintended CORS preflight requests.
+See also https://github.com/koenvo/pyodide-http/issues/22
+"""
+HEADERS_TO_IGNORE = ("user-agent",)
+
+SUCCESS_HEADER = -1
+SUCCESS_EOF = -2
+ERROR_TIMEOUT = -3
+ERROR_EXCEPTION = -4
+
+_STREAMING_WORKER_CODE = (
+    files(__package__)
+    .joinpath("emscripten_fetch_worker.js")
+    .read_text(encoding="utf-8")
+)
+
+
+class _RequestError(Exception):
+    def __init__(
+        self,
+        message: str | None = None,
+        *,
+        request: EmscriptenRequest | None = None,
+        response: EmscriptenResponse | None = None,
+    ):
+        self.request = request
+        self.response = response
+        self.message = message
+        super().__init__(self.message)
+
+
+class _StreamingError(_RequestError):
+    pass
+
+
+class _TimeoutError(_RequestError):
+    pass
+
+
+def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
+    return to_js(dict_val, dict_converter=js.Object.fromEntries)
+
+
+class _ReadStream(io.RawIOBase):
+    def __init__(
+        self,
+        int_buffer: JsArray,
+        byte_buffer: JsArray,
+        timeout: float,
+        worker: JsProxy,
+        connection_id: int,
+        request: EmscriptenRequest,
+    ):
+        self.int_buffer = int_buffer
+        self.byte_buffer = byte_buffer
+        self.read_pos = 0
+        self.read_len = 0
+        self.connection_id = connection_id
+        self.worker = worker
+        self.timeout = int(1000 * timeout) if timeout > 0 else None
+        self.is_live = True
+        self._is_closed = False
+        self.request: EmscriptenRequest | None = request
+
+    def __del__(self) -> None:
+        self.close()
+
+    # this is compatible with _base_connection
+    def is_closed(self) -> bool:
+        return self._is_closed
+
+    # for compatibility with RawIOBase
+    @property
+    def closed(self) -> bool:
+        return self.is_closed()
+
+    def close(self) -> None:
+        if self.is_closed():
+            return
+        self.read_len = 0
+        self.read_pos = 0
+        self.int_buffer = None
+        self.byte_buffer = None
+        self._is_closed = True
+        self.request = None
+        if self.is_live:
+            self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
+            self.is_live = False
+        super().close()
+
+    def readable(self) -> bool:
+        return True
+
+    def writable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return False
+
+    def readinto(self, byte_obj: Buffer) -> int:
+        if not self.int_buffer:
+            raise _StreamingError(
+                "No buffer for stream in _ReadStream.readinto",
+                request=self.request,
+                response=None,
+            )
+        if self.read_len == 0:
+            # wait for the worker to send something
+            js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
+            self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
+            if (
+                js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
+                == "timed-out"
+            ):
+                raise _TimeoutError
+            data_len = self.int_buffer[0]
+            if data_len > 0:
+                self.read_len = data_len
+                self.read_pos = 0
+            elif data_len == ERROR_EXCEPTION:
+                string_len = self.int_buffer[1]
+                # decode the error string
+                js_decoder = js.TextDecoder.new()
+                json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
+                raise _StreamingError(
+                    f"Exception thrown in fetch: {json_str}",
+                    request=self.request,
+                    response=None,
+                )
+            else:
+                # EOF, free the buffers and return zero
+                # and free the request
+                self.is_live = False
+                self.close()
+                return 0
+        # copy from int32array to python bytes
+        ret_length = min(self.read_len, len(memoryview(byte_obj)))
+        subarray = self.byte_buffer.subarray(
+            self.read_pos, self.read_pos + ret_length
+        ).to_py()
+        memoryview(byte_obj)[0:ret_length] = subarray
+        self.read_len -= ret_length
+        self.read_pos += ret_length
+        return ret_length
+
+
+class _StreamingFetcher:
+    def __init__(self) -> None:
+        # make web-worker and data buffer on startup
+        self.streaming_ready = False
+
+        js_data_blob = js.Blob.new(
+            to_js([_STREAMING_WORKER_CODE], create_pyproxies=False),
+            _obj_from_dict({"type": "application/javascript"}),
+        )
+
+        def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
+            def onMsg(e: JsProxy) -> None:
+                self.streaming_ready = True
+                js_resolve_fn(e)
+
+            def onErr(e: JsProxy) -> None:
+                js_reject_fn(e)  # Defensive: never happens in ci
+
+            self.js_worker.onmessage = onMsg
+            self.js_worker.onerror = onErr
+
+        js_data_url = js.URL.createObjectURL(js_data_blob)
+        self.js_worker = js.globalThis.Worker.new(js_data_url)
+        self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
+
+    def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
+        headers = {
+            k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
+        }
+
+        body = request.body
+        fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
+        # start the request off in the worker
+        timeout = int(1000 * request.timeout) if request.timeout > 0 else None
+        js_shared_buffer = js.SharedArrayBuffer.new(1048576)
+        js_int_buffer = js.Int32Array.new(js_shared_buffer)
+        js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
+
+        js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
+        js.Atomics.notify(js_int_buffer, 0)
+        js_absolute_url = js.URL.new(request.url, js.location).href
+        self.js_worker.postMessage(
+            _obj_from_dict(
+                {
+                    "buffer": js_shared_buffer,
+                    "url": js_absolute_url,
+                    "fetchParams": fetch_data,
+                }
+            )
+        )
+        # wait for the worker to send something
+        js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
+        if js_int_buffer[0] == ERROR_TIMEOUT:
+            raise _TimeoutError(
+                "Timeout connecting to streaming request",
+                request=request,
+                response=None,
+            )
+        elif js_int_buffer[0] == SUCCESS_HEADER:
+            # got response
+            # header length is in second int of intBuffer
+            string_len = js_int_buffer[1]
+            # decode the rest to a JSON string
+            js_decoder = js.TextDecoder.new()
+            # this does a copy (the slice) because decode can't work on shared array
+            # for some silly reason
+            json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
+            # get it as an object
+            response_obj = json.loads(json_str)
+            return EmscriptenResponse(
+                request=request,
+                status_code=response_obj["status"],
+                headers=response_obj["headers"],
+                body=_ReadStream(
+                    js_int_buffer,
+                    js_byte_buffer,
+                    request.timeout,
+                    self.js_worker,
+                    response_obj["connectionID"],
+                    request,
+                ),
+            )
+        elif js_int_buffer[0] == ERROR_EXCEPTION:
+            string_len = js_int_buffer[1]
+            # decode the error string
+            js_decoder = js.TextDecoder.new()
+            json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
+            raise _StreamingError(
+                f"Exception thrown in fetch: {json_str}", request=request, response=None
+            )
+        else:
+            raise _StreamingError(
+                f"Unknown status from worker in fetch: {js_int_buffer[0]}",
+                request=request,
+                response=None,
+            )
+
+
+class _JSPIReadStream(io.RawIOBase):
+    """
+    A read stream that uses pyodide.ffi.run_sync to read from a JavaScript fetch
+    response. This requires support for WebAssembly JavaScript Promise Integration
+    in the containing browser, and for pyodide to be launched via runPythonAsync.
+
+    :param js_read_stream:
+        The JavaScript stream reader
+
+    :param timeout:
+        Timeout in seconds
+
+    :param request:
+        The request we're handling
+
+    :param response:
+        The response this stream relates to
+
+    :param js_abort_controller:
+        A JavaScript AbortController object, used for timeouts
+    """
+
+    def __init__(
+        self,
+        js_read_stream: Any,
+        timeout: float,
+        request: EmscriptenRequest,
+        response: EmscriptenResponse,
+        js_abort_controller: Any,  # JavaScript AbortController for timeouts
+    ):
+        self.js_read_stream = js_read_stream
+        self.timeout = timeout
+        self._is_closed = False
+        self._is_done = False
+        self.request: EmscriptenRequest | None = request
+        self.response: EmscriptenResponse | None = response
+        self.current_buffer = None
+        self.current_buffer_pos = 0
+        self.js_abort_controller = js_abort_controller
+
+    def __del__(self) -> None:
+        self.close()
+
+    # this is compatible with _base_connection
+    def is_closed(self) -> bool:
+        return self._is_closed
+
+    # for compatibility with RawIOBase
+    @property
+    def closed(self) -> bool:
+        return self.is_closed()
+
+    def close(self) -> None:
+        if self.is_closed():
+            return
+        self.read_len = 0
+        self.read_pos = 0
+        self.js_read_stream.cancel()
+        self.js_read_stream = None
+        self._is_closed = True
+        self._is_done = True
+        self.request = None
+        self.response = None
+        super().close()
+
+    def readable(self) -> bool:
+        return True
+
+    def writable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return False
+
+    def _get_next_buffer(self) -> bool:
+        result_js = _run_sync_with_timeout(
+            self.js_read_stream.read(),
+            self.timeout,
+            self.js_abort_controller,
+            request=self.request,
+            response=self.response,
+        )
+        if result_js.done:
+            self._is_done = True
+            return False
+        else:
+            self.current_buffer = result_js.value.to_py()
+            self.current_buffer_pos = 0
+            return True
+
+    def readinto(self, byte_obj: Buffer) -> int:
+        if self.current_buffer is None:
+            if not self._get_next_buffer() or self.current_buffer is None:
+                self.close()
+                return 0
+        ret_length = min(
+            len(byte_obj), len(self.current_buffer) - self.current_buffer_pos
+        )
+        byte_obj[0:ret_length] = self.current_buffer[
+            self.current_buffer_pos : self.current_buffer_pos + ret_length
+        ]
+        self.current_buffer_pos += ret_length
+        if self.current_buffer_pos == len(self.current_buffer):
+            self.current_buffer = None
+        return ret_length
+
+
+# check if we are in a worker or not
+def is_in_browser_main_thread() -> bool:
+    return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
+
+
+def is_cross_origin_isolated() -> bool:
+    return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
+
+
+def is_in_node() -> bool:
+    return (
+        hasattr(js, "process")
+        and hasattr(js.process, "release")
+        and hasattr(js.process.release, "name")
+        and js.process.release.name == "node"
+    )
+
+
+def is_worker_available() -> bool:
+    return hasattr(js, "Worker") and hasattr(js, "Blob")
+
+
+_fetcher: _StreamingFetcher | None = None
+
+if is_worker_available() and (
+    (is_cross_origin_isolated() and not is_in_browser_main_thread())
+    and (not is_in_node())
+):
+    _fetcher = _StreamingFetcher()
+else:
+    _fetcher = None
+
+
+NODE_JSPI_ERROR = (
+    "urllib3 only works in Node.js with pyodide.runPythonAsync"
+    " and requires the flag --experimental-wasm-stack-switching in "
+    " versions of node <24."
+)
+
+
+def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
+    if has_jspi():
+        return send_jspi_request(request, True)
+    elif is_in_node():
+        raise _RequestError(
+            message=NODE_JSPI_ERROR,
+            request=request,
+            response=None,
+        )
+
+    if _fetcher and streaming_ready():
+        return _fetcher.send(request)
+    else:
+        _show_streaming_warning()
+        return None
+
+
+_SHOWN_TIMEOUT_WARNING = False
+
+
+def _show_timeout_warning() -> None:
+    global _SHOWN_TIMEOUT_WARNING
+    if not _SHOWN_TIMEOUT_WARNING:
+        _SHOWN_TIMEOUT_WARNING = True
+        message = "Warning: Timeout is not available on main browser thread"
+        js.console.warn(message)
+
+
+_SHOWN_STREAMING_WARNING = False
+
+
+def _show_streaming_warning() -> None:
+    global _SHOWN_STREAMING_WARNING
+    if not _SHOWN_STREAMING_WARNING:
+        _SHOWN_STREAMING_WARNING = True
+        message = "Can't stream HTTP requests because: \n"
+        if not is_cross_origin_isolated():
+            message += "  Page is not cross-origin isolated\n"
+        if is_in_browser_main_thread():
+            message += "  Python is running in main browser thread\n"
+        if not is_worker_available():
+            message += " Worker or Blob classes are not available in this environment."  # Defensive: this is always False in browsers that we test in
+        if streaming_ready() is False:
+            message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
+is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
+        from js import console
+
+        console.warn(message)
+
+
+def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
+    if has_jspi():
+        return send_jspi_request(request, False)
+    elif is_in_node():
+        raise _RequestError(
+            message=NODE_JSPI_ERROR,
+            request=request,
+            response=None,
+        )
+    try:
+        js_xhr = js.XMLHttpRequest.new()
+
+        if not is_in_browser_main_thread():
+            js_xhr.responseType = "arraybuffer"
+            if request.timeout:
+                js_xhr.timeout = int(request.timeout * 1000)
+        else:
+            js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
+            if request.timeout:
+                # timeout isn't available on the main thread - show a warning in console
+                # if it is set
+                _show_timeout_warning()
+
+        js_xhr.open(request.method, request.url, False)
+        for name, value in request.headers.items():
+            if name.lower() not in HEADERS_TO_IGNORE:
+                js_xhr.setRequestHeader(name, value)
+
+        js_xhr.send(to_js(request.body))
+
+        headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
+
+        if not is_in_browser_main_thread():
+            body = js_xhr.response.to_py().tobytes()
+        else:
+            body = js_xhr.response.encode("ISO-8859-15")
+        return EmscriptenResponse(
+            status_code=js_xhr.status, headers=headers, body=body, request=request
+        )
+    except JsException as err:
+        if err.name == "TimeoutError":
+            raise _TimeoutError(err.message, request=request)
+        elif err.name == "NetworkError":
+            raise _RequestError(err.message, request=request)
+        else:
+            # general http error
+            raise _RequestError(err.message, request=request)
+
+
+def send_jspi_request(
+    request: EmscriptenRequest, streaming: bool
+) -> EmscriptenResponse:
+    """
+    Send a request using WebAssembly JavaScript Promise Integration
+    to wrap the asynchronous JavaScript fetch api (experimental).
+
+    :param request:
+        Request to send
+
+    :param streaming:
+        Whether to stream the response
+
+    :return: The response object
+    :rtype: EmscriptenResponse
+    """
+    timeout = request.timeout
+    js_abort_controller = js.AbortController.new()
+    headers = {k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE}
+    req_body = request.body
+    fetch_data = {
+        "headers": headers,
+        "body": to_js(req_body),
+        "method": request.method,
+        "signal": js_abort_controller.signal,
+    }
+    # Call JavaScript fetch (async api, returns a promise)
+    fetcher_promise_js = js.fetch(request.url, _obj_from_dict(fetch_data))
+    # Now suspend WebAssembly until we resolve that promise
+    # or time out.
+    response_js = _run_sync_with_timeout(
+        fetcher_promise_js,
+        timeout,
+        js_abort_controller,
+        request=request,
+        response=None,
+    )
+    headers = {}
+    header_iter = response_js.headers.entries()
+    while True:
+        iter_value_js = header_iter.next()
+        if getattr(iter_value_js, "done", False):
+            break
+        else:
+            headers[str(iter_value_js.value[0])] = str(iter_value_js.value[1])
+    status_code = response_js.status
+    body: bytes | io.RawIOBase = b""
+
+    response = EmscriptenResponse(
+        status_code=status_code, headers=headers, body=b"", request=request
+    )
+    if streaming:
+        # get via inputstream
+        if response_js.body is not None:
+            # get a reader from the fetch response
+            body_stream_js = response_js.body.getReader()
+            body = _JSPIReadStream(
+                body_stream_js, timeout, request, response, js_abort_controller
+            )
+    else:
+        # get directly via arraybuffer
+        # n.b. this is another async JavaScript call.
+        body = _run_sync_with_timeout(
+            response_js.arrayBuffer(),
+            timeout,
+            js_abort_controller,
+            request=request,
+            response=response,
+        ).to_py()
+    response.body = body
+    return response
+
+
+def _run_sync_with_timeout(
+    promise: Any,
+    timeout: float,
+    js_abort_controller: Any,
+    request: EmscriptenRequest | None,
+    response: EmscriptenResponse | None,
+) -> Any:
+    """
+    Await a JavaScript promise synchronously with a timeout which is implemented
+    via the AbortController
+
+    :param promise:
+        Javascript promise to await
+
+    :param timeout:
+        Timeout in seconds
+
+    :param js_abort_controller:
+        A JavaScript AbortController object, used on timeout
+
+    :param request:
+        The request being handled
+
+    :param response:
+        The response being handled (if it exists yet)
+
+    :raises _TimeoutError: If the request times out
+    :raises _RequestError: If the request raises a JavaScript exception
+
+    :return: The result of awaiting the promise.
+    """
+    timer_id = None
+    if timeout > 0:
+        timer_id = js.setTimeout(
+            js_abort_controller.abort.bind(js_abort_controller), int(timeout * 1000)
+        )
+    try:
+        from pyodide.ffi import run_sync
+
+        # run_sync here uses WebAssembly JavaScript Promise Integration to
+        # suspend python until the JavaScript promise resolves.
+        return run_sync(promise)
+    except JsException as err:
+        if err.name == "AbortError":
+            raise _TimeoutError(
+                message="Request timed out", request=request, response=response
+            )
+        else:
+            raise _RequestError(message=err.message, request=request, response=response)
+    finally:
+        if timer_id is not None:
+            js.clearTimeout(timer_id)
+
+
+def has_jspi() -> bool:
+    """
+    Return true if jspi can be used.
+
+    This requires both browser support and also WebAssembly
+    to be in the correct state - i.e. that the javascript
+    call into python was async not sync.
+
+    :return: True if jspi can be used.
+    :rtype: bool
+    """
+    try:
+        from pyodide.ffi import can_run_sync, run_sync  # noqa: F401
+
+        return bool(can_run_sync())
+    except ImportError:
+        return False
+
+
+def streaming_ready() -> bool | None:
+    if _fetcher:
+        return _fetcher.streaming_ready
+    else:
+        return None  # no fetcher, return None to signify that
+
+
+async def wait_for_streaming_ready() -> bool:
+    if _fetcher:
+        await _fetcher.js_worker_ready_promise
+        return True
+    else:
+        return False