about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py')
-rw-r--r--.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py b/.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py
new file mode 100644
index 00000000..40a3a352
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/msrest/universal_http/async_requests.py
@@ -0,0 +1,240 @@
+# --------------------------------------------------------------------------
+#
+# Copyright (c) Microsoft Corporation. All rights reserved.
+#
+# The MIT License (MIT)
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the ""Software""), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+# --------------------------------------------------------------------------
+import asyncio
+from collections.abc import AsyncIterator
+import functools
+import logging
+from typing import Any, Callable, Optional, AsyncIterator as AsyncIteratorType
+
+from oauthlib import oauth2
+import requests
+from requests.models import CONTENT_CHUNK_SIZE
+
+from ..exceptions import (
+    TokenExpiredError,
+    ClientRequestError,
+    raise_with_traceback)
+from . import AsyncHTTPSender, ClientRequest, AsyncClientResponse
+from .requests import (
+    BasicRequestsHTTPSender,
+    RequestsHTTPSender,
+    HTTPRequestsClientResponse
+)
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class AsyncBasicRequestsHTTPSender(BasicRequestsHTTPSender, AsyncHTTPSender):  # type: ignore
+
+    async def __aenter__(self):
+        return super(AsyncBasicRequestsHTTPSender, self).__enter__()
+
+    async def __aexit__(self, *exc_details):  # pylint: disable=arguments-differ
+        return super(AsyncBasicRequestsHTTPSender, self).__exit__()
+
+    async def send(self, request: ClientRequest, **kwargs: Any) -> AsyncClientResponse:  # type: ignore
+        """Send the request using this HTTP sender.
+        """
+        # It's not recommended to provide its own session, and is mostly
+        # to enable some legacy code to plug correctly
+        session = kwargs.pop('session', self.session)
+
+        loop = kwargs.get("loop", asyncio.get_event_loop())
+        future = loop.run_in_executor(
+            None,
+            functools.partial(
+                session.request,
+                request.method,
+                request.url,
+                **kwargs
+            )
+        )
+        try:
+            return AsyncRequestsClientResponse(
+                request,
+                await future
+            )
+        except requests.RequestException as err:
+            msg = "Error occurred in request."
+            raise_with_traceback(ClientRequestError, msg, err)
+
+class AsyncRequestsHTTPSender(AsyncBasicRequestsHTTPSender, RequestsHTTPSender):  # type: ignore
+
+    async def send(self, request: ClientRequest, **kwargs: Any) -> AsyncClientResponse:  # type: ignore
+        """Send the request using this HTTP sender.
+        """
+        requests_kwargs = self._configure_send(request, **kwargs)
+        return await super(AsyncRequestsHTTPSender, self).send(request, **requests_kwargs)
+
+
+class _MsrestStopIteration(Exception):
+    pass
+
+def _msrest_next(iterator):
+    """"To avoid:
+    TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
+    """
+    try:
+        return next(iterator)
+    except StopIteration:
+        raise _MsrestStopIteration()
+
+class StreamDownloadGenerator(AsyncIterator):
+
+    def __init__(self, response: requests.Response, user_callback: Optional[Callable] = None, block: Optional[int] = None) -> None:
+        self.response = response
+        self.block = block or CONTENT_CHUNK_SIZE
+        self.user_callback = user_callback
+        self.iter_content_func = self.response.iter_content(self.block)
+
+    async def __anext__(self):
+        loop = asyncio.get_event_loop()
+        try:
+            chunk = await loop.run_in_executor(
+                None,
+                _msrest_next,
+                self.iter_content_func,
+            )
+            if not chunk:
+                raise _MsrestStopIteration()
+            if self.user_callback and callable(self.user_callback):
+                self.user_callback(chunk, self.response)
+            return chunk
+        except _MsrestStopIteration:
+            self.response.close()
+            raise StopAsyncIteration()
+        except Exception as err:
+            _LOGGER.warning("Unable to stream download: %s", err)
+            self.response.close()
+            raise
+
+class AsyncRequestsClientResponse(AsyncClientResponse, HTTPRequestsClientResponse):
+
+    def stream_download(self, chunk_size: Optional[int] = None, callback: Optional[Callable] = None) -> AsyncIteratorType[bytes]:
+        """Generator for streaming request body data.
+
+        :param callback: Custom callback for monitoring progress.
+        :param int chunk_size:
+        """
+        return StreamDownloadGenerator(
+            self.internal_response,
+            callback,
+            chunk_size
+        )
+
+
+# Trio support
+try:
+    import trio
+
+    class TrioStreamDownloadGenerator(AsyncIterator):
+
+        def __init__(self, response: requests.Response, user_callback: Optional[Callable] = None, block: Optional[int] = None) -> None:
+            self.response = response
+            self.block = block or CONTENT_CHUNK_SIZE
+            self.user_callback = user_callback
+            self.iter_content_func = self.response.iter_content(self.block)
+
+        async def __anext__(self):
+            try:
+                chunk = await trio.to_thread.run_sync(
+                    _msrest_next,
+                    self.iter_content_func,
+                )
+                if not chunk:
+                    raise _MsrestStopIteration()
+                if self.user_callback and callable(self.user_callback):
+                    self.user_callback(chunk, self.response)
+                return chunk
+            except _MsrestStopIteration:
+                self.response.close()
+                raise StopAsyncIteration()
+            except Exception as err:
+                _LOGGER.warning("Unable to stream download: %s", err)
+                self.response.close()
+                raise
+
+    class TrioAsyncRequestsClientResponse(AsyncClientResponse, HTTPRequestsClientResponse):
+
+        def stream_download(self, chunk_size: Optional[int] = None, callback: Optional[Callable] = None) -> AsyncIteratorType[bytes]:
+            """Generator for streaming request body data.
+
+            :param callback: Custom callback for monitoring progress.
+            :param int chunk_size:
+            """
+            return TrioStreamDownloadGenerator(
+                self.internal_response,
+                callback,
+                chunk_size
+            )
+
+
+    class AsyncTrioBasicRequestsHTTPSender(BasicRequestsHTTPSender, AsyncHTTPSender):  # type: ignore
+
+        async def __aenter__(self):
+            return super(AsyncTrioBasicRequestsHTTPSender, self).__enter__()
+
+        async def __aexit__(self, *exc_details):  # pylint: disable=arguments-differ
+            return super(AsyncTrioBasicRequestsHTTPSender, self).__exit__()
+
+        async def send(self, request: ClientRequest, **kwargs: Any) -> AsyncClientResponse:  # type: ignore
+            """Send the request using this HTTP sender.
+            """
+            # It's not recommended to provide its own session, and is mostly
+            # to enable some legacy code to plug correctly
+            session = kwargs.pop('session', self.session)
+
+            trio_limiter = kwargs.get("trio_limiter", None)
+            future = trio.to_thread.run_sync(
+                functools.partial(
+                    session.request,
+                    request.method,
+                    request.url,
+                    **kwargs
+                ),
+                limiter=trio_limiter
+            )
+            try:
+                return TrioAsyncRequestsClientResponse(
+                    request,
+                    await future
+                )
+            except requests.RequestException as err:
+                msg = "Error occurred in request."
+                raise_with_traceback(ClientRequestError, msg, err)
+
+    class AsyncTrioRequestsHTTPSender(AsyncTrioBasicRequestsHTTPSender, RequestsHTTPSender):  # type: ignore
+
+        async def send(self, request: ClientRequest, **kwargs: Any) -> AsyncClientResponse:  # type: ignore
+            """Send the request using this HTTP sender.
+            """
+            requests_kwargs = self._configure_send(request, **kwargs)
+            return await super(AsyncTrioRequestsHTTPSender, self).send(request, **requests_kwargs)
+
+except ImportError:
+    # trio not installed
+    pass
\ No newline at end of file